返回到文章

采纳

编辑于 3年前

kafka Raft集群模式安装和启动(去zk)

kafka
kafka
实战

kafka Raft集群模式安装和启动(去zk)

接上一步,kafka Raft安装和启动(单节点)已经搭建好之后。

设置多个broker集群

之前,我们只是单一的运行一个kafka Raft模式的broker,没什么意思。对于Kafka,一个broker仅仅只是一个集群的大小,所有让我们多设几个broker。

首先为每个broker创建一个配置文件:

> cp config/kraft/server.properties config/kraft/server-1.properties
> cp config/kraft/server.properties config/kraft/server-2.properties 
> cp config/kraft/server.properties config/kraft/server-3.properties

现在编辑这些新建的文件,设置以下属性:

config/kraft/server-1.properties: 
    node.id=1
    controller.quorum.voters=1@localhost:9095,2@localhost:9097,3@localhost:9099
    listeners=PLAINTEXT://:9094,CONTROLLER://:9095
    advertised.listeners=PLAINTEXT://localhost:9094
    log.dirs=/tmp/kraft-combined-logs-1

config/kraft/server-2.properties: 
    node.id=2
    controller.quorum.voters=1@localhost:9095,2@localhost:9097,3@localhost:9099
    listeners=PLAINTEXT://:9096,CONTROLLER://:9097
    advertised.listeners=PLAINTEXT://localhost:9096
    log.dirs=/tmp/kraft-combined-logs-2

config/kraft/server-3.properties:
    node.id=3
    controller.quorum.voters=1@localhost:9095,2@localhost:9097,3@localhost:9099
    listeners=PLAINTEXT://:9098,CONTROLLER://:9099
    advertised.listeners=PLAINTEXT://localhost:9098
    log.dirs=/tmp/kraft-combined-logs-3

node.id是集群中每个节点的唯一且永久的名称,我们修改端口和日志目录是因为我们现在在同一台机器上运行,我们要防止broker在同一端口上注册和覆盖对方的数据。

生成集群id,并格式化存储目录

生成集群ID

bin/kafka-storage.sh random-uuid

Cba3BkapTFWyEKbpUkNJ_w #获取返回的集群ID

依次格式化存储目录:

> bin/kafka-storage.sh format -t Cba3BkapTFWyEKbpUkNJ_w -c config/kraft/server-1.properties
Formatting /tmp/kraft-combined-logs-1

> bin/kafka-storage.sh format -t Cba3BkapTFWyEKbpUkNJ_w -c config/kraft/server-2.properties
Formatting /tmp/kraft-combined-logs-2

> bin/kafka-storage.sh format -t Cba3BkapTFWyEKbpUkNJ_w -c config/kraft/server-3.properties
Formatting /tmp/kraft-combined-logs-3

启动这3个kafka节点:

> bin/kafka-server-start.sh config/kraft/server-1.properties &
...
> bin/kafka-server-start.sh config/kraft/server-2.properties &
...
> bin/kafka-server-start.sh config/kraft/server-3.properties &
...

现在,我们创建一个新topic,把备份设置为:3

> bin/kafka-topics.sh --create --bootstrap-server localhost:9094 --replication-factor 3 --partitions 1 --topic my-replicated-topic

好了,现在我们已经有了一个集群了,我们怎么知道每个集群在做什么呢?运行命令“describe topics”

> bin/kafka-topics.sh --bootstrap-server localhost:9094 --describe --topic my-replicated-topic
Topic: my-replicated-topic      TopicId: fPxYIYp1RDeT1lzIjM9iGQ PartitionCount: 1       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: my-replicated-topic      Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1

输出解释:第一行是所有分区的摘要,其次,每一行提供一个分区信息,因为我们只有一个分区,所以只有一行。

  • "Leader":该节点负责该分区的所有的读和写,每个节点的leader都是随机选择的。
  • "Replicas":备份的节点列表,无论该节点是否是leader或者目前是否还活着,只是显示。
  • "Isr":“同步备份”的节点列表,也就是活着的节点并且正在同步leader的数据。

我们运行这个命令,看看一开始我们创建的那个单节点:

> bin/kafka-topics.sh --bootstrap-server localhost:9094 --describe --topic test
Topic:test    PartitionCount:1    ReplicationFactor:1    Configs:
Topic: test    Partition: 0    Leader: 1    Replicas: 1    Isr: 1

这并不奇怪,刚才创建的主题没有Replicas(副本),并且在服务器“1”上,我们创建它的时候,集群中只有一个节点,所以是“1”。

让我们来发布一些信息在新的topic上:

> bin/kafka-console-producer.sh --broker-list localhost:9094 --topic my-replicated-topic
 ...
my test message 1
my test message 2
^C

现在,消费这些消息。

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --from-beginning --topic my-replicated-topic --consumer.config config/consumer.properties
 ...
my test message 1
my test message 2
^C

我们要测试集群的容错,kill掉Leader,Broker1作为当前的leader,也就是kill掉Broker1。

> ps -ef | grep server-1.properties
7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java... 
> kill -9 7564

在Windows上使用:

> wmic process where "caption = 'java.exe' and commandline like '%server-1.properties%'" get processid
ProcessId
6016
> taskkill /pid 6016 /f

备份节点之一成为新的leader,而broker1已经不在同步备份集合里了。

> bin/kafka-topics.sh --bootstrap-server localhost:9096 --describe --topic my-replicated-topic
Topic: my-replicated-topic      TopicId: fPxYIYp1RDeT1lzIjM9iGQ PartitionCount: 1       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: my-replicated-topic      Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3

但是,消息仍然没丢:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9096 --from-beginning --topic my-replicated-topic --consumer.config config/consumer.properties
...
my test message 1
my test message 2
^C

相关