返回到文章
优化文章格式

采纳

编辑于 4年前

本地kafka消费者代码获取windows虚拟机上的kafka集群消息时,获取不到却也不报错,只是一直在循环,请问是什么原因?

kafka版本:kafka_2.12-2.5.0
zookeeper版本:zookeeper-3.6.3

windows虚拟机上的kafka集群大体上是按照这篇博客弄出来的: https://blog.csdn.net/weixin_38040473/article/details/106716439, 在虚拟机kafka客户端上是可以正常消费,本地代码也可以正常生产消息。

kafka配置文件server.properties大概只改了这几个属性,其他的没有更改:

broker.id=0
listeners=PLAINTEXT://192.168.2.99:9092
port=9092
host.name=192.168.2.99
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
public class MyKafkaFilterConsumer {
    public static void main(String[] args){
        // 1.使用Properties定义配置属性
        Properties properties = new Properties();
        // 设置消费者Broker服务器的连接地址
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.2.99:9092,192.168.2.99:9093,192.168.2.99:9094");
        // 设置key反序列化的程序,与生成者对应
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 设置value反序列化的程序,与生产者对应
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        // 设置消费者组ID,即组名称,值可自定义,组名称相同的消费者进程属于同一个消费者组
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"583-con");
        // 2, 定义消费者对象
        Consumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
        // 3 设置消费者读取的主题名称,可以设置多个
        consumer.subscribe(Arrays.asList("testkkk"));
        while(true){
            // 拉取消息,并设置超时时间为10秒
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
            for(ConsumerRecord<String,String> record : records){
                // 打印消息关键消息
               // System.out.println("kafka-filter-key: "+record.key()+", kafka-filter-value: "+record.value()+", kafka-filter-partition: "+record.partition()+",kafka-filter-offset: "+record.offset());
                System.out.println("topic = " + record.topic() + " offset = " + record.offset() + " value = " + record.value());
            }
            consumer.commitAsync();
        }
    }
}

报错信息

"D:\javaPro\idea\IntelliJ IDEA 2019.3\jbr\bin\java.exe" -agentlib:jdwp=transport=dt_socket,address=127.0.0.1:60504,suspend=y,server=n -Dfile.encoding=UTF-8 -classpath "D:\javaPro\ideaPro\新建文件夹\kkT\target\classes;C:\Users\微凉\.m2\repository\org\springframework\boot\spring-boot-starter-web\2.4.0\spring-boot-starter-web-2.4.0.jar;C:\Users\微凉\.m2\repository\org\springframework\boot\spring-boot-starter\2.4.0\spring-boot-starter-2.4.0.jar;C:\Users\微凉\.m2\repository\org\springframework\boot\spring-boot\2.4.0\spring-boot-2.4.0.jar;C:\Users\微凉\.m2\repository\org\springframework\boot\spring-boot-autoconfigure\2.4.0\spring-boot-autoconfigure-2.4.0.jar;C:\Users\微凉\.m2\repository\org\springframework\boot\spring-boot-starter-logging\2.4.0\spring-boot-starter-logging-2.4.0.jar;C:\Users\微凉\.m2\repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;C:\Users\微凉\.m2\repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;C:\Users\微凉\.m2\repository\org\apache\logging\log4j\log4j-to-slf4j\2.13.3\log4j-to-slf4j-2.13.3.jar;C:\Users\微凉\.m2\repository\org\apache\logging\log4j\log4j-api\2.13.3\log4j-api-2.13.3.jar;C:\Users\微凉\.m2\repository\org\slf4j\jul-to-slf4j\1.7.30\jul-to-slf4j-1.7.30.jar;C:\Users\微凉\.m2\repository\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;C:\Users\微凉\.m2\repository\org\yaml\snakeyaml\1.27\snakeyaml-1.27.jar;C:\Users\微凉\.m2\repository\org\springframework\boot\spring-boot-starter-json\2.4.0\spring-boot-starter-json-2.4.0.jar;C:\Users\微凉\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.11.3\jackson-databind-2.11.3.jar;C:\Users\微凉\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.11.3\jackson-annotations-2.11.3.jar;C:\Users\微凉\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.11.3\jackson-core-2.11.3.jar;C:\Users\微凉\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.11.3\jackson-datatype-jdk8-2.11.3.jar;C:\Users\微凉\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.11.3\jackson-datatype-jsr310-2.11.3.jar;C:\Users\微凉\.m2\repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.11.3\jackson-module-parameter-names-2.11.3.jar;C:\Users\微凉\.m2\repository\org\springframework\boot\spring-boot-starter-tomcat\2.4.0\spring-boot-starter-tomcat-2.4.0.jar;C:\Users\微凉\.m2\repository\org\apache\tomcat\embed\tomcat-embed-core\9.0.39\tomcat-embed-core-9.0.39.jar;C:\Users\微凉\.m2\repository\org\glassfish\jakarta.el\3.0.3\jakarta.el-3.0.3.jar;C:\Users\微凉\.m2\repository\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.39\tomcat-embed-websocket-9.0.39.jar;C:\Users\微凉\.m2\repository\org\springframework\spring-web\5.3.1\spring-web-5.3.1.jar;C:\Users\微凉\.m2\repository\org\springframework\spring-beans\5.3.1\spring-beans-5.3.1.jar;C:\Users\微凉\.m2\repository\org\springframework\spring-webmvc\5.3.1\spring-webmvc-5.3.1.jar;C:\Users\微凉\.m2\repository\org\springframework\spring-aop\5.3.1\spring-aop-5.3.1.jar;C:\Users\微凉\.m2\repository\org\springframework\spring-context\5.3.1\spring-context-5.3.1.jar;C:\Users\微凉\.m2\repository\org\springframework\spring-expression\5.3.1\spring-expression-5.3.1.jar;C:\Users\微凉\.m2\repository\org\springframework\spring-core\5.3.1\spring-core-5.3.1.jar;C:\Users\微凉\.m2\repository\org\springframework\spring-jcl\5.3.1\spring-jcl-5.3.1.jar;C:\Users\微凉\.m2\repository\org\apache\kafka\kafka-clients\2.4.1\kafka-clients-2.4.1.jar;C:\Users\微凉\.m2\repository\com\github\luben\zstd-jni\1.4.3-1\zstd-jni-1.4.3-1.jar;C:\Users\微凉\.m2\repository\org\lz4\lz4-java\1.6.0\lz4-java-1.6.0.jar;C:\Users\微凉\.m2\repository\org\xerial\snappy\snappy-java\1.1.7.3\snappy-java-1.1.7.3.jar;C:\Users\微凉\.m2\repository\org\slf4j\slf4j-api\1.7.30\slf4j-api-1.7.30.jar;D:\javaPro\idea\IntelliJ IDEA 2019.3\lib\idea_rt.jar" cn.xp.consumer.MyKafkaFilterConsumer
Connected to the target VM, address: '127.0.0.1:0', transport: 'socket'
16:05:57.174 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [192.168.2.99:9092, 192.168.2.99:9093, 192.168.2.99:9094]
    check.crcs = true
    client.dns.lookup = default
    client.id = 
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = 583-con
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

16:05:57.179 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initializing the Kafka consumer
16:05:57.861 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.4.1
16:05:57.862 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: c57222ae8cd7866b
16:05:57.862 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1620461157856
16:05:57.864 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-583-con-1, groupId=583-con] Kafka consumer initialized
16:05:57.865 [main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-583-con-1, groupId=583-con] Subscribed to topic(s): testkkk
16:05:57.866 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending FindCoordinator request to broker 192.168.2.99:9093 (id: -2 rack: null)
16:05:57.996 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initiating connection to node 192.168.2.99:9093 (id: -2 rack: null) using address /192.168.2.99
16:05:58.004 [main] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-583-con-1, groupId=583-con] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -2
16:05:58.004 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Completed connection to node -2. Fetching API versions.
16:05:58.004 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initiating API versions fetch from node -2.
16:05:58.126 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Recorded API versions for node -2: (Produce(0): 0 to 8 [usable: 8], Fetch(1): 0 to 11 [usable: 11], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 9 [usable: 9], LeaderAndIsr(4): 0 to 4 [usable: 4], StopReplica(5): 0 to 2 [usable: 2], UpdateMetadata(6): 0 to 6 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 7 [usable: 6], FindCoordinator(10): 0 to 3 [usable: 3], JoinGroup(11): 0 to 7 [usable: 6], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 4], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 3 [usable: 3], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 5 [usable: 5], DeleteTopics(20): 0 to 4 [usable: 4], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 3 [usable: 2], OffsetForLeaderEpoch(23): 0 to 3 [usable: 3], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 2], DescribeAcls(29): 0 to 2 [usable: 1], CreateAcls(30): 0 to 2 [usable: 1], DeleteAcls(31): 0 to 2 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 2 [usable: 1], CreatePartitions(37): 0 to 2 [usable: 1], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 1], ExpireDelegationToken(40): 0 to 2 [usable: 1], DescribeDelegationToken(41): 0 to 2 [usable: 1], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0])
16:05:58.128 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name='testkkk')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node 192.168.2.99:9093 (id: -2 rack: null)
16:05:58.192 [main] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-583-con-1, groupId=583-con] Updating last seen epoch from null to 0 for partition testkkk-0
16:05:58.194 [main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-583-con-1, groupId=583-con] Cluster ID: w_bDBUtGR4-0LpMJ3e4RvA
16:05:58.194 [main] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-583-con-1, groupId=583-con] Updated cluster metadata updateVersion 2 to MetadataCache{clusterId='w_bDBUtGR4-0LpMJ3e4RvA', nodes=[192.168.2.99:9092 (id: 0 rack: null), 192.168.2.99:9093 (id: 1 rack: null), 192.168.2.99:9094 (id: 2 rack: null)], partitions=[PartitionInfoAndEpoch{partitionInfo=Partition(topic = testkkk, partition = 0, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), epoch=0}], controller=192.168.2.99:9092 (id: 0 rack: null)}
16:05:58.195 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Received FindCoordinator response ClientResponse(receivedTimeMs=1620461158194, latencyMs=202, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-583-con-1, correlationId=0), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=0, host='192.168.2.99', port=9092))
16:05:58.195 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Discovered group coordinator 192.168.2.99:9092 (id: 2147483647 rack: null)
16:05:58.195 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initiating connection to node 192.168.2.99:9092 (id: 2147483647 rack: null) using address /192.168.2.99
16:05:58.197 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Executing onJoinPrepare with generation -1 and memberId 
16:05:58.197 [kafka-coordinator-heartbeat-thread | 583-con] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Heartbeat thread started
16:05:58.197 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Disabling heartbeat thread
16:05:58.197 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] (Re-)joining group
16:05:58.198 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Joining group with current subscription: [testkkk]
16:05:58.199 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending JoinGroup (JoinGroupRequestData(groupId='583-con', sessionTimeoutMs=10000, rebalanceTimeoutMs=300000, memberId='', groupInstanceId=null, protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='range', metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 101, 115, 116, 107, 107, 107, -1, -1, -1, -1, 0, 0, 0, 0])])) to coordinator 192.168.2.99:9092 (id: 2147483647 rack: null)
16:05:58.201 [main] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-583-con-1, groupId=583-con] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2147483647
16:05:58.201 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Completed connection to node 2147483647. Fetching API versions.
16:05:58.201 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initiating API versions fetch from node 2147483647.
16:05:58.205 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Recorded API versions for node 2147483647: (Produce(0): 0 to 8 [usable: 8], Fetch(1): 0 to 11 [usable: 11], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 9 [usable: 9], LeaderAndIsr(4): 0 to 4 [usable: 4], StopReplica(5): 0 to 2 [usable: 2], UpdateMetadata(6): 0 to 6 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 7 [usable: 6], FindCoordinator(10): 0 to 3 [usable: 3], JoinGroup(11): 0 to 7 [usable: 6], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 4], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 3 [usable: 3], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 5 [usable: 5], DeleteTopics(20): 0 to 4 [usable: 4], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 3 [usable: 2], OffsetForLeaderEpoch(23): 0 to 3 [usable: 3], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 2], DescribeAcls(29): 0 to 2 [usable: 1], CreateAcls(30): 0 to 2 [usable: 1], DeleteAcls(31): 0 to 2 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 2 [usable: 1], CreatePartitions(37): 0 to 2 [usable: 1], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 1], ExpireDelegationToken(40): 0 to 2 [usable: 1], DescribeDelegationToken(41): 0 to 2 [usable: 1], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0])
16:05:58.209 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Disabling heartbeat thread
16:05:58.209 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] (Re-)joining group
16:05:58.209 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Joining group with current subscription: [testkkk]
16:05:58.210 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending JoinGroup (JoinGroupRequestData(groupId='583-con', sessionTimeoutMs=10000, rebalanceTimeoutMs=300000, memberId='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', groupInstanceId=null, protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='range', metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 101, 115, 116, 107, 107, 107, -1, -1, -1, -1, 0, 0, 0, 0])])) to coordinator 192.168.2.99:9092 (id: 2147483647 rack: null)
16:05:58.222 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Received successful JoinGroup response: JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=1, protocolName='range', leader='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', memberId='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', members=[JoinGroupResponseMember(memberId='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', groupInstanceId=null, metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 101, 115, 116, 107, 107, 107, -1, -1, -1, -1, 0, 0, 0, 0])])
16:05:58.222 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Performing assignment using strategy range with subscriptions {consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Subscription@18bc345}
16:05:58.224 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Finished assignment for group at generation 1: {consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2=Assignment(partitions=[testkkk-0])}
16:05:58.225 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending leader SyncGroup to coordinator 192.168.2.99:9092 (id: 2147483647 rack: null) at generation Generation{generationId=1, memberId='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', protocol='range'}: SyncGroupRequestData(groupId='583-con', generationId=1, memberId='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', groupInstanceId=null, assignments=[SyncGroupRequestAssignment(memberId='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', assignment=[0, 1, 0, 0, 0, 1, 0, 7, 116, 101, 115, 116, 107, 107, 107, 0, 0, 0, 1, 0, 0, 0, 0, -1, -1, -1, -1])])
16:05:58.238 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Successfully joined group with generation 1
16:05:58.238 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Enabling heartbeat thread
16:05:58.239 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Executing onJoinComplete with generation 1 and memberId consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2
16:05:58.241 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Adding newly assigned partitions: testkkk-0
16:05:58.247 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Fetching committed offsets for partitions: [testkkk-0]
16:05:58.251 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Found no committed offset for partition testkkk-0
16:05:58.253 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={testkkk-0={timestamp: -1, maxNumOffsets: 1, currentLeaderEpoch: Optional[0]}}, isolationLevel=READ_UNCOMMITTED) to broker 192.168.2.99:9094 (id: 2 rack: null)
16:05:58.256 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initiating connection to node 192.168.2.99:9094 (id: 2 rack: null) using address /192.168.2.99
16:05:58.258 [main] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-583-con-1, groupId=583-con] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2
16:05:58.258 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Completed connection to node 2. Fetching API versions.
16:05:58.258 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initiating API versions fetch from node 2.
16:05:58.324 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Recorded API versions for node 2: (Produce(0): 0 to 8 [usable: 8], Fetch(1): 0 to 11 [usable: 11], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 9 [usable: 9], LeaderAndIsr(4): 0 to 4 [usable: 4], StopReplica(5): 0 to 2 [usable: 2], UpdateMetadata(6): 0 to 6 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 7 [usable: 6], FindCoordinator(10): 0 to 3 [usable: 3], JoinGroup(11): 0 to 7 [usable: 6], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 4], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 3 [usable: 3], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 5 [usable: 5], DeleteTopics(20): 0 to 4 [usable: 4], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 3 [usable: 2], OffsetForLeaderEpoch(23): 0 to 3 [usable: 3], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 2], DescribeAcls(29): 0 to 2 [usable: 1], CreateAcls(30): 0 to 2 [usable: 1], DeleteAcls(31): 0 to 2 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 2 [usable: 1], CreatePartitions(37): 0 to 2 [usable: 1], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 1], ExpireDelegationToken(40): 0 to 2 [usable: 1], DescribeDelegationToken(41): 0 to 2 [usable: 1], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0])
16:05:58.330 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Handling ListOffsetResponse response for testkkk-0. Fetched offset 7, timestamp -1
16:05:58.331 [main] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-583-con-1, groupId=583-con] Not replacing existing epoch 0 with new epoch 0 for partition testkkk-0
16:05:58.331 [main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-583-con-1, groupId=583-con] Resetting offset for partition testkkk-0 to offset 7.
16:05:58.334 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Added READ_UNCOMMITTED fetch request for partition testkkk-0 at position FetchPosition{offset=7, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=192.168.2.99:9094 (id: 2 rack: null), epoch=0}} to node 192.168.2.99:9094 (id: 2 rack: null)
16:05:58.334 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Built full fetch (sessionId=INVALID, epoch=INITIAL) for node 2 with 1 partition(s).
16:05:58.335 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending READ_UNCOMMITTED FullFetchRequest(testkkk-0) to broker 192.168.2.99:9094 (id: 2 rack: null)
16:05:58.848 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Node 2 sent a full fetch response that created a new incremental fetch session 1123350265 with 1 response partition(s)
16:05:58.849 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Fetch READ_UNCOMMITTED at offset 7 for partition testkkk-0 returned fetch data (error=NONE, highWaterMark=7, lastStableOffset = 7, logStartOffset = 0, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=0)
16:05:58.851 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Added READ_UNCOMMITTED fetch request for partition testkkk-0 at position FetchPosition{offset=7, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=192.168.2.99:9094 (id: 2 rack: null), epoch=0}} to node 192.168.2.99:9094 (id: 2 rack: null)
16:05:58.852 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Built incremental fetch (sessionId=1123350265, epoch=1) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
16:05:58.852 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(testkkk-0)) to broker 192.168.2.99:9094 (id: 2 rack: null)
16:05:59.703 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Node 2 sent an incremental fetch response for session 1123350265 with 0 response partition(s), 1 implied partition(s)
16:05:59.703 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Added READ_UNCOMMITTED fetch request for partition testkkk-0 at position FetchPosition{offset=7, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=192.168.2.99:9094 (id: 2 rack: null), epoch=0}} to node 192.168.2.99:9094 (id: 2 rack: null)
16:05:59.703 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Built incremental fetch (sessionId=1123350265, epoch=2) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
16:05:59.703 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(testkkk-0)) to broker 192.168.2.99:9094 (id: 2 rack: null)
16:06:00.209 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Node 2 sent an incremental fetch response for session 1123350265 with 0 response partition(s), 1 implied partition(s)
16:06:00.209 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Added READ_UNCOMMITTED fetch request for partition testkkk-0 at position FetchPosition{offset=7, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=192.168.2.99:9094 (id: 2 rack: null), epoch=0}} to node 192.168.2.99:9094 (id: 2 rack: null)
16:06:00.209 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Built incremental fetch (sessionId=1123350265, epoch=3) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
16:06:00.209 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(testkkk-0)) to broker 192.168.2.99:9094 (id: 2 rack: null)
Disconnected from the target VM, address: '127.0.0.1:0', transport: 'socket'

提问说明

  1. 描述你的问题
    本地kafka消费者代码获取windows虚拟机上的kafka集群消息时,获取不到却也不报错,只是一直在循环,请问是什么原因?
  2. 描述你的版本、环境、场景等上下文信息
    kafka版本:kafka_2.12-2.5.0
    zookeeper版本:zookeeper-3.6.3

windows虚拟机上的kafka集群大体上是按照这篇博客弄出来的:https://blog.csdn.net/weixin_38040473/article/details/106716439,在虚拟机kafka客户端上是可以正常消费,本地代码也可以正常生产消息
kafka配置文件server.properties大概只改了这几个属性,其他的没有更改:
broker.id=0
listeners=PLAINTEXT://192.168.2.99:9092
port=9092
host.name=192.168.2.99
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

  1. 贴上相关代码(请勿用图片代替代码)
    ``` public class MyKafkaFilterConsumer {
    public static void main(String[] args){
     // 1.使用Properties定义配置属性
     Properties properties = new Properties();
     // 设置消费者Broker服务器的连接地址
     properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.2.99:9092,192.168.2.99:9093,192.168.2.99:9094");
     // 设置key反序列化的程序,与生成者对应
     properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     // 设置value反序列化的程序,与生产者对应
     properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
     // 设置消费者组ID,即组名称,值可自定义,组名称相同的消费者进程属于同一个消费者组
     properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"583-con");
     // 2, 定义消费者对象
     Consumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
     // 3 设置消费者读取的主题名称,可以设置多个
     consumer.subscribe(Arrays.asList("testkkk"));
     while(true){
         // 拉取消息,并设置超时时间为10秒
         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
         for(ConsumerRecord<String,String> record : records){
             // 打印消息关键消息
            // System.out.println("kafka-filter-key: "+record.key()+", kafka-filter-value: "+record.value()+", kafka-filter-partition: "+record.partition()+",kafka-filter-offset: "+record.offset());
             System.out.println("topic = " + record.topic() + " offset = " + record.offset() + " value = " + record.value());
         }
         consumer.commitAsync();
     }
    
    }
    }
    代码块
    ```
  2. 贴上报错信息
    "D:\javaPro\idea\IntelliJ IDEA 2019.3\jbr\bin\java.exe" -agentlib:jdwp=transport=dt_socket,address=127.0.0.1:60504,suspend=y,server=n -Dfile.encoding=UTF-8 -classpath "D:\javaPro\ideaPro\新建文件夹\kkT\target\classes;C:\Users\微凉.m2\repository\org\springframework\boot\spring-boot-starter-web\2.4.0\spring-boot-starter-web-2.4.0.jar;C:\Users\微凉.m2\repository\org\springframework\boot\spring-boot-starter\2.4.0\spring-boot-starter-2.4.0.jar;C:\Users\微凉.m2\repository\org\springframework\boot\spring-boot\2.4.0\spring-boot-2.4.0.jar;C:\Users\微凉.m2\repository\org\springframework\boot\spring-boot-autoconfigure\2.4.0\spring-boot-autoconfigure-2.4.0.jar;C:\Users\微凉.m2\repository\org\springframework\boot\spring-boot-starter-logging\2.4.0\spring-boot-starter-logging-2.4.0.jar;C:\Users\微凉.m2\repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;C:\Users\微凉.m2\repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;C:\Users\微凉.m2\repository\org\apache\logging\log4j\log4j-to-slf4j\2.13.3\log4j-to-slf4j-2.13.3.jar;C:\Users\微凉.m2\repository\org\apache\logging\log4j\log4j-api\2.13.3\log4j-api-2.13.3.jar;C:\Users\微凉.m2\repository\org\slf4j\jul-to-slf4j\1.7.30\jul-to-slf4j-1.7.30.jar;C:\Users\微凉.m2\repository\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;C:\Users\微凉.m2\repository\org\yaml\snakeyaml\1.27\snakeyaml-1.27.jar;C:\Users\微凉.m2\repository\org\springframework\boot\spring-boot-starter-json\2.4.0\spring-boot-starter-json-2.4.0.jar;C:\Users\微凉.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.11.3\jackson-databind-2.11.3.jar;C:\Users\微凉.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.11.3\jackson-annotations-2.11.3.jar;C:\Users\微凉.m2\repository\com\fasterxml\jackson\core\jackson-core\2.11.3\jackson-core-2.11.3.jar;C:\Users\微凉.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.11.3\jackson-datatype-jdk8-2.11.3.jar;C:\Users\微凉.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.11.3\jackson-datatype-jsr310-2.11.3.jar;C:\Users\微凉.m2\repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.11.3\jackson-module-parameter-names-2.11.3.jar;C:\Users\微凉.m2\repository\org\springframework\boot\spring-boot-starter-tomcat\2.4.0\spring-boot-starter-tomcat-2.4.0.jar;C:\Users\微凉.m2\repository\org\apache\tomcat\embed\tomcat-embed-core\9.0.39\tomcat-embed-core-9.0.39.jar;C:\Users\微凉.m2\repository\org\glassfish\jakarta.el\3.0.3\jakarta.el-3.0.3.jar;C:\Users\微凉.m2\repository\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.39\tomcat-embed-websocket-9.0.39.jar;C:\Users\微凉.m2\repository\org\springframework\spring-web\5.3.1\spring-web-5.3.1.jar;C:\Users\微凉.m2\repository\org\springframework\spring-beans\5.3.1\spring-beans-5.3.1.jar;C:\Users\微凉.m2\repository\org\springframework\spring-webmvc\5.3.1\spring-webmvc-5.3.1.jar;C:\Users\微凉.m2\repository\org\springframework\spring-aop\5.3.1\spring-aop-5.3.1.jar;C:\Users\微凉.m2\repository\org\springframework\spring-context\5.3.1\spring-context-5.3.1.jar;C:\Users\微凉.m2\repository\org\springframework\spring-expression\5.3.1\spring-expression-5.3.1.jar;C:\Users\微凉.m2\repository\org\springframework\spring-core\5.3.1\spring-core-5.3.1.jar;C:\Users\微凉.m2\repository\org\springframework\spring-jcl\5.3.1\spring-jcl-5.3.1.jar;C:\Users\微凉.m2\repository\org\apache\kafka\kafka-clients\2.4.1\kafka-clients-2.4.1.jar;C:\Users\微凉.m2\repository\com\github\luben\zstd-jni\1.4.3-1\zstd-jni-1.4.3-1.jar;C:\Users\微凉.m2\repository\org\lz4\lz4-java\1.6.0\lz4-java-1.6.0.jar;C:\Users\微凉.m2\repository\org\xerial\snappy\snappy-java\1.1.7.3\snappy-java-1.1.7.3.jar;C:\Users\微凉.m2\repository\org\slf4j\slf4j-api\1.7.30\slf4j-api-1.7.30.jar;D:\javaPro\idea\IntelliJ IDEA 2019.3\lib\idea_rt.jar" cn.xp.consumer.MyKafkaFilterConsumer
    Connected to the target VM, address: '127.0.0.1:0', transport: 'socket'
    16:05:57.174 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [192.168.2.99:9092, 192.168.2.99:9093, 192.168.2.99:9094]
    check.crcs = true
    client.dns.lookup = default
    client.id =
    client.rack =
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = 583-con
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

16:05:57.179 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initializing the Kafka consumer
16:05:57.861 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.4.1
16:05:57.862 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: c57222ae8cd7866b
16:05:57.862 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1620461157856
16:05:57.864 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-583-con-1, groupId=583-con] Kafka consumer initialized
16:05:57.865 [main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-583-con-1, groupId=583-con] Subscribed to topic(s): testkkk
16:05:57.866 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending FindCoordinator request to broker 192.168.2.99:9093 (id: -2 rack: null)
16:05:57.996 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initiating connection to node 192.168.2.99:9093 (id: -2 rack: null) using address /192.168.2.99
16:05:58.004 [main] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-583-con-1, groupId=583-con] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -2
16:05:58.004 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Completed connection to node -2. Fetching API versions.
16:05:58.004 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initiating API versions fetch from node -2.
16:05:58.126 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Recorded API versions for node -2: (Produce(0): 0 to 8 [usable: 8], Fetch(1): 0 to 11 [usable: 11], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 9 [usable: 9], LeaderAndIsr(4): 0 to 4 [usable: 4], StopReplica(5): 0 to 2 [usable: 2], UpdateMetadata(6): 0 to 6 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 7 [usable: 6], FindCoordinator(10): 0 to 3 [usable: 3], JoinGroup(11): 0 to 7 [usable: 6], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 4], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 3 [usable: 3], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 5 [usable: 5], DeleteTopics(20): 0 to 4 [usable: 4], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 3 [usable: 2], OffsetForLeaderEpoch(23): 0 to 3 [usable: 3], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 2], DescribeAcls(29): 0 to 2 [usable: 1], CreateAcls(30): 0 to 2 [usable: 1], DeleteAcls(31): 0 to 2 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 2 [usable: 1], CreatePartitions(37): 0 to 2 [usable: 1], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 1], ExpireDelegationToken(40): 0 to 2 [usable: 1], DescribeDelegationToken(41): 0 to 2 [usable: 1], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0])
16:05:58.128 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name='testkkk')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node 192.168.2.99:9093 (id: -2 rack: null)
16:05:58.192 [main] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-583-con-1, groupId=583-con] Updating last seen epoch from null to 0 for partition testkkk-0
16:05:58.194 [main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-583-con-1, groupId=583-con] Cluster ID: w_bDBUtGR4-0LpMJ3e4RvA
16:05:58.194 [main] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-583-con-1, groupId=583-con] Updated cluster metadata updateVersion 2 to MetadataCache{clusterId='w_bDBUtGR4-0LpMJ3e4RvA', nodes=[192.168.2.99:9092 (id: 0 rack: null), 192.168.2.99:9093 (id: 1 rack: null), 192.168.2.99:9094 (id: 2 rack: null)], partitions=[PartitionInfoAndEpoch{partitionInfo=Partition(topic = testkkk, partition = 0, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), epoch=0}], controller=192.168.2.99:9092 (id: 0 rack: null)}
16:05:58.195 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Received FindCoordinator response ClientResponse(receivedTimeMs=1620461158194, latencyMs=202, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-583-con-1, correlationId=0), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=0, host='192.168.2.99', port=9092))
16:05:58.195 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Discovered group coordinator 192.168.2.99:9092 (id: 2147483647 rack: null)
16:05:58.195 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initiating connection to node 192.168.2.99:9092 (id: 2147483647 rack: null) using address /192.168.2.99
16:05:58.197 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Executing onJoinPrepare with generation -1 and memberId
16:05:58.197 [kafka-coordinator-heartbeat-thread | 583-con] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Heartbeat thread started
16:05:58.197 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Disabling heartbeat thread
16:05:58.197 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] (Re-)joining group
16:05:58.198 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Joining group with current subscription: [testkkk]
16:05:58.199 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending JoinGroup (JoinGroupRequestData(groupId='583-con', sessionTimeoutMs=10000, rebalanceTimeoutMs=300000, memberId='', groupInstanceId=null, protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='range', metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 101, 115, 116, 107, 107, 107, -1, -1, -1, -1, 0, 0, 0, 0])])) to coordinator 192.168.2.99:9092 (id: 2147483647 rack: null)
16:05:58.201 [main] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-583-con-1, groupId=583-con] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2147483647
16:05:58.201 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Completed connection to node 2147483647. Fetching API versions.
16:05:58.201 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initiating API versions fetch from node 2147483647.
16:05:58.205 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Recorded API versions for node 2147483647: (Produce(0): 0 to 8 [usable: 8], Fetch(1): 0 to 11 [usable: 11], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 9 [usable: 9], LeaderAndIsr(4): 0 to 4 [usable: 4], StopReplica(5): 0 to 2 [usable: 2], UpdateMetadata(6): 0 to 6 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 7 [usable: 6], FindCoordinator(10): 0 to 3 [usable: 3], JoinGroup(11): 0 to 7 [usable: 6], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 4], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 3 [usable: 3], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 5 [usable: 5], DeleteTopics(20): 0 to 4 [usable: 4], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 3 [usable: 2], OffsetForLeaderEpoch(23): 0 to 3 [usable: 3], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 2], DescribeAcls(29): 0 to 2 [usable: 1], CreateAcls(30): 0 to 2 [usable: 1], DeleteAcls(31): 0 to 2 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 2 [usable: 1], CreatePartitions(37): 0 to 2 [usable: 1], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 1], ExpireDelegationToken(40): 0 to 2 [usable: 1], DescribeDelegationToken(41): 0 to 2 [usable: 1], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0])
16:05:58.209 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Disabling heartbeat thread
16:05:58.209 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] (Re-)joining group
16:05:58.209 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Joining group with current subscription: [testkkk]
16:05:58.210 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending JoinGroup (JoinGroupRequestData(groupId='583-con', sessionTimeoutMs=10000, rebalanceTimeoutMs=300000, memberId='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', groupInstanceId=null, protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='range', metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 101, 115, 116, 107, 107, 107, -1, -1, -1, -1, 0, 0, 0, 0])])) to coordinator 192.168.2.99:9092 (id: 2147483647 rack: null)
16:05:58.222 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Received successful JoinGroup response: JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=1, protocolName='range', leader='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', memberId='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', members=[JoinGroupResponseMember(memberId='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', groupInstanceId=null, metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 101, 115, 116, 107, 107, 107, -1, -1, -1, -1, 0, 0, 0, 0])])
16:05:58.222 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Performing assignment using strategy range with subscriptions {consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Subscription@18bc345}
16:05:58.224 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Finished assignment for group at generation 1: {consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2=Assignment(partitions=[testkkk-0])}
16:05:58.225 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending leader SyncGroup to coordinator 192.168.2.99:9092 (id: 2147483647 rack: null) at generation Generation{generationId=1, memberId='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', protocol='range'}: SyncGroupRequestData(groupId='583-con', generationId=1, memberId='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', groupInstanceId=null, assignments=[SyncGroupRequestAssignment(memberId='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', assignment=[0, 1, 0, 0, 0, 1, 0, 7, 116, 101, 115, 116, 107, 107, 107, 0, 0, 0, 1, 0, 0, 0, 0, -1, -1, -1, -1])])
16:05:58.238 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Successfully joined group with generation 1
16:05:58.238 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Enabling heartbeat thread
16:05:58.239 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Executing onJoinComplete with generation 1 and memberId consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2
16:05:58.241 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Adding newly assigned partitions: testkkk-0
16:05:58.247 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Fetching committed offsets for partitions: [testkkk-0]
16:05:58.251 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Found no committed offset for partition testkkk-0
16:05:58.253 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={testkkk-0={timestamp: -1, maxNumOffsets: 1, currentLeaderEpoch: Optional[0]}}, isolationLevel=READ_UNCOMMITTED) to broker 192.168.2.99:9094 (id: 2 rack: null)
16:05:58.256 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initiating connection to node 192.168.2.99:9094 (id: 2 rack: null) using address /192.168.2.99
16:05:58.258 [main] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-583-con-1, groupId=583-con] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2
16:05:58.258 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Completed connection to node 2. Fetching API versions.
16:05:58.258 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initiating API versions fetch from node 2.
16:05:58.324 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Recorded API versions for node 2: (Produce(0): 0 to 8 [usable: 8], Fetch(1): 0 to 11 [usable: 11], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 9 [usable: 9], LeaderAndIsr(4): 0 to 4 [usable: 4], StopReplica(5): 0 to 2 [usable: 2], UpdateMetadata(6): 0 to 6 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 7 [usable: 6], FindCoordinator(10): 0 to 3 [usable: 3], JoinGroup(11): 0 to 7 [usable: 6], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 4], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 3 [usable: 3], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 5 [usable: 5], DeleteTopics(20): 0 to 4 [usable: 4], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 3 [usable: 2], OffsetForLeaderEpoch(23): 0 to 3 [usable: 3], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 2], DescribeAcls(29): 0 to 2 [usable: 1], CreateAcls(30): 0 to 2 [usable: 1], DeleteAcls(31): 0 to 2 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 2 [usable: 1], CreatePartitions(37): 0 to 2 [usable: 1], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 1], ExpireDelegationToken(40): 0 to 2 [usable: 1], DescribeDelegationToken(41): 0 to 2 [usable: 1], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0])
16:05:58.330 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Handling ListOffsetResponse response for testkkk-0. Fetched offset 7, timestamp -1
16:05:58.331 [main] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-583-con-1, groupId=583-con] Not replacing existing epoch 0 with new epoch 0 for partition testkkk-0
16:05:58.331 [main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-583-con-1, groupId=583-con] Resetting offset for partition testkkk-0 to offset 7.
16:05:58.334 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Added READ_UNCOMMITTED fetch request for partition testkkk-0 at position FetchPosition{offset=7, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=192.168.2.99:9094 (id: 2 rack: null), epoch=0}} to node 192.168.2.99:9094 (id: 2 rack: null)
16:05:58.334 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Built full fetch (sessionId=INVALID, epoch=INITIAL) for node 2 with 1 partition(s).
16:05:58.335 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending READ_UNCOMMITTED FullFetchRequest(testkkk-0) to broker 192.168.2.99:9094 (id: 2 rack: null)
16:05:58.848 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Node 2 sent a full fetch response that created a new incremental fetch session 1123350265 with 1 response partition(s)
16:05:58.849 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Fetch READ_UNCOMMITTED at offset 7 for partition testkkk-0 returned fetch data (error=NONE, highWaterMark=7, lastStableOffset = 7, logStartOffset = 0, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=0)
16:05:58.851 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Added READ_UNCOMMITTED fetch request for partition testkkk-0 at position FetchPosition{offset=7, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=192.168.2.99:9094 (id: 2 rack: null), epoch=0}} to node 192.168.2.99:9094 (id: 2 rack: null)
16:05:58.852 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Built incremental fetch (sessionId=1123350265, epoch=1) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
16:05:58.852 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(testkkk-0)) to broker 192.168.2.99:9094 (id: 2 rack: null)
16:05:59.703 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Node 2 sent an incremental fetch response for session 1123350265 with 0 response partition(s), 1 implied partition(s)
16:05:59.703 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Added READ_UNCOMMITTED fetch request for partition testkkk-0 at position FetchPosition{offset=7, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=192.168.2.99:9094 (id: 2 rack: null), epoch=0}} to node 192.168.2.99:9094 (id: 2 rack: null)
16:05:59.703 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Built incremental fetch (sessionId=1123350265, epoch=2) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
16:05:59.703 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(testkkk-0)) to broker 192.168.2.99:9094 (id: 2 rack: null)
16:06:00.209 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Node 2 sent an incremental fetch response for session 1123350265 with 0 response partition(s), 1 implied partition(s)
16:06:00.209 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Added READ_UNCOMMITTED fetch request for partition testkkk-0 at position FetchPosition{offset=7, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=192.168.2.99:9094 (id: 2 rack: null), epoch=0}} to node 192.168.2.99:9094 (id: 2 rack: null)
16:06:00.209 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Built incremental fetch (sessionId=1123350265, epoch=3) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
16:06:00.209 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(testkkk-0)) to broker 192.168.2.99:9094 (id: 2 rack: null)
Disconnected from the target VM, address: '127.0.0.1:0', transport: 'socket'

  1. 已经尝试过哪些方法仍然没解决,操作步骤等
  2. 本网编辑器采用markdown风格,请提问的同学注意排版

记录

编辑于 4年前

本地kafka消费者代码获取windows虚拟机上的kafka集群消息时,获取不到却也不报错

kafka

提问说明

  1. 描述你的问题
    本地kafka消费者代码获取windows虚拟机上的kafka集群消息时,获取不到却也不报错,只是一直在循环,请问是什么原因?
  2. 描述你的版本、环境、场景等上下文信息
    kafka版本:kafka_2.12-2.5.0
    zookeeper版本:zookeeper-3.6.3

windows虚拟机上的kafka集群大体上是按照这篇博客弄出来的:https://blog.csdn.net/weixin_38040473/article/details/106716439,在虚拟机kafka客户端上是可以正常消费,本地代码也可以正常生产消息
kafka配置文件server.properties大概只改了这几个属性,其他的没有更改:
broker.id=0
listeners=PLAINTEXT://192.168.2.99:9092
port=9092
host.name=192.168.2.99
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

  1. 贴上相关代码(请勿用图片代替代码)
    ``` public class MyKafkaFilterConsumer {
    public static void main(String[] args){
     // 1.使用Properties定义配置属性
     Properties properties = new Properties();
     // 设置消费者Broker服务器的连接地址
     properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.2.99:9092,192.168.2.99:9093,192.168.2.99:9094");
     // 设置key反序列化的程序,与生成者对应
     properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     // 设置value反序列化的程序,与生产者对应
     properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
     // 设置消费者组ID,即组名称,值可自定义,组名称相同的消费者进程属于同一个消费者组
     properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"583-con");
     // 2, 定义消费者对象
     Consumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
     // 3 设置消费者读取的主题名称,可以设置多个
     consumer.subscribe(Arrays.asList("testkkk"));
     while(true){
         // 拉取消息,并设置超时时间为10秒
         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
         for(ConsumerRecord<String,String> record : records){
             // 打印消息关键消息
            // System.out.println("kafka-filter-key: "+record.key()+", kafka-filter-value: "+record.value()+", kafka-filter-partition: "+record.partition()+",kafka-filter-offset: "+record.offset());
             System.out.println("topic = " + record.topic() + " offset = " + record.offset() + " value = " + record.value());
         }
         consumer.commitAsync();
     }
    
    }
    }
    代码块
    ```
  2. 贴上报错信息
    "D:\javaPro\idea\IntelliJ IDEA 2019.3\jbr\bin\java.exe" -agentlib:jdwp=transport=dt_socket,address=127.0.0.1:60504,suspend=y,server=n -Dfile.encoding=UTF-8 -classpath "D:\javaPro\ideaPro\新建文件夹\kkT\target\classes;C:\Users\微凉.m2\repository\org\springframework\boot\spring-boot-starter-web\2.4.0\spring-boot-starter-web-2.4.0.jar;C:\Users\微凉.m2\repository\org\springframework\boot\spring-boot-starter\2.4.0\spring-boot-starter-2.4.0.jar;C:\Users\微凉.m2\repository\org\springframework\boot\spring-boot\2.4.0\spring-boot-2.4.0.jar;C:\Users\微凉.m2\repository\org\springframework\boot\spring-boot-autoconfigure\2.4.0\spring-boot-autoconfigure-2.4.0.jar;C:\Users\微凉.m2\repository\org\springframework\boot\spring-boot-starter-logging\2.4.0\spring-boot-starter-logging-2.4.0.jar;C:\Users\微凉.m2\repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;C:\Users\微凉.m2\repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;C:\Users\微凉.m2\repository\org\apache\logging\log4j\log4j-to-slf4j\2.13.3\log4j-to-slf4j-2.13.3.jar;C:\Users\微凉.m2\repository\org\apache\logging\log4j\log4j-api\2.13.3\log4j-api-2.13.3.jar;C:\Users\微凉.m2\repository\org\slf4j\jul-to-slf4j\1.7.30\jul-to-slf4j-1.7.30.jar;C:\Users\微凉.m2\repository\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;C:\Users\微凉.m2\repository\org\yaml\snakeyaml\1.27\snakeyaml-1.27.jar;C:\Users\微凉.m2\repository\org\springframework\boot\spring-boot-starter-json\2.4.0\spring-boot-starter-json-2.4.0.jar;C:\Users\微凉.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.11.3\jackson-databind-2.11.3.jar;C:\Users\微凉.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.11.3\jackson-annotations-2.11.3.jar;C:\Users\微凉.m2\repository\com\fasterxml\jackson\core\jackson-core\2.11.3\jackson-core-2.11.3.jar;C:\Users\微凉.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.11.3\jackson-datatype-jdk8-2.11.3.jar;C:\Users\微凉.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.11.3\jackson-datatype-jsr310-2.11.3.jar;C:\Users\微凉.m2\repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.11.3\jackson-module-parameter-names-2.11.3.jar;C:\Users\微凉.m2\repository\org\springframework\boot\spring-boot-starter-tomcat\2.4.0\spring-boot-starter-tomcat-2.4.0.jar;C:\Users\微凉.m2\repository\org\apache\tomcat\embed\tomcat-embed-core\9.0.39\tomcat-embed-core-9.0.39.jar;C:\Users\微凉.m2\repository\org\glassfish\jakarta.el\3.0.3\jakarta.el-3.0.3.jar;C:\Users\微凉.m2\repository\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.39\tomcat-embed-websocket-9.0.39.jar;C:\Users\微凉.m2\repository\org\springframework\spring-web\5.3.1\spring-web-5.3.1.jar;C:\Users\微凉.m2\repository\org\springframework\spring-beans\5.3.1\spring-beans-5.3.1.jar;C:\Users\微凉.m2\repository\org\springframework\spring-webmvc\5.3.1\spring-webmvc-5.3.1.jar;C:\Users\微凉.m2\repository\org\springframework\spring-aop\5.3.1\spring-aop-5.3.1.jar;C:\Users\微凉.m2\repository\org\springframework\spring-context\5.3.1\spring-context-5.3.1.jar;C:\Users\微凉.m2\repository\org\springframework\spring-expression\5.3.1\spring-expression-5.3.1.jar;C:\Users\微凉.m2\repository\org\springframework\spring-core\5.3.1\spring-core-5.3.1.jar;C:\Users\微凉.m2\repository\org\springframework\spring-jcl\5.3.1\spring-jcl-5.3.1.jar;C:\Users\微凉.m2\repository\org\apache\kafka\kafka-clients\2.4.1\kafka-clients-2.4.1.jar;C:\Users\微凉.m2\repository\com\github\luben\zstd-jni\1.4.3-1\zstd-jni-1.4.3-1.jar;C:\Users\微凉.m2\repository\org\lz4\lz4-java\1.6.0\lz4-java-1.6.0.jar;C:\Users\微凉.m2\repository\org\xerial\snappy\snappy-java\1.1.7.3\snappy-java-1.1.7.3.jar;C:\Users\微凉.m2\repository\org\slf4j\slf4j-api\1.7.30\slf4j-api-1.7.30.jar;D:\javaPro\idea\IntelliJ IDEA 2019.3\lib\idea_rt.jar" cn.xp.consumer.MyKafkaFilterConsumer
    Connected to the target VM, address: '127.0.0.1:0', transport: 'socket'
    16:05:57.174 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [192.168.2.99:9092, 192.168.2.99:9093, 192.168.2.99:9094]
    check.crcs = true
    client.dns.lookup = default
    client.id =
    client.rack =
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = 583-con
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

16:05:57.179 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initializing the Kafka consumer
16:05:57.861 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.4.1
16:05:57.862 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: c57222ae8cd7866b
16:05:57.862 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1620461157856
16:05:57.864 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-583-con-1, groupId=583-con] Kafka consumer initialized
16:05:57.865 [main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-583-con-1, groupId=583-con] Subscribed to topic(s): testkkk
16:05:57.866 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending FindCoordinator request to broker 192.168.2.99:9093 (id: -2 rack: null)
16:05:57.996 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initiating connection to node 192.168.2.99:9093 (id: -2 rack: null) using address /192.168.2.99
16:05:58.004 [main] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-583-con-1, groupId=583-con] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -2
16:05:58.004 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Completed connection to node -2. Fetching API versions.
16:05:58.004 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initiating API versions fetch from node -2.
16:05:58.126 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Recorded API versions for node -2: (Produce(0): 0 to 8 [usable: 8], Fetch(1): 0 to 11 [usable: 11], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 9 [usable: 9], LeaderAndIsr(4): 0 to 4 [usable: 4], StopReplica(5): 0 to 2 [usable: 2], UpdateMetadata(6): 0 to 6 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 7 [usable: 6], FindCoordinator(10): 0 to 3 [usable: 3], JoinGroup(11): 0 to 7 [usable: 6], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 4], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 3 [usable: 3], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 5 [usable: 5], DeleteTopics(20): 0 to 4 [usable: 4], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 3 [usable: 2], OffsetForLeaderEpoch(23): 0 to 3 [usable: 3], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 2], DescribeAcls(29): 0 to 2 [usable: 1], CreateAcls(30): 0 to 2 [usable: 1], DeleteAcls(31): 0 to 2 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 2 [usable: 1], CreatePartitions(37): 0 to 2 [usable: 1], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 1], ExpireDelegationToken(40): 0 to 2 [usable: 1], DescribeDelegationToken(41): 0 to 2 [usable: 1], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0])
16:05:58.128 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name='testkkk')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node 192.168.2.99:9093 (id: -2 rack: null)
16:05:58.192 [main] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-583-con-1, groupId=583-con] Updating last seen epoch from null to 0 for partition testkkk-0
16:05:58.194 [main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-583-con-1, groupId=583-con] Cluster ID: w_bDBUtGR4-0LpMJ3e4RvA
16:05:58.194 [main] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-583-con-1, groupId=583-con] Updated cluster metadata updateVersion 2 to MetadataCache{clusterId='w_bDBUtGR4-0LpMJ3e4RvA', nodes=[192.168.2.99:9092 (id: 0 rack: null), 192.168.2.99:9093 (id: 1 rack: null), 192.168.2.99:9094 (id: 2 rack: null)], partitions=[PartitionInfoAndEpoch{partitionInfo=Partition(topic = testkkk, partition = 0, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), epoch=0}], controller=192.168.2.99:9092 (id: 0 rack: null)}
16:05:58.195 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Received FindCoordinator response ClientResponse(receivedTimeMs=1620461158194, latencyMs=202, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-583-con-1, correlationId=0), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=0, host='192.168.2.99', port=9092))
16:05:58.195 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Discovered group coordinator 192.168.2.99:9092 (id: 2147483647 rack: null)
16:05:58.195 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initiating connection to node 192.168.2.99:9092 (id: 2147483647 rack: null) using address /192.168.2.99
16:05:58.197 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Executing onJoinPrepare with generation -1 and memberId
16:05:58.197 [kafka-coordinator-heartbeat-thread | 583-con] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Heartbeat thread started
16:05:58.197 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Disabling heartbeat thread
16:05:58.197 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] (Re-)joining group
16:05:58.198 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Joining group with current subscription: [testkkk]
16:05:58.199 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending JoinGroup (JoinGroupRequestData(groupId='583-con', sessionTimeoutMs=10000, rebalanceTimeoutMs=300000, memberId='', groupInstanceId=null, protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='range', metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 101, 115, 116, 107, 107, 107, -1, -1, -1, -1, 0, 0, 0, 0])])) to coordinator 192.168.2.99:9092 (id: 2147483647 rack: null)
16:05:58.201 [main] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-583-con-1, groupId=583-con] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2147483647
16:05:58.201 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Completed connection to node 2147483647. Fetching API versions.
16:05:58.201 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initiating API versions fetch from node 2147483647.
16:05:58.205 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Recorded API versions for node 2147483647: (Produce(0): 0 to 8 [usable: 8], Fetch(1): 0 to 11 [usable: 11], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 9 [usable: 9], LeaderAndIsr(4): 0 to 4 [usable: 4], StopReplica(5): 0 to 2 [usable: 2], UpdateMetadata(6): 0 to 6 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 7 [usable: 6], FindCoordinator(10): 0 to 3 [usable: 3], JoinGroup(11): 0 to 7 [usable: 6], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 4], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 3 [usable: 3], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 5 [usable: 5], DeleteTopics(20): 0 to 4 [usable: 4], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 3 [usable: 2], OffsetForLeaderEpoch(23): 0 to 3 [usable: 3], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 2], DescribeAcls(29): 0 to 2 [usable: 1], CreateAcls(30): 0 to 2 [usable: 1], DeleteAcls(31): 0 to 2 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 2 [usable: 1], CreatePartitions(37): 0 to 2 [usable: 1], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 1], ExpireDelegationToken(40): 0 to 2 [usable: 1], DescribeDelegationToken(41): 0 to 2 [usable: 1], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0])
16:05:58.209 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Disabling heartbeat thread
16:05:58.209 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] (Re-)joining group
16:05:58.209 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Joining group with current subscription: [testkkk]
16:05:58.210 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending JoinGroup (JoinGroupRequestData(groupId='583-con', sessionTimeoutMs=10000, rebalanceTimeoutMs=300000, memberId='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', groupInstanceId=null, protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='range', metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 101, 115, 116, 107, 107, 107, -1, -1, -1, -1, 0, 0, 0, 0])])) to coordinator 192.168.2.99:9092 (id: 2147483647 rack: null)
16:05:58.222 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Received successful JoinGroup response: JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=1, protocolName='range', leader='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', memberId='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', members=[JoinGroupResponseMember(memberId='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', groupInstanceId=null, metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 101, 115, 116, 107, 107, 107, -1, -1, -1, -1, 0, 0, 0, 0])])
16:05:58.222 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Performing assignment using strategy range with subscriptions {consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Subscription@18bc345}
16:05:58.224 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Finished assignment for group at generation 1: {consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2=Assignment(partitions=[testkkk-0])}
16:05:58.225 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending leader SyncGroup to coordinator 192.168.2.99:9092 (id: 2147483647 rack: null) at generation Generation{generationId=1, memberId='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', protocol='range'}: SyncGroupRequestData(groupId='583-con', generationId=1, memberId='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', groupInstanceId=null, assignments=[SyncGroupRequestAssignment(memberId='consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2', assignment=[0, 1, 0, 0, 0, 1, 0, 7, 116, 101, 115, 116, 107, 107, 107, 0, 0, 0, 1, 0, 0, 0, 0, -1, -1, -1, -1])])
16:05:58.238 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Successfully joined group with generation 1
16:05:58.238 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Enabling heartbeat thread
16:05:58.239 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Executing onJoinComplete with generation 1 and memberId consumer-583-con-1-d6828726-12a3-444d-8f25-f9319052eef2
16:05:58.241 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Adding newly assigned partitions: testkkk-0
16:05:58.247 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Fetching committed offsets for partitions: [testkkk-0]
16:05:58.251 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-583-con-1, groupId=583-con] Found no committed offset for partition testkkk-0
16:05:58.253 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={testkkk-0={timestamp: -1, maxNumOffsets: 1, currentLeaderEpoch: Optional[0]}}, isolationLevel=READ_UNCOMMITTED) to broker 192.168.2.99:9094 (id: 2 rack: null)
16:05:58.256 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initiating connection to node 192.168.2.99:9094 (id: 2 rack: null) using address /192.168.2.99
16:05:58.258 [main] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-583-con-1, groupId=583-con] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2
16:05:58.258 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Completed connection to node 2. Fetching API versions.
16:05:58.258 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Initiating API versions fetch from node 2.
16:05:58.324 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-583-con-1, groupId=583-con] Recorded API versions for node 2: (Produce(0): 0 to 8 [usable: 8], Fetch(1): 0 to 11 [usable: 11], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 9 [usable: 9], LeaderAndIsr(4): 0 to 4 [usable: 4], StopReplica(5): 0 to 2 [usable: 2], UpdateMetadata(6): 0 to 6 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 7 [usable: 6], FindCoordinator(10): 0 to 3 [usable: 3], JoinGroup(11): 0 to 7 [usable: 6], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 4], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 3 [usable: 3], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 5 [usable: 5], DeleteTopics(20): 0 to 4 [usable: 4], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 3 [usable: 2], OffsetForLeaderEpoch(23): 0 to 3 [usable: 3], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 2], DescribeAcls(29): 0 to 2 [usable: 1], CreateAcls(30): 0 to 2 [usable: 1], DeleteAcls(31): 0 to 2 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 2 [usable: 1], CreatePartitions(37): 0 to 2 [usable: 1], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 1], ExpireDelegationToken(40): 0 to 2 [usable: 1], DescribeDelegationToken(41): 0 to 2 [usable: 1], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0])
16:05:58.330 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Handling ListOffsetResponse response for testkkk-0. Fetched offset 7, timestamp -1
16:05:58.331 [main] DEBUG org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-583-con-1, groupId=583-con] Not replacing existing epoch 0 with new epoch 0 for partition testkkk-0
16:05:58.331 [main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-583-con-1, groupId=583-con] Resetting offset for partition testkkk-0 to offset 7.
16:05:58.334 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Added READ_UNCOMMITTED fetch request for partition testkkk-0 at position FetchPosition{offset=7, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=192.168.2.99:9094 (id: 2 rack: null), epoch=0}} to node 192.168.2.99:9094 (id: 2 rack: null)
16:05:58.334 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Built full fetch (sessionId=INVALID, epoch=INITIAL) for node 2 with 1 partition(s).
16:05:58.335 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending READ_UNCOMMITTED FullFetchRequest(testkkk-0) to broker 192.168.2.99:9094 (id: 2 rack: null)
16:05:58.848 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Node 2 sent a full fetch response that created a new incremental fetch session 1123350265 with 1 response partition(s)
16:05:58.849 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Fetch READ_UNCOMMITTED at offset 7 for partition testkkk-0 returned fetch data (error=NONE, highWaterMark=7, lastStableOffset = 7, logStartOffset = 0, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=0)
16:05:58.851 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Added READ_UNCOMMITTED fetch request for partition testkkk-0 at position FetchPosition{offset=7, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=192.168.2.99:9094 (id: 2 rack: null), epoch=0}} to node 192.168.2.99:9094 (id: 2 rack: null)
16:05:58.852 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Built incremental fetch (sessionId=1123350265, epoch=1) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
16:05:58.852 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(testkkk-0)) to broker 192.168.2.99:9094 (id: 2 rack: null)
16:05:59.703 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Node 2 sent an incremental fetch response for session 1123350265 with 0 response partition(s), 1 implied partition(s)
16:05:59.703 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Added READ_UNCOMMITTED fetch request for partition testkkk-0 at position FetchPosition{offset=7, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=192.168.2.99:9094 (id: 2 rack: null), epoch=0}} to node 192.168.2.99:9094 (id: 2 rack: null)
16:05:59.703 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Built incremental fetch (sessionId=1123350265, epoch=2) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
16:05:59.703 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(testkkk-0)) to broker 192.168.2.99:9094 (id: 2 rack: null)
16:06:00.209 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Node 2 sent an incremental fetch response for session 1123350265 with 0 response partition(s), 1 implied partition(s)
16:06:00.209 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Added READ_UNCOMMITTED fetch request for partition testkkk-0 at position FetchPosition{offset=7, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=192.168.2.99:9094 (id: 2 rack: null), epoch=0}} to node 192.168.2.99:9094 (id: 2 rack: null)
16:06:00.209 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-583-con-1, groupId=583-con] Built incremental fetch (sessionId=1123350265, epoch=3) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
16:06:00.209 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-583-con-1, groupId=583-con] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(testkkk-0)) to broker 192.168.2.99:9094 (id: 2 rack: null)
Disconnected from the target VM, address: '127.0.0.1:0', transport: 'socket'

  1. 已经尝试过哪些方法仍然没解决,操作步骤等
  2. 本网编辑器采用markdown风格,请提问的同学注意排版