首页  ·  知识 ·  大数据
详解Kafka生产者和消费者的工作原理
CIO之家的朋友  简书  消息队列  编辑:吃饭睡觉打豆豆   图片来源:网络
每个消费者保留的唯一元数据是该消费者在日志中的偏移量或位置。此偏移量由使用者控制:通常,使用者在读取记录时会线性地推进其偏移量,但实际上,由于位置是由使用者控制的,因此它可以按喜欢的任

主题和日志

对于每个主题,Kafka群集都会维护一个分区日志,如下所示:

image.png

每个分区(Partition)都是有序的(所以每一个Partition内部都是有序的),不变的记录序列,这些记录连续地附加到结构化的提交日志中。分区中的每个记录均分配有一个称为偏移的顺序ID号,该ID 唯一地标识分区中的每个记录。

每个消费者保留的唯一元数据是该消费者在日志中的偏移量或位置。此偏移量由使用者控制:通常,使用者在读取记录时会线性地推进其偏移量,但实际上,由于位置是由使用者控制的,因此它可以按喜欢的任何顺序使用记录。例如,使用者可以重置到较旧的偏移量以重新处理过去的数据,或者跳到最近的记录并从“现在”开始使用。(类似于游标指针的方式顺序处理数据,并且该指标可以任意移动)

分区的设计结构

  • 提供了负载均衡的能力,实现了系统的高伸缩性。

  • 不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。

  • 可以通过添加新的节点机器来增加整体系统的吞吐量。

  • Kafka分区的设计逻辑和ES分片的设计逻辑是相同的。

生产者分区策略

生产者分区策略是 决定生产者将消息发送到哪个分区的算法, 主要有以下几种:

  • 轮询策略:Round-robin 策略,即顺序分配, 轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略。(默认、常用)

  • 随机策略: Randomness 策略。所谓随机就是我们随意地将消息放置到任意一个分区上。

  • 消息键保序策略:key-ordering 策略,Kafka 中每条消息都会有自己的key,一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的

kafka消息的有序性,是采用消息键保序策略来实现的。 一个topic,一个partition(分割),一个consumer,内部单线程消费,写N个内存queue,然后N个线程分别消费一个内存queue。

  • 通过指定key的方式,具有相同key的消息会分发到同一个partition

  • partition会内部对其进行排序,保证其有序性。

image.png

Kafka的消息压缩机制

kafka发送进行消息压缩有两个地方,分别是生产端压缩和Broker端压缩。

  • 一般情况下压缩机制:在生产者端解压、Broker端保持、消费者端解压

  • Kafka 支持 4 种压缩算法:GZIP、Snappy 、LZ4,从 2.1.0 开始,Kafka 正式支持 Zstandard 算法(简写为 zstd)。

  • 压缩机制本质上以消费者端CPU性能换取节省网络传输带宽以及Kafka Broker端的磁盘占用。

生产者端压缩 生产者压缩通常采用的GZIP算法这样 Producer 启动后生产的每个消息集合都是经 GZIP 压缩过的,故而能很好地节省网络传输带宽以及 Kafka Broker 端的磁盘占用。 配置参数:

<!-- 定义producer的参数 -->

    <bean id="producerProperties" class="java.util.HashMap">

        <constructor-arg>

            <map>

                <entry key="bootstrap.servers" value="127.0.0.1:9092"/>

                <!-- acks表示所有需同步返回确认的节点数,all或者?1表示分区全部备份节点均需响应,可靠性最

                高,但吞吐量会相对降低;

                1表示只需分区leader节点响应;

                0表示无需等待服务端响应;

                大部分业务建议配置1,风控或安全建议配置0 -->

                <entry key="acks" value="1"/>

                <!-- retries表示重试次数,如果配置重试请保证消费端具有业务上幂等,根据业务需求配置  -->

                <entry key="retries" value="1"/>

                <!-- 发送消息请求的超时时间,规范2000 -->

                <entry key="request.timeout.ms" value="2000"/>

                <!-- 如果发送方buffer满或者获取不到元数据时最大阻塞时间,规范2000 -->

                <entry key="max.block.ms" value="2000"/>

                <!--开启GZIP压缩 -->

                <entry key="compression.type" value="gzip"/>

                <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>

                <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>

            </map>

        </constructor-arg>

    </bean>

Broker压缩 大部分情况下 Broker 从 Producer 端接收到消息后仅仅是原封不动地保存而不会对其进行任何修改,但以下情况会引发Broker压缩

Broker端和Producer端采用了不同的压缩算法

Broker端发生了消息格式转换(如过集群中同时保存多种版本的消息格式。为了兼容老版本,Broker会将消息转换为老版本格式,这对性能影响很大,而且会丧失Zero Copy的特性)

消费者端解压 Kafka 会将启用了哪种压缩算法封装进消息集合中,在Consummer中进行解压操作。

消息可靠性

kafka提供以下特性来保证其消息的不丢失,从而保证消息的可靠性

生产者确认机制 当 Kafka 的若干个 Broker(根据配置策略,可以是一个,也可以是ALL) 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。

生产者失败回调机制 生产者不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。producer.send(msg, callback) 采用异步的方式,当发生失败时会调用callback方法。

失败重试机制 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。

消费者确认机制 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。

副本机制 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。

限定Broker选取Leader机制 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。

消息幂等性和事务

由于kafka生产者确认机制、失败重试机制的存在,kafka的消息不会丢失但是存在由于网络延迟等原因造成重复发送的可能性。 所以我们要考虑消息幂等性的设计。 kafka提供了幂等性Producer的方式来保证消息幂等性。使用 ****的方式开启幂等性。

幂等性 Producer 的作用范围:

只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。

只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。

Kafka事务 事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。 同样使用 的方式开启事务。

探究Kafka消费者的工作原理

消费者组

consumer group是kafka提供的可扩展且具有容错性的消费者机制。它是由一个或者多个消费者组成,它们共享同一个Group ID. 组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费。

consummer group有以下的特性:

consumer group下可以有一个或多个consumer instance,consumer instance可以是一个进程,也可以是一个线程(所以消费者可以采用多线程的方式去消费消息)

group.id是一个字符串,唯一标识一个consumer group

consumer group下订阅的topic下的每个分区只能分配给某个group下的一个consumer(当然该分区还可以被分配给其他group)

消费者位置 消费者位置,即位移。 消费者在消费的过程中需要记录自己消费了多少数据。 位移提交有自动、手动两种方式进行位移提交。

自动提交:在kafka拉取到数据之后就直接提交,这样很容易丢失数据

手动提交:成功拉取数据之后,对数据进行相应的处理之后再进行提交。如拉取数据之后进行写入mysql这种 (存在数据处理失败的可能性), 所以这时我们就需要进行手动提交kafka的offset下标。

<!-- 定义consumer的参数 -->

<bean id="consumerProperties" class="java.util.HashMap">

    <constructor-arg>

        <map>

            <entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />

            <!--关闭自动提交,使用spring实现的提交方案-->

            <entry key="enable.auto.commit" value="false" />

            <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />

            <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />

        </map>

    </constructor-arg>

</bean>

Kafka通过一个内置Topic(__consumer_offsets)来管理消费者位移。

Rebalance机制

rebalance本质上是一种协议,规定了一个consumer group下的所有consumer如何达成一致来分配订阅topic的每个分区。

Kafka提供了一个角色:coordinator来执行对于consumer group的管理。 Group Coordinator是一个服务,每个Broker在启动的时候都会启动一个该服务。Group Coordinator的作用是用来存储Group的相关Meta信息,并将对应Partition的Offset信息记录到Kafka内置Topic(__consumer_offsets)中。

Rebalance 过程分为两步:Join 和 Sync。 Join 顾名思义就是加入组。这一步中,所有成员都向coordinator发送JoinGroup请求,请求加入消费组。一旦所有成员都发送了JoinGroup请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader——注意leader和coordinator不是一个概念。leader负责消费分配方案的制定。

image.png

Sync,这一步leader开始分配消费方案,即哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案封装进SyncGroup请求中发给coordinator,非leader也会发SyncGroup请求,只是内容为空。coordinator接收到分配方案之后会把方案塞进SyncGroup的response中发给各个consumer。这样组内的所有成员就都知道自己应该消费哪些分区了。

image.png



本文作者:CIO之家的朋友 来源:简书
CIO之家 www.ciozj.com 微信公众号:imciow
    >>频道首页  >>网站首页   纠错  >>投诉
版权声明:CIO之家尊重行业规范,每篇文章都注明有明确的作者和来源;CIO之家的原创文章,请转载时务必注明文章作者和来源;
延伸阅读
也许感兴趣的
我们推荐的
主题最新
看看其它的