Hive 数据倾斜和优化
本文及后续所有文章都以 3.1.2 做为版本讲解和入门学习
关键词 | 情形 |
---|---|
group by | 当某一个表分区重复数据较多,会导致数据倾斜,非常耗时 |
join | 当小表连接大表,处理大表的 Map 会慢且分配到的 Reduce 也会慢 |
count() | count 会统计,交由一个 Reduce 来做,数据量大会很慢 |
# group by 解决方案
再 hive 里输入如下:
set hive.groupby.skewindata=true;
或在 hive-site.xml 里添加如下:
<property>
<name>hive.groupby.skewindata</name>
<value>true</value>
</property>
2
3
4
hive.groupby.skewindata=true:数据倾斜时负载均衡,当选项设定为 true,生成的查询计划会有两个 MRJob:
第一个 MRJob 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 GroupBy Key 有可能被分到不同的 Reduce 中,从而达到负载均衡的目的;
第二个 MRJob 再根据预处理的数据结果按照 GroupBy Key 分布到 Reduce 中(这个过程可以保证相同的 GroupBy Key 被分到统一个 Reduce 中),最后完成最终的聚合操作。
由上面可以看出,起到至关重要的作用其实时第二个参数的设置,它使计算变成了两个 MapReduce,现在第一个中再 shuffle 过程 partition 时随鸡给 key 打标机,使每个 key 随机均匀分布到各个 reduce 上计算,但是这样只能完成部分计算,因为相同 key 没有分配到相同 reduce 上,所以需要第二次的 MapReduce,这次就回归正常 shuffle,但是数据分布不均匀的问题再第一次有很大的改善,因此基本解决数据倾斜。
# join 解决方案
再 hive 里输入如下:
set hive.auto.convert.join=true;
# 其中一个表大小小于25mb时,自动启用mapjoin
set hive.mapjoin.smalltable.filesize=25mb
2
3
或在 hive-site.xml 里添加如下:
<property>
<name>hive.auto.convert.join</name>
<value>true</value>
</property>
<property>
<name>hive.mapjoin.smalltable.filesize</name>
<value>50mb</value>
</property>
2
3
4
5
6
7
8
当使用以上参数设置时,需要注意 sql 写的时候,小表一定要在前面(最左侧)。
mapjoin 的主要意思就是,当连接两个表是一个比较小的表和一个特大的表的时候,我们把比较小的 table 直接放到内存中去,然后再对比较大的表格进行 map 操作。join 就发生在 map 操作的时候,每当扫描一个大的 table 中的数据,就要去查看小表的数据,哪条与之相符,继而进行连接。这里的 join 并不会设计 reduce 操作。map 端 join 的优势就是在于没有 shuffle。
还有种 sql 优化的方式,就是有条件的话,先对大表进行条件过滤,后对他 join。
# count distinct 优化
优化前
select count(distinct id) from table_name
优化后
select count(*) from (select distinct id from table_name) tmp;
优化前的 sql 会启用一个 Reduce 任务,优化后的会启用两个 Reduce 任务。优化后的 Reduce 必须设置 mapred.reduce.tasks 数量才有效,否则默认还是 1 个。而 count 一定之启用 1 个 Reduce。
日常统计场景中,我们经常会对一段十七内的字段进行消重并统计数量,SQL 语句类似于 优化前 的语句,这条语句是从一个表的符合 where 条件的记录中统计不重复的 id 的总数。该语句转化为 MapReduce 任务后执行示意图如下:
由于引入了 distinct,因此在 Map 阶段无法利用 combine 对输出结果消重,必须将 id 作为 key 输出,在 Reduce 阶段再对来自于不同 Map 相同 key 的结果进行消重,计入最终统计值。
我们看到任务运行时 Reduce 个数为 1,对于统计大量数据量时,这会导致最终 Map 的全部输出由单个 Reuduce 处理,这唯一的 Reduce 需要 shuffle 大量的数据,并且进行排序聚合等处理,这使得它称为整个任务的 IO 和运算瓶颈。
经过上述分析后,我们可以尝试显示的增大 Reduce 任务个数来提高 Reduce 阶段的并发,使每一个 Reduce 的数据量控制在我们预想的范围。
set mapred.reduce.tasks=100
但这个调正不会影响 count 这种 "全聚合 (full aggregates)" 的任务,它会忽略用户指定的
我们利用 hive 对嵌套语句的支持,将原来一个 MapReduce 任务转为两个任务,在第一阶段选出全部的非重复 id,在第二阶段再对这些已消重的 id 进行统计。这样在第一阶段我们可以同过增大 Reduce 的并发数,并发处理 Map 输出。在第二阶段,由于 id 已经消重,因此 count (*) 操作 Map 阶段不需要输出原 id 数据,只输出一个合并后的计数即可。这样即使第二阶段 Hive 强制指定一个 Reduce 任务,极少量的 Map 输出数据也不会使单一的 Reduce 任务成为瓶颈,改进后的视图如下:
# 调整切片数(map 任务数)
Hive 底层自动对小文件做了优化,用了 CombineTextInputFormat,将多个小文件切片合成一片,对应也就只会启动一个 Map 任务。
合并完成的切片大小,如果 > mapred.max.split.size 的大小,就启动一个新的切片任务。默认 mapred.max.split.size=134217728(128MB)
# JVM 重用
# 默认是1个
set mapred.job.reuse.jvm.num.task=20
2
JVM 重用是 hadoop 调优参数内容,对 hive 的性能具有非常大的影响,特别是对于很难避免小文件的场景或者 task 特别多的场景,这类场景大多数执行事件都很短。这时 jvm 的启动过程可能会造成相当大的开销,尤其时执行的 Job 包含由成千上万个 task 任务的情况。
JVM 重用可以使得一个 JVM 进程在同一个 JOB 中重新使用 N 次后才会销毁。
# 启用严格模式
在 hive 里面可以通过严格模式防止用户执行那些可能产生意想不到的不好的效果的查询,从而保护 hive 的集群。
用户可以通过以下来设置严格模式,改成 unstrict 则为非严格模式。
set hive.mapred.mode=strict
在严格模式下,用户在运行如下查询的时候会报错:
- 分区表的查询语句没有使用分区字段来限制
- 使用了 order by 但没有使用 limit 语句。(如果不使用 limit,会对查询结果进行全局排序,消耗时间长)
- 产生了笛卡尔积
# 关闭推测执行机制
因为在测试环境下我们都把应用程序跑通了,如果还加上推测执行,如果有一个数据分片本来就会发生数据倾斜,执行事时间就是比其他的时间长,那么 hive 就会把这个执行时间长的 job 当作运行失败,继而又产生一个相同的 job 去运行,后果可想而知,可以通过如下设置关闭推测执行机制。该机制默认时开启的。
set mapreduce.map.speculative=false;
set mapreduce.reduce.speculative=false;
set hive.mapred.reduce.tasks.speculative.execution=false;
2
3