kafka命令大全

原创
半兽人 发表于: 2016-10-25   最后更新时间: 2023-09-19 13:28:02  
{{totalSubscript}} 订阅, 147,008 游览

整理kafka相关的常用命令。

管理

## 创建topic(4个分区,2个副本)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test

### kafka版本 >= 2.2
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

## 分区扩容
### kafka版本 < 2.2
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2

### kafka版本 >= 2.2
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic topic1 --partitions 2

## 删除topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

查询

## 查询集群描述
bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181

## 查询集群描述(新)
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic foo --describe

## topic列表查询
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

## topic列表查询(支持0.9版本+)
bin/kafka-topics.sh --list --bootstrap-server localhost:9092

## 消费者列表查询(存储在zk中的)
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list

## 消费者列表查询(支持0.9版本+)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list

## 消费者列表查询(支持0.10版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

## 显示某个消费组的消费详情(仅支持offset存储在zookeeper上的)
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test

## 显示某个消费组的消费详情(0.9版本 - 0.10.1.0 之前)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group

## 显示某个消费组的消费详情(0.10.1.0版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

发送和消费

## 生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

## 消费者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test

## 生产者(支持0.9版本+)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties

## 消费者(支持0.9版本+)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties

## 消费者(最新)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.config config/consumer.properties


## kafka-verifiable-consumer.sh(消费者事件,例如:offset提交等)
bin/kafka-verifiable-consumer.sh --broker-list localhost:9092 --topic test --group-id groupName

## 高级点的用法
bin/kafka-simple-consumer-shell.sh --brist localhost:9092 --topic test --partition 0 --offset 1234  --max-messages 10

切换leader

# kafka版本 <= 2.4
> bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot

# kafka新版本
> bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port

kafka自带压测命令

bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100  --producer-props bootstrap.servers=localhost:9092

kafka持续发送消息

持续发送消息到指定的topic中,且每条发送的消息都会有响应信息:

kafka-verifiable-producer.sh --broker-list $(hostname -i):9092 --topic test --max-messages 100000

zookeeper-shell.sh

如果kafka集群的zk配置了chroot路径,那么需要加上/path

bin/zookeeper-shell.sh localhost:2181[/path]
ls /brokers/ids
get /brokers/ids/0

重置消费者offset

例如,要将消费者组的offset重置为最新的offset:

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest

 TOPIC                          PARTITION  NEW-OFFSET
topic1                         0          0

详情参考:https://www.orchome.com/35

迁移分区

  1. 创建规则json

    cat > increase-replication-factor.json <<EOF
    {"version":1, "partitions":[
    {"topic":"__consumer_offsets","partition":0,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":1,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":2,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":3,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":4,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":5,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":6,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":7,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":8,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":9,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":10,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":11,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":12,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":13,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":14,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":15,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":16,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":17,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":18,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":19,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":20,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":21,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":22,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":23,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":24,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":25,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":26,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":27,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":28,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":29,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":30,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":31,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":32,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":33,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":34,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":35,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":36,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":37,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":38,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":39,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":40,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":41,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":42,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":43,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":44,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":45,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":46,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":47,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":48,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":49,"replicas":[0,1]}]
    }
    EOF
    
  2. 执行

    bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
    
  3. 验证

    bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
    

删除消费者组

https://www.orchome.com/10257

更新于 2023-09-19

没干嘛 2年前

大神你好,我想问一下,我用kafka自带压测命令,结果给我报这个错,是为什么呢,我的生产消费都没有问题

1 records sent, 0.0 records/sec (0.00 MB/sec), 60001.0 ms avg latency, 60001.0 max latency.
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

半兽人 -> 没干嘛 2年前

2种情况
1、压测过大,有些消息发送排队超过了60000 ms
2、跟压测无关,如果你timeout一开始就是失败的,那说明你当下发送指令的这个集群请求地址本身就不通,telnet一下全部节点,看看网络。

没干嘛 -> 半兽人 2年前

我全部测试过了,端口都是通的,请问问题出在哪里呢。
具体问题详情转到了:kafka自带压力测试报org.apache.kafka.common.errors.TimeoutException

秦山木槿花 4年前

我还是下午问你的那个小白,扳手哥
你让我看那个迁移分区对应的消费组lag情况,我想了下
可是我迁移的这个分区,ISR 中已经出现了我的目标地址(broker_id)了,我个人觉得这个迁移的消费者组id对应的lag已经为0了(我也找不到。。这个消费者组id。。。)
我看了源码的注释
这是分区迁移的流程
我认为我现在已经到了第八步,在第九步或者第10步卡住了

  * For example, if OAR = {1, 2, 3} and RAR = {4,5,6}, the values in the assigned replica (AR) and leader/isr path in ZK
   * may go through the following transition.
   * AR                 leader/isr
   * {1,2,3}            1/{1,2,3}           (initial state)
   * {1,2,3,4,5,6}      1/{1,2,3}           (step 2)
   * {1,2,3,4,5,6}      1/{1,2,3,4,5,6}     (step 4)
   * {1,2,3,4,5,6}      4/{1,2,3,4,5,6}     (step 7)
   * {1,2,3,4,5,6}      4/{4,5,6}           (step 8)
   * {4,5,6}            4/{4,5,6}           (step 10)
   *
愚妄 4年前

在此目录下运行命令
/opt/kafka_2.12-2.2.0/

1.启动内置的zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

2.启动kafka服务
bin/kafka-server-start.sh config/server.properties

3.创建topic话题
bin/kafka-topics.sh --create --topic test --bootstrap-server 192.168.218.128:9092 -partitions 3 -replication-factor 1

查看所有topic话题
bin/kafka-topics.sh --list --bootstrap-server 192.168.218.128:9092

查看指定话题的详情
bin/kafka-topics.sh --bootstrap-server 192.168.230.128:9092 --describe --topic test

3.创建生产者
bin/kafka-console-producer.sh --broker-list 192.168.218.128:9092 --topic test

4.创建消费者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.218.128:9092 --topic test

愚妄 -> 愚妄 4年前

这个是最基本的吗?

半兽人 -> 愚妄 4年前

是的。

小夕夕 5年前

博主,您好:
想请教个kafka副本扩容问题:(2个broker,2个分区,1个副本)
今天创建了一个topic,指定了2个分区,1个副本,后来想把副 本修改为2个,按照操作步骤执行:
1、创建json文件

{
"partitions":
[
{
"topic":"queue",
"partition": 0,
"replicas": [1,2]
},
{
"topic": "queue",
"partition": 1,
"replicas": [2,1]
}
],
"version":1
}

2、./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file test.json --execute
3、./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file test.json --verify
以上操作,终端都没提示错误,都正常执行
但是当我停掉其中一台kafka后,消费者就消费不到数据了,这种情况我不知道如何排查了,希望博主能帮我看看

半兽人 -> 小夕夕 5年前

__consumer_offsets 这个消费者主题也要扩成2个副本。

小夕夕 -> 半兽人 5年前

您的意思是保存偏移量的这个topic也要2副本,是吧? 我刚刚看了下,我的两个broker里,一个保存的是奇数的分区,一个保存的是偶数的分区。
那我现在要对偏移量进行增加副本操作了?

小夕夕 -> 小夕夕 5年前

我试着已经将__consumer_offsets 扩成2个副本了, 第2台停掉后,能正常消费。但是第1台停掉后,就不正常消费了。

小夕夕 -> 小夕夕 5年前

错误是这个:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition

小夕夕 -> 小夕夕 5年前

有一个奇怪的现象是: 我刚刚扩容的这个__consumer_offsets 副本,leader显示都是为1

半兽人 -> 小夕夕 5年前

到问题专题里面把你的主题状态贴一下给我看看吧,咱们换个地方。

w 5年前

还有大大 问一下 我用java写producer的时候 比如有多个topic 但是只有其中一个topic能用 其他都会报错并且会在linux下产生多个sonsumer-console进程 然后consumer 直接不能用

w -> w 5年前

打错 是console-consumer 进程

半兽人 -> w 5年前

没看懂额。。

w 5年前
./kafka-consumer-groups.sh --list --new-consumer --bootstrap-server localhost:9092 
Exception in thread "main" joptsimple.UnrecognizedOptionException: new-consumer is not a recognized option
    at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)
    at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)
    at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)
    at joptsimple.OptionParser.parse(OptionParser.java:396)
    at kafka.admin.ConsumerGroupCommand$ConsumerGroupCommandOptions.<init>(ConsumerGroupCommand.scala:725)
    at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:42)
    at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
半兽人 -> w 5年前

你什么版本,把--new-consumer去掉试试。

小夕夕 6年前

 bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
执行此命令,有报错:new-consumer is not a recognized option

没遇到过这个情况

Explore 6年前

您好,有没有用Java API创建,查找topic的文章?

無名 -> Explore 6年前

topic你要通过api创建,还是查询。如果是查询可以参考KafaOffsetMonitor源码。

Explore -> 無名 6年前

做的是一个先查询,如果没有这个topic,则去创建topic。
我用网上查找的TopicCommand.main(options);报错运行不出来。。。

無名 -> Explore 6年前

会自动创建,为啥要自己创建呢?

Explore -> 無名 6年前

啊?用send命令时候,producer.send(topic, key, value),如果kafka没有这个topic的时候,会自动创建吗?

半兽人 -> Explore 6年前

对呀。

Explore -> 半兽人 6年前

奥奥,非常感谢。。。学到了学到了,谢谢大佬。。。。
大佬有公众号什么的吗?

Explore -> 半兽人 6年前

嗯嗯,好的,谢谢,辛苦您了。

Nirvana 6年前

我这kafka内存占用率下不来,什么情况,消息也过了失效的时间。

半兽人 -> Nirvana 6年前

咦,我记得回答过你的问题。
kafka是基于jvm的,充分利用当前内存,当有别的进程启动的时候,会释放这部分内存。

0.0 7年前

.sh在命令台运行不了啊,不是只能运行.bat文件吗

半兽人 -> 0.0 7年前

跟你的系统有关。linux是sh,windowns是bat

0.0 -> 半兽人 7年前

对不起,我傻逼了

查看kafka更多相关的文章或提一个关于kafka的问题,也可以与我们一起分享文章