// 生产者
public class ProducerThread implements Runnable {
private final Producer<String, String> kafkaProducer;
private final String topic;
private static Logger logger = Logger.getLogger(ProducerThread .class);
public ProducerThread(String topic) {
Properties properties = buildkafkaProperty();
this.topic = topic;
this.kafkaProducer = new KafkaProducer<String, String>(properties);
}
private Properties buildkafkaProperty() {
// 加载配置文件
// File file = new File("kafkaclient.properties");
Properties p = new Properties();
FileInputStream f = null;
// InputStream in = null;
String url = "kafka1.properties";
try {
f=new FileInputStream(url);
p.load(f);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
f.close();
// in.close();
} catch (IOException e) {
e.printStackTrace();
}
};
Properties properties = new Properties();
// Kafka服务器的主机名和端口号
properties.put("bootstrap.servers", p.getProperty("BOOTSTRAP_SERVERS"));
// 客户端的ID
properties.put("client.id", p.getProperty("CLIENT_ID"));
// 消息的key和value都是字节数组,为了将Java对象转化为字节数组,可以配置"key.serializer"和"value.serializer"两个序列化器,完成转化
properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
properties
.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("batch.size", 65536);
return properties;
}
// 解析文件
public void run() {
while (true) {
String filePath = "D:/Documents/WeChat Files/apan9410/Files/ELK日志信息/ELK日志信息";
File files = new File(filePath);
File[] listFiles = files.listFiles();
Map<String, String> map = new HashMap<>();
final String string = null;
for (File file : listFiles) {
if (file.isFile()
&& (file.getName().endsWith(".txt") || file.getName().endsWith(".data"))) {
FileInputStream is;
try {
is = new FileInputStream(file);
InputStreamReader isr = new InputStreamReader(is);
BufferedReader in = new BufferedReader(isr);
String line = null;
while ((line = in.readLine()) != null) {
// string = line;
kafkaProducer.send(new ProducerRecord<String, String>(topic, line),
new Callback() {// callback
// 回调函数,负责通知消息
public void onCompletion(RecordMetadata recordMetadata,
Exception e) {
if (e != null) {
e.printStackTrace();
}
/*logger.info("主题:" + recordMetadata.topic()
+ ", 偏移量:" + recordMetadata.offset()
);*/
}
});
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
}
//消费者
public final class ConsumerThread2 implements Runnable {
private static Logger logger = Logger.getLogger(ConsumerThread2.class);
private volatile boolean threadStatus = true;
static DatagramSocket datagramSocket = null;
static String KAFKA_TOPIC;
static LinkedHashMap<String, Integer> collectConfigMap = null;
private static KafkaConsumer<String, String> consumer;
private static AttackMapService attackMapService;
private final String topic;
public ConsumerThread2(String topic) {
initConsumer();
logger.info("消费者启动");
this.topic = topic;
// list.add(consumer);
}
@Override
public void run() {
//consumer.wakeup();
try {
consumer.subscribe(Arrays.asList(topic));
while (isThreadStatus()) {
// 消费消息,这里是获取消息的offset、key、value
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
logger.info("当前线程:" + Thread.currentThread().getName() + "," + "偏移量:"
+ record.offset() + "," + "主题:" + record.topic() + "," + "," + "获取的消息:"
+ record.value());
// this.completeAndForward(record, datagramPacket);
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
public boolean isThreadStatus() {
return threadStatus;
}
public void setThreadStatus(boolean b) {
// TODO Auto-generated method stub
threadStatus = b;
}
public void initConsumer() {
String url = "kafka1.properties";
// collectConfigMap = TxtUtils.getCollectConfigMap();
// 加载配置文件
Properties p = new Properties();
Properties properties = new Properties();
FileInputStream f = null;
try {
f = new FileInputStream(filePath + "kafkaclient.properties");
//f = new FileInputStream(url);
p.load(f);
// KAFKA_TOPIC = p.getProperty("C_TOPIC");
// Broker的地址
properties.put("bootstrap.servers", p.getProperty("BOOTSTRAP_SERVERS"));
// 所属Consumer Group的id
properties.put("group.id", p.getProperty("GROUP_ID"));
// 自动提交offset
properties.put("enable.auto.commit", p.getProperty("ENABLE_AUTO_COMMIT"));
// 自动提交offset的时间间隔
properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "60000");
//消息发送的最长等待时间.需大于session.timeout.ms这个时间
properties.put("request.timeout.ms", "70000");
//一次从kafka中poll出来的数据条数
//max.poll.records条数据需要在在session.timeout.ms这个时间内处理完
properties.put("max.poll.records","100");
//server发送到消费端的最小数据,若是不满足这个数值则会等待直到满足指定大小。默认为1表示立即接收。
properties.put("fetch.min.bytes", "1");
//若是不满足fetch.min.bytes时,等待消费端请求的最长等待时间
//properties.put("fetch.wait.max.ms", "1000");
// properties.put("heartbeat.interval.ms","40000");
// p.getProperty("HEARTBEAT.INTERVAL.MS"));
// 拉取的最大条数
// 消息的key和value都是字节数组,为了将Java对象转化为字节数组,可以配置"key.serializer"和"value.serializer"两个序列化器,完成转化
properties.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
f.close();
} catch (IOException e) {
e.printStackTrace();
}
}
consumer = new KafkaConsumer<>(properties);
}
public void shutdown() {
consumer.wakeup();
}
}
//消费组
public class ConsumerGroup {
private static List<ConsumerThread2> consumerThreadList = new ArrayList<ConsumerThread2>();
static String KAFKA_TOPIC;
static ExecutorService executor = Executors.newFixedThreadPool(6);
// 获取线程集合
public void getConsumerThreadList() {
String url = "kafka1.properties";
// collectConfigMap = TxtUtils.getCollectConfigMap();
// 加载配置文件
Properties p = new Properties();
Properties properties = new Properties();
FileInputStream f = null;
try {
f = new FileInputStream(filePath + "kafkaclient.properties");
//f = new FileInputStream(url);
p.load(f);
KAFKA_TOPIC = p.getProperty("C_TOPIC");
String[] topics = KAFKA_TOPIC.split(",");
for (String topic : topics) {
ConsumerThread2 consumerThread = new ConsumerThread2(topic);
consumerThreadList.add(consumerThread);
executor.submit(consumerThread);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void shutdown() {
try {
for (ConsumerThread2 consumer : consumerThreadList) {
consumer.shutdown();
}
if (executor != null) {
executor.shutdown(); // 关闭线程
}
if (!executor.awaitTermination(100, TimeUnit.SECONDS)) {
System.out.println("Timeout");
}
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
}
//主方法
public class TestMain {
public static void main(String args[]) {
/*
* KafkaConsumerService service = new KafkaConsumerService();
* service.startProducer();
*/
ProducerThread producerThread1 = new ProducerThread("s1");
Thread thread1 = new Thread(producerThread1);
thread1.start();
Thread thread2 = new Thread(new ProducerThread("s2"));
thread2.start();
Thread thread3 = new Thread(new ProducerThread("s3"));
thread3.start();
Thread thread4 = new Thread(new ProducerThread("s4"));
thread4.start();
Thread thread5 = new Thread(new ProducerThread("s5"));
thread5.start();
Thread thread6 = new Thread(new ProducerThread("s6"));
thread6.start();
/*try {
Thread.sleep(30000);
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}*/
ConsumerGroup group = new ConsumerGroup();
try {
group.getConsumerThreadList();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdown();
}
}
}