返回到文章

采纳

编辑于

kafka启动报错Attempt to heartbeat failed since group is rebalancing想知道怎么解决

kafka

// 生产者

public class ProducerThread implements Runnable {
    private final Producer<String, String> kafkaProducer;
    private final String topic;
    private static Logger logger = Logger.getLogger(ProducerThread .class);

    public ProducerThread(String topic) {
        Properties properties = buildkafkaProperty();
        this.topic = topic;
        this.kafkaProducer = new KafkaProducer<String, String>(properties);
    }

    private Properties buildkafkaProperty() {

        // 加载配置文件
        // File file = new File("kafkaclient.properties");
        Properties p = new Properties();
        FileInputStream f = null;
        // InputStream in = null;

        String url = "kafka1.properties";

        try {
           f=new FileInputStream(url);
            p.load(f);

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                f.close();
                // in.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        };

        Properties properties = new Properties();
        // Kafka服务器的主机名和端口号
        properties.put("bootstrap.servers", p.getProperty("BOOTSTRAP_SERVERS"));
        // 客户端的ID
        properties.put("client.id", p.getProperty("CLIENT_ID"));
        // 消息的key和value都是字节数组,为了将Java对象转化为字节数组,可以配置"key.serializer"和"value.serializer"两个序列化器,完成转化
        properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        properties
                .put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("batch.size", 65536);

        return properties;
    }

    // 解析文件

    public void run() {

        while (true) {

            String filePath = "D:/Documents/WeChat Files/apan9410/Files/ELK日志信息/ELK日志信息";
            File files = new File(filePath);
            File[] listFiles = files.listFiles();
            Map<String, String> map = new HashMap<>();
            final String string = null;
            for (File file : listFiles) {

                if (file.isFile()
                        && (file.getName().endsWith(".txt") || file.getName().endsWith(".data"))) {
                    FileInputStream is;
                    try {
                        is = new FileInputStream(file);
                        InputStreamReader isr = new InputStreamReader(is);
                        BufferedReader in = new BufferedReader(isr);
                        String  line = null;

                        while ((line = in.readLine()) != null) {
                            // string = line;

                            kafkaProducer.send(new ProducerRecord<String, String>(topic, line),
                                    new Callback() {// callback
                                        // 回调函数,负责通知消息
                                        public void onCompletion(RecordMetadata recordMetadata,
                                                Exception e) {
                                            if (e != null) {
                                                e.printStackTrace();
                                            }
                                            /*logger.info("主题:" + recordMetadata.topic()
                                                    + ", 偏移量:" + recordMetadata.offset()
                                                    );*/
                                        }
                                    });
                        }

                    } catch (Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }

                }
            }
        }

    }
}

//消费者

public final class ConsumerThread2 implements Runnable {

    private static Logger logger = Logger.getLogger(ConsumerThread2.class);

    private volatile boolean threadStatus = true;

    static DatagramSocket datagramSocket = null;
    static String KAFKA_TOPIC;
    static LinkedHashMap<String, Integer> collectConfigMap = null;

    private static KafkaConsumer<String, String> consumer;
    private static AttackMapService attackMapService;



    private final String topic;

    public ConsumerThread2(String topic) {
        initConsumer();
        logger.info("消费者启动");
        this.topic = topic;
        // list.add(consumer);
    }

    @Override
    public void run() {
         //consumer.wakeup();
        try {
            consumer.subscribe(Arrays.asList(topic));
            while (isThreadStatus()) {

                // 消费消息,这里是获取消息的offset、key、value

                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {

                    logger.info("当前线程:" + Thread.currentThread().getName() + "," + "偏移量:"
                            + record.offset() + "," + "主题:" + record.topic() + "," + "," + "获取的消息:"
                            + record.value());
                    // this.completeAndForward(record, datagramPacket);
                }

            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }


    public boolean isThreadStatus() {
        return threadStatus;
    }

    public void setThreadStatus(boolean b) {
        // TODO Auto-generated method stub
        threadStatus = b;
    }

    public void initConsumer() {


        String url = "kafka1.properties";
        // collectConfigMap = TxtUtils.getCollectConfigMap();
        // 加载配置文件
        Properties p = new Properties();
        Properties properties = new Properties();
        FileInputStream f = null;

        try {
             f = new FileInputStream(filePath + "kafkaclient.properties");
            //f = new FileInputStream(url);
            p.load(f);
            // KAFKA_TOPIC = p.getProperty("C_TOPIC");
            // Broker的地址
            properties.put("bootstrap.servers", p.getProperty("BOOTSTRAP_SERVERS"));
            // 所属Consumer Group的id
            properties.put("group.id", p.getProperty("GROUP_ID"));
            // 自动提交offset
            properties.put("enable.auto.commit", p.getProperty("ENABLE_AUTO_COMMIT"));
            // 自动提交offset的时间间隔
            properties.put("auto.commit.interval.ms", "1000");
            properties.put("session.timeout.ms", "60000");
            //消息发送的最长等待时间.需大于session.timeout.ms这个时间
            properties.put("request.timeout.ms", "70000");
            //一次从kafka中poll出来的数据条数
            //max.poll.records条数据需要在在session.timeout.ms这个时间内处理完
            properties.put("max.poll.records","100");
            //server发送到消费端的最小数据,若是不满足这个数值则会等待直到满足指定大小。默认为1表示立即接收。
            properties.put("fetch.min.bytes", "1");
            //若是不满足fetch.min.bytes时,等待消费端请求的最长等待时间
            //properties.put("fetch.wait.max.ms", "1000");
            // properties.put("heartbeat.interval.ms","40000");
            // p.getProperty("HEARTBEAT.INTERVAL.MS"));
            // 拉取的最大条数
            // 消息的key和value都是字节数组,为了将Java对象转化为字节数组,可以配置"key.serializer"和"value.serializer"两个序列化器,完成转化
            properties.put("key.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                f.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        consumer = new KafkaConsumer<>(properties);

    }


     public void shutdown() {

             consumer.wakeup();
          }
}

//消费组

public class ConsumerGroup {
    private static List<ConsumerThread2> consumerThreadList = new ArrayList<ConsumerThread2>();
    static String KAFKA_TOPIC;
    static ExecutorService executor = Executors.newFixedThreadPool(6);

    // 获取线程集合
    public  void getConsumerThreadList() {
        String url = "kafka1.properties";
        // collectConfigMap = TxtUtils.getCollectConfigMap();
        // 加载配置文件
        Properties p = new Properties();
        Properties properties = new Properties();
        FileInputStream f = null;

        try {
             f = new FileInputStream(filePath + "kafkaclient.properties");
            //f = new FileInputStream(url);
            p.load(f);
            KAFKA_TOPIC = p.getProperty("C_TOPIC");
            String[] topics = KAFKA_TOPIC.split(",");
            for (String topic : topics) {
                ConsumerThread2 consumerThread = new ConsumerThread2(topic);
                consumerThreadList.add(consumerThread);
                executor.submit(consumerThread);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    }



    public void shutdown() {
        try {

            for (ConsumerThread2 consumer : consumerThreadList) {
                consumer.shutdown();
            }

            if (executor != null) {
                executor.shutdown(); // 关闭线程
            }
            if (!executor.awaitTermination(100, TimeUnit.SECONDS)) {
                System.out.println("Timeout");
            }
        } catch (InterruptedException ignored) {
            Thread.currentThread().interrupt();
        }
    }

}

//主方法

public class TestMain {

    public static void main(String args[]) {

        /*
         * KafkaConsumerService service = new KafkaConsumerService();
         * service.startProducer();
         */
        ProducerThread producerThread1 = new ProducerThread("s1");
        Thread thread1 = new Thread(producerThread1);
        thread1.start();

        Thread thread2 = new Thread(new ProducerThread("s2"));
        thread2.start();

        Thread thread3 = new Thread(new ProducerThread("s3"));
        thread3.start();

        Thread thread4 = new Thread(new ProducerThread("s4"));
        thread4.start();

        Thread thread5 = new Thread(new ProducerThread("s5"));
        thread5.start();

        Thread thread6 = new Thread(new ProducerThread("s6"));
        thread6.start();

        /*try {
            Thread.sleep(30000);
        } catch (InterruptedException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }*/
        ConsumerGroup group = new ConsumerGroup();

        try {
            group.getConsumerThreadList();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdown();
        }

    }
}