返回到文章

采纳

业务系统连接不上kafka服务处理

kafka

生产者在send的时候,有非阻塞和阻塞两种。

现在有业务需要,使用阻塞模式。需要根据send之后的响应进行其他的业务处理,或者接收到send的异常之后,将一些数据存在定时任务,尝试再往下走。

业务过程:

1 业务逻辑 ---> 2 kafka的send(),并且调用get() ----> 3 业务逻辑(根据第2步的响应进行处理)。

在第二步中,有个疑问点:

a: 业务服务器 跟 kafka服务器因为网络原因不连通,那么在send的时候,要如何捕获超时,如何设置不多次重试?

我封装的代码如下


import com.oristartech.kafka.core.domain.Message;
import com.oristartech.kafka.core.domain.Reply;

/**
 * 生产者接口
 * @author 
 *
 */
public interface ProducerHandler {

    /**
     * 发送消息
     * @param topic
     * @param value
     * @return
     */
    public void send(Message message) throws Exception;

    public void send(Message message,SendCallback callback) throws Exception;

    /**
     * 同步发送,等待响应
     * @param message
     */
    public Reply sendSync(Message message);
}


import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import javaimport java.io;
import javaInputStream;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrentimport java.io.IOException;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3importjava.ioream;
importl.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrentimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apachet java.io;
import javaInputStream;
importutilperties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Futureon;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.utilimportjava.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrentimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Randomimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
importt java.io.IOException;
importInputStreama.utilrties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producerimportt java.ioxception;
import javaInputStream;
import javal.Properties;
import java.util.Random;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Randomimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
importimportimportimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Randomimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Randomimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorServiceimport java.io.IOException;
import java.io.InputStream;
importimport java.io.IOException;
import java.io.InputStream;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorServiceimport java.io.IOException;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.utilimport javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafkaimport java.iotion;
import javaeam;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrentrt java.io.IOExceptionimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javart javaimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

importimport java.iotion;
importeam;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Randomimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import orgt java.io.IOException;
import java.io.InputStreamava.util.Propertiesort javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

importt java.io.IOExceptiontion;
import java.ioeam;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executorsimport java.io.IOException;
import java.io.InputStream;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Randomimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafkaimportt java.io;
import javaInputStreammport java.utilperties;
import.utilmport java.util.concurrentimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrentimport javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Randomimport java.io.IOException;
import java.io.InputStream;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clientsimportt javaxception;
import javaam;
import javaProperties java.util.Random;
import javaent.Executors;
importutilcurrent.Futureimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
importimportimportimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apacheimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtilst java.iotion;
import java.iomport java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrentimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3import java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Randomimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import orgt java.iotion;
import javamport java.util;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
importt java.iotion;
import javamport javava.util.Properties;
importjavaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apacheimport javation;
import java.io.InputStream;
importjava.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.apacheimport java.io.IOExceptiontion;
import java.io;
import javaa.utilrties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Futureimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commonst javation;
import javaeam;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executorsimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Randomrt javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
importt java.iotion;
import javaeam;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util java.iotion;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Futureimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Randomimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apachet java.iotion;
import java.ioimportimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Futureimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtilst java.io.IOException;
import javaimportl.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Randomimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtilsimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

importimportt java.iotion;
import java.io.InputStreamimport java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
importt javation;
importmport javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Randomimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.utilimport java.ioimport java.io.IOException;
import java.io.InputStream;
import javaimport java.io.IOException;
import java.io.InputStream;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
importt java.io.IOException;
importort javatStream;
importerties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.utilt javation;
importmport javarties;
import java.util.Random;
import java.util.concurrentimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrentimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
importt javaIOExceptiontion;
import java.iomport javaties;
importimport java.io.IOException;
import java.io.InputStream;
import javaimport java.io.IOException;
import java.io.InputStream;
importimport java.io.IOException;
import java.io.InputStream;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
importt javation;
importmport javaes;
import java.util.Random;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Randomimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrentt javation;
importmport java;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.utilimportIOExceptionort javamport java.util.Propertiesjavaimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
importimport javation;
importmport javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Futuret javaxception;
import;
importInputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Futureon;
importimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrentjava.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Randomimport java.io.IOException;
import java.io.InputStream;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commonst javaxception;
import java.io.InputStream;
import java.util.Propertiesrties;
import;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrenton;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrentimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import javajavaimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrentimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrentimport java.io.IOException;
import java.io.InputStream;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import javaimportjava.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
importimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Futureimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clientst javaxception;
importam;
importProperties java.utilm;
importva.utilent.Executors.Futureimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Futureon;
importerties;
import java.util.Random;
import javaimport java.io.IOException;
import java.ioimport java.io.IOException;
import java.io.InputStream;
importimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

importt javaxception;
import;
import.ioam;
import javaroperties;
importutil.Random;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrentimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javat javaxception;
importInputStreama.utilrties;
importimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrentimportjavaream;
importl.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
importimport java.io.IOException;
import java.io.InputStream;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Futureimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3t java.io.IOExceptiona.iom;
importl.Propertiesperties;
import.utilmport java.concurrentimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.utilimport java.io.IOExceptiontStream;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Futureimport java.io.IOException;
import java.io.InputStream;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clientst javaxception;
importInputStreamam;
importa.util.Properties;
importava.util.Randomt javaimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.utiljava.io.InputStream;
import java.util.Properties;
import java.util.Random;
importimport java.io.IOException;
import java.io.InputStreamimport java.io.IOException;
importimport java.io.IOException;
import java.ioimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
importt javaxception;
import.io javaroperties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executorson;
importInputStreamimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStreamimport java.io.IOException;
import java.io.InputStream;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import orgimport javaioxception;
importInputStreamt javaies;
importandom;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
importimportjavaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Futureimportio;
importInputStreamimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrentimport javaInputStreamimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStreamimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import javaimportport javaputStream;
importimport java.io.IOException;
import java.io.InputStream;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3import javaio.IOException;
importa.ioInputStream;
import javal.Propertiesperties;
import.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import orgimporton;
importInputStream;
import java.util.Properties;
import java.util.Random;
importimport java.io.IOException;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
importimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafkat java.io.IOException;
importInputStream javarties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertieson;
importimport java.io.IOException;
import java.io.InputStreamimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrentimportport java.ioimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import orgt javaxception;
importInputStreamt javaies;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Randomon;
importimport java.io.IOException;
import java.ioimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
importport javaimport java.io.IOException;
import java.io.InputStream;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
importimport java.io.IOException;
import java.ioimport java.io.IOException;
import java.io.InputStreamimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javat javaxception;
import;
importa.io.InputStream;
importroperties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executorsjavaream;
importl.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import javaimport java.io.IOException;
import java.io.InputStream;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.apacheimport java.io.IOException;
importa.ioInputStream;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.utilimport javaInputStreamimport java.io.IOException;
import java.io.InputStream;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import javaimport javaio
importimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clientst javaption;
importutStream;
importutil
importil.Random;
importa.utils;
importutiluture;
importoncurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producerioimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.IOExceptionimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apacheimport java.iomport javaream;
importutil
importil.Randoma.util.concurrent.Executors;
importutil.concurrentuture;
importoncurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducerioimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import javaimport javaeption;
importimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
importimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

importimportva.iotion;
importtStream;
importutil.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import orgon;
importm;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ioon;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadataimport java.io.IOExceptionn;
importa.utilerties;
import.utilmport javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrentimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
importimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrentimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import orgimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSONObject;
import com.oristartech.kafka.core.domain.Message;
import com.oristartech.kafka.coret java.io.IOExceptioneption;
import javaeam;
importutil
import.Randomurrent.Executorsimportutilent.Futureort javaurrent.ScheduledExecutorService;

importpache.commonsls;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrentrt java.ioception;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Randomimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apachet javaption;
importt javaties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafkaimport javaxception;
importream;
importProperties javaandom;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producerva.iotion;
importputStream;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javaort javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import javaimportimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importort javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importrt javaimport java.io.IOException;
import java.io.InputStream;
import java.utilimportimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaort javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimport javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Propertiesport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.utilort javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
importrt javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
importimportimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.utilort java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import orgimport java.ioion;
import javaimportutilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.utilio.IOExceptionimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.utilimport.ioion;
import.InputStreameam;
importoperties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Randomimport java.ioception;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.utila.io.IOExceptionimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import orga.io.IOException;
import javal.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

importimport java.io.IOExceptionimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Randomo.IOExceptionException;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors.ioa.ioream;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import javaimporttion;
importtStream;
import;
importava.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import orgimport javan;
importInputStreamrt javaerties;
import javandom;
importmport javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import javaimport javation;
import;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
importimport javation;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.utilimport javaeption;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javaimport javaxception;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorServiceva.ion;
import.ioutStream;
importroperties;
importtil.RandomRandom;
import javaoncurrent.Executorsrt javaimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
importva.io.IOExceptionort javatStream;
importtil.Propertiesimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrentva.ion;
importputStream;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import orgva.ioon;
importputStream;
importPropertiesort javaandom;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrentva.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.utilimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.utilimport java.io.IOExceptionion;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executorsva.ioion;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javaimport java.ioimportream;
importl.Propertiesrties;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javaio.IOExceptionimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorServiceava.io.IOException;
importva.io.InputStream;
importimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Randomjava.ioimport java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSONObject;
import com.oristartech.kafka.core.domain.Message;
import com.oristartech.kafka.core.domain.Reply;
import com.oristartech.kafka.core.exception.ProducerException;

/**
 * 生产者接口实现抽象类,引入jar的项目在使用消息的时候,继承该类
 * 
 * @author 
 *
 */
public abstract class AbstractProducer implements ProducerHandler {

    private static final Logger logger = LoggerFactory.getLogger(AbstractProducer.class);

    static KafkaProducer<String LoggerFactory.getLogger(AbstractProducer.class);

    static KafkaProducer<String, String> kafkaProducer = null;
    static KafkaProducer<String, String> kafkaProducer_sync = null;

    // ExecutorService fixedThreadPool = Executors.newFixedThreadPool(20);//线程池
    ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);
    static   static String);
    static String key = "e24d66f5bbec";
    Random random = new Random();

    /**
     * 同步
     */
    protected static Properties props_sync = new Properties();
    static {
        Properties properties = new Properties();
        // 使用ClassLoader加载properties配置文件生成对应的输入流
        InputStream in = AbstractProducer.class.getClassLoader()
                .getResourceAsStream("config/message-producer.properties");
        // 使用properties对象加载输入流
        try {
            properties.load(in);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            logger.error("找不到config/message-producer.properties文件");
            e.printStackTrace();
        }

        String bootstrap_servers = properties.getProperty("bootstrap.servers");
        if (StringUtils.isEmpty(bootstrap_servers)) {
            logger.error("找不到bootstrap.servers属性");
        }

        String producer_acks = properties.getProperty("producer.acks");
        if (StringUtils.isEmpty(producer_acks)) {
            producer_acks = "all";
        }
        String producer_retries = properties.getProperty("producer.retries");
        if (StringUtils.isEmpty(producer_retries)) {
            //producer_retries = "0";
            producer_retries = ""+3;
        }
        String producer_batch_size = properties.getProperty("producer.batch.size");
        if (StringUtils.isEmpty(producer_batch_size)) {
            producer_batch_size = "16384";
        }
        String producer_linger_ms = properties.getProperty("producer.linger.ms");
        if (StringUtils.isEmpty(producer_linger_ms)) {
            producer_linger_ms = "1";
        }
        String producer_buffer_memory = properties.getProperty("producer.buffer.memory");
        if (StringUtils.isEmpty(producer_buffer_memory)) {
            producer_buffer_memory = "33554432";
        }

        String producer_metadata_fetch_timeout_ms = properties.getProperty("metadata.fetch.timeout.ms");
        if (StringUtils.isEmpty(producer_metadata_fetch_timeout_ms)) {
            producer_metadata_fetch_timeout_ms = "" + (60 * 5 * 1000);
        }



        //非阻塞
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrap_servers);// 服务器ip:端口号,集群用逗号分隔
        //尝试50秒时间,发送不出去就当失败
        props.put("max.block.ms", 50000);// 该配置控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 将阻塞多长时间。此外这些方法被阻止,也可能是因为缓冲区已满或元数据不可用。在用户提供的序列化程序或分区器中的锁定不会计入此超时
        props.put("acks", producer_acks); // “所有”设置将导致记录的完整提交阻塞,最慢的,但最持久的设置。
        props.put("retries", Integer.valueOf(producer_retries)); // 如果请求失败,生产者也会自动重试,即使设置成0
        props.put("batch.size", Integer.valueOf(producer_batch_size));
        props.put("linger.ms", Integer.valueOf(producer_linger_ms)); // 默认立即发送,这里这是延时毫秒数
        props.put("buffer.memory", Integer.valueOf(producer_buffer_memory)); // 生产者缓冲大小,当缓冲区耗尽后,额外的发送调用将被阻塞。时间超过max.block.ms将抛出TimeoutException
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CustomPartitioner.class.getCanonicalName());//自定义分区函数
        props.put("metadata.fetch.timeout.ms", Integer.valueOf(producer_metadata_fetch_timeout_ms));// 指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调)。

        kafkaProducer = new KafkaProducer<String, String>(props);

        //阻塞
        props_sync.put("bootstrap.servers", bootstrap_servers);// 服务器ip:端口号,集群用逗号分隔
        props_sync.put("max.block.ms", 5000);// 该配置控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 将阻塞多长时间。此外这些方法被阻止,也可能是因为缓冲区已满或元数据不可用。在用户提供的序列化程序或分区器中的锁定不会计入此超时
        props_sync.put("request.timeout.ms", 15000);// 该配置控制客户端等待请求响应的最长时间。如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 这应该大于replica.lag.time.max.ms,以减少由于不必要的生产者重试引起的消息重复的可能性。
        //props.put("replica.lag.time.max.ms", 18000);//
        props_sync.put("acks", producer_acks); // “所有”设置将导致记录的完整提交阻塞,最慢的,但最持久的设置。
        props_sync.put("retries", Integer.valueOf(producer_retries)); // 如果请求失败,生产者也会自动重试,即使设置成0
        props_sync.put("batch.size", Integer.valueOf(producer_batch_size));
        props_sync.put("linger.ms", Integer.valueOf(producer_linger_ms)); // 默认立即发送,这里这是延时毫秒数
        props_sync.put("buffer.memory", Integer.valueOf(producer_buffer_memory)); // 生产者缓冲大小,当缓冲区耗尽后,额外的发送调用将被阻塞。时间超过max.block.ms将抛出TimeoutException
        props_sync.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props_sync.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props_sync.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CustomPartitioner.class.getCanonicalName());//自定义分区函数

        props_sync.put("metadata.fetch.timeout.ms", Integer.valueOf(producer_metadata_fetch_timeout_ms));// 指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调)。


    }

    //非阻塞
    @Override
    public void send(Message message) throws Exception {
        send(message, null);
    }

    @Override
    public void send(Message message, final SendCallback callback) throws Exception {
         if(null == message) {
             logger.error("消息对象为空");
             return;
         }

         String topic = message.getTopic();
         if(StringUtils.isEmpty(topic)) {
             throw new ProducerException("主题(topic)不允许为空");
         }

         String key = message.getKey();
         if(StringUtils.isEmpty(key)) {
             throw new ProducerException("消息key不允许为空");
         }

         String value = JSONObject.toJSONString(message);


         if(kafkaProducer == null) {
             logger.error("kafkaProducer 为null...........");
             return;
         }

         ProducerRecord<String, String> record = new ProducerRecord<String,String>(topic,key,value);
         long startTime = System.currentTimeMillis();

         kafkaProducer.send(record);
         long endTime = System.currentTimeMillis();
         logger.info("响应时间[" + ((endTime - startTime) / 1000.0) + "秒]" );

    }

    //同步、阻塞
    @Override
    public Reply sendSync(Message message) {
         kafkaProducer_sync = new KafkaProducer<String, String>(props_sync);
         Reply reply = new Reply();
         if(null == message) {
             logger.error("要发送的消息对象为空...............");
             reply.setStatus(0);
             reply.setMsg("要发送的消息对象不允许为空。");
             return reply;
         }

         String topic = message.getTopic();
         if(StringUtils.isEmpty(topic)) {
             logger.error("要发送的消息主题为空...............");
             reply.setStatus(0);
             reply.setMsg("要发送的消息主题不允许为空。");
             return reply;
         }
         String key = message.getKey();
         if(StringUtils.isEmpty(key)) {
             logger.error("要发送的消息key为空...............");
             reply.setStatus(0);
             reply.setMsg("要发送的消息key不允许为空。");
             return reply;
         }

         String value = JSONObject.toJSONString(message);
         System.out.println(value);
         if(kafkaProducer == null) {
             logger.error("kafkaProducer 为null...........");
             reply.setStatus(0);
             reply.setMsg("生产者Producer为空");
             return reply;
         }
         long startTime = System.currentTimeMillis();
        //这里通过Future.get()方法,阻塞当前线程,等待Kafka服务端的ACK响应
         try {
             reply.setStatus(1);
             ProducerRecord<String, String> record = new ProducerRecord<String,String>(topic, key,value);
             Future<RecordMetadata> future = kafkaProducer_sync.send(record);
             RecordMetadata recordMetadata = future.get();
             if (recordMetadata == null) {
                 reply.setStatus(0);
                 reply.setMsg("发送失败:recordMetadata为空");
                 return reply;
             }
         } catch (Exception e) {
             reply.setStatus(0);
             if(e instanceof java.util.concurrent.ExecutionException) {
                 if(e.getCause() != null && e.getCause() instanceof org.apache.kafka.common.errors.TimeoutException) {
                     reply.setMsg("发送超时:"+e.getMessage());
                     kafkaProducer_sync.close();
                 }else {
                     reply.setMsg("发送失败");
                 }
             }else {
                 reply.setMsg("发送失败");
             }
             e.printStackTrace();
             return reply;
         }finally {
             //kafkaProducer_sync.close();
        }
         long endTime = System.currentTimeMillis();
         reply.setMsg("发送成功! 响应时间["+ ((endTime - startTime) / 1000.0) + "秒]");
          logger.info("sendSync 响应时间[" + ((endTime - startTime) / 1000.0) + "秒]");
          return reply;
    }

}

其中,在 sendSync 中 ,是否有必要在入口第一步的时候,每次都初始化一次:

kafkaProducer_sync = new KafkaProducer(props_sync);

其次是, kafkaProducer_sync.close(); 在什么时机下使用才合适?