hive中sql优化有哪些

2020-07-01 00:00:00 数据 文件 优化 合并 倾斜

目标:在有限的资源下提升执行效率;

hive表的优化:分区

hive查询优化:

1、join优化:

hive.optimize.skewjoin=true;如果是join过程中出现倾斜 应该设置为true;

set hive.skewjoin.key=100000; 这个是join的键对应的记录条数,超过这个值则会进行优化;

2、mapjoin优化

set hive.auto.convert.join=true;

hive.mapjoin.smalltable.filesize默认值是25mb;

select/*+mapjoin(A)*/f.a, f.b from A t join B f on (f.a=t.a);

3、bucket join

两个表以相同方式划分桶,两个表的桶个数是倍数关系

create table order(cid int, price float) clustered by(cid) into 32 buckets;

create table customer(id int, first string) clustered by(id) into 32 buckets;

select price fromorder t join customers s on t.cid = s.id;

4、group by优化

hive.group.skewindata=true; 如果是group by过程出现倾斜,应该设置为true;

set hive.groupby.mapaggr.checkinterval=100000; 这个是group的键对应的记录条数超过这个值则会进行优化

5、count distinct优化

优化前:

selectcount(distinctid) from tablename;

select a, sum(b), count(distinct c), count(distinct d) fromtestgroupby a;

优化后:

select count(1) from (selectidfrom tablename groupbyid) tmp;

select a, sum(b) as b, count(c) as c, count(d) as d

from(

select a,0 as b, c, null as d from test group by a,c

union all

select a,0 as b, null as c, d from test group by a, d

union all

select a,b,null as c, null as d from test

)tmp1 group by a;

6、Hive job优化

每个查询被Hive转化为多个阶段,有些阶段关联性不大,则可以并行化执行,减少执行时间

set hive.exec.parallel=true;

set hive.exec.parallel.thread.number=8;

job合并输出小文件

set hive.merge.smallfiles.avgsize=256000000;当输出文件平均大小小于该值,启动新job合并文件;

set hive.merge.size.per.task=64000000;合并之后的文件大小;

7、JVM重利用

set mapred.job.reuse.jvm.num.tasks=20;

JVM重利用可以是Job长时间保留slot,直到作业结束,这在对于有较多任务和较多小文件的任务时非常有意义的,减少执行时间。但是这个值不能设置过大,因为有些作业会有reduce任务,如果reduce任务没有完成,则map任务占用的slot不能释放,其他的作业可能就需要等待。

8、压缩数据

set hive.exec.compress.intermediate=true;

set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;

set hive.intermediate.compression.type=BLOCK;

9、Hive Map优化

(1) 默认map个数

default_num = total_size / block_size;

(2)期望大小

goal_num = mapred.map.tasks;

(3)设置处理的文件大小

split_size=max(mapred.min.split.size, block_size)

split_num = total_size / split_size;

(4)计算map个数

compute_map_num = min(split_num, max(default_num, goal_num))

设置map个数总结如下:

(1) 如果想增加map个数,则设置mapred.map.tasks为一个较大的值

(2) 如果想减少map个数,则设置mapred.min.split.size为一个较大的值

情况1:输入文件size巨大,但不是小文件

增大mapred.min.split.size的值

情况2:输入文件数量巨大,且都是小文件,就是单个文件的size小于blockSize。


map端聚合

set hive.map.aggr=true;

10、Hive Shuffle优化

11、Hive Reduce优化

set mapred.reduce.tasks=10;直接设置

hive.exec.reducers.max 默认999

hive.exec.reducers.bytes.per.reducer 默认1G

numTasks = min(maxReducers, input.size / perReducer)

maxReducers = hive.exec.reducers.max

perReducer=hive.exec.reducers.bytes.per.reducer

12、列裁剪和分区裁剪

只获取我们需要的列和分区

13、优先过滤,把条件先放在where条件中,减少数据再进行关联

14、sort by 替换order by,配合distribute by一起使用

15、group by 代替distinct去重数据,会有多个mapreduce执行,大数据量情况下比较好用;

16、join优化

小表前置,因为hive在解析sql的时候会把个表放进内存;

17、多表关联时尽量key相同,会当成同一个mr任务执行

select a.event_type,a.event_code,a.event_desc,b.upload_time
from calendar_event_code a
inner join (
select event_type,upload_time from calendar_record_log
where pt_date = 20190225
) b on a.event_type = b.event_type
inner join (
select event_type,upload_time from calendar_record_log_2
where pt_date = 20190225
) c on a.event_type = c.event_type;

18、倾斜均衡配置项

这个配置与上面group by的倾斜均衡配置项异曲同工,通过hive.optimize.skewjoin来配置,默认false。

如果开启了,在join过程中Hive会将计数超过阈值hive.skewjoin.key(默认100000)的倾斜key对应的行临时写进文件中,然后再启动另一个job做map join生成结果。通过hive.skewjoin.mapjoin.map.tasks参数还可以控制第二个job的mapper数量,默认10000。 再重复一遍,通过自带的配置项经常不能解决数据倾斜问题。join是数据倾斜的重灾区,后面还要介绍在SQL层面处理倾斜的各种方法。

19、优化SQL处理join数据倾斜

单独处理倾斜key

这其实是上面处理空值方法的拓展,不过倾斜的key变成了有意义的。一般来讲倾斜的key都很少,我们可以将它们抽样出来,对应的行单独存入临时表中,然后打上一个较小的随机数前缀(比如0~9),后再进行聚合。SQL语句与上面的相仿,不再赘述。

不同数据类型

这种情况不太常见,主要出现在相同业务含义的列发生过逻辑上的变化时。 举个例子,假如我们有一旧一新两张日历记录表,旧表的记录类型字段是(event_type int),新表的是(event_type string)。为了兼容旧版记录,新表的event_type也会以字符串形式存储旧版的值,比如'17'。当这两张表join时,经常要耗费很长时间。其原因就是如果不转换类型,计算key的hash值时默认是以int型做的,这就导致所有“真正的”string型key都分配到一个reducer上。所以要注意类型转换:

select a.uid,a.event_type,b.record_data
from calendar_record_log a
left outer join (
select uid,event_type from calendar_record_log_2
where pt_date = 20190228
) b on a.uid = b.uid and b.event_type = cast(a.event_type as string)
where a.pt_date = 20190228;

build table过大

有时,build table会大到无法直接使用map join的地步,比如全量用户维度表,而使用普通join又有数据分布不均的问题。这时就要充分利用probe table的限制条件,削减build table的数据量,再使用map join解决。代价就是需要进行两次join。举个例子:

select /*+mapjoin(b)*/ a.uid,a.event_type,b.status,b.extra_info
from calendar_record_log a
left outer join (
select /*+mapjoin(s)*/ t.uid,t.status,t.extra_info
from (select distinct uid from calendar_record_log where pt_date = 20190228) s
inner join user_info t on s.uid = t.uid
) b on a.uid = b.uid
where a.pt_date = 20190228

MapReduce优化

调整mapper数

一般来讲,如果输入文件是少量大文件,就减少mapper数;如果输入文件是大量非小文件,就增大mapper数;至于大量小文件的情况,得参考下面“合并小文件”一节的方法处理。

调整reducer数

reducer数量与输出文件的数量相关。如果reducer数太多,会产生大量小文件,对HDFS造成压力。如果reducer数太少,每个reducer要处理很多数据,容易拖慢运行时间或者造成OOM。

reducer_num = MIN(total_input_size / reducers.bytes.per.reducer, reducers.max)

合并小文件

  • 输入阶段合并 需要更改Hive的输入文件格式,即参数hive.input.format,默认值是org.apache.hadoop.hive.ql.io.HiveInputFormat,我们改成org.apache.hadoop.hive.ql.io.CombineHiveInputFormat。 这样比起上面调整mapper数时,又会多出两个参数,分别是mapred.min.split.size.per.nodemapred.min.split.size.per.rack,含义是单节点和单机架上的小split大小。如果发现有split大小小于这两个值(默认都是100MB),则会进行合并。具体逻辑可以参看Hive源码中的对应类。
  • 输出阶段合并 直接将hive.merge.mapfileshive.merge.mapredfiles都设为true即可,前者表示将map-only任务的输出合并,后者表示将map-reduce任务的输出合并。 另外,hive.merge.size.per.task可以指定每个task输出后合并文件大小的期望值,hive.merge.size.smallfiles.avgsize可以指定所有输出文件大小的均值阈值,默认值都是1GB。如果平均大小不足的话,就会另外启动一个任务来进行合并。

严格模式

所谓严格模式,就是强制不允许用户执行3种有风险的HiveSQL语句,一旦执行会直接失败。这3种语句是:

  • 查询分区表时不限定分区列的语句;
  • 两表join产生了笛卡尔积的语句;
  • 用order by来排序但没有指定limit的语句。

要开启严格模式,需要将参数hive.mapred.mode设为strict

采用合适的存储格式

在HiveSQL的create table语句中,可以使用stored as ...指定表的存储格式。Hive表支持的存储格式有TextFile、SequenceFile、RCFile、Avro、ORC、Parquet等。

相关文章