首页  ·  知识 ·  大数据
一文带你搞清楚什么是“数据倾斜”
SAMshare  CIO之家的朋友们  实践应用  编辑:kaiserin   图片来源:网络
数据倾斜,在MapReduce编程模型中十分常见,就是大量的相同key被partition分配到一个分区里,造成了"一个人累死,其他人闲死"的情况,这违背了并行计算的初衷,整体的效率是十分低下的

? 什么是数据倾斜

我们在用hive取数的时候,有的时候只是跑一个简单的join语句,但是却跑了很长的时间,有的时候我们会觉得是集群资源不够导致的,但是很大情况下就是出现了"数据倾斜"的情况。

在了解数据倾斜之前,我们应该有一个常识,就是现实生活中的数据分布是不均匀的,俗话说"28定理",80%的财富集中在20%的人手中之类的故事相信大家都看得不少。所以,在我们日常处理的现实数据中,也是符合这种数据分布的,数据倾斜一般有两种情况:

  • 变量值很少: 单个变量值的占比极大,常见的字段如性别、学历、年龄等。

  • 变量值很多: 单个变量值的占比极小,常见的字段如收入、订单金额之类的。

数据倾斜,在MapReduce编程模型中十分常见,就是大量的相同key被partition分配到一个分区里,造成了"一个人累死,其他人闲死"的情况,这违背了并行计算的初衷,整体的效率是十分低下的。

? 数据倾斜的原因

当我们看任务进度长时间维持在99%(或100%),查看任务监控页面就会发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大,这就是数据倾斜的直接表现。

而导致这个的原因,大致可以分为下面几点:

  • key分布不均匀

  • 业务数据本身的特性

  • 建表时考虑不周

  • 某些SQL语句本身就有数据倾斜

具体可以体现在下面的常见操作:

image.png

备注:图片文字内容来自网络

? Hadoop计算框架的特点

在了解如何避免数据倾斜之前,我们先来看看Hadoop框架的特性:

  • 大数据量不是大问题,数据倾斜才是大问题;

  • jobs数比较多的作业效率相对比较低,比如即使有几百万的表,如果多次关联多次汇总,产生十几个jobs,耗时很长。原因是map reduce作业初始化的时间是比较长的;

  • sum,count,max,min等UDAF(User Defined Aggregate Function:自定义函数),不怕数据倾斜问题,hadoop在map端的汇总并优化,使数据倾斜不成问题;

  • count(distinct),在数据量大的情况下,效率较低,如果是多count(distinct)效率更低,因为count(distinct)是按group by字段分组,按distinct字段排序,一般这种分布式是很倾斜的,比如男uv,女uv,淘宝一天30亿的pv,如果按性别分组,分配2个reduce,每个reduce处理15亿数据。

?? 优化的常用手段

说的是优化手段,但更多的是"踩坑"的经验之谈。

?? 优化之道:

  • 首先要了解数据分布,自己动手解决数据倾斜问题是个不错的选择;

  • 增加jvm(Java Virtual Machine:Java虚拟机)内存,这适用于变量值非常少的情况,这种情况下,往往只能通过硬件的手段来进行调优,增加jvm内存可以显著的提高运行效率;

  • 增加reduce的个数,这适用于变量值非常多的情况,这种情况下最容易造成的结果就是大量相同key被partition到一个分区,从而一个reduce执行了大量的工作;

  • 重新设计key,有一种方案是在map阶段时给key加上一个随机数,有了随机数的key就不会被大量的分配到同一节点(小几率),待到reduce后再把随机数去掉即可;

  • 使用combiner合并。combinner是在map阶段,reduce之前的一个中间阶段,在这个阶段可以选择性的把大量的相同key数据先进行一个合并,可以看做是local reduce,然后再交给reduce来处理,减轻了map端向reduce端发送的数据量(减轻了网络带宽),也减轻了map端和reduce端中间的shuffle阶段的数据拉取数量(本地化磁盘IO速率);(hive.map.aggr=true)

  • 设置合理的map reduce的task数,能有效提升性能。(比如,10w+级别的计算,用160个reduce,那是相当的浪费,1个足够);

  • 数据量较大的情况下,慎用count(distinct),count(distinct)容易产生倾斜问题;

  • hive.groupby.skewindata=true有数据倾斜的时候进行负载均衡,当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

??  SQL语句调节:

  • 如何Join:关于驱动表的选取,选用join key分布最均匀的表作为驱动表; 做好列裁剪和filter操作,以达到两表做join的时候,数据量相对变小的效果。

  • 大小表Join:使用map join让小的维度表(1000条以下的记录条数) 先进内存。在map端完成reduce。

  • 大表Join大表:把空值的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。

  • count distinct大量相同特殊值:count distinct时,将值为空的情况单独处理,如果是计算count distinct,可以不用处理,直接过滤,在最后结果中加1。如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union。

  • group by维度过小:采用sum() group by的方式来替换count(distinct)完成计算。

  • 特殊情况特殊处理:在业务逻辑优化效果的不大情况下,有些时候是可以将倾斜的数据单独拿出来处理。最后union回去。

看完上面的经验总结还是有点懵逼?说实话我也是的,太多的信息量冲击,不过我们可以收藏起来以后继续看多几次,加深印象。

接下来,我们将从一些具体的案例来讲讲SQL语句优化的技巧,非常常用,对我们日常写SQL很有帮助。

优化案例

场景1:RAC常用(real application clusters的缩写,译为“实时应用集群”)

有一张user表,为卖家每天收入表,user_id,dt(日期)为key,属性有主营类目(cat),指标有交易金额,交易比数。每天要取每个user的主营类目在过去10天的总收入,总比数。

常规做法:取每个user_id最近一天的主营类目,存入临时表t1,汇总过去10天的总交易金额,交易比数,存入临时表t2,连接t1,t2,得到最终的结果。

优化做法:

SELECT user_id  , substr(MAX(concat(dt, cat)), 9) AS main_cat  , SUM(qty), SUM(amt)FROM usersWHERE dt BETWEEN 20101201 AND 20101210GROUP BY user_id;


场景2:空值产生的数据倾斜(最常见的现象)

日志中,常会有信息丢失的问题,比如全网日志中的 user_id,如果取其中的 user_id 和 bmw_users 关联,会碰到数据倾斜的问题。

解决方式1:user_id为空的不参与关联

SELECT *FROM log a  JOIN bmw_users b  ON a.user_id IS NOT NULL
      AND a.user_id = b.user_idUNION ALLSELECT *FROM log aWHERE a.user_id IS NULL;


解决方式2:赋与空值分新的key值

SELECT *FROM log a  LEFT JOIN bmw_users b ON CASE
          WHEN a.user_id IS NULL THEN concat(‘dp_hive’, rand())
          ELSE a.user_id      END = b.user_id;


结论:方法2比方法1效率更好,不但io少了,而且作业数也少了。解决方法1 log表被读取了两次,jobs是2。这个优化适合无效 id (比如 -99 , ’’, null 等) 产生的倾斜问题。把空值的 key 变成一个字符串加上随机数,就能把倾斜的数据分到不同的reduce上 ,解决数据倾斜问题。

场景3:不同数据类型关联产生数据倾斜

一张表 s8_log,每个商品一条记录,要和商品表关联。但关联却碰到倾斜的问题。s8_log 中有字符串商品 id,也有数字的商品 id。字符串商品 id 类型是 string 的,但商品中的数字 id 是 bigint 的。问题的原因是把 s8_log 的商品 id 转成数字 id 做 Hash(数字的 Hash 值为其本身,相同的字符串的 Hash 也不同)来分配 Reducer,所以相同字符串 id 的 s8_log,都到一个 Reducer 上了。

解决方式:把数字类型转换成字符串类型

SELECT *FROM s8_log a  LEFT JOIN r_auction_auctions b ON a.auction_id = CAST(b.auction_id AS string);


场景4:多表 union all 会优化成一个 job

推广效果表要和商品表关联,效果表中的 auction id 列既有商品 id,也有数字 id,和商品表关联得到商品的信息。

SELECT *FROM effect a  JOIN (
      SELECT auction_id AS auction_id      FROM auctions      UNION ALL
      SELECT auction_string_id AS auction_id      FROM auctions  ) b  ON a.auction_id = b.auction_id;


结论:这样子比分别过滤数字 id,字符串 id ,然后分别和商品表关联性能要好。这样写的好处:1个 MR 作业,商品表只读取一次,推广效果表只读取一次。把这个 sql 换成 MR 代码的话,map 的时候,把 a 表的记录打上标签 a ,商品表记录每读取一条,打上标签 t,变成两个<key,value> 对,<t,数字id,value>,<t,字符串id,value>。所以商品表的 HDFS(Hadoop Distributed File System) 读只会是一次。

场景5:消灭子查询内的 group by

原写法:

SELECT *FROM (
  SELECT *
  FROM t1  GROUP BY c1, c2, c3  UNION ALL
  SELECT *
  FROM t2  GROUP BY c1, c2, c3) t3GROUP BY c1, c2, c3;


优化写法:

SELECT *FROM (
  SELECT *
  FROM t1  UNION ALL
  SELECT *
  FROM t2) t3GROUP BY c1, c2, c3;


结论:从业务逻辑上说,子查询内的 group by 功能与外层的 group by 重复,除非子查询内有 count(distinct)。经过测试,并未出现 union all 的 hive bug,数据是一致的。MR 的作业数由3减少到1。t1 相当于一个目录,t2 相当于一个目录,对map reduce程序来说,t1,t2 可以做为 map reduce 作业的 mutli inputs。这可以通过一个 map reduce 来解决这个问题。Hadoop的计算框架,不怕数据多,怕作业数多。

场景6:消灭子查询内的count(distinct),max,min

原写法:

SELECT c1, c2, c3, sum(pv)FROM (
    SELECT c1, c2, c3, COUNT(c4)
    FROM t1    GROUP BY c1, c2, c3    UNION ALL
    SELECT c1, c2, c3, COUNT(DISTINCT c4)
    FROM t2    GROUP BY c1, c2, c3) t3GROUP BY c1, c2, c3;


这种我们不能直接union 再groupby,因为其中有一个表的操作用到了去重,这种情况,我们可以通过建立临时表来消灭这种数据倾斜问题。

优化写法:

INSERT INTO t4SELECT c1, c2, c3, COUNT(DISTINCT c4)FROM t2GROUP BY c1, c2, c3;SELECT c1, c2, c3, SUM(pv)FROM (
    SELECT c1, c2, c3, COUNT(c4)
    FROM t1    UNION ALL
    SELECT *
    FROM t4) t3GROUP BY c1, c2, c3;


场景7:两张大表join

有两张表,一张是用户访问日志表log,一张是用户表users,其中log表上T,user表也上G,如何每日做到快速连接呢?

解决方法:

SELECT *FROM log a    LEFT JOIN (
        SELECT d.*
        FROM (
            SELECT DISTINCT memberid            FROM log        ) c            JOIN users d ON c.memberid = d.memberid    ) x    ON a.memberid = b.memberid;


上面代码的意思,就是我们可以通过缩小主键的范围来达到减少表的连接操作,比如说限值某段时间,这样子,memberid就会有所减少了,而不是全量数据。

场景8:reduce的时间过长

还是场景7的例子,假设一个memberid对应的log里有很多数据,那么最后合并的时候,也是十分耗时的,所以,这里需要找到一个方法来解决这种reduce分配不均的问题。

解决方法:

SELECT *FROM log a    LEFT JOIN (
        SELECT memberid, number        FROM users d            JOIN num e    ) b    ON a.memberid = b.memberid        AND mod(a.pvtime, 30) + 1 = b.number;


解释一下,上面的num是一张1列30行的表,对应1-30的正整数,把users表膨胀成N份(基于倾斜程度做一个合适的选择),然后把log数据根据memberid和pvtime分到不同的reduce里去,这样可以保证每个reduce分配到的数据可以相对均匀。

场景9:过多的where条件

有的时候,我们会写超级多的where条件来限制查询,其实这样子是非常低效的,主要原因是因为这个and条件hive在生成执行计划时产生了一个嵌套层次很多的算子。

解决方案:

1)把筛选条件对应的值写入一张小表,再一次性join到主表;

2)或者写个udf(user-defined function,用户定义函数),把这些预设值读取进去,udf来完成这个and数据过滤操作。

场景10:分组结果很多,但是你只需要topK

原写法:

SELECT mid, url, COUNT(1) AS cntFROM (
    SELECT *
    FROM r_atpanel_log    WHERE pt = '20190610'
        AND pagetype = 'normal') subqGROUP BY mid, urlORDER BY cnt DESCLIMIT 15;

优化写法:

SELECT *FROM (
    SELECT mid, url, COUNT(1) AS cnt    FROM (
        SELECT *
        FROM r_atpanel_log        WHERE pt = '20190610'
            AND pagetype = 'normal'
    ) subq    GROUP BY mid, url) subq2WHERE cnt > 100ORDER BY cnt DESCLIMIT 15;


可以看出,我们先过滤掉无关的内容,再进行排序,这样子快很多。

? References

  • 百度百科

  • Hive优化案例(很好):https://blog.csdn.net/u011500419/article/details/90266428

  • 数据倾斜是什么以及造成的原因?:https://blog.csdn.net/wyz0516071128/article/details/80997158


本文作者:SAMshare 来源:CIO之家的朋友们
CIO之家 www.ciozj.com 微信公众号:imciow
    >>频道首页  >>网站首页   纠错  >>投诉
版权声明:CIO之家尊重行业规范,每篇文章都注明有明确的作者和来源;CIO之家的原创文章,请转载时务必注明文章作者和来源;
延伸阅读
也许感兴趣的
我们推荐的
主题最新
看看其它的