目录
  1. 1. Kafka
    1. 1.1. 概念
    2. 1.2. 体系架构
      1. 1.2.1. MessageServer
        1. 1.2.1.1. Broker
      2. 1.2.2. Topic
      3. 1.2.3. 元信息
    3. 1.3. 高性能原理
    4. 1.4. 需求分析
    5. 1.5. 生产者参数
      1. 1.5.1. 分区
      2. 1.5.2. 消息重试问题
        1. 1.5.2.1. 解决办法
      3. 1.5.3. ack参数
      4. 1.5.4. ISR参数
    6. 1.6. 消费者参数
      1. 1.6.1. 消费者偏移量维护方式
      2. 1.6.2. 分组
      3. 1.6.3. 单条消息最大字节数
    7. 1.7. LEO&HW
      1. 1.7.1. LEO
      2. 1.7.2. HW
      3. 1.7.3. kafka消息不重复消费
    8. 1.8. 拆包粘包
    9. 1.9. 分布式kafka启动命令
Kafka相关

Kafka

概念

分布式消息订阅系统

体系架构

MessageServer

集群中的一个消息节点

Broker

真正处理消息的
在一个 MessageServer中可能存在多个Broker

Topic

主题 频道 广播
逻辑上概念 可以基于分布式环境存储的消息
可能运行在多个Topic中
内部由多个分区组成 分区运行在MessageServer之上 由对应的Broker处理

发布消息流程: 生产者产生消息-> 消息保存到Topic中 -> Topic建立分区 ->运行在MessageServer上 -> Broker负责处理
消息消费流程: 通过Topic上发布消息的顺序/位置(offset)来读取消息
  消费者组: 同一个消费者组中的消费者只能消费一次消息。ABC不在一个消费组中ABC三个都能都到消息。在同一个消费组中的是时候 ABC三个只能依次都到消息。A收到过了下次可能就是B下次可能就是C

元信息

  1. topic信息
  2. 分区信息
  3. 消费者信息
  4. 生产者信息
    保存于zookeeper

高性能原理

  1. 基于磁盘顺序写保证写数据的高性能,追加写(正常是随机写)
    流程: 生产者-> kafka -> oscache->sync到磁盘上
  2. kafka采用服务端日志分段存储
    通过分区创建目录存储数据。数据文件就叫做日志文件,一条数据叫做一个Message, 数据分段存储在多个文件中避免单文件过大
  3. kafka 采用多服务端冗余副本
    副本关系:
    leader partition:
      客户端写数据 消费者读数据操作的都是lead partition
      会维护一个ISR in-sync-replica 副本列表 写数据的时候会往副本列表中的所有副本写入数据 所有都成功了才会认为写入成功,如果有个副本写入不成功可能会将副本移出ISR中
    follwer partition:   
      定期去lead partition同步数据
  4. kafka采用稀疏索引方式,二分查找方式
      消息中存在了两个位置
      offset: 消息之间的相对便宜位置 偏移量, 每当数据加入4kb大小后offset+1
      position: 物理位置 绝对位置
  5. 服务端0拷贝
      非0拷贝读取数据流程 磁盘获取数据到osCache中-> 复制数据到kafka中->复制数据到socket cache中 -> 发送到网卡
      0拷贝流程 sendfile: 由oscache直接发送到网卡中

需求分析

28法则
  0:00 - 8:00 20%的数据量
  剩下的16个小时有 80%的数据 高峰期有20%的数据是在3小时内产生的。

生产者参数

  1. buffer.memory: 设置发送消息的缓冲区, 默认值 32M(33554432)
  2. compression.type 默认 none 不压缩,可以使用lz4 对生产者数据压缩, 但是会压缩需要计算所以导致cpu上升
  3. batch.size: 设置batch的大小, 如果b atch太小, 会导致频繁网络请求,吞吐量下降;如果batch 太大会导致一条消息需要等待很久才能被发送出去,过多数据缓冲在内存中会让内存缓冲区有很大的压力, 默认值:16384 = 16kb, 也就是一个batch 满了16kb 就发送出去。 需要配合linger.ms 默认值0 就是有消息就发送(那缓冲区就没意义了), 一般设置100ms 用于保证数据不会断档,

分区

  1. 没有设置key
    消息会轮询发送到不同的分区中
  2. 设置了key
    kafka自带的分区器(partitioner),会根据key计算出来一个hash值,这个hash值对应某个分区, 如果key相同hash必然相同,就会发送到同一个分区中
  3. 自定义分区
    自定义分区, 通过代码将指定的消息送到指定的分区中。

消息重试问题

  1. 消息重复
    可能因为网络抖动导致 异常但是确实成功了
  2. 消息乱序
    因为消息异步发送的, 所以如果消息重试的情况下 可能会将后面的消息先发出去, 理论上 消息1,消息2,消息3的顺序出去的,但是重试了可能会出现 消息2,消息3,消息1

解决办法

props.put(“max.in.fligiht.requests.per.connection”, 1);
用于保证producer同一时间只能发送一条数据

ack参数

用于区分发送成功的条件
request.required.acks

  1. acks = 0
    只要请求发送出去了,就算是发送结束,不关心是否写成功
    性能好,在不考虑部分数据丢失的情况下(日志处理) 可采用
  2. acks = 1 (默认)
    发送一条消息,当leader partition 写入成功以后,才算写入成功
  3. acks = -1
    需要ISR 列表里面所有的副本都写入完了 才算写入成功(严格模式)

ISR参数

min.insync.relicas
默认值1, 用于表明限定在ISR 列表里面 至少有几个副本, 如果设定为2,如果ISR中只有一个副本的情况下,往该分区插入数据就会报错。

消费者参数

消费者偏移量维护方式

老版本: 保存在zookeeper 存在高并发
新版本: 在kafka内部的 topic:__consume_offsets
key: groupid+topic+分区号
value: offset

分组

groupid 一个分区只能被一个消费者组里面的消费者使用

单条消息最大字节数

a.fetch.max.bytes
获取一条消息最大的字节数, 默认1M

  1. Producer 发送数据一条小消息最大多大
  2. Broker 存数数据一条消息最大能接受多大
  3. Fetch 获取一条消息最大多大

LEO&HW

LEO

LOG END OFFSET 分区里最新偏移量+1
与偏移量offset有关系

用于更新HW

HW

高水位
如果follower partition和leader partition 的LEO 同步了,此时HW就可以更新
HW 之前的数据对消费者是可见,消息才属于Commit状态

kafka消息不重复消费

重复消息发生的原因: 重试机制。(网络波动、)

  1. 保存并查询
    消息设置唯一key 消费消息时候查询key是否消费过 (不容易)
  2. 幂等
    将消费的业务逻辑设计成幂等的操作
    常见的可通过数据库唯一约束来处理
  3. 设置前置条件

拆包粘包

当使用 NIO ,由于 buffer 大小不匹配问题,必然会碰到粘包或拆包的问题
发送 “hello word” 和 “flink kafka”
收到

1
2
3
“hello word”
“flink “
“kafka”

一个消息数据被拆开 - 拆包
收到

1
2
“hello wordflink”
“kafka”

一个包中粘带了另外一个包的内容 - 粘包

kafka 的解决方案是相当于在原有 data 基础上增加 header,header 只包含 data size,很朴素和通用方案。两个数据包变成 10”hello word”,11”flink kafka”。此时读取方式变为先读取4字节size,然后再开辟 size 大小的 buffer 存data,死循环直到 data buffer 读满

分布式kafka启动命令

先启动zookeeper

1
zkServer.sh start

启动kafka

1
./bin/kafka-server-start.sh -daemon config/server.properties  

查询当前存在的topic

1
./bin/kafka-topics.sh --list --zookeeper hadoop01:2181  

创建一个topic

1
./bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic kafkatest001  

创建一个生产者

1
./bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic kafkatest001

创建一个消费者

1
./bin/kafka-console-consumer.sh --bootstrap-server hadoop02:9092 --zookeeper hadoop01:2181 --topic kafkatest001  

MaxWell

1
maxwell --user='maxwell' --password='111111' --host='hadoop01' --producer=kafka --kafka.bootstrap.servers=hadoop01:9092 --kafka_topic=testTopic
文章作者: Fibonacci
文章链接: http://sovwcwsfm.com/blog/page/kafka.html
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Blog