在大数据系统中,我们往往无法直接对在线系统中的数据直接进行检索和计算。
在线系统所使用关系型数据库、缓存数据库存储数据的方式都非常不同,很多存储系统并不适合分析型(OLAP)的查询,也不允许分析查询影响到在线业务的稳定性。
从数仓建设的角度思考,数据仓库需要依赖于稳定和规范的数据源,数据需要经过采集加工后才能真正被数仓所使用。推动数据同步服务的平台化,才有可能从源头规范数据的产出。数据同步服务不像数据挖掘一样可以直接产生价值,但它更像是连接在线系统和离线系统的高速公路,好的同步工具可以很大程度上提升数据开发的效率。
本文主要介绍知乎在数据同步这方面的建设,工具选型和平台化的实践。
由于在线业务的数据库在知乎内部还是以MySQL为主,在数据同步的数据源方面主要考虑MySQL和Hive的互相同步,后续可以考虑支持HBase。
早期数据同步使用Oozie + Sqoop来完成,基本满足业务需求。但是随着数据同步任务的不断变多,出现了很多重复同步的例子,对同步任务的负载管理也是空白。凌晨同步数据高峰导致MySQL不断报警,DBA苦不堪言。
对于业务来说,哪些表已经被同步了,哪些表还没有也是一个黑盒子,依赖其他业务方的数据都只能靠口头的约定。为了解决这些问题,决定对数据同步做一个统一的平台,简化同步任务的配置,调度平衡负载,管理元信息等等。
到现在为止,数据同步平台支撑了上千张表的同步,每天同步的数据量超过10TB。
数据同步工具市面上有很多解决方案,面向批的主要有Apache Sqoop和阿里开源的DataX,其他商业的数据同步工具不在本文讨论范围。下面主要对比这两种数据同步工具:
1、Sqoop
Pros:
Cons:
支持的数据源不算太丰富(比如ES),扩展难度大;
不支持限速,容易对MySQL造成压力。
2、DataX
Pros:
Cons:
考虑到同步本身要消耗不少的计算和带宽资源,Sqoop可以更好的利用Hadoop集群的资源,而且和Hive适配的更好,最终选择了Sqoop作为数据同步的工具。
平台化的目标是构建一个相对通用的数据同步平台,更好的支持新业务的接入,和公司内部的系统集成,满足业务需求。平台初期设计的目标有以下几个:
简单的任务配置界面,方便新的任务接入;
监控和报警;
屏蔽MySQL DDL造成的影响;
可扩展新数据源。
整体系统架构如下图:
API Server用于提供用户界面和RESTFul API;
数据源中心存储数据源信息,并从真实的数据源定期更新,保持比较新的数据;
Scheduler负责规划任务的执行资源,保护MySQL集群避免负载过高;
Worker真实的执行任务,分布在多个节点上。
1、简化任务接入
平台不应该要求每个用户都理解底层数据同步的原理,对用户而言,应该是描述数据源 (Source) 和目标存储 (Sink),还有同步周期等配置。所有提供的同步任务应该经过审核,防止未经许可的数据被同步,或者同步配置不合理,增加平台负担。最后暴露给用户的UI大概如下图:
2、增量同步
对于数据量非常大的数据源,如果每次同步都是全量,对于MySQL的压力会特别大,同步需要的时间也会很长。因此需要一种可以每次只同步新增数据的机制,减少对于MySQL端的压力。但是增量同步不是没有代价的,它要求业务在设计业务逻辑和表结构的时候,满足下面任意条件:
如果满足上面条件,数据量比较大的表就可以采用增量同步的方式拉取。小数据量的表不需要考虑增量同步,因为数据和合并也需要时间,如果收益不大就不应该引入额外的复杂性。一个经验值是行数<= 2000w的都属于数据量比较小的表,具体还取决于存储的数据内容(比如有很多 Text 类型的字段)。
3、处理Schema变更
做数据同步永远回避不掉的一个问题就是Schema的变更,对MySQL来说,Schema变更就是数据库的DDL操作。数据同步平台应该尽可能屏蔽MySQL DDL对同步任务的影响,并且对兼容的变更,及时变更推送到目标存储。
数据同步平台会通过数据源中心定时的扫描每个同步任务上游的数据源,保存当前Schema的快照,如果发现Schema发生变化,就通知下游做出一样的变更。绝大部分的DDL还是增加字段,对于这种情况数据同步平台可以很好屏蔽变更对数仓的影响。对于删除字段的操作原则上禁止的,如果一定要做,需要走变更流程,通知到依赖该表的业务方,进行Schema同步的调整。
4、和调度平台的集成
MySQL的数据通常会作为后续ETL的数据源,位于整个数据链路的最顶端。知乎内部自研了离线任务调度器,根据数据的依赖关系自动解析任务的依赖,按照合理的顺序启动ETL任务。
数据同步平台和调度平台打通后,可以在每个同步任务结束后,通知调度器启动下游的后继任务,而不用依赖平台和用户口头约定启动时间。如果数据同步出现延时,调度器也可以很好的屏蔽这个问题。待数据同步恢复后,数据链路也随之恢复。
5、监控和报警
根据USE原则,大概整理出下面几个需要监控的指标:
MySQL机器的负载,IOPS,出入带宽;
调度队列长度,Yarn提交队列长度;
任务执行错误数。
报警更多是针对队列饱和度和同步错误进行的。
1、资源管理
当同步任务越来越多时,单纯的按照任务启动时间来触发同步任务已经不能满足需求。数据同步应该保证对于线上业务没有影响,在此基础上速度越快越好。这里本质上是让Sqoop充分又不过度利用MySQL的IOPS,快速拉取数据同时避免资源过度竞争。
为了避免数据同步对线上服务的影响,对于需要数据同步的MySQL单独建立一个从节点,隔离线上流量,只提供给数据同步和业务离线查询使用。
除此之外,需要一个调度策略来决定一个任务何时执行。由于任务的总数量并不多,但是每个任务可能会执行非常长的时间,对调度器的压力并不大,最终决定采用类似YARN的一个中央式的资源调度器,调度器的状态都持久化在数据库中,方便重启或者故障恢复。
最终架构图如下:
最终任务的调度流程如下:
每个MySQL实例是调度器的一个队列,根据同步的元信息决定该任务属于哪个队列;
根据要同步数据量预估资源消耗,向调度器申请该队列对应大小的资源;
调度器将任务提交到执行队列,没有意外的话会立刻开始执行;
Monitor定时向调度器汇报MySQL节点的负载,如果负载过高就停止向该队列提交新的任务;
任务结束后向调度器释放资源。
从早期依靠Crontab调度任务到引入调度器,MySQL集群的资源被更充分的利用。在数据同步高峰期基本不会出现负载空置的情况,任务的平均执行时间只有原先的一半。对DBA来说,MySQL集群的负载报警也大幅减少。
2、存储格式
Hive默认的格式是Textfile,这是一种类似CSV的存储方式,所有数据以文本的形式存储,但是对于OLAP查询来说压缩比太低,查询性能不好。通常我们会选择一些列式存储提高存储和检索的效率。Hive中比较成熟的列式存储格式有Parquet和ORC。这两个存储的查询性能相差不大,但是ORC和Hive集成更好而且对于非嵌套数据结构查询性能是优于Parquet的。但是知乎内部因为也用了Impala,早期的Impala版本不支持ORC格式的文件,为了兼容Impala最终选择了Parquet作为默认的存储格式。
3、针对不同的数据源选择合适的并发数
Sqoop是基于MapReduce实现的,提交任务前先会生成MapReduce代码,然后提交到Hadoop 集群。Job整体的并发度就取决于 Mapper 的个数。Sqoop默认的并发数是4,对于数据量比较大的表的同步显然是不够的,对于数据量比较小的任务又太多了,这个参数一定要在运行时根据数据源的元信息去动态决定。
4、优化Distributed Cache避免任务启动对HDFS的压力
在平台上线后,随着任务越来越多,发现如果HDFS的性能出现抖动,对同步任务整体的执行时间影响非常大,导致夜间的很多后继任务受到影响。
开始推测是数据写入HDFS性能慢导致同步出现延时,但是任务大多数会卡在提交阶段。随着进一步排查,发现MapReduce为了解决不同作业依赖问题,引入了Distributed Cache机制可以将Job依赖的lib上传到HDFS,然后再启动作业。
Sqoop也使用了类似的机制,会依赖Hive的相关lib,这些依赖加起来有好几十个文件,总大小接近150MB,虽然对于HDFS来说是很小数字,但是当同步任务非常多的时候,集群一点点的性能抖动都会导致调度器的吞吐大幅度下降,最终同步的产出会有严重延时。
最后的解决方法是将Sqoop安装到集群中,然后通过Sqoop的参数--skip-distcache避免在任务提交阶段上传依赖的jar。
Distributed Cache:
https://community.hortonworks.com/questions/79556/what-is-distributed-cache-in-hadoop.html
5、关闭推测执行(Speculative Execution)
所谓推测执行是这样一种机制:在集群环境下运行MapReduce,一个job下的多个task执行速度不一致,比如有的任务已经完成,但是有些任务可能只跑了10%,这些任务将会成为整个job的短板。推测执行会对运行慢的task启动备份任务,然后以先运行完成的task的结果为准,kill掉另外一个task。
这个策略可以提升job的稳定性,在一些极端情况下加快job的执行速度。
Sqoop默认的分片策略是按照数据库的主键和Mapper数量来决定每个分片拉取的数据量。如果主键不是单调递增或者递增的步长有大幅波动,分片就会出现数据倾斜。
对于一个数据量较大的表来说,适度的数据倾斜是一定会存在的情况,当Mapper结束时间不均而触发推测执行机制时,MySQL的数据被重复且并发的读取,占用了大量io资源,也会影响到其他同步的任务。
在一个Hadoop集群中,我们仍然认为一个节点不可用导致整个MapReduce失败仍然是小概率事件,对这种错误,在调度器上增加重试就可以很好的解决问题而不是依赖推测执行机制。
数据同步发展到比较多的任务后,新增的同步任务越来越多,删除的速度远远跟不上新增的速度,总体来说同步的压力会越来越大,需要一个更好的机制去发现无用的同步任务并通知业务删除,减轻平台的压力。
另外就是数据源的支持不够,Hive和HBase、ElasticSearch互通已经成了一个呼声很强烈的需求。Hive虽然可以通过挂外部表用SQL的方式写入数据,但是效率不高有很难控制并发,很容易影响到线上集群,需要有更好的实现方案才能在生产环境真正的运行起来。
还有就是这里没有谈到的一个话题就是流式数据如何做同步,一个典型的场景就是Kafka的日志实时落地然后实时进行OLAP的查询,或者通过MySQL binlog实时更新Kudu或者ElasticSearch。
本文作者:lfyzjck 来源:DBAplus社群
CIO之家 www.ciozj.com 微信公众号:imciow