kafaka相关知识

一、kafka架构

Kafka基础知识

Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多生产者、多订阅者,基于zookeeper协 调的分布式日志系统(也可以当做MQ系统),常见可以用于webynginx日志、访问日志,消息服务等等,Linkedin于 2010年贡献给了Apache基金会并成为顶级开源项目。主要应用场景是:日志收集系统和消息系统。

Kafka主要设计目标如下:

以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。

高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。支持KafkaServer间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。同时支持离线数据处理和实时数据处理。支持在线水平扩展

img

kafka是一种发布-订阅模式, 对于消息中间件,消息分推拉两种模式。Kafka只有消息的拉取,没有推送,可以通过轮询实现消息的推送。1.Kafka在一个或多个可以跨越多个数据中心的服务器上作为集群运行。

2.Kafka集群中按照主题分类管理,一个主题可以有多个分区,一个分区可以有多个副本分区。

3.每个记录由一个键,一个值和一个时间戳组成。

Kafka具有四个核心API:

1.ProducerAPI:允许应用程序将记录流发布到一个或多个Kafka主题。2.ConsumerAPI:允许应用程序订阅一个或多个主题并处理为其生成的记录流。3.StreamsAPI:允许应用程序充当流处理器,使用一个或多个主题的输入流,并生成一个或多个输出主题的 输出流,从而有效地将输入流转换为输出流。4.ConnectorAPI:允许构建和运行将Kafka主题连接到现有应用程序或数据系统的可重用生产者或使用者。例如,关系数据库的连接器可能会捕获对表的所有更改。

Kafka优势

1.高吞吐量:单机每秒处理几十上百万的消息量。即使存储了许多TB的消息,它也保持稳定的性能。2.高性能:单节点支持上千个客户端,并保证零停机和零数据丢失。3.持久化数据存储:将消息持久化到磁盘。通过将数据持久化到硬盘以及replication防止数据丢失。

4.分布式系统,无需停机就可扩展机器。

5.可靠性-kafka是分布式,分区,复制和容错的。

6.客户端状态维护:消息被处理的状态是在Consumer端维护,而不是由server端维护。当失败时能自动平衡。

7.支持online和offline的场景。

8.支持多种客户端语言。Kafka支持Java、.NET、PHP、Python等多种语言。

Kafka应用场景

日志收集:一个公司可以用Kafka可以收集各种服务的Log,通过Kafka以统一接口服务的方式开放给各种Consumer。

消息系统:解耦生产者和消费者、缓存消息等。

用户活动跟踪:用来记录web用户或者APP用户的各种活动,如网页搜索、搜索、点击,用户数据收集然后进行用户行为分析。

运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比

如报警和报告;

流式处理:比如Spark Streaming和Storm。

Kafka基本架构

消息和批次

Kafka的数据单元称为消息。可以把消息看成是数据库里的一个“数据行”或一条“记录”,消息由字节数组组成。批次就是一组消息,这些消息属于同一个主题和分区。

模式

消息模式(schema)有许多可用的选项,以便于理解。如JSON和XML,但是它们缺乏强类型处理能力。Kafka的

许多开发者喜欢使用Apache Avro。Avro提供了一种紧凑的序列化格式,模式和消息体分开。当模式发生变化时,不需要重新生成代码,它还支持强类型和模式进化,其版本既向前兼容,也向后兼容。

主题和分区

Kafka的消息通过主题进行分类。主题可比是数据库的表或者文件系统里的文件夹。主题可以被分为若干分区,一

个主题通过分区分布于Kafka集群中,提供了横向扩展的能力。

生产者和消费者

生产者创建消息。消费者消费消息。消息被发布到一个特定的主题上。、

Borker和集群

一个独立的Kafka服务器称为broker。broker接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保

存。broker为消费者提供服务,对读取分区的请求做出响应,返回已经提交到磁盘上的消息。单个broker可以轻松处理数千个分区以及每秒百万级的消息量。每个集群都有一个broker是集群控制器。

Kafka核心概念

Producer

生产者创建消息。

Consumer

消费者读取消息

Broker

一个独立的Kafka 服务器被称为broker,是集群的组成部分。

Topic

每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。

Partition

\1. 主题可以被分为若干个分区,一个分区就是一个提交日志。

\2. 消息以追加的方式写入分区,然后以先入先出的顺序读取。

\3. 无法在整个主题范围内保证消息的顺序,但可以保证消息在单个分区内的顺序。

\4. Kafka 通过分区来实现数据冗余和伸缩性。

\5. 在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。

Replicas

kafka 使用主题来组织数据,每个主题被分为若干个分区,每个分区有多个副本。

Offset

生产者Offset:消息写入的时候,每一个分区都有一个offset,这个offset就是生产者的offset,同时也是这个分区的最新最大的offset。

消费者Offset:某个分区的offset情况,生产者写入的offset是最新最大的值是12,而当Consumer A进行消费时,从0开始消费,一直消费到了9,消费者的offset就记录在9,Consumer B就纪录在了11。

副本

Kafka通过副本保证高可用。副本分为首领副本(Leader)和跟随者副本(Follower)。

AR

分区中的所有副本统称为AR(Assigned Repllicas),AR=ISR+OSR。

ISR

所有与leader副本保持一定程度同步的副本(包括Leader)组成ISR(In-Sync Replicas),ISR集合是AR集合中

的一个子集。

OSR

与leader副本同步滞后过多的副本(不包括leader)副本,组成OSR(Out-Sync Relipcas)。

Kafka的安装和配置

第一步:jdk安装,上传jdk-8u261-linux-x64.rpm到服务器并安装。

rpm -ivh jdk-8u261-linux-x64.rpm

第二步:配置java环境变量

vim /etc/profile

# 生效

source /etc/profile

# 验证

java -version

img

第三步:上传zookeeper安装包并解压。

tar -zxf zookeeper-3.4.14.tar.gz

cd /zookeeper-3.4.14/conf

# 复制zoo_sample.cfg命名为zoo.cfg

cp zoo_sample.cfg zoo.cfg

# 编辑zoo.cfg文件

vim zoo.cfg

dataDir=/usr/local/zookeeper/zookeeper-3.4.14/data

第四步:配置zookeeper环境变量

vim /etc/profile

img

启动命令:zkServer.sh start 查看状态命令:zkServer.sh status

img

第五步:上传kafka_2.12-1.0.2.tgz到服务器并解压

tar -zxf kafka_2.12-1.0.2.tgz

第六步:配置kafka环境变量

vim /etc/profile

img

第七步:修改kafka配置文件,连接zookeeper

# 进入配置文件夹修改server.properties文件

cd config/

vim server.properties

img

img

img

第七步:启动kafka

kafka-server-start.sh -daemon ../config/server.properties

img

消费和主题

# 列出现有的主题

[root@node1 ~]# kafka-topics.sh –list –zookeeper localhost:2181/myKafka

# 创建主题,该主题包含一个分区,该分区为Leader分区,它没有Follower分区副本。

[root@node1 ~]# kafka-topics.sh –zookeeper localhost:2181/myKafka –create –topic topic_1 –partitions 1 –replication-factor 1

# 查看分区信息

[root@node1 ~]# kafka-topics.sh –zookeeper localhost:2181/myKafka –list

# 查看指定主题的详细信息

[root@node1 ~]# kafka-topics.sh –zookeeper localhost:2181/myKafka –describe –topic topic_1

# 删除指定主题

[root@node1 ~]# kafka-topics.sh –zookeeper localhost:2181/myKafka –delete –topic topic_1

# 开启生产者

[root@node1 ~]# kafka-console-producer.sh –topic topic_1 –broker-list localhost:9020

# 开启消费者

[root@node1 ~]# kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic topic_1

# 开启消费者方式二,从头消费,不按照偏移量消费

[root@node1 ~]# kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic topic_1 –from-beginning

Kafka消息接收和发送

img

生产者主要的对象有:KafkaProducer ,ProducerRecord 。

其中KafkaProducer 是用于发送消息的类,ProducerRecord 类用于封装Kafka的消息。

KafkaProducer 的创建需要指定的参数和含义:

img

二、Kafka高级特性

生产者

消息发送

流程图:

img

\1. Producer创建时,会创建一个Sender线程并设置为守护线程。

\2. 生产消息时,内部其实是异步流程;生产的消息先经过拦截器->序列化器->分区器,然后将消息缓存在缓冲

区(该缓冲区也是在Producer创建时创建)。

\3. 批次发送的条件为:缓冲区数据大小达到batch.size或者linger.ms达到上限,哪个先达到就算哪个。

\4. 批次发送后,发往指定分区,然后落盘到broker;如果生产者配置了retrires参数大于0并且失败原因允许重

试,那么客户端内部会对该消息进行重试。

\5. 落盘到broker成功,返回生产元数据给生产者。

\6. 元数据返回有两种方式:一种是通过阻塞直接返回,另一种是通过回调返回。

配置参数:

img

序列化器

img

Kafka中的数据都是字节数组,将消息发送到Kafka之前需要先将数据序列化为字节数组,序列化器的作用就是用于序列化要发送的消息。

分区器

默认分区计算:

\1. 如果record提供了分区号,则使用record提供的分区号

\2. 如果record没有提供分区号,则使用key的序列化后的值的hash值对分区数量取模

\3. 如果record没有提供分区号,也没有提供key,则使用轮询的方式分配分区号。

拦截器

Producer拦截器(interceptor)和Consumer端Interceptor是在Kafka 0.10版本被引入的,主要用于实现Client

端的定制化控制逻辑。

Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

onSend(ProducerRecord):该方法封装进KafkaProducer.send方法中,即运行在用户主线程中。Producer

确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修

改消息所属的topic和分区,否则会影响目标分区的计算。

onAcknowledgement(RecordMetadata, Exception):该方法会在消息被应答之前或消息发送失败时调用,

并且通常都是在Producer回调逻辑触发之前。onAcknowledgement运行在Producer的IO线程中,因此不

要在该方法中放入很重的逻辑,否则会拖慢Producer的消息发送效率。

close:关闭Interceptor,主要用于执行一些资源清理工作。

原理

img

主线程:负责消息创建,拦截器,序列化器,分区器等操作,并将消息追加到消息收集器。

Sender线程:

该线程从消息收集器获取缓存的消息,将其处理为 <Node, List 的形式,表示集群的broker节点。

进一步将<Node, List转化为<Node, Request>形式,此时才可以向服务端发送数据。在发送之前,Sender线程将消息以 Map<NodeId, Deque> 的形式保存到InFlightRequests 中进行缓存,可以通过其获取 leastLoadedNode ,即当前Node中负载压力最小的一个,以实现消息的尽快发出。

img

img

消费者

消费组

消费者从订阅的主题消费消息,消费消息的偏移量保存在Kafka的名字是 __consumer_offsets 的主题中,消费者还可以将自己的偏移量存储到Zookeeper,需要设置offset.storage=zookeeper。推荐使用Kafka存储消费者的偏移量。因为Zookeeper不适合高并发。多个从同一个主题消费的消费者可以加入到一个消费组中。消费组中的消费者共享group_id。

消费者四种情况:

\1. 消费组均衡地给消费者分配分区,每个分区只由消费组中一个消费者消费。

\2. 消费组均衡地给消费者分配分区,每个分区只由消费组中一个消费者消费。

\3. 如果在消费组中添加一个消费者2,则每个消费者分别从两个分区接收消息。

\4. 如果消费组有四个消费者,则每个消费者可以分配到一个分区。

\5. 如果向消费组中添加更多的消费者,超过主题分区数量,则有一部分消费者就会闲置,不会接收任何消息。

img

心跳机制

消费者宕机,退出消费组,触发再平衡,重新给消费组中的消费者分配分区。

Kafka 的心跳是 Kafka Consumer 和 Broker 之间的健康检查,只有当 Broker Coordinator 正常时,Consumer

才会发送心跳。broker 处理心跳的逻辑在 GroupCoordinator 类中:如果心跳超期, broker coordinator 会把消费者从 group中移除,并触发 rebalance。

img

订阅

Topic,Kafka用于分类管理消息的逻辑单元,类似与MySQL的数据库。

Partition,是Kafka下数据存储的基本单元,这个是物理上的概念。同一个topic的数据,会被分散的存储到多个partition中,这些partition可以在同一台机器上,也可以是在多台机器上。优势在于:有利于水平扩展,避免单台机器在磁盘空间和性能上的限制,同时可以通过复制来增加数据冗余性,提高容灾能力。为了做到均匀分布,通常partition的数量通常是Broker Server数量的整数倍。

Consumer Group,同样是逻辑上的概念,是Kafka实现单播和广播两种消息模型的手段。保证一个消费组获取到特定主题的全部的消息。在消费组内部,若干个消费者消费主题分区的消息,消费组可以保证一个主题的每个分区只被消费组中的一个消费者消费。

consumer 采用 pull 模式从 broker 中读取数据。

采用 pull 模式,consumer 可自主控制消费消息的速率,可以自己控制消费方式(批量消费/逐条消费),还可以选择不同的提交方式从而实现不同的传输语义。

反序列化

Kafka的broker中所有的消息都是字节数组,消费者获取到消息之后,需要先对消息进行反序列化处理,然后才能

交给用户程序消费处理。消费者的反序列化器包括key的和value的反序列化器。

key.deserializer

value.deserializer

IntegerDeserializer

StringDeserializer

需要实现 org.apache.kafka.common.serialization.Deserializer 接口。

位移提交

\1. Consumer需要向Kafka记录自己的位移数据,这个汇报过程称为提交位移(Committing Offsets)

\2. Consumer 需要为分配给它的每个分区提交各自的位移数据

3.位移提交的由Consumer端负责的,Kafka只负责保管。

4.位移提交分为自动提交和手动提交

5.位移提交分为同步提交和异步提交

消费位移管理

Kafka中,消费者根据消息的位移顺序消费消息。消费者的位移由消费者管理,可以存储于zookeeper中,也可以存储于Kafka主题__consumer_offsets中。Kafka提供了消费者API,让消费者可以管理自己的位移。

再均衡

重平衡其实就是一个协议,它规定了如何让消费者组下的所有消费者来分配topic中的每一个分区。比如一个topic

有100个分区,一个消费者组内有20个消费者,在协调者的控制下让组内每一个消费者分配到5个分区,这个分配的过程就是重平衡,是kafka为人诟病最多的一个点。

重平衡的触发条件主要有三个:

\1. 消费者组内成员发生变更,这个变更包括了增加和减少消费者,比如消费者宕机退出消费组。

\2. 主题的分区数发生变更,kafka目前只支持增加分区,当增加的时候就会触发重平衡。

\3. 订阅的主题发生变化,当消费者组使用正则表达式订阅主题,而恰好又新建了对应的主题,就会触发重平衡。

重平衡过程中,消费者无法从kafka消费消息,这对kafka的TPS影响极大,而如果kafka集内节点较多,比如数百个,那重平衡可能会耗时极多。数分钟到数小时都有可能,而这段时间kafka基本处于不可用状态。所以在实际环境中,应该尽量避免重平衡发生。避免重平衡,是不可能,因为你无法完全保证消费者不会故障。而消费者故障是最常见引发重平衡的地方,需要尽力避免消费者故障,比如合理利用心跳来维持。控制发送心跳的频率,频率越高越不容易被误判。

消费者管理

消费组:consumer group是kafka提供的可扩展且具有容错性的消费者机制。

三个特性:

\1. 消费组有一个或多个消费者,消费者可以是一个进程,也可以是一个线程

\2. group.id是一个字符串,唯一标识一个消费组

\3. 消费组订阅的主题每个分区只能分配给消费组一个消费者。

消费者位移:消费者在消费的过程中记录已消费的数据,即消费位移(offset)信息。

kafka提供了5个协议来处理与消费组协调相关的问题:

Heartbeat请求:consumer需要定期给组协调器发送心跳来表明自己还活着

LeaveGroup请求:主动告诉组协调器我要离开消费组

SyncGroup请求:消费组Leader把分配方案告诉组内所有成员

JoinGroup请求:成员请求加入组

DescribeGroup请求:显示组的所有信息,包括成员信息,协议名称,分配方案,订阅信息等。通常该请求

是给管理员使用

组协调器在再均衡的时候主要用到了前面4种请求。

消费组组协调器根据状态机对消费组做不同处理:

\1. Dead:组内已经没有任何成员的最终状态,组的元数据也已经被组协调器移除了。这种状态响应各种请求都

是一个response:UNKNOWN_MEMBER_ID

\2. Empty:组内无成员,但是位移信息还没有过期。这种状态只能响应JoinGroup请求

\3. PreparingRebalance:组准备开启新的rebalance,等待成员加入

\4. AwaitingSync:正在等待leader consumer将分配方案传给各个成员

\5. Stable:再均衡完成,可以开始消费。

主题

创建主题

kafka-topics.sh –zookeeper localhost:2181/myKafka –create –topic topic_x – partitions 1 –replication-factor 1 kafka-topics.sh –zookeeper localhost:2181/myKafka –create –topic topic_test_02 – partitions 3 –replication-factor 1 –config max.message.bytes=1048576 –config segment.bytes=10485760

查看主题

kafka-topics.sh –zookeeper localhost:2181/myKafka –list

kafka-topics.sh –zookeeper localhost:2181/myKafka –describe –topic topic_x

kafka-topics.sh –zookeeper localhost:2181/myKafka –topics-with-overrides –describe

修改主题

kafka-topics.sh –zookeeper localhost:2181/myKafka –create –topic topic_test_01 – partitions 2 –replication-factor 1

kafka-topics.sh –zookeeper localhost:2181/myKafka –alter –topic topic_test_01 – config max.message.bytes=1048576

kafka-topics.sh –zookeeper localhost:2181/myKafka –describe –topic topic_test_01

kafka-topics.sh –zookeeper localhost:2181/myKafka –alter –topic topic_test_01 – config segment.bytes=10485760

kafka-topics.sh –zookeeper localhost:2181/myKafka –alter –delete-config max.message.bytes –topic topic_test_01

删除主题

kafka-topics.sh –zookeeper localhost:2181/myKafka –delete –topic topic_

增加分区

kafka-topics.sh –zookeeper localhost/myKafka –alter –topic myTop1 –partitions 2

分区副本的分配

副本分配的三个目标:

\1. 均衡的将副本分散各个broker上

\2. 对于某个broker上分配的分区,他的其他副本在其他broker上

\3. 如果所有的broker都有机架信息,尽量将分区的各个副本分配到不同机架的broker上

不考虑机架信息的情况:

\1. 第一个副本通过轮询方式挑选一个broker,进行分配,该轮询是从broker列表随机轮询

\2. 其余副本是通过增加偏移进行分配

偏移量管理

__consumer_offsets主题中保存了各个消费组的偏移量。早期由zookeeper管理。

KafkaAdminClient应用

除了使用kafka的bin目录脚本工具来管理kafka之外,还可以通过kafkaAdminClient来将kafka的api将管理功能继承到此客户端中方便调用。其内部原理是使用kafka自定义的一套二进制协议来实现。

\1. 创建主题:createTopics(final Collection newTopics, final CreateTopicsOptions options)

\2. 删除主题:deleteTopics(final Collection topicNames, DeleteTopicsOptions options)

\3. 列出所有主题:listTopics(final ListTopicsOptions options)

\4. 查询主题:describeTopics(final Collection topicNames, DescribeTopicsOptions options)

\5. 查询集群信息:describeCluster(DescribeClusterOptions options)

\6. 查询配置信息:describeConfigs(Collection configResources, final DescribeConfigsOptions options)

\7. 修改配置信息:alterConfigs(Map configs, final AlterConfigsOptions options)

\8. 修改副本的日志目录:alterReplicaLogDirs(Map replicaAssignment, final AlterReplicaLogDirsOptions options)

\9. 查询节点的日志目录信息:describeLogDirs(Collection brokers, DescribeLogDirsOptions options)

\10. 增加分区:createPartitions(Map newPartitions, final CreatePartitionsOptions options)

主要操作步骤:

\1. 客户端根据方法的调用创建相应协议请求。

\2. 客户端发送请求到kafka broker。

\3. Kafka broker 处理相应请求并回执,比如CreateTopicRequest对应的是CreateTopicResponse

\4. 客户端接收相应回执并解析处理。

分区

副本机制

Kafka在一定数量上的服务器上对主题分区进行复制,当集群中某个broker宕机后系统可以自动故障转移到其他可用副本上,不会造成数据丢失。

img

同步节点定义:

\1. 节点必须能够维持与zookeeper会话,心跳机制。

\2. 对于Follower副本分区,他复制在leader分区的写入,延迟不能太高。

宕机恢复:

\1. 少部分副本宕机,从follower从选择一个leader,若宕机恢复,清空commit。重新从leader上pull

\2. 全部副本宕机,等待ISR其中一个恢复作为leader 或者选择第一个恢复的副本作为leader,前者等待时间长,后者可能丢失数据。

Leader选举

Kafka中的leader分区选举,通过维护一个动态变化的ISR集合来实现,一旦Leader分区丢掉,则从ISR中随机挑选一个副本来做新的Leader分区,没有使用过半原则。如果ISR中的副本都丢失了,则要么等待ISR中某个副本恢复成为Leader或者选择第一个恢复的副本做为Leader,两种方式利弊如上。

分区重新分配

向已经部署好的kafka集群添加机器,需要从已部署好的kafka节点中复制相应配置文件,然后把里面的broker id修改成全局唯一,最后启动这个节点将他加入到现有的kafka集群中。但是新添加的kafka节点不会自动分配数据,因此无法分担集群负载,因此需要手动将部分分区移动到新添加的kafka节点上,kafka内部自带有相应工具。

自动再均衡

当我们分好区运行一段时间后,broker的宕机重启,会引发leader分区和follower分区的角色转化,最后可能导致leader大部分都集中在少数几台broker上,由于leader负责客户端的读写操作,因此集中leader分区的少数服务器I/O、CPU、内存都会很紧张,针对这种不平衡情况就需要让leader的分区重新恢复到均衡状态,kafka提供了自动再均衡脚本:kafka-preferred-replica-election.sh,该工具会让每个分区的leader副本分配在合适位置,让leader分区和follower分区在服务器之间均衡分配。

修改副本因子

一开始集群较小,因此副本因子较小,现在需要扩容,就要修改副本因子,通过kafka-reassign-partitions.sh修改副本因子。

分区分配策略

在kafka中,每个Topic会包含多个分区,默认情况下一个分区只能被一个消费组下面的一个消费者消费,因此会产生分区分配问题,kafka针对这个问题提供了三种分区分配算法:RangeAssignor、RoundRobinAssignor、StickyAssignor。

RangeAssignor:消费组的成员订阅他们感兴趣的Topic并将这种订阅关系传递给作为订阅组协调者的broker,协调者选择其中一个消费者来执行这个消费组的分区分配并将分配结果转发给消费组内所有的消费者,他是kafka默认采用的分配算法。

RoundRobinAssignor:将消费组内订阅的所有Topic的分区及所有消费者进行排序后尽量均衡的分配。如果消费组内,消费者订阅的Topic列表是相同的,那分配结果尽量均衡,如果订阅的topic列表是不同的,那么分配结果不保证均衡,因为某些消费者不参与一些topic分配。

StickyAssignor:虽然RoundRobinAssignor在RangeAssignor上做了一些优化来更均衡的分配分区,但是在一些情况依旧会出现分配偏差,因为以上两种算法没有考虑上一次的分配结果,如果在新一次分配之前考虑上一次的分配结果,尽量少的调整分区分配变动,能节省不少开销,StickyAssignor因此而诞生,他保证分区的分配尽量均衡,并且每一次重新分配的结果尽量与上一次分配结果保持一致。

物理存储

日志存储

Kafka消息是以主题为单位进行归类,各个主题之间又相互独立,互不影响,每个主题可以分一个或多个分区,每个分区各自存在一个记录消息数据的日志文件。

img

偏移量索引文件用于记录消息偏移量与物理地址之间的映射地址之间的映射关系,时间戳索引文件根据时间戳查找对应偏移量。消息内容保存在log日志文件中,新内容追加末尾,采用顺序写的方式。如果需要找到对应偏移量的文件,因为kafka中存在一个ConcurrentSkipListMap保存每个日志分段,可以通过跳跃表方式定位到00000000000000000000.index,再通过二分法在偏移量索引文件中找对应偏移量范围最大内的索引文件,缩小范围之后再顺序找对应偏移量的消息。

img

Kafka提供了两种日志清理策略:

\1. 日志删除,按照一定删除策略,将不满足条件数据进行数据删除。Kafka通过设定日志保留时间节点进行执行日志删除任务,默认七天,超过就删除,删除是通过跳跃表找到待删除的日志分段,在文件加.delete后缀,然后交给一个延迟删除任务来删除这些指定后缀文件。

\2. 日志压缩,针对每个消息的key进行整合,对于有相同key不同的value的值,只保留最后一个版本。日志压缩和key有关,确保每个消息的key不为null。

磁盘存储

零拷贝

Kafka性能非常高,但是他却把数据存储在磁盘中,需要对数据进行落盘,因此kafka是多方面协同的结果,包括宏观架构、分布式partition存储,ISR数据同步、以及各种高效利用磁盘的特性。零拷贝不是不需要拷贝,而且减少不必要的拷贝次数,nginx高性能中也有零拷贝应用。

传统IO:先读取,再发送,经过1-4次复制,第一次将磁盘文件读取到操作系统内核缓存区,第二次copy到application应用程序的缓存,第三次再copy到socket网络发送到缓冲区,最后copy到网络协议栈,由网卡进行网络传输。

Kafka:网络数据持久化到磁盘,磁盘文件通过网络发送,不需要第二和第三个副本。

页缓存

页缓存就是操作系统实现的一种主要磁盘缓存,用来减少对磁盘的IO操作,也就是把磁盘中的数据缓存在内存中,把对磁盘的访问变成对内存的访问,提高效率。

mmap和senfile

\1. linux内核提供实现零拷贝的api

\2. sendfile是将读到内核空间的数据,转到socket buffer,进行网络发送。

\3. mmap将磁盘文件映射到内存,支持读和写,对内存的操作会反应在磁盘文件上。

\4. rocketmq在消费消息使用了mmap,kafka使用了sendfile。

Kafka速度快的原因:

\1. partition顺序读写,充分利用磁盘特性。

\2. producer生产的数据持久化到broker,采用mmap文件映射,实现顺序的快速写入。

\3. customer从broker读取消息,采用sendfile,将磁盘文件读到OS内核缓存区,然后直接转到socket buffer进行网络发送。

事务

Kafka的producer发送消息可能是分布式事务,引入2PC,有事务协调者Transaction Coordinator。而且事务管理中日志不可缺,kafka通过内部一个topic来保存事务日志,事务具有commit和abort两种操作,因此也具有对应的两种隔离级别,而且事务ID一定要设置,幂等性也要开启,幂等性的实现通过一个唯一ID。

img

在kafka事务中,一个原子性操作,根据操作类型分为3种情况:

1. 只有producer生产消息,需要事务介入

2. 消费消息和生产消息并存,需要事务介入,最常见模式。

3. 只有consumer消费消息,可以手动commit,意义不大。

控制器

控制器就是一个broker,控制器还负责leader分区的选举。

Kafka集群包含若干个broker,集群上创建的主题,包含若干个分区,每个分区包含若干个副本,副本因子包括leader副本和follower副本,副本又分为ISR同步副本分区和OSR非同步副本分区。

borker选举

\1. kafka使用zookeeper的分布式锁选举控制器,并在节点加入集群或退出集群时通知控制器。

\2. 控制器负责在节点加入或离开集群时进行分区leader选举。

\3. 控制器使用epoch来避免脑裂,脑裂是指两个节点同时认为自己是当前控制器。

可靠性保证和一致性保证

可靠性保证

创建topic主题时可以指定副本因子和分区的副本数,leader负责读写的节点,其他是follower,producer只把消息发送到leader上,follower定期从leader上pull数据,因此就会存在一定的延时性,为了保证可靠性,可以设置ack=all。Follower收到消息后,会像leader发送ack,一旦leader收到ISR中所有replica中的ACK,leader就会commit,那么leader就会像producer发送ack,保存消息可靠性。

一致性保证

水位或者水印表示位置信息,即位移,kafka源码中使用的是高水位HW。

每个分区副本对象都有两个重要属性:LEO和HW

LEO:日志末端位移,记录了该副本日志中下一条消息的位移值。

HW:水位值,即水淹到这里,对于同一个副本对象而言,其HW值不会大于LEO值。

img

上图HW值就是7,表示位移0-7的所有消息已经提交,LEO值是14,指下一条消息来得时候位移。

Follower副本的LEO值就是日志的LEO值,新写入一条消息就更新LEO。

Follower更新HW发生在更新LEO之后,一旦follower向log写入数据,就更新自己的hw值。

和follower更新LEO相同,leader写log时候自动更新自己的LEO值。

Leader的HW值就是分区的HW值,当尝试确定分区HW时,他会选出所有满足条件的副本,比较他们的LEO,并选择最小的LEO值作为HW值。

消息重复的场景及解决方案

消息重复发送在三个阶段:生产者阶段、broke阶段、消费者阶段

根本原因:生产者发送的消息没有收到正确的broke响应,导致producer重试,producer发出一条消息,broke落盘以后因为网络等种种原因发送端得到一个发送失败的响应或网络中断,然后producer收到一个可恢复的exception重试消息导致消息重复。

生产者发送重复解决方案

\1. 启动kafka幂等性

\2. Ack=0,不重试,但是可能丢失消息。

生产者和broke阶段消息丢失场景

\1. ack=0,不重试,发送完不管结果,发送失败就丢失了。

\2. ack=1 leader crash 生产者发送消息完,等待leader写入成功就返回了,leader分区丢失了,此时follower还没同步,消息丢失。

解决生产者和broke阶段消息丢失

\1. 禁用unclean选举,ack=all

\2. 失败的offset单独记录

消费者数据重复场景及解决方案

数据消费完没有及时提交offset到broke

解决方案:取消自动提交,通过手动提交或者下游做幂等,保证每次一致。

__consumer_offsets

Kafka内部的一个主题。Zookeeper不适合大批量的频繁写入操作,而通过这个主题来实现这个功能。

延时队列

两个follower副本已经拉取到了leader副本的最新位置,此时又向leader副本发送拉取请求,而leader副本并没有新的消息写入,就会导致follower副本一直发送拉取请求,而且总拉取空的结果,导致空转消耗资源。

Kafka通过延迟操作的概念,在处理拉取请求时,先读取一次日志文件,如果收集不到足够多的消息,就会创建一个延时拉取操作以等待拉取到足够数量的消息,当延时拉取操作执行时,会在读取一次日志文件,然后将拉取结果返回给follower副本。Kafka中还有延时数据删除、延时生产等等。

重试队列

Kafka没有重试机制,也没有死信队列,需要自己来实现消息重试功能。

实现逻辑:

\1. 创建一个topic作为重试topic,用去接收等待重试消息。

\2. 普通topic消费者设置待重试消息的下一个重试topic。

\3. 从重试topic获取待重试消息储存到redis的zset中,并以下一次消费时间排序。

\4. 定时任务从redis获取到达消费事件的消息,并把消息发送到对应的topic

\5. 同一个消息重试次数过多则不再重试。

ngx_kafka_module安装与配置

1. Install librdkafka

#安装git

yum -y install git

#安装c++环境

yum install gcc-c++

#git librdkafka并编译

git clone https://github.com/edenhill/librdkafka

cd librdkafka

./configure

make

sudo make install

2. Install ngx_kafka_module

git clone https://github.com/brg-liuwei/ngx_kafka_module

# cd /opt/nginx-1.19.8

./configure –add-module=/opt/nginx_extra/ngx_kafka_module

make

sudo make install

# or, use sudo make upgrade instead of sudo make install

3. 配置nginx

http {

# some other configs

kafka;

kafka_broker_list 127.0.0.1:9092; # host:port …

server {

# some other configs

location = /ozdemo/kafka {

# optional directive: kafka_partition [ | auto]

#

# kafka_partition auto; # default value

# kafka_partition 0;

# kafka_partition 1;

kafka_topic topic_ozdemo;

}

}

}

4.重启nginx

./nginx -s reload

5.测试ngx_kafka_module

# 列出现有的主题

kafka-topics.sh –list –zookeeper localhost:2181/kafka

# 创建主题,该主题包含一个分区,该分区为Leader分区,它没有Follower分区副本。

kafka-topics.sh –zookeeper localhost:2181/kafka –create –topic topic_ozdemo –partitions 1 –replication-factor 1

# 查看分区信息

kafka-topics.sh –zookeeper localhost:2181/kafka –list

# 查看指定主题的详细信息

kafka-topics.sh –zookeeper localhost:2181/kafka –describe –topic topic_ozdemo

# 删除指定主题

kafka-topics.sh –zookeeper localhost:2181/kafka –delete –topic topic_ozdemo

kafka-topics.sh –zookeeper localhost:2181/kafka –create –topic topic_ozdemo –partitions 1 –replication-factor 1

# 开启消费者

kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic topic_ozdemo

# 开启消费者方式二,从头消费,不按照偏移量消费

kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic topic_ozdemo –from-beginning

# 测试

curl http://127.0.0.1/ozdemo/kafka -d “message send to kafka topic”

------ 本文结束感谢您的阅读 ------
请我一杯咖啡吧!
itingyu 微信打赏 微信打赏