Kafka
概念
分布式消息订阅系统
体系架构
MessageServer
集群中的一个消息节点
Broker
真正处理消息的
在一个 MessageServer中可能存在多个Broker
Topic
主题 频道 广播
逻辑上概念 可以基于分布式环境存储的消息
可能运行在多个Topic中
内部由多个分区组成 分区运行在MessageServer之上 由对应的Broker处理
发布消息流程: 生产者产生消息-> 消息保存到Topic中 -> Topic建立分区 ->运行在MessageServer上 -> Broker负责处理
消息消费流程: 通过Topic上发布消息的顺序/位置(offset)来读取消息
消费者组: 同一个消费者组中的消费者只能消费一次消息。ABC不在一个消费组中ABC三个都能都到消息。在同一个消费组中的是时候 ABC三个只能依次都到消息。A收到过了下次可能就是B下次可能就是C
元信息
- topic信息
- 分区信息
- 消费者信息
- 生产者信息
保存于zookeeper
高性能原理
- 基于磁盘顺序写保证写数据的高性能,追加写(正常是随机写)
流程: 生产者-> kafka -> oscache->sync到磁盘上 - kafka采用服务端日志分段存储
通过分区创建目录存储数据。数据文件就叫做日志文件,一条数据叫做一个Message, 数据分段存储在多个文件中避免单文件过大 - kafka 采用多服务端冗余副本
副本关系:
leader partition:
客户端写数据 消费者读数据操作的都是lead partition
会维护一个ISR in-sync-replica 副本列表 写数据的时候会往副本列表中的所有副本写入数据 所有都成功了才会认为写入成功,如果有个副本写入不成功可能会将副本移出ISR中
follwer partition:
定期去lead partition同步数据 - kafka采用稀疏索引方式,二分查找方式
消息中存在了两个位置
offset: 消息之间的相对便宜位置 偏移量, 每当数据加入4kb大小后offset+1
position: 物理位置 绝对位置 - 服务端0拷贝
非0拷贝读取数据流程 磁盘获取数据到osCache中-> 复制数据到kafka中->复制数据到socket cache中 -> 发送到网卡
0拷贝流程 sendfile: 由oscache直接发送到网卡中
需求分析
28法则
0:00 - 8:00 20%的数据量
剩下的16个小时有 80%的数据 高峰期有20%的数据是在3小时内产生的。
生产者参数
- buffer.memory: 设置发送消息的缓冲区, 默认值 32M(33554432)
- compression.type 默认 none 不压缩,可以使用lz4 对生产者数据压缩, 但是会压缩需要计算所以导致cpu上升
- batch.size: 设置batch的大小, 如果b atch太小, 会导致频繁网络请求,吞吐量下降;如果batch 太大会导致一条消息需要等待很久才能被发送出去,过多数据缓冲在内存中会让内存缓冲区有很大的压力, 默认值:16384 = 16kb, 也就是一个batch 满了16kb 就发送出去。 需要配合linger.ms 默认值0 就是有消息就发送(那缓冲区就没意义了), 一般设置100ms 用于保证数据不会断档,
分区
- 没有设置key
消息会轮询发送到不同的分区中 - 设置了key
kafka自带的分区器(partitioner),会根据key计算出来一个hash值,这个hash值对应某个分区, 如果key相同hash必然相同,就会发送到同一个分区中 - 自定义分区
自定义分区, 通过代码将指定的消息送到指定的分区中。
消息重试问题
- 消息重复
可能因为网络抖动导致 异常但是确实成功了 - 消息乱序
因为消息异步发送的, 所以如果消息重试的情况下 可能会将后面的消息先发出去, 理论上 消息1,消息2,消息3的顺序出去的,但是重试了可能会出现 消息2,消息3,消息1
解决办法
props.put(“max.in.fligiht.requests.per.connection”, 1);
用于保证producer同一时间只能发送一条数据
ack参数
用于区分发送成功的条件
request.required.acks
- acks = 0
只要请求发送出去了,就算是发送结束,不关心是否写成功
性能好,在不考虑部分数据丢失的情况下(日志处理) 可采用 - acks = 1 (默认)
发送一条消息,当leader partition 写入成功以后,才算写入成功 - acks = -1
需要ISR 列表里面所有的副本都写入完了 才算写入成功(严格模式)
ISR参数
min.insync.relicas
默认值1, 用于表明限定在ISR 列表里面 至少有几个副本, 如果设定为2,如果ISR中只有一个副本的情况下,往该分区插入数据就会报错。
消费者参数
消费者偏移量维护方式
老版本: 保存在zookeeper 存在高并发
新版本: 在kafka内部的 topic:__consume_offsets
key: groupid+topic+分区号
value: offset
分组
groupid 一个分区只能被一个消费者组里面的消费者使用
单条消息最大字节数
a.fetch.max.bytes
获取一条消息最大的字节数, 默认1M
- Producer 发送数据一条小消息最大多大
- Broker 存数数据一条消息最大能接受多大
- Fetch 获取一条消息最大多大
LEO&HW
LEO
LOG END OFFSET 分区里最新偏移量+1
与偏移量offset有关系
用于更新HW
HW
高水位
如果follower partition和leader partition 的LEO 同步了,此时HW就可以更新
HW 之前的数据对消费者是可见,消息才属于Commit状态
kafka消息不重复消费
重复消息发生的原因: 重试机制。(网络波动、)
- 保存并查询
消息设置唯一key 消费消息时候查询key是否消费过 (不容易) - 幂等
将消费的业务逻辑设计成幂等的操作
常见的可通过数据库唯一约束来处理 - 设置前置条件
拆包粘包
当使用 NIO ,由于 buffer 大小不匹配问题,必然会碰到粘包或拆包的问题
发送 “hello word” 和 “flink kafka”
收到
1 | “hello word” |
一个消息数据被拆开 - 拆包
收到
1 | “hello wordflink” |
一个包中粘带了另外一个包的内容 - 粘包
kafka 的解决方案是相当于在原有 data 基础上增加 header,header 只包含 data size,很朴素和通用方案。两个数据包变成 10”hello word”,11”flink kafka”。此时读取方式变为先读取4字节size
,然后再开辟 size 大小的 buffer 存data,死循环直到 data buffer 读满
分布式kafka启动命令
先启动zookeeper
1 | zkServer.sh start |
启动kafka
1 | ./bin/kafka-server-start.sh -daemon config/server.properties |
查询当前存在的topic
1 | ./bin/kafka-topics.sh --list --zookeeper hadoop01:2181 |
创建一个topic
1 | ./bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic kafkatest001 |
创建一个生产者
1 | ./bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic kafkatest001 |
创建一个消费者
1 | ./bin/kafka-console-consumer.sh --bootstrap-server hadoop02:9092 --zookeeper hadoop01:2181 --topic kafkatest001 |
MaxWell
1 | maxwell --user='maxwell' --password='111111' --host='hadoop01' --producer=kafka --kafka.bootstrap.servers=hadoop01:9092 --kafka_topic=testTopic |