返回到文章

采纳

编辑于

kafka高级消费者API读取消息不完整

kafka

在实现kafka消费者的时候出现跳着消费数据的情况
比如,生产者生产的数据是0,1,2,3,4,5
java实现的消费者只消费到了2,5两个数字

java实现的消费者代码如下:

//声明一个线程池,用于消费各个partition
        ExecutorService executor=Executors.newFixedThreadPool(threadCount);
        //获取对应topic的消息队列
        final KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
            System.out.println("开始消费消息...");
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    ConsumerIterator<byte[], byte[]> it = stream.iterator();
                    //有信息则消费,无信息将会阻塞
                    while (it.hasNext()){
                        try {
                            System.out.println("收到消息" + new String(it.next().message()));
                        } catch (Exception e) {
                            e.printStackTrace();
                            return;
                        }
                    }