Hive中SQL数据倾斜及优化的示例分析

14次阅读
没有评论

这篇文章主要介绍了 Hive 中 SQL 数据倾斜及优化的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让丸趣 TV 小编带着大家一起了解一下。

1 数据倾斜的原因 1.1 操作:

关键词

情形

后果

Join

其中一个表较小,

但是 key 集中

分发到某一个或几个 Reduce 上的数据远高于平均值

大表与大表,但是分桶的判断字段 0 值或空值过多

这些空值都由一个 reduce 处理,灰常慢

group by

group by 维度过小,

某值的数量过多

处理某值的 reduce 灰常耗时

Count Distinct

某特殊值过多

处理此特殊值的 reduce 耗时

1.2 原因:

1)、key 分布不均匀

2)、业务数据本身的特性

3)、建表时考虑不周

4)、某些 SQL 语句本身就有数据倾斜

1.3 表现:

任务进度长时间维持在 99%(或 100%),查看任务监控页面,发现只有少量(1 个或几个)reduce 子任务未完成。因为其处理的数据量和其他 reduce 差异过大。

单一 reduce 的记录数与平均记录数差异过大,通常可能达到 3 倍甚至更多。最长时长远大于平均时长。

2 数据倾斜的解决方案 2.1 参数调节:

hive.map.aggr=true

Map 端部分聚合,相当于 Combiner

hive.groupby.skewindata=true

有数据倾斜的时候进行负载均衡,当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

2.2 SQL 语句调节:

如何 Join:

关于驱动表的选取,选用 join key 分布最均匀的表作为驱动表

做好列裁剪和 filter 操作,以达到两表做 join 的时候,数据量相对变小的效果。

大小表 Join:

使用 map join 让小的维度表(1000 条以下的记录条数)先进内存。在 map 端完成 reduce.

大表 Join 大表:

把空值的 key 变成一个字符串加上随机数,把倾斜的数据分到不同的 reduce 上,由于 null 值关联不上,处理后并不影响最终结果。

count distinct 大量相同特殊值

count distinct 时,将值为空的情况单独处理,如果是计算 count distinct,可以不用处理,直接过滤,在最后结果中加 1。如果还有其他计算,需要进行 group by,可以先将值为空的记录单独处理,再和其他计算结果进行 union。

group by 维度过小:

采用 sum() group by 的方式来替换 count(distinct)完成计算。

特殊情况特殊处理:

在业务逻辑优化效果的不大情况下,有些时候是可以将倾斜的数据单独拿出来处理。最后 union 回去。

3 典型的业务场景 3.1 空值产生的数据倾斜

场景:如日志中,常会有信息丢失的问题,比如日志中的 user_id,如果取其中的 user_id 和 用户表中的 user_id 关联,会碰到数据倾斜的问题。

解决方法 1: user_id 为空的不参与关联(红色字体为修改后)

select * from log a
 join users b
 on a.user_id is not null
 and a.user_id = b.user_id
union all
select * from log a
 where a.user_id is null;

解决方法 2  :赋与空值分新的 key 值

select *
 from log a
 left outer join users b
 on case when a.user_id is null then concat(‘hive’,rand() ) else a.user_id end = b.user_id;

结论:方法 2 比方法 1 效率更好,不但 io 少了,而且作业数也少了。解决方法 1 中 log 读取两次,jobs 是 2。解决方法 2 job 数是 1。这个优化适合无效 id (比如 -99 ,’’, null 等) 产生的倾斜问题。把空值的 key 变成一个字符串加上随机数,就能把倾斜的数据分到不同的 reduce 上 , 解决数据倾斜问题。

3.2 不同数据类型关联产生数据倾斜

场景:用户表中 user_id 字段为 int,log 表中 user_id 字段既有 string 类型也有 int 类型。当按照 user_id 进行两个表的 Join 操作时,默认的 Hash 操作会按 int 型的 id 来进行分配,这样会导致所有 string 类型 id 的记录都分配到一个 Reducer 中。

解决方法:把数字类型转换成字符串类型

select * from users a
 left outer join logs b
 on a.usr_id = cast(b.user_id as string)

3.3 小表不小不大,怎么用 map join 解决倾斜问题

使用 map join 解决小表 (记录数少) 关联大表的数据倾斜问题,这个方法使用的频率非常高,但如果小表很大,大到 map join 会出现 bug 或异常,这时就需要特别的处理。  以下例子:

select * from log a
 left outer join users b
 on a.user_id = b.user_id;

users 表有 600w+ 的记录,把 users 分发到所有的 map 上也是个不小的开销,而且 map join 不支持这么大的小表。如果用普通的 join,又会碰到数据倾斜的问题。

解决方法:

select /*+mapjoin(x)*/* from log a
 left outer join ( select /*+mapjoin(c)*/d.*
 from ( select distinct user_id from log ) c
 join users d
 on c.user_id = d.user_id
 ) x
 on a.user_id = b.user_id;

假如,log 里 user_id 有上百万个,这就又回到原来 map join 问题。所幸,每日的会员 uv 不会太多,有交易的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多等等。所以这个方法能解决很多场景下的数据倾斜问题。

3.4GROUP BY 替代 COUNT(DISTINCT)达到优化效果

计算 uv 的时候,经常会用到 COUNT(DISTINCT),但在数据比较倾斜的时候 COUNT(DISTINCT)  会比较慢。这时可以尝试用 GROUP BY 改写代码计算 uv。

INSERT OVERWRITE TABLE s_dw_tanx_adzone_uv PARTITION (ds=20120329)

SELECT 20120329 AS thedate,adzoneid,COUNT(DISTINCT acookie) AS uv FROM s_ods_log_tanx_pv t WHERE t.ds=20120329 GROUP BY adzoneid

关于 COUNT(DISTINCT)的数据倾斜问题不能一概而论,要依情况而定,下面是我测试的一组数据:

测试数据:169857 条

# 统计每日 IP 
CREATE TABLE ip_2014_12_29 AS SELECT COUNT(DISTINCT ip) AS IP FROM logdfs WHERE logdate= 2014_12_29  
耗时:24.805 seconds 
#统计每日 IP(改造) 
CREATE TABLE ip_2014_12_29 AS SELECT COUNT(1) AS IP FROM (SELECT DISTINCT ip from logdfs WHERE logdate= 2014_12_29) tmp; 
耗时:46.833 seconds

测试结果表名:明显改造后的语句比之前耗时,这是因为改造后的语句有 2 个 SELECT,多了一个 job,这样在数据量小的时候,数据不会存在倾斜问题。

3.5 解决 Hive 对 UNION ALL 优化的短板

Hive 对 union all 的优化的特性:对 union all 优化只局限于非嵌套查询。

消灭子查询内的 group by

  示例 1:子查询内有 group by 

SELECT * FROM 
(SELECT * FROM t1 GROUP BY c1,c2,c3 UNION ALL SELECT * FROM t2 GROUP BY c1,c2,c3)t3 
GROUP BY c1,c2,c3

从业务逻辑上说,子查询内的 GROUP BY 怎么都看显得多余(功能上的多余,除非有 COUNT(DISTINCT)),如果不是因为 Hive Bug 或者性能上的考量(曾经出现如果不执行子查询 GROUP BY,数据得不到正确的结果的 Hive Bug)。所以这个 Hive 按经验转换成如下所示:

SELECT * FROM (SELECT * FROM t1 UNION ALL SELECT * FROM t2)t3 GROUP BY c1,c2,c3

调优结果:经过测试,并未出现 union all 的 Hive Bug,数据是一致的。MapReduce 的 作业数由 3 减少到 1。 

  t1 相当于一个目录,t2 相当于一个目录,对 Map/Reduce 程序来说,t1,t2 可以作为 Map/Reduce 作业的 mutli inputs。这可以通过一个 Map/Reduce 来解决这个问题。Hadoop 的 计算框架,不怕数据多,就怕作业数多。

但如果换成是其他计算平台如 Oracle,那就不一定了,因为把大的输入拆成两个输入,分别排序汇总后 merge(假如两个子排序是并行的话),是有可能性能更优的(比如希尔排 序比冒泡排序的性能更优)。

消灭子查询内的 COUNT(DISTINCT),MAX,MIN。

SELECT * FROM 
(SELECT * FROM t1 
UNION ALL SELECT c1,c2,c3 COUNT(DISTINCT c4) FROM t2 GROUP BY c1,c2,c3) t3 
GROUP BY c1,c2,c3;

由于子查询里头有 COUNT(DISTINCT)操作,直接去 GROUP BY 将达不到业务目标。这时采用 临时表消灭 COUNT(DISTINCT)作业不但能解决倾斜问题,还能有效减少 jobs。

INSERT t4 SELECT c1,c2,c3,c4 FROM t2 GROUP BY c1,c2,c3; 
SELECT c1,c2,c3,SUM(income),SUM(uv) FROM 
(SELECT c1,c2,c3,income,0 AS uv FROM t1 
UNION ALL 
SELECT c1,c2,c3,0 AS income,1 AS uv FROM t2) t3 
GROUP BY c1,c2,c3;

job 数是 2,减少一半,而且两次 Map/Reduce 比 COUNT(DISTINCT)效率更高。

  调优结果:千万级别的类目表,member 表,与 10 亿级得商品表关联。原先 1963s 的任务经过调整,1152s 即完成。

消灭子查询内的 JOIN

SELECT * FROM 
(SELECT * FROM t1 UNION ALL SELECT * FROM t4 UNION ALL SELECT * FROM t2 JOIN t3 ON t2.id=t3.id) x 
GROUP BY c1,c2;

上面代码运行会有 5 个 jobs。加入先 JOIN 生存临时表的话 t5,然后 UNION ALL,会变成 2 个 jobs。

INSERT OVERWRITE TABLE t5 
SELECT * FROM t2 JOIN t3 ON t2.id=t3.id; 
SELECT * FROM (t1 UNION ALL t4 UNION ALL t5);

调优结果显示:针对千万级别的广告位表,由原先 5 个 Job 共 15 分钟,分解为 2 个 job 一个 8-10 分钟,一个 3 分钟。

4 总结

使 map 的输出数据更均匀的分布到 reduce 中去,是我们的最终目标。由于 Hash 算法的局限性,按 key Hash 会或多或少的造成数据倾斜。大量经验表明数据倾斜的原因是人为的建表疏忽或业务逻辑可以规避的。在此给出较为通用的步骤:

1、采样 log 表,哪些 user_id 比较倾斜,得到一个结果表 tmp1。由于对计算框架来说,所有的数据过来,他都是不知道数据分布情况的,所以采样是并不可少的。

2、数据的分布符合社会学统计规则,贫富不均。倾斜的 key 不会太多,就像一个社会的富人不多,奇特的人不多一样。所以 tmp1 记录数会很少。把 tmp1 和 users 做 map join 生成 tmp2, 把 tmp2 读到 distribute file cache。这是一个 map 过程。

3、map 读入 users 和 log,假如记录来自 log, 则检查 user_id 是否在 tmp2 里,如果是,输出到本地文件 a, 否则生成 user_id,value 的 key,value 对,假如记录来自 member, 生成 user_id,value 的 key,value 对,进入 reduce 阶段。

4、最终把 a 文件,把 Stage3 reduce 阶段输出的文件合并起写到 hdfs。

如果确认业务需要这样倾斜的逻辑,考虑以下的优化方案:

1、对于 join,在判断小表不大于 1G 的情况下,使用 map join

2、对于 group by 或 distinct,设定  hive.groupby.skewindata=true

3、尽量使用上述的 SQL 语句调节进行优化

hadoop 处理数据的过程,有几个显著的特征:
不怕数据多,就怕数据倾斜。
对 jobs 数比较多的作业运行效率相对比较低,比如即使有几百行的表,如果多次关联多次汇总,产生十几个 jobs,没半小时是跑不完的。map reduce 作业初始化的时间是比较长的。
对 sum,count 来说,不存在数据倾斜问题。
对 count(distinct), 效率较低,数据量一多,准出问题,如果是多 count(distinct)效率更低。

优化可以从几个方面着手:
好的模型设计事半功倍。
解决数据倾斜问题。
减少 job 数。
设置合理的 map reduce 的 task 数,能有效提升性能。(比如,10w+ 级别的计算,用 160 个 reduce,那是相当的浪费,1 个足够)。
自己动手写 sql 解决数据倾斜问题是个不错的选择。set hive.groupby.skewindata=true; 这是通用的算法优化,但算法优化总是漠视业务,习惯性提供通用的解决方法。Etl 开发人员更了解业务,更了解数据,所以通过业务逻辑解决倾斜的方法往往更精确,更有效。
对 count(distinct)采取漠视的方法,尤其数据大的时候很容易产生倾斜问题,不抱侥幸心理。自己动手,丰衣足食。
对小文件进行合并,是行至有效的提高调度效率的方法,假如我们的作业设置合理的文件数,对云梯的整体调度效率也会产生积极的影响。
优化时把握整体,单个作业最优不如整体最优。

细节上就是:
去除查询中不需要的 column
Where 条件判断等在 TableScan 阶段就进行过滤
利用 Partition 信息,只读取符合条件的 Partition
Map 端 join,以大表作驱动,小表载入所有 mapper 内存中
调整 Join 顺序,确保以大表作为驱动表
对于数据分布不均衡的表 Group by 时,为避免数据集中到少数的 reducer 上,分成两个 map-reduce 阶段。第一个阶段先用 Distinct 列进行 shuffle,然后在 reduce 端部分聚合,减小数据规模,第二个 map-reduce 阶段再按 group-by 列聚合。
在 map 端用 hash 进行部分聚合,减小 reduce 端数据处理规模。

感谢你能够认真阅读完这篇文章,希望丸趣 TV 小编分享的“Hive 中 SQL 数据倾斜及优化的示例分析”这篇文章对大家有帮助,同时也希望大家多多支持丸趣 TV,关注丸趣 TV 行业资讯频道,更多相关知识等着你来学习!