按照orcHome网上的教程写了消费者程序,但是不能消费消息,但是用Kafka命令能消费消息,以下是我的相关配置。
kafka版本是kafka_2.11-1.0.1
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.202.165:9092,192.168.201.168:9092");//服务器地址
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");//消息者组
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");//是否自动提交偏移量
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");//指定的时间内发送心跳给群组的时间
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,"10000");//送心跳的频率一般设置成session.timeout.ms值的3分之一。
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1048576");//消息者从服务器获取记录的最小字节数。
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");//等到有足够的数据时才返回给消费者,看和fetch.min.bytes参数哪个先满足。
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//从头条开始处理消费
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
consumer = new KafkaConsumer<>(props);
public void run() {
String topic=Global.getConfig("get_topic");
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("tostring="+record.toString()+"|key="+record.key()+"|value="+record.value());
}
consumer.commitSync();
}
}
我用Debug跟踪了一下,发现卡在AbstractCoordinator类中ensureCoordinatorReady方法这块了,老是进行循环,打印log.debug("Coordinator discovery failed, refreshing metadata")
。
protected synchronized boolean ensureCoordinatorReady(long startTimeMs, long timeoutMs) {
long remainingMs = timeoutMs;
while (coordinatorUnknown()) {
RequestFuture<Void> future = lookupCoordinator();
client.poll(future, remainingMs);
if (future.failed()) {
if (future.isRetriable()) {
remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
if (remainingMs <= 0)
break;
*log.debug("Coordinator discovery failed, refreshing metadata");*
client.awaitMetadataUpdate(remainingMs);
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection has failed, so mark
// it dead and backoff before retrying discovery
markCoordinatorUnknown();
time.sleep(retryBackoffMs);
}
remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
if (remainingMs <= 0)
break;
}
return !coordinatorUnknown();
}