接上一步,kafka Raft安装和启动(单节点)已经搭建好之后。
之前,我们只是单一的运行一个kafka Raft模式的broker,没什么意思。对于Kafka,一个broker仅仅只是一个集群的大小,所有让我们多设几个broker。
首先为每个broker创建一个配置文件:
> cp config/kraft/server.properties config/kraft/server-1.properties
> cp config/kraft/server.properties config/kraft/server-2.properties
> cp config/kraft/server.properties config/kraft/server-3.properties
现在编辑这些新建的文件,设置以下属性:
config/kraft/server-1.properties:
node.id=1
controller.quorum.voters=1@localhost:9095,2@localhost:9097,3@localhost:9099
listeners=PLAINTEXT://:9094,CONTROLLER://:9095
advertised.listeners=PLAINTEXT://localhost:9094
log.dirs=/tmp/kraft-combined-logs-1
config/kraft/server-2.properties:
node.id=2
controller.quorum.voters=1@localhost:9095,2@localhost:9097,3@localhost:9099
listeners=PLAINTEXT://:9096,CONTROLLER://:9097
advertised.listeners=PLAINTEXT://localhost:9096
log.dirs=/tmp/kraft-combined-logs-2
config/kraft/server-3.properties:
node.id=3
controller.quorum.voters=1@localhost:9095,2@localhost:9097,3@localhost:9099
listeners=PLAINTEXT://:9098,CONTROLLER://:9099
advertised.listeners=PLAINTEXT://localhost:9098
log.dirs=/tmp/kraft-combined-logs-3
node.id
是集群中每个节点的唯一且永久的名称,我们修改端口和日志目录是因为我们现在在同一台机器上运行,我们要防止broker在同一端口上注册和覆盖对方的数据。
生成集群ID
bin/kafka-storage.sh random-uuid
Cba3BkapTFWyEKbpUkNJ_w #获取返回的集群ID
依次格式化存储目录:
> bin/kafka-storage.sh format -t Cba3BkapTFWyEKbpUkNJ_w -c config/kraft/server-1.properties
Formatting /tmp/kraft-combined-logs-1
> bin/kafka-storage.sh format -t Cba3BkapTFWyEKbpUkNJ_w -c config/kraft/server-2.properties
Formatting /tmp/kraft-combined-logs-2
> bin/kafka-storage.sh format -t Cba3BkapTFWyEKbpUkNJ_w -c config/kraft/server-3.properties
Formatting /tmp/kraft-combined-logs-3
启动这3个kafka节点:
> bin/kafka-server-start.sh config/kraft/server-1.properties &
...
> bin/kafka-server-start.sh config/kraft/server-2.properties &
...
> bin/kafka-server-start.sh config/kraft/server-3.properties &
...
现在,我们创建一个新topic,把备份设置为:3
> bin/kafka-topics.sh --create --bootstrap-server localhost:9094 --replication-factor 3 --partitions 1 --topic my-replicated-topic
好了,现在我们已经有了一个集群了,我们怎么知道每个集群在做什么呢?运行命令“describe topics”
> bin/kafka-topics.sh --bootstrap-server localhost:9094 --describe --topic my-replicated-topic
Topic: my-replicated-topic TopicId: fPxYIYp1RDeT1lzIjM9iGQ PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
输出解释:第一行是所有分区的摘要,其次,每一行提供一个分区信息,因为我们只有一个分区,所以只有一行。
leader
都是随机选择的。我们运行这个命令,看看一开始我们创建的那个单节点:
> bin/kafka-topics.sh --bootstrap-server localhost:9094 --describe --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1
这并不奇怪,刚才创建的主题没有Replicas(副本),并且在服务器“1”上,我们创建它的时候,集群中只有一个节点,所以是“1”。
让我们来发布一些信息在新的topic上:
> bin/kafka-console-producer.sh --broker-list localhost:9094 --topic my-replicated-topic
...
my test message 1
my test message 2
^C
现在,消费这些消息。
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --from-beginning --topic my-replicated-topic --consumer.config config/consumer.properties
...
my test message 1
my test message 2
^C
我们要测试集群的容错,kill掉Leader,Broker1作为当前的leader,也就是kill掉Broker1。
> ps -ef | grep server-1.properties
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java...
> kill -9 7564
在Windows上使用:
> wmic process where "caption = 'java.exe' and commandline like '%server-1.properties%'" get processid
ProcessId
6016
> taskkill /pid 6016 /f
备份节点之一成为新的leader,而broker1已经不在同步备份集合里了。
> bin/kafka-topics.sh --bootstrap-server localhost:9096 --describe --topic my-replicated-topic
Topic: my-replicated-topic TopicId: fPxYIYp1RDeT1lzIjM9iGQ PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3
但是,消息仍然没丢:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9096 --from-beginning --topic my-replicated-topic --consumer.config config/consumer.properties
...
my test message 1
my test message 2
^C