首页  ·  知识 ·  大数据
大厂都在用的Hive优化
TMH_ITBOY  简书  Hadoop  编辑:落日666号   图片来源:网络
使用Hive过程中,面对各种各样的查询需求,需要具有针对性的优化下面内容就给大家分别介绍下。

前言

        Hive作为大数据分析领域常用的仓库工具,即使是现在流式计算如火如荼背景下,Hive依然倍受各大厂商挚爱。使用Hive过程中,面对各种各样的查询需求,需要具有针对性的优化下面内容就给大家分别介绍下。


1. 启用压缩

         压缩可以使磁盘上的数据量变小,例如,文本文件格式能够压缩40%甚至更高的比例,这样可以通过降低I/O来提高查询速度。除非产生的数据用于外部系统,或者存在格式兼容性问题,建议总是启用压缩。压缩与解压缩会消耗CPU资源,但是Hive产生的作业往往是IO密集型的。因此CPU开销通常不是问题。


1.1 查询出可用的编解码器:


set io.compression.codecs;

结果如下:


io.compression.codecs=org.apache.hadoop.io.compress.GzipCodec,          org.apache.hadoop.io.compress.DefaultCodec,          org.apache.hadoop.io.compress.BZip2Codec,          org.apache.hadoop.io.compress.SnappyCodec

1.2 启用中间数据压缩。

         一个复杂的Hive查询再提交后,通常被转化成一系列中间阶段的MapReduce作业,Hive引擎将这些作业串联起来完成整个查询。可以将这些中间数据进行压缩。这里所说的中间数据指的是上一个MR作业的输出,这个输出将会被下一个MR作业作为输入数据使用。


 set hive.exec.compress.intermediate=true; set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec; set hive.intermediate.compression.type=BLOCK;

1.3 启用结果压缩

当Hive输出接入到表中时,输出内容同样可以进行压缩。


 set hive.exec.compress.output=true; set mapreduce.output.fileoutputformat.compress=true; set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec; set mapreduce.output.fileoutputformat.compress.type=BLOCK;

2. 优化连接查询

可以通过配置Map连接和倾斜连接的相关属性提升连接查询的性能。


2.1 自动Map连接

         当连接一个大表和一个小表时,自动Map连接是一个非常有用特性。如果启动该特性,小表将保存在每个节点的本地缓存中,并在Map节点与大表进行连接。开启自动Map连接提供了两个好处。首先,将小标装进缓存将节省每个数据节点上的读取时间。其次,它避免了Hive查询中的倾斜连接,因为每个数据块的连接操作已经在Map阶段完成了。


set hive.auto.convert.join=true;set hive.auto.convert.join.noconditionaltask=true;set hive.auto.convert.join.noconditionaltask.size=100000000;set hive.auto.convert.join.use.nonstaged=true;

其中:


hive.auto.convert.join:是否启用基于输入文件的大小,将普通连接转化为Map连接的优化机制。


hive.auto.convert.join.noconditionaltask:是否启用基于输入文件的大小,将普通连接转化为Map连接的优化机制。假设参与连接的表(或分区)有N个,如果打开这个 参数,并且有N-1个表(或分区)的大小总和小于hive.auto.convert.join.noconditionaltask.size参数指定的值,那么会直接将连接转为Map连接。


hive.auto.convert.join.noconditionaltask.size:如果hive.auto.convert.join.noconditionaltask是关闭的,则本参数不起作用。否则,如果参与连接的N个表(或分区)中的N-1个 的总大小小于这个参数的值,则直接将连接转为Map连接。默认值为10MB。


hive.auto.convert.join.use.nonstaged:对于条件连接,如果从一个小的输入流可以直接应用于join操作而不需要过滤或者投影,那么不需要通过MapReduce的本地任务在 分布式缓存中预存。当前该参数在vectorization或tez执行引擎中不工作。


2.2 倾斜连接

         两个大表连接时,会先基于连接键分别对两个表进行排序,然后连接它们。Mapper将特定键值的所有行发送给同一个Reducer。例如:表A的id列有1,2,3,4四个值,表B的id有1,2,3三个值,查询语句如下:


select A.id from A join B on A.id = B.id

         一系列Mapper读取表中的数据并基于键发送给Reducer。如id=1行进入Reducer R1,id = 2的行进入Reducer R2的行等。这些Reducer产生A B的交集并输出。Reducer R4只从A获取行,不产生查询结果。


         现在假设id=1的数据行是高度倾斜的,则R2和R3会很快完成,而R1需要很长时间,将成为整个查询的瓶颈。配置倾斜连接的相关属性可以有效优化倾斜连接。


set hive.optimize.skewjoin=true;set hive.skewjoin.key=1000000;set hive.skewjoin.mapjoin.map.tasks=10000;set hive.skewjoin.mapjoin.min.split=33554432;

其中:


hive.optimize.skewjoin:是否为连接表中的倾斜键创建单独的执行计划。它基于存储在元数据中的倾斜键。在编译时,Hive为倾斜键和其他键值生成各自的查询计划。


hive.skewjoin.key:决定如何确定连接中的倾斜键。在连接操作中,如果同一键值所对应的数据行数超过该参数值,则认为该键是一个倾斜连接键。


hive.skewjoin.mapjoin.map.tasks:指定倾斜连接中,用于Map连接作业的任务数。该参数应该与hive.skewjoin.mapjoin.min.split一起使用,执行细粒度的控制。


hive.skewjoin.mapjoin.min.split:通过指定最小split的大小,确定Map连接作业的任务数。该参数应该与hive.skewjoin.mapjoin.map.tasks一起使用,执行细粒度的控制。


2.3 桶Map连接

如果连接中使用的表是特定列分桶的,可以开启桶Map连接提升性能。


set hive.optimize.bucketmapjoin=true;set hive.optimize.bucketmapjoin.sortedmerge=true;

其中:


hive.optimize.bucketmapjoin:是否尝试桶Map连接。


hive.optimize.bucketmapjoin.sortedmerge:是否尝试在Map连接中使用归并排序。


3. 避免使用Order by 全局排序

         Hive中使用order by 子句实现全局排序,order by只用一个reduce产生结果,对于大数据集,这种做法效率很低。如果不需要全局有序,则可以使用sort by子句,该子句为每个reduce生成一个排好序的文件。如果需要控制一个特定数据行流向哪个reducer,可以使用用distribute by 子句。如:


select id,name,salary, dept from employee distribute by dept sort by id asc, name desc;

         属于一个dept的数据会分配到一个reducer进行处理,同一个dept的所有记录会按照id,name列排序。最终的结果集是全局有序的。


4. 启用Tex或者Spark执行引擎。


set hive.execution.engine=tex;或者set hive.execution.engine=spark;

5. 优化Limit操作

         默认时limit操作仍然会执行整个查询,然后返回限定的行数。在有些情况下这种处理方式很浪费,因此可以通过设置下面的属性避免此行为。


set hive.limit.optimize.enable=true;set hive.limit.row.max.size=true;set hive.limit.optimize.limit.file=10;set hive.limit.optimize.fetch.max=50000;

其中:


hive.limit.optimize.enable:是否启用limit优化。当使用limit语句时,对源数据进行抽样。


hive.limit.row.max.size:在使用limit做数据的子集查询时保证的最大行数据量。


hive.limit.optimize.limit.file:在使用limit做数据子集查询时,采样的最大文件数。


hive.limit.optimize.fetch.max:使用简单limit数据抽样时,允许的最大行数。


6. 启用并行执行

         每条Hive SQL语句都被转化成一个或者多个阶段执行,可能是一个MapReduce阶段,采样阶段,限制阶段等。默认时,Hive在任意时刻只能执行其中一个阶段。如果组成一个特定作业的多个执行阶段是彼此独立的,那么它们可以并行执行,从而整个作业得以更快完成。设置下面的属性启用并执行。


set hive.exec.parallel=true;set hive.exec.parallel.thread.number=8;

其中:


hive.exec.parallel:是否并行执行作业。


hive.exec.parallel.thread.number:最多可以并行执行的作业数。


7. 启用MapReduce严格模式

         Hive提供了一个严格模式,可以防止用户执行那些可能产生负面影响的查询。通过设置下面的属性启用MapReduce严格模式。


set hive.mapred.mode=strict

严格模式禁止3种类型的查询


对于分区表,where子句中不包含分区字段过滤条件的查询不允许执行。


对于使用了order by子句的查询,要求必须使用limit子句,否则不允许执行。


限制笛卡尔积查询。


8. 使用单一Reducer执行多个Group By

         通过为group by操作开启单一reduce任务属性,可以将一个查询中的多个group by操作联合发送给单一MapReduce作业。


set hive.multigroupby.singlereducer=true;

9. 控制并行Reduce任务

         Hive通过将查询任务分成一个或者多个MapReduce任务达到并行的目的。确定最佳的mapper个数和reducer个数取决于多个变量,例如输入的数据量以及对这些数据执行的操作类型等。如果有太多的mapper或者reducer任务,会导致启动、调度和运行作业过程产生过多的开销,而设置的数量太少,那么就可能没有重分利用好集群内在的并发性。对于一个Hive查询,可以设置下面的属性来控制并行reducer任务的个数。


set hive.exec.reducers.bytes.per.reducer=256000000;set hive.exec.reducers.max=1009;

其中:


hive.exec.reducers.bytes.per.reducer:每个reducer的字节数,默认值为256MB。Hive是按照输入的数据量大小来确定reducer个数的。例如,如果输入的数据是1GB,将 使用4个reducer。


hive.exec.reducers.max:将会使用的最大reducer个数。


10. 启用向量化

         向量化特性在Hive 0.13.1版本中被首次引入。通过查询执行向量化,使Hive从单行处理数据改为批量处理方式,具体来说是一次1024行而不是原来的每次只处理一行,这大大提升了指令流水线和缓存的利用率,从而提高了扫描、聚合、过滤和链接等操作的性能。可以设置下面的属性启用查询执行向量化。


set hive.vectorized.execution.enabled=true;set hive.vectorized.execution.reduce.enabled=true;set hive.vectorized.execution.reduce.groupby.enabled=true;

其中:


hive.vectorized.execution.enabled:如果该标志设置为true,则开启查询执行的向量模式,默认值为false。


hive.vectorized.execution.reduce.enabled:如果该标志设置为true,则开启查询执行reduce端的向量模式,默认值为true。


hive.vectorized.execution.reduce.groupby.enabled:如果该标志设置为true,则开启查询执行reduce端group by操作的向量模式,默认值为true。


11. 启用基于成本的优化器

         Hive 0.14 版本开始提供基于成本优化器(CBO)特性。使用过Oracle数据库的同学对CBO一定不会陌生。与Oracle类似,Hive的CBO也可以根据查询成本指定执行计划,例如确定表链接的顺序、以何种方式执行链接、使用的并行度等。设置下面的属性启用基于成本优化器。


set hive.cbo.enable=true;set hive.compute.query.using.stats=true;set hive.stats.fetch.partition.stats=true;set hive.stats.fetch.column.stats=true;

其中:


hive.cbo.enable:控制是否启用基于成本的优化器,默认值是true。Hive的CBO使用Apache Calcite框架实现。


hive.compute.query.using.stats:该属性的默认值为false。如果设置为true,Hive在执行某些查询时,例如select count(1),只利用元数据存储中保存的状态信息返回结果。为了收集基本状态信息,需要将hive.stats.autogather属性配置为true。为了收集更多的状态信息,需要运行analyzetable查询命令。


hive.stats.fetch.partition.stats:该属性的默认值为true。操作树中所标识的统计信息,需要分区级别的基本统计,如每个分区的行数、数据量大小和文件大小等。分区 统计信息从元数据存储中获取。如果存在很多分区,要为每个分区收集统计信息可能会消耗大量的资源。这个标志可被用于禁止从元数据存储中获取分区统计。当 该标志设置为false时,Hive从文件系统获取文件大小,并根据表结构估算行数。


hive.stats.fetch.column.stats:该属性的默认值为false。操作树中所标识的统计信息,需要列统计。列统计信息从元数据存储中获取。如果存在很多列,要为每个列收 集统计信息可能会消耗大量的资源。这个标志可被用于禁止从元数据存储中获取列统计。


可以使用HQL的analyze table语句收集一个表中所有列相关的统计信息,例如下面的语句收集sales_order_face表的统计信息。


analyze table sales_order_fact compute statistics for climuns;analyze table sales_order_fact compute statistics for columns order_number,customer_sk;

12. 使用合适的存储格式

         Hive支持多种数据格式,如textFile、sequenceFile、RCFFile、ORCFile等。在合适的场景下使用合适的存储格式,有助于提升查询性能。


下面是选择存储格式建议的场景:


如果数据有参数的分隔符,那么可以选择TEXTFILE格式。


如果数据所在文件比块尺寸小、可以选择SEQUCEFILE格式。


如果想执行数据分析,并高效地存储数据,可以选择RCFFILE。


如果希望减少数据所需要的存储空间并提升性能,可以选择ORCFILE。


本文作者:TMH_ITBOY 来源:简书
CIO之家 www.ciozj.com 微信公众号:imciow
    >>频道首页  >>网站首页   纠错  >>投诉
版权声明:CIO之家尊重行业规范,每篇文章都注明有明确的作者和来源;CIO之家的原创文章,请转载时务必注明文章作者和来源;
延伸阅读