warn信息
2019-10-24 14:28:06.105 [kafka-producer-network-thread | producer-17] WARN com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_REPORT retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 181 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_REPORT-14
2019-10-24 14:28:06.178 [kafka-producer-network-thread | producer-3] WARN com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_SDKINFO retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 164 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_GRAN-10
2019-10-24 14:28:06.404 [kafka-producer-network-thread | producer-20] WARN com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_APPURLSTATS retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 290 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_URL-10
2019-10-24 14:28:06.405 [kafka-producer-network-thread | producer-17] WARN com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_REPORT retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 181 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_REPORT-14
2019-10-24 14:28:06.479 [kafka-producer-network-thread | producer-3] WARN com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_SDKINFO retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 164 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_GRAN-10
2019-10-24 14:28:06.704 [kafka-producer-network-thread | producer-20] WARN com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_APPURLSTATS retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 290 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_URL-10
2019-10-24 14:28:06.706 [kafka-producer-network-thread | producer-17] WARN com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_REPORT retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 181 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_REPORT-14
2019-10-24 14:28:06.779 [kafka-producer-network-thread | producer-3] WARN com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_SDKINFO retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 164 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_GRAN-10
error:
对应生产数据代码块:其中delivery.add
跟生产者的send方法一样,delivery做了一层很简单的封装。
public class DeliveryUtils {
private static final Logger logger = LoggerFactory.getLogger(DeliveryUtils.class);
static AtomicBoolean retringSignal = new AtomicBoolean(false);
public static void sendDataToDelivery(Delivery delivery, String tableName, Map<String, Object> data, String dataSource, String topic) {
delivery.add(tableName, data, new Callback() {
int retryTime = 0;
@Override
public void onSuccess(int i) {
retringSignal.getAndSet(false);
logger.debug("send data to delievery success! tableName:{}, dataLength:{}", tableName, i);
}
@Override
public void onFail(Exception e) {
retringSignal.getAndSet(true);
retryTime++;
printExceptionInfoAndSleep(300, tableName + "\tretryTimes" + retryTime, e);
if(retryTime > 10){
// 多次重试之后改用新delivery对象
DeliveryPool.removeAndPolish(dataSource, tableName, delivery);
Delivery newDelivery = DeliveryPool.getDelivery(dataSource, topic, false);
newDelivery.add(tableName, data, this);
DeliveryPool.removeAndPolish(dataSource, tableName, newDelivery);
}else{
delivery.add(tableName, data, this);
}
}
});
getGoOnSignal();
}
/**
* 打印send 异常信息并暂停 seconds 时间等待重试
*
* @param milliseconds 单位 ms
*/
private static void printExceptionInfoAndSleep(int milliseconds, String extraInfo, Exception e) {
logger.warn("send to delivery data failed! will to retry after {}ms, extraInfo:{}, exception info :{}", milliseconds, extraInfo, e.getMessage());
try {
TimeUnit.MILLISECONDS.sleep(milliseconds);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
private static void getGoOnSignal(){
while(retringSignal.get()){
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
kakfa 生产者
if (isLogCheckDataSource(dataSourceName)) {
producerParams.put("bootstrap.servers", CommonConfig.DELIVERY_KAFKA_TRACK_LOG_BOOTSTRAP_SERVERS);
producerParams.put("compression.type", CommonConfig.DELIVERY_KAFKA_TRACK_LOG_COMPRESSION_TYPE);
} else {
producerParams.put("bootstrap.servers", CommonConfig.DELIVERY_KAFKA_BOOTSTRAP_SERVERS);
producerParams.put("compression.type", CommonConfig.DELIVERY_KAFKA_COMPRESSION_TYPE);
}
producerParams.put("key.serializer", ByteArraySerializer.class.getName());
producerParams.put("value.serializer", ByteArraySerializer.class.getName());
producerParams.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 3145728);
producerParams.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);
producerParams.put(ProducerConfig.LINGER_MS_CONFIG, 5);
producerParams.put("acks", "1");
我的重试发送方案基本无用,因为达不到retryTimes,这是什么原因呢,同一个callback,retry次数应该是增加的?
然后这个报错一般是什么原因,该怎么解决呢?