主类核心代码
ThreadGroup tg = new ThreadGroup("处理线程组");
while(true){
if (tg.activeCount() < threadNum) {
Thread th = new Thread(tg, new Consumer(topicName,filePath));
th.start();
} else {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Consumer类的核心代码
public void run(){
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", groupId);
props.put("enable.auto.commit", enableAutoCommit);
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("max.poll.records", maxPollRecords);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
//Thread ch = new Thread(new CheckThread(tg,topic));
//ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
DoSomeThings Dos = new DoSomeThings();
boolean iResult = Dos.execute(record);
if(iResult){
consumer.commitSync();
}
//fixedThreadPool.execute(new DoSomeThings(record));
}
consumer.close();
}