kafka对topic,group等操作命令

如何使用kafka shell 管理kafka, 上面几篇写了如何搭建和使用api方式操作kafka。下面讲解使用shell控制台来对 topic增删改查操作, 对消费组进行消费延迟查看, 重置kafka 消费组的offset等操作。
下面所有操作都是基于 kafka 1.1.x 。

启动kafka

./zookeeper-server-start.sh -daemon ../config/zookeeper.properties 
./kafka-server-start.sh -daemon ../config/server.properties

topic 级别操作

创建topic

具体其他config见 kafka创建topic的config

./kafka-topics.sh --zookeeper ZK_HOST:2181 --create --topic TOPIC_NAME --partitions 15   --replication-factor 2 --config max.message.bytes=640000 --config flush.messages=1 --config retention.ms=86400000

删除topic

kafka删除topic的config kafka只是在zk上给topic打一个delete标记, 数据不会删除。 直到下一次触发磁盘整理或者重启的时候才会删除数据真正删除topic。

./kafka-topics.sh --delete --zookeeper ZK_HOST:2181 --topic TOPIC_NAME

修改topic

具体其他见 kafka修改topic

./kafka-topics.sh --alter --topic  TOPIC_NAME     --zookeeper ZK_HOST:2181 --partitions 9   --config max.message.bytes=640000 --config retention.bytes=8000000000 --config retention.ms=86400000

查询topic

kafka查看topic的 各种config信息和partition信息
列出所有topic

./kafka-topics.sh --list --zookeeper ZK_HOST:2181


列出所有topic并显示所有topic配置信息

./kafka-topics.sh --describe --zookeeper BK_HOST:2181


显示指定topic的配置信息

./kafka-topics.sh --describe --zookeeper ZK_HOST:2181 --topic   TOPIC_NAME

更新负载因子个数

# kafka不支持这种方式修改负载因子。
./kafka-topics.sh --alter --topic  point-topic-dev1  --zookeeper 172.16.217.67:2181    --replication-factor 2
Option "[replication-factor]" can't be used with option"[alter]"

使用如下方法

{"version":1,
  "partitions":[
     {"topic":"topic-dev1","partition":0,"replicas":[1,2]},
     {"topic":"topic-dev1","partition":1,"replicas":[2,3]},
     {"topic":"topic-dev1","partition":2,"replicas":[1,2]},
     {"topic":"topic-dev1","partition":3,"replicas":[1,2]},
     {"topic":"topic-dev1","partition":4,"replicas":[2,3]},
     {"topic":"topic-dev1","partition":5,"replicas":[1,3]},
     {"topic":"topic-dev1","partition":6,"replicas":[2,3]},
     {"topic":"topic-dev1","partition":7,"replicas":[1,3]},
     {"topic":"topic-dev1","partition":8,"replicas":[1,3]}
]}

./kafka-reassign-partitions.sh -zookeeper zk_host:2181 --reassignment-json-file /Users/wanli.zhou/Desktop/p.json --execute

group级别操作

查看group信息


列出所有group

./kafka-consumer-groups.sh --bootstrap-server ZK_HOST:9092   --list

查看特定组的详情(包含延迟)

 ./kafka-consumer-groups.sh --bootstrap-server BK_HOST:9092 --describe --group GROUP_NAME

 TOPIC            PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
TOPIC_NAME 7          5               5               0               consumer-1-66c5955d-95e6-4440-9bb6-b483c5802e13 /127.0.0.1      consumer-1
TOPIC_NAME 3          5               5               0               consumer-1-66c5955d-95e6-4440-9bb6-b483c5802e13 /127.0.0.1      consumer-1
TOPIC_NAME 6          5               5               0               consumer-1-66c5955d-95e6-4440-9bb6-b483c5802e13 /127.0.0.1      consumer-1
TOPIC_NAME 2          4               4               0               consumer-1-66c5955d-95e6-4440-9bb6-b483c5802e13 /127.0.0.1      consumer-1
TOPIC_NAME 1          5               5               0               consumer-1-66c5955d-95e6-4440-9bb6-b483c5802e13 /127.0.0.1      consumer-1
TOPIC_NAME 5          4               4               0               consumer-1-66c5955d-95e6-4440-9bb6-b483c5802e13 /127.0.0.1      consumer-1
TOPIC_NAME 8          4               4               0               consumer-1-66c5955d-95e6-4440-9bb6-b483c5802e13 /127.0.0.1      consumer-1
TOPIC_NAME 4          4               4               0               consumer-1-66c5955d-95e6-4440-9bb6-b483c5802e13 /127.0.0.1      consumer-1
TOPIC_NAME 0          4               4               0               consumer-1-66c5955d-95e6-4440-9bb6-b483c5802e13 /127.0.0.1      consumer-1

查看一个group所有连接的客户端

./kafka-consumer-groups.sh --bootstrap-server BK_HOST:9092 --describe --group wanli-point-test-g --members   --verbose

  CONSUMER-ID                                     HOST            CLIENT-ID       #PARTITIONS     ASSIGNMENT
consumer-1-66c5955d-95e6-4440-9bb6-b483c5802e13 /127.0.0.1      consumer-1      9               point-topic-test(0,1,2,3,4,5,6,7,8)

重置一个group的offset


重置offset,必须把所有group的程序关闭,否则下面操作没用
有 --to-current, --to-datetime , --to-earliest ,--to-latest ,--to-offset

./kafka-consumer-groups.sh --bootstrap-server BK_HOST:9092 --group console-consumer-30171 --topic point-topic-test --reset-offsets --to-earliest -–execute
./kafka-consumer-groups.sh --bootstrap-server snn:6667 --group offsettest --topic offset-test --reset-offsets --to-offset 3 -–execute
 ./kafka-consumer-groups.sh --bootstrap-server snn:6667 --group offsettest --topic offset-test --reset-offsets --to-latest --execute

查看一个groupd的状态

./kafka-consumer-groups.sh --bootstrap-server BK_HOST:9092  --describe --group GROUP_NAME  --state --timeout 6000000

COORDINATOR (ID)          ASSIGNMENT-STRATEGY       STATE                #MEMBERS
BK_HOST:9092 (3)    range                     Stable               1

其他

查看每个message的过期时间

./kafka-console-consumer.sh  --consumer-property exclude.internal.topics=false \
                                                            --bootstrap-server BK_HOST:9092 \
                              --from-beginning \
                              --topic TOPIC_NAME \
                              --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

console producer

sh  ./kafka-console-producer.sh --broker-list  BK_HOST:9092 --topic TOPIC_NAME --property "parse.key=true" --property "key.separator=:"

console consumer

可以跟如下参数: --from-beginning , --max-messages 10 等

./kafka-console-consumer.sh --bootstrap-server BK_HOST:9092 --topic TOPIC_NAME      --group g-5 --partition 1 --max-messages 10
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
慷慨打赏