kafka配置详解

Kafka 的配置项分为 Broker级别的配置,Topic级别的配置, producer代码级别配置,consumer代码级别的配置。
其中部分broker和全部的topic级别的配置都支持用kafka命令动态配置。

关于安装kafka机器配置选择

根据实际生产经验:
三台单独 2C4G zk(实际配置的zk 的-MX2GB ), 三个 4C8G kafka broker(实际配置的kafka的 -MX4GB -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent)

  • Messages in /sec : 600/s(5千万/天)的message输入, 每条消息0.7KB,使用json字符串做序列化。
  • Bytes in /sec : 524k/s (43GB/天)
  • Bytes out /sec : 2.1m/s (180GB/天)

实际负载情况

  • system load: 最高1.3 最低 0.6
  • CPU.busy:最高 53%, 最低 5%
  • disk.io : 最高 19M/s 最低 1M/s
  • fsopen.file: 1800左右
  • 内存空闲(操作系统级别): 60%, 因为kafka 配置的 -MX4GB


经过压测使用protobuf序列化方式 可以到40W/S的写入和 3个consumer消费,整个集群的cpu大概60%。

broker级别的配置放在每个kafka节点的 config/server.properties中

详见官方apache文档
其中部分的broker级别的配置支持用命令配置的形式,不用重启broker生效。read-only类型的配置必须要重启broker生效,per-broker和cluster-wide类型的配置Maybe不用重启

修改配置

注意这个命令执行结束不会修改 server.properties的内容,而是会生成一个DYNAMIC_BROKER_CONFIG的配置项,DYNAMIC_BROKER_CONFIG会覆盖server.properties配置
注意: 即使我们手动重启了broker,这个DYNAMIC_BROKER_CONFIG仍然会生效,除非手动删除

 #修改配置
 bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2

查看配置

 #查看配置生效情况
 bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --describe
 # 显示的结果
 Configs for broker 2 are:
   log.cleaner.threads=2 sensitive=false synonyms={DYNAMIC_BROKER_CONFIG:log.cleaner.threads=2, STATIC_BROKER_CONFIG:log.cleaner.threads=1, DEFAULT_CONFIG:log.cleaner.threads=1}

删除配置

 #删除动态配置,启用配置文件中的server.properties配置
 bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config log.cleaner.threads=2

cluster-wide级别配置的查看


 bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --describe

其中必须要的配置如下三个

  • broker.id
  • log.dirs
  • zookeeper.connect
配置名字 类型 举例 描述
broker.id int 0 kafka broker的id
log.dirs String /opt/data/kafkaData kafka的数据目录
zookeeper.connect String 192.168.0.145:2181,192.168.0.146:2181,192.168.0.147:2181 本台机器的IP
zookeeper.connection.timeout.ms int 30000 zk连接超时
host.name String 192.168.0.145 本台机器的IP
port 数字 9092 端口
num.network.threads 数字 4 kafka接受和发送消息的线程个数,跟机器cpu核数有关系
queued.max.requests 数字 500 当上面4个线程都被block的情况下,允许缓存多少请求
num.io.threads 数字 8 io线程个数包含磁盘IO
socket.send.buffer.bytes 数字 102400 linux os的SO_SNDBUF配置参数,如果-1用OS默认配置。
socket.receive.buffer.bytes 数字 102400 反之
socket.request.max.bytes 数字 104857600 消息最大多少字节
socket.receive.buffer.bytes 数字 102400 反之
num.partitions 数字 8 默认partition个数
default.replication.factor 数字 3 数据备份集
min.insync.replicas 数字 2 当producer设置ack模式为all(-1)的时候,kafka必须当至少有2个节点写成功以后才会幡成功,否则producer会收到Exception
num.recovery.threads.per.data.dir 在kafka启动,停止,refresh磁盘的时候一个目录对应的线程个数 1 数据备份集
auto.create.topics.enable boolean true 允许自动创建topic
offsets.topic.replication.factor 数字 3 自动创建topic的时候,当可用节点个数小于这个数字时候,会创建失败直到有充足的节点可用
transaction.state.log.replication.factor 数字 1 transaction topic的复制集
transaction.state.log.min.isr 数字 1 覆盖min.insync.replicas 的配置
log.cleanup.policy 数组 [compact, delete] 或者 delete 当retention时间或者大小超过清理阈值的时候,出发的操作
log.cleaner.enable boolean 是否开启日志压缩 当log.cleanup.policy中有compact的时候合格值必须要设置true
log.cleaner.min.cleanable.ratio double 0.5 脏log占总log的多少的时候开始清理
log.retention.check.interval.ms 数字 300000 log clean检查的interval, 默认5分钟
log.retention.hours 数字 168 数据最久保存7天
log.retention.bytes 数字 -1 或者 1024000000 数据最大保存多大
log.flush.interval.messages 数字 1000 每个partition的消息刷新磁盘的阈值,默认值是9223372036854775807,所以一定要设置
log.flush.interval.ms 数字 60000 刷新磁盘的时间阈值
log.cleaner.io.max.bytes.per.second 数字 60000 压缩的时候磁盘IO平均值阈值
log.segment.bytes 数字 1073741824 kafka一个topic有多个partition组成,一个partition一个segment文件存储,当达到log.segment.bytes或者log.roll.hours(log.roll.ms)阈值的时候会新建一个新的segment文件
log.roll.hours 数字 24*7 kafka一个topic有多个partition组成,一个partition一个segment文件存储,当达到log.segment.bytes或者log.roll.hours(log.roll.ms)阈值的时候会新建一个新的segment文件
log.roll.ms 数字 默认值是null,会被log.roll.hours覆盖 kafka一个topic有多个partition组成,一个partition一个segment文件存储,当达到log.segment.bytes或者log.roll.hours(log.roll.ms)阈值的时候会新建一个新的segment文件
group.initial.rebalance.delay.ms int 6000 更多的group JVM消费进程加入进来的时候,距离上次re balance的时间间隔
delete.topic.enable boolean true 允许用 kafka-topics.sh --delete 删除topic
group.max.session.timeout.ms int 300000 默认值 允许group操作的超时时间,设置的大,可以处理更多的message
group.min.session.timeout.ms int 6000 默认值 允许group操作的超时时间最小值,设的小,可以快速失败,当kafka有问题的时候
log.index.interval.bytes int 4096 默认值 拿日志的时候,最多扫描临近的log的大小,越大会加快消费速度,但是耗费broker的内存。
broker.id=0
log.dirs=/opt/data/kafkaData
zookeeper.connect=192.168.0.145:2181,192.168.0.146:2181,192.168.0.147:2181
zookeeper.connection.timeout.ms=30000
host.name=192.168.0.145
port=9092
num.network.threads=4
queued.max.requests=500
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.receive.buffer.bytes=102400
num.partitions=8
default.replication.factor=3
min.insync.replicas=2
num.recovery.threads.per.data.dir=1
auto.create.topics.enable=true
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.cleanup.policy=[compact, delete]
log.cleaner.enable=true 
log.cleaner.min.cleanable.ratio=0.5
log.retention.check.interval.ms=300000
log.retention.hours=168
log.retention.bytes=1024000000
log.flush.interval.messages=1000
log.flush.interval.ms=60000
log.cleaner.io.max.bytes.per.second=60000
log.segment.bytes=1073741824
log.roll.hours=24*7
group.initial.rebalance.delay.ms=6000
delete.topic.enable=true
group.max.session.timeout.ms=300000
group.min.session.timeout.ms=6000
log.index.interval.bytes=4096

topic级别配置,使用方法是在创建topic的时候指定的

官方apache配置文档


创建topic的时候指定config,这个时候会覆盖broker级别的配置

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1
    --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1


修改已经存在topic的配置

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic
    --alter --add-config max.message.bytes=128000


查看已存在的topic配置是否设置成功

> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --describe


移除topic级别的配置,使用broker的默认配置

bin/kafka-configs.sh --zookeeper localhost:2181  --entity-type topics --entity-name my-topic
    --alter --delete-config max.message.bytes

#默认deletelist类型,
cleanup.policy [compact, delete]

#默认值producercompression.type

# 
delete.retention.ms

# 
file.delete.delay.ms

#每个partition内存cache多少messagerefresh一次磁盘,如果设置1代表每个partition没收到一个message就写一次磁盘
flush.messages    
# 磁盘刷新阈值: flush.messages 或者 flush.ms
flush.ms

#kafka每个partitionsegment文件每隔
index.interval.bytes

topic级别的配置主要是一些可以覆盖broker级别配置的配置:比如

  • 清理/压缩策略
  • 日志保留周期(大小和时间)
  • 消息flush阈值(条数和时间)
  • 索引interval

producer级别的配置

producer级别配置

  • key, value的序列化反序列化策略
  • ack策略:
配置名字 类型 举例 描述
key.serializer,value.serializer class JsonSerializer.class key, value的序列化反序列化策略
ack策略 String [all, -1, 0, 1] 0 代表producer只是把message发送到本机的 SOCKET的buffer中就当做已经发送成功了。而且send接口返回的offset永远是-1, 缺点:如果本机操作系统重启,消息会丢失; ACK=1,producer只是等待 leader写入leader日志以后,leader返回成功producer就认为已经发送成功,缺点:如果leader机器出现重启,会丢失。 all/-1 至少一个replication副本同步成功以后才会返回成功。安全性最高
bootstrap.servers String 192.168.0.145:9092,192.168.0.146:9092

consumer配置

consumer级别配置

  • fetch.min.bytes 默认值1byte就是只要kafka有数据,consumer就可以消费数据,如果设置的过大,那么即使kafka有message,但是如果达不到这个min.bytes的话,server会一直等待够了再给consumer,因此会出现消费延迟。
  • group.id 很重要,也很简单了不说了
  • heartbeat.interval.ms 当有新的consumer加进去的时候,reblance时间,时间范围应该在 [session.timeout.ms/3, session.timeout.ms]
  • auto.offset.reset & enable.auto.commit
  • isolation.level 目前支持 [read_committed, read_uncommitted],
  • max.poll.interval.ms & max.poll.records
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
慷慨打赏