返回到文章

采纳

编辑于

kafka启动报错Attempt to heartbeat failed since group is rebalancing想知道怎么解决

kafka

// 生产者

public class ProducerThread implements Runnable {
    private final Producer<String, String> kafkaProducer;
    private final String topic;
    private static Logger logger = Logger.getLogger(ProducerThread .class);

    public ProducerThread(String topic) {
        Properties properties = buildkafkaProperty();
        this.topic = topic;
        this.kafkaProducer = new KafkaProducer<String, String>(properties);
    }

    private Properties buildkafkaProperty() {

        // 加载配置文件
        // File file = new File("kafkaclient.properties");
        Properties p = new Properties();
        FileInputStream f = null;
        // InputStream in = null;

        String url = "kafka1.properties";

        try {
           f=new FileInputStream(url);
            p.load(f);

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                f.close();
                // in.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        };

        Properties properties = new Properties();
        // Kafka服务器的主机名和端口号
        properties.put("bootstrap.servers", p.getProperty("BOOTSTRAP_SERVERS"));
        // 客户端的ID
        properties.put("client.id", p.getProperty("CLIENT_ID"));
        // 消息的key和value都是字节数组,为了将Java对象转化为字节数组,可以配置"key.serializer"和"value.serializer"两个序列化器,完成转化
        properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        properties
                .put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("batch.size", 65536);

        return properties;
    }

    // 解析文件

    public void run() {

        while (true) {

            String filePath = "D:/Documents/WeChat Files/apan9410/Files/ELK日志信息/ELK日志信息";
            File files = new File(filePath);
            File[] listFiles = files.listFiles();
            Map<String, String> map = new HashMap<>();
            final String string = null;
            for (File file : listFiles) {

                if (file.isFile()
                        && (file.getName().endsWith(".txt") || file.getName().endsWith(".data"))) {
                    FileInputStream is;
                    try {
                        is = new FileInputStream(file);
                        InputStreamReader isr = new InputStreamReader(is);
                        BufferedReader in = new BufferedReader(isr);
                        String  line = null;

                        while ((line = in.readLine()) != null) {
                            // string = line;

                            kafkaProducer.send(new ProducerRecord<String, String>(topic, line),
                                    new Callback() {// callback
                                        // 回调函数,负责通知消息
                                        public void onCompletion(RecordMetadata recordMetadata,
                                                Exception e) {
                                            if (e != null) {
                                                e.printStackTrace();
                                            }
                                            /*logger.info("主题:" + recordMetadata.topic()
                                                    + ", 偏移量:" + recordMetadata.offset()
                                                    );*/
                                        }
                                    });
                        }

                    } catch (Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }

                }
            }
        }

    }
}

//消费者

public finalumerThread2 implementspublic final class ConsumerThread2 implements Runnable {

    private static Logger logger = Logger.getLoggerc final classsumerThread2 implements2 implementspublic final class ConsumerThread2 implements Runnable {

    private static Logger loggerpublic final class ConsumerThread2 implements Runnable {

    private static Loggerpublic final class ConsumerThread2 implements Runnable {

    privatepublic final class ConsumerThread2 implements Runnable {

    privatepublic final class ConsumerThread2 implements Runnablepublic final class ConsumerThread2 implements Runnablepublic final class ConsumerThread2 implements Runnable {

    private static Logger logger = Logger.getLogger(ConsumerThread2.class);

    private volatile boolean threadStatus = true;

    static DatagramSocket datagramSocket = null;
    static String KAFKA_TOPIC;
    static LinkedHashMap<String, Integer> collectConfigMap = null;

    private static KafkaConsumer<String, StringfinalConsumerThread2 implements Runnable    privateic Loggerr.getLogger2.class);

    privatetile boolean threadStatus = truepublic final class ConsumerThread2 implements Runnable {

    private staticpublic final class ConsumerThread2 implements Runnablepublic final class ConsumerThread2 implements Runnable {

    privatepublic final class ConsumerThread2 implements Runnable {

    private static Logger final classpublic final class ConsumerThread2 implements Runnable {

    private static Logger logger = Logger.getLogger(ConsumerThread2.class);

    private volatile boolean threadStatuslic finalnsumerThread2 implements

    privatepublic final class ConsumerThread2 implements Runnable {

    private static Logger logger = Logger.getLogger(ConsumerThread2.class);

    private volatile boolean threadStatuspublic final class ConsumerThread2 implements Runnable {

    private static Logger logger = Logger.getLogger(ConsumerThread2.class);

    private volatile boolean threadStatusinal classhread2 implementse {

    privatepublic final class ConsumerThread2 implements Runnable {

    private static Logger logger = Logger.getLogger(ConsumerThread2.class);

    private volatile boolean threadStatus = true;

    staticinal classlass ConsumerThread2 implements Runnablevate staticpublic final class ConsumerThread2 implements Runnable {

    private static Logger logger = Logger.getLogger(ConsumerThread2.class);

    privateinal classhread2 implementsimplementsts Runnablepublic final class ConsumerThread2 implements Runnable {

    private static Logger logger = Logger.getLogger(ConsumerThread2.class);

    private volatile booleaninal class2 implements Runnable {

    privatepublic final class ConsumerThread2 implements Runnable {

    private static Logger logger = Logger.getLoggerinal class2 implements   privatepublic final class ConsumerThread2 implements Runnable {

    private static Logger logger = Logger.getLogger(ConsumerThread2.class);

    private volatile boolean threadStatusinal classnts Runnable {

    private static Loggerpublic final class ConsumerThread2 implements Runnable {

    private static Logger logger = Logger.getLogger(ConsumerThread2.classinal classrThread2 implementspublic final class ConsumerThread2 implements Runnable {

    private static Logger logger = Logger.getLogger(ConsumerThread2publicinal classerThread2 implementspublic final class ConsumerThread2 implements Runnable {

    private static Logger logger = Logger.getLogger(ConsumerThread2.class);

    private volatile booleanpublic final class ConsumerThread2 implements Runnablepublic final class ConsumerThread2 implements Runnable {

    private static Logger logger = Logger.getLogger(ConsumerThread2.class);

    private volatile boolean threadStatus = true;

    static DatagramSocket datagramSocket = null;
    static String KAFKA_TOPIC final classThread2 implementspublic final class ConsumerThread2 implements Runnable {

    private static Logger logger = Logger.getLogger(ConsumerThread2.class);

    private volatile booleanpublic final class ConsumerThread2 implements Runnablepublic final class ConsumerThread2 implements Runnablepublic final class ConsumerThread2 implements Runnablepublic final class ConsumerThread2 implements Runnable {

    private static Logger logger = Logger.getLogger(ConsumerThread2.class);

    private volatile finalss ConsumerThread2ments Runnablepublic final class ConsumerThread2 implements Runnablepublic final class ConsumerThread2 implements Runnable {

    private static Logger logger = Logger.getLoggerpublic final class ConsumerThread2 implements Runnablepublic final class ConsumerThread2 implements Runnable {

    private staticpublic final class ConsumerThread2 implements Runnable {

    private static Logger loggerpublic final class ConsumerThread2 implements Runnable {

    private static Logger logger = Logger.getLogger(ConsumerThread2.class);

    private volatile boolean threadStatus = true;

    static DatagramSocket datagramSocket = nullpublic final class ConsumerThread2 implementspublic final class ConsumerThread2 implements Runnablepublic final class ConsumerThread2 implements Runnable {

    privatepublic final class ConsumerThread2 implementspublic final class ConsumerThread2 implements Runnablepublic final class ConsumerThread2 implements Runnable {

    private static Logger logger = Logger.getLogger(ConsumerThread2.class);

    private volatile boolean finalonsumerThread2 implementsThread2 implementspublic final class ConsumerThread2 implements Runnablepublic final class ConsumerThread2 implements Runnablepublic final class ConsumerThread2 implements Runnablepublic final class ConsumerThread2 implements Runnable {

    private static Logger logger = Logger.getLoggerpublic final class ConsumerThread2 implements Runnable {

    privatepublic final class ConsumerThread2 implements Runnable {

    private static Logger logger = Logger final class ConsumerThread2 implements Runnable {

    private static Logger logger = Logger.getLogger(ConsumerThread2nal classpublic final class ConsumerThread2 implements Runnable {

    private static Logger logger = Logger.getLogger(ConsumerThread2.class);

    private volatile boolean threadStatusblic finalss ConsumerThread2umerThread2 implements2 implements Runnablepublic final class ConsumerThread2 implements Runnable {

    privatepublic final class ConsumerThread2 implements Runnablepublic final class ConsumerThread2 implements Runnablepublic final class ConsumerThread2 implements Runnablepublic final class ConsumerThread2 implements Runnablepublic final class ConsumerThread2 implements Runnablepublic final class ConsumerThread2 implements Runnable {

    private static Logger logger = Logger.getLogger(ConsumerThread2 finalmerThread2 implementsmplements Runnable {

    privatepublic final class ConsumerThread2 implements Runnable {

    private static Logger logger = Logger.getLogger(ConsumerThread2.classc finalsumerThread2 implementspublic final class ConsumerThread2 implements Runnable {

    private static Logger logger = Logger.getLogger(ConsumerThread2.class);

    private volatile boolean threadStatus = true;

    static DatagramSocket datagramSocket = null;
    static String KAFKA_TOPIC;
    static LinkedHashMap<String, Integer> collectConfigMap = null;

    private static KafkaConsumer<String, String> consumer;
    private static AttackMapService attackMapService;



    private final String topic;

    public ConsumerThread2(String topic) {
        initConsumer();
        logger.info("消费者启动");
        this.topic = topic;
        // list.add(consumer);
    }

    @Override
    public void run() {
         //consumer.wakeup();
        try {
            consumer.subscribe(Arrays.asList(topic));
            while (isThreadStatus()) {

                // 消费消息,这里是获取消息的offset、key、value

                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {

                    logger.info("当前线程:" + Thread.currentThread().getName() + "," + "偏移量:"
                            + record.offset() + "," + "主题:" + record.topic() + "," + "," + "获取的消息:"
                            + record.value());
                    // this.completeAndForward(record, datagramPacket);
                }

            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }


    public boolean isThreadStatus() {
        return threadStatus;
    }

    public void setThreadStatus(boolean b) {
        // TODO Auto-generated method stub
        threadStatus = b;
    }

    public void initConsumer() {


        String url = "kafka1.properties";
        // collectConfigMap = TxtUtils.getCollectConfigMap();
        // 加载配置文件
        Properties p = new Properties();
        Properties properties = new Properties();
        FileInputStream f = null;

        try {
             f = new FileInputStream(filePath + "kafkaclient.properties");
            //f = new FileInputStream(url);
            p.load(f);
            // KAFKA_TOPIC = p.getProperty("C_TOPIC");
            // Broker的地址
            properties.put("bootstrap.servers", p.getProperty("BOOTSTRAP_SERVERS"));
            // 所属Consumer Group的id
            properties.put("group.id", p.getProperty("GROUP_ID"));
            // 自动提交offset
            properties.put("enable.auto.commit", p.getProperty("ENABLE_AUTO_COMMIT"));
            // 自动提交offset的时间间隔
            properties.put("auto.commit.interval.ms", "1000");
            properties.put("session.timeout.ms", "60000");
            //消息发送的最长等待时间.需大于session.timeout.ms这个时间
            properties.put("request.timeout.ms", "70000");
            //一次从kafka中poll出来的数据条数
            //max.poll.records条数据需要在在session.timeout.ms这个时间内处理完
            properties.put("max.poll.records","100");
            //server发送到消费端的最小数据,若是不满足这个数值则会等待直到满足指定大小。默认为1表示立即接收。
            properties.put("fetch.min.bytes", "1");
            //若是不满足fetch.min.bytes时,等待消费端请求的最长等待时间
            //properties.put("fetch.wait.max.ms", "1000");
            // properties.put("heartbeat.interval.ms","40000");
            // p.getProperty("HEARTBEAT.INTERVAL.MS"));
            // 拉取的最大条数
            // 消息的key和value都是字节数组,为了将Java对象转化为字节数组,可以配置"key.serializer"和"value.serializer"两个序列化器,完成转化
            properties.put("key.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                f.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        consumer = new KafkaConsumer<>(properties);

    }


     public void shutdown() {

             consumer.wakeup();
          }
}

//消费组

public class ConsumerGroup {
    private static List<ConsumerThread2> consumerThreadList = new ArrayList<ConsumerThread2>();
    static String KAFKA_TOPIC;
    static ExecutorService executor = Executors.newFixedThreadPool(6);

    // 获取线程集合
    public  void getConsumerThreadList() {
        String url = "kafka1.properties";
        // collectConfigMap = TxtUtils.getCollectConfigMap();
        // 加载配置文件
        Properties p = new Properties();
        Properties properties = new Properties();
        FileInputStream f = null;

        try {
             f = new FileInputStream(filePath + "kafkaclient.properties");
            //f = new FileInputStream(url);
            p.load(f);
            KAFKA_TOPIC = p.getProperty("C_TOPIC");
            String[] topics = KAFKA_TOPIC.split(",");
            for (String topic : topics) {
                ConsumerThread2 consumerThread = new ConsumerThread2(topic);
                consumerThreadList.add(consumerThread);
                executor.submit(consumerThread);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    }



    public void shutdown() {
        try {

            for (ConsumerThread2 consumer : consumerThreadList) {
                consumer.shutdown();
            }

            if (executor != null) {
                executor.shutdown(); // 关闭线程
            }
            if (!executor.awaitTermination(100, TimeUnit.SECONDS)) {
                System.out.println("Timeout");
            }
        } catch (InterruptedException ignored) {
            Thread.currentThread().interrupt();
        }
    }

}

//主方法

public class TestMain {

    public static void main(String args[]) {

        /*
         * KafkaConsumerService service = new KafkaConsumerService();
         * service.startProducer();
         */
        ProducerThread producerThread1 = new ProducerThread("s1");
        Thread thread1 = new Thread(producerThread1);
        thread1.start();

        Thread thread2 = new Thread(new ProducerThread("s2"));
        thread2.start();

        Thread thread3 = new Thread(new ProducerThread("s3"));
        thread3.start();

        Thread thread4 = new Thread(new ProducerThread("s4"));
        thread4.start();

        Thread thread5 = new Thread(new ProducerThread("s5"));
        thread5.start();

        Thread thread6 = new Thread(new ProducerThread("s6"));
        thread6.start();

        /*try {
            Thread.sleep(30000);
        } catch (InterruptedException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }*/
        ConsumerGroup group = new ConsumerGroup();

        try {
            group.getConsumerThreadList();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdown();
        }

    }
}