代码如下:
KafkaConsumer<String, String> consumer;
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "11.223.34.66:19093");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1500");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项:latest, earliest, none
properties.put("auto.offset.reset", "earliest");
// 批量一次最大拉取数据量
properties.put("max.poll.records", "30");
// 心跳
properties.put("heartbeat.interval.ms", "10000");
// 处理逻辑最大时间
properties.put("max.poll.interval.ms", "60000");
// 请求响应的最长等待时间
properties.put("request.timeout.ms", "65000");
// session超时时间
properties.put("session.timeout.ms", "30000");
//security.protocol
properties.put("security.protocol","SASL_PLAINTEXT");
// sasl.mechanism
properties.put("sasl.mechanism", "PLAIN");
String loginInfo = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
+ "kafkatest" + "\" password=\"" + "5svs" + "\";";
// sasl.jaas.config
properties.put("sasl.jaas.config", loginInfo);
// 使用配置初始化 Kafka 消费者
consumer = new KafkaConsumer<>(properties);
try {
// 订阅 Topic
consumer.subscribe(Collections.singletonList("PersonInfo_323d5010195e11e93e80d17d1396110c"));
// 轮询
while (true) {
// 消费消息
ConsumerRecords<String, String> records = consumer.poll(100);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
System.out.printf("kafka lastOffSet:%s\n ", lastOffset);
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭消费者
consumer.close();
}
具体报错日志如下:
[org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.setSaslState(SaslClientAuthenticator.java:209)] - [ Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE ]
2021-04-20 20:44:42 - [DEBUG] - [org.apache.kafka.clients.NetworkClient.handleConnections(NetworkClient.java:724)] - [ Completed connection to node -1. Fetching API versions. ]
2021-04-20 20:44:42 - [TRACE] - [org.apache.kafka.clients.NetworkClient.leastLoadedNode(NetworkClient.java:543)] - [ Found least loaded node 122.225.193.235:19093 (id: -1 rack: null) ]
2021-04-20 20:44:42 - [DEBUG] - [org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.setSaslState(SaslClientAuthenticator.java:209)] - [ Set SASL client state to FAILED ]
2021-04-20 20:44:42 - [WARN] - [org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:403)] - [ Unexpected error from 122.225.193.235/122.225.193.235; closing connection ]
org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are [GSSAPI]
2021-04-20 20:44:42 - [DEBUG] - [org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:704)] - [ Node -1 disconnected. ]
2021-04-20 20:44:42 - [WARN] - [org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:585)] - [ Connection to node -1 terminated during authentication. This may indicate that authentication failed due to invalid credentials. ]
2021-04-20 20:44:42 - [DEBUG] - [org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:482)] - [ Cancelled FIND_COORDINATOR request {api_key=10,api_version=1,correlation_id=0,client_id=consumer-1} with correlation id 0 due to node -1 being disconnected ]
大佬帮忙指点下,哪里有问题?
代码如下:
KafkaConsumer
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "11.223.34.66:19093");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1500");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项:latest, earliest, none
properties.put("auto.offset.reset", "earliest");
// 批量一次最大拉取数据量
properties.put("max.poll.records", "30");
// 心跳
properties.put("heartbeat.interval.ms", "10000");
// 处理逻辑最大时间
properties.put("max.poll.interval.ms", "60000");
// 请求响应的最长等待时间
properties.put("request.timeout.ms", "65000");
// session超时时间
properties.put("session.timeout.ms", "30000");
//security.protocol
properties.put("security.protocol","SASL_PLAINTEXT");
// sasl.mechanism
properties.put("sasl.mechanism", "PLAIN");
String loginInfo = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
+ "kafkatest" + "\" password=\"" + "5svs" + "\";";
// sasl.jaas.config
properties.put("sasl.jaas.config", loginInfo);
// 使用配置初始化 Kafka 消费者
consumer = new KafkaConsumer<>(properties);
try {
// 订阅 Topic
consumer.subscribe(Collections.singletonList("PersonInfo_323d5010195e11e93e80d17d1396110c"));
// 轮询
while (true) {
// 消费消息
ConsumerRecords<String, String> records = consumer.poll(100);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
System.out.printf("kafka lastOffSet:%s\n ", lastOffset);
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭消费者
consumer.close();
}
具体报错日志如下:
[org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.setSaslState(SaslClientAuthenticator.java:209)] - [ Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE ]
2021-04-20 20:44:42 - [DEBUG] - [org.apache.kafka.clients.NetworkClient.handleConnections(NetworkClient.java:724)] - [ Completed connection to node -1. Fetching API versions. ]
2021-04-20 20:44:42 - [TRACE] - [org.apache.kafka.clients.NetworkClient.leastLoadedNode(NetworkClient.java:543)] - [ Found least loaded node 122.225.193.235:19093 (id: -1 rack: null) ]
2021-04-20 20:44:42 - [DEBUG] - [org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.setSaslState(SaslClientAuthenticator.java:209)] - [ Set SASL client state to FAILED ]
2021-04-20 20:44:42 - [WARN] - [org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:403)] - [ Unexpected error from 122.225.193.235/122.225.193.235; closing connection ]
org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are [GSSAPI]
2021-04-20 20:44:42 - [DEBUG] - [org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:704)] - [ Node -1 disconnected. ]
2021-04-20 20:44:42 - [WARN] - [org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:585)] - [ Connection to node -1 terminated during authentication. This may indicate that authentication failed due to invalid credentials. ]
2021-04-20 20:44:42 - [DEBUG] - [org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:482)] - [ Cancelled FIND_COORDINATOR request {api_key=10,api_version=1,correlation_id=0,client_id=consumer-1} with correlation id 0 due to node -1 being disconnected ]
大佬帮忙指点下,哪里有问题?
代码如下:
KafkaConsumer
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "11.223.34.66:19093");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1500");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项:latest, earliest, none
properties.put("auto.offset.reset", "earliest");
// 批量一次最大拉取数据量
properties.put("max.poll.records", "30");
// 心跳
properties.put("heartbeat.interval.ms", "10000");
// 处理逻辑最大时间
properties.put("max.poll.interval.ms", "60000");
// 请求响应的最长等待时间
properties.put("request.timeout.ms", "65000");
// session超时时间
properties.put("session.timeout.ms", "30000");
//security.protocol
properties.put("security.protocol","SASL_PLAINTEXT");
// sasl.mechanism
properties.put("sasl.mechanism", "PLAIN");
String loginInfo = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
+ "kafkatest" + "\" password=\"" + "5svs" + "\";";
// sasl.jaas.config
properties.put("sasl.jaas.config", loginInfo);
// 使用配置初始化 Kafka 消费者
consumer = new KafkaConsumer<>(properties);
try {
// 订阅 Topic
consumer.subscribe(Collections.singletonList("PersonInfo_323d5010195e11e93e80d17d1396110c"));
// 轮询
while (true) {
// 消费消息
ConsumerRecords<String, String> records = consumer.poll(100);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
System.out.printf("kafka lastOffSet:%s\n ", lastOffset);
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭消费者
consumer.close();
}
具体报错日志如下:
[org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.setSaslState(SaslClientAuthenticator.java:209)] - [ Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE ]
2021-04-20 20:44:42 - [DEBUG] - [org.apache.kafka.clients.NetworkClient.handleConnections(NetworkClient.java:724)] - [ Completed connection to node -1. Fetching API versions. ]
2021-04-20 20:44:42 - [TRACE] - [org.apache.kafka.clients.NetworkClient.leastLoadedNode(NetworkClient.java:543)] - [ Found least loaded node 122.225.193.235:19093 (id: -1 rack: null) ]
2021-04-20 20:44:42 - [DEBUG] - [org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.setSaslState(SaslClientAuthenticator.java:209)] - [ Set SASL client state to FAILED ]
2021-04-20 20:44:42 - [WARN] - [org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:403)] - [ Unexpected error from 122.225.193.235/122.225.193.235; closing connection ]
org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are [GSSAPI]
2021-04-20 20:44:42 - [DEBUG] - [org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:704)] - [ Node -1 disconnected. ]
2021-04-20 20:44:42 - [WARN] - [org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:585)] - [ Connection to node -1 terminated during authentication. This may indicate that authentication failed due to invalid credentials. ]
2021-04-20 20:44:42 - [DEBUG] - [org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:482)] - [ Cancelled FIND_COORDINATOR request {api_key=10,api_version=1,correlation_id=0,client_id=consumer-1} with correlation id 0 due to node -1 being disconnected ]
大佬帮忙指点下,哪里有问题?