按照orcHome网上的教程写了消费者程序,但是不能消费消息,但是用Kafka命令能消费消息,以下是我的相关配置。
kafka版本是kafka_2.11-1.0.1
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.202.165:9092,192.168.201.168:9092");//服务器地址
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");//消息者组
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");//是否自动提交偏移量
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");//指定的时间内发送心跳给群组的时间
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,"10000");//送心跳的频率一般设置成session.timeout.ms值的3分之一。
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1048576");//消息者从服务器获取记录的最小字节数。
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");//等到有足够的数据时才返回给消费者,看和fetch.min.bytes参数哪个先满足。
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//从头条开始处理消费
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
consumer = new KafkaConsumer<>(props);
public void run() {
String topic=Global.getConfig("get_topic");
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("tostring="+record.toString()+"|key="+record.key()+"|value="+record.value());
}
consumer.commitSync();
}
}
我用Debug跟踪了一下,发现卡在AbstractCoordinator类中ensureCoordinatorReady方法这块了,老是进行循环,打印log.debug("Coordinator discovery failed, refreshing metadata")
。
protectedprotected synchronized boolean ensureCoordinatorReady(long startTimeMs, long timeoutMsprotected synchronized boolean ensureCoordinatorReady(long startTimeMs, long timeoutMs) {
longprotected synchronized boolean ensureCoordinatorReady(long startTimeMs, long timeoutMs) {
long remainingMs = timeoutMs;
while (coordinatorUnknown()) {
RequestFuture<Void> future = lookupCoordinator();
client.poll(future, remainingMs);
ifremainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
if (remainingMs <= 0)
break *logbug("Coordinatord, refreshing clientadataUpdate(remainingMs);
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator } else throwxception();
} elseion();
} elseordinator != nullnectionFailed(coordinatorailed(coordinator
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we foundelse throwption();
} else
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed
} else throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailedelse throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we } else throwfuture } else
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection
} else throwception();
} else
} else
throw future.exception();
} else if (coordinator != null && client
} else throw
} else
throw future.exception();
} else if (coordinator != null && client } else throw
} else
throw future.exception();
} else if (coordinator != null } else throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but
} else throwuture.exception } elsetor != null && client.connectionFailed(coordinator)) {
// we found the coordinator
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found } else throw.exception
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator
} else } else throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection has failed, so mark
// it dead and backoff before retrying discovery} else throw future.exceptionn();
} else null && clientnnectionFailed(coordinator)) {
// we found the coordinator, but the connection has failed, so
} else
throw future.exception();
} else if (coordinator != null
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed } else
throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found } else throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found
} else
throw throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection } else throwre.exception();
} else if (coordinator != null
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection has failed, so mark
// it dead and backoff } else throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found } else throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found } else throw
} else
throw future.exception();
} else if (coordinator != null && client
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the
} else
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection has failed, so mark
// it dead and backoff before retrying discovery } else throwre.exception();
} else ifrdinator != nullent.connectionFailed)) {
// we
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection has failed
} else
throw future.exception();
} elsese if
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the
} else } else else
throwxception();
} else
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found } else throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found } else throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found } else throw
} else
throw future.exception();
} else if (coordinator != null && client
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection has failed, so mark
// it } else
throw future.exception();
} elseelse(coordinatoror != nullnull && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection has failed, so mark
// it
} else } else throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the } else
throw throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection has failed, so mark
// it dead and backoff
} else } else throw futureception();
} else
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the } else throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection has failed } else throwre.exception
} else
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found } else throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection has } else throweption();
} elseon();
} else
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator
} else
throwrow future
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection } else throwtion();
} else
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we } else
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we
} else
throw future.exception();
} else if (coordinator != null } else
} else
throw future.exception();
} else if (coordinator != null } else throw
} else
throw future.exception();
} else if (coordinator != null
} else
throw future.exception();
} else if (coordinator != null && client } else throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the
} else } else throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator
} else
throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the } else
throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection has failed } else throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the
} else
throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator } else throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the } else throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator } else throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the } else throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection has failed, so mark else
throw future();
} else
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the } else throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found
} else } else
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator } else
throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found } else throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found } else throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we } else
throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found
} else
throw throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found } else throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found } elsee
throw throw throwelse
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator } else throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator } elserow futurexception();
} else
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found } else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection
} else
throw future.exception
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but } else throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we
} else
throw throw throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection has failed, so
} elselse
throw throw throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we
} else } else
throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we} else throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection has failed, so mark
// it dead and backoff } else
throw futuren();
} elserdinator != null!= null& clientionFailed(coordinator
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection has failed, so mark
// it dead and backoff before retrying
} else
throw future.exception();
} else if (coordinator
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but
} else
throw future.exception();
} else if (coordinator
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection has failed, so mark
// it dead and backoff } else throw future.exception();
} elsedinator != nullnnectionFailed(coordinatorFailed(coordinator
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found
} else throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found } else throw
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection has failed
} else
throw future.exceptionon();
} else if (coordinatorr != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the
} else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection has failed, so mark
// it dead and backoff before retrying discovery
markCoordinatorUnknown();
time.sleep(retryBackoffMs);
}
remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
if (remainingMs <= 0)
break;
}
return !coordinatorUnknown();
}
睡觉前对这个问题不死心,打开电脑又查了查,终于解决了问题。主要是因为我服务器之前装过Kafka所以产生了脏数据。
以下是解决过程:
1、进入zookeeper 运行
zkCli.sh
。2、运行
ls /brokers/topics
查看主题3、然后运行
rmr /brokers/topics/consumer_offsets
删除consumer_offsets_
主题4、然后重启kafka集群就好了。
赞。
我也是这个问题,最后同样的方式解决了。才看到这条,相见恨晚
我也是这个问题,最后同样的方式解决了。感谢楼主
有没有报错?
控制台打印这个,也没啥错误啊,消息是我用命令发送的。/usr/local/kafka/bin/kafka-console-
producer.sh --broker-list 192.168.202.165:9092 --topic test 2018-04-02 22:03:08,170 INFO [main] org.springframework.boot.StartupInfoLogger[logStarted 57]: Started Application in 15.684 seconds (JVM running for 16.758) 2018-04-02 22:03:08,666 INFO [pool-2-thread-1] org.apache.kafka.clients.Metadata[update 265]: Cluster ID: 380dL-1DRV2PUuB1NLMSbw
调试了好几次小时了,没有解决真是郁闷。。但是我用以下这个jar包就能消费消息,感觉还是代码哪块没写好。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.8.0</artifactId> <version>0.8.0-beta1</version> </dependency>
在老项目里用这个代码就能消息到消息。
String topic = Global.getConfig("topic_task_history");//接收机器返回任务状态的topic Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, 1); Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer .createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据 ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); while (iterator.hasNext()) { MessageAndMetadata<byte[], byte[]> msg = iterator.next(); String message; try { message = new String(msg.message(),"UTF-8"); System.out.println("message="+message); } catch (UnsupportedEncodingException e) { logger.error("监控历史消息|异常",e); } }
改了参数,直接运行试试。
importimport org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class ConsumerTest { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "172.30.34.4:9092"); props.put("group.id", "myself"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("TRADE-NOTIFY")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s \r\n", record.offset(), record.key(), record.value()); } } }
刚试了一下,用这个测试类还是不行。难道是我Kafka服务器配置有问题?
没错误,实在是不知道怎么帮你
我也挺郁闷的。。刚重新装了Kafka了也不行。这个日志信息是啥意思呢?log.debug("Coordinator discovery failed, refreshing metadata")
你telnet下你的服务器。9092端口的
都是通的。。
确保topic中有消息。
你的答案