返回到文章

采纳

编辑于

【生产环境】向kafka生产数据,报数据超时,有producer I/O异常

kafka

warn信息

2019-10-24 14:28:06.105 [kafka-producer-network-thread | producer-17] WARN  com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_REPORT      retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 181 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_REPORT-14
2019-10-24 14:28:06.178 [kafka-producer-network-thread | producer-3] WARN  com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_SDKINFO      retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 164 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_GRAN-10
2019-10-24 14:28:06.404 [kafka-producer-network-thread | producer-20] WARN  com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_APPURLSTATS retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 290 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_URL-10
2019-10-24 14:28:06.405 [kafka-producer-network-thread | producer-17] WARN  com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_REPORT      retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 181 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_REPORT-14
2019-10-24 14:28:06.479 [kafka-producer-network-thread | producer-3] WARN  com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_SDKINFO      retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 164 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_GRAN-10
2019-10-24 14:28:06.704 [kafka-producer-network-thread | producer-20] WARN  com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_APPURLSTATS retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 290 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_URL-10
2019-10-24 14:28:06.706 [kafka-producer-network-thread | producer-17] WARN  com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_REPORT      retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 181 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_REPORT-14
2019-10-24 14:28:06.779 [kafka-producer-network-thread | producer-3] WARN  com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_SDKINFO      retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 164 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_GRAN-10

error:

screenshot

对应生产数据代码块:其中delivery.add跟生产者的send方法一样,delivery做了一层很简单的封装。

public class DeliveryUtils {
    private static final Logger logger = LoggerFactory.getLogger(DeliveryUtils.class);

    static AtomicBoolean retringSignal = new AtomicBoolean(false);

    public static void sendDataToDelivery(Delivery delivery, String tableName, Map<String, Object> data, String dataSource, String topic) {
        delivery.add(tableName, data, new Callback() {
            int retryTime = 0;
            @Override
            public void onSuccess(int i) {
                retringSignal.getAndSet(false);
                logger.debug("send data to delievery success! tableName:{}, dataLength:{}", tableName, i);
            }
            @Override
            public void onFail(Exception e) {
                retringSignal.getAndSet(true);
                retryTime++;
                printExceptionInfoAndSleep(300, tableName + "\tretryTimes" + retryTime, e);
                if(retryTime > 10){
                    // 多次重试之后改用新delivery对象
                    DeliveryPool.removeAndPolish(dataSource, tableName, delivery);
                    Delivery newDelivery = DeliveryPool.getDelivery(dataSource, topic, false);
                    newDelivery.add(tableName, data, this);
                    DeliveryPool.removeAndPolish(dataSource, tableName, newDelivery);
                }else{
                    delivery.add(tableName, data, this);
                }
            }
        });
        getGoOnSignal();
    }

    /**
     * 打印send 异常信息并暂停 seconds 时间等待重试
     *
     * @param milliseconds 单位 ms
     */
    private static void printExceptionInfoAndSleep(int milliseconds, String extraInfo, Exception e) {
        logger.warn("send to delivery data failed! will to retry after {}ms, extraInfo:{}, exception info :{}", milliseconds, extraInfo, e.getMessage());
        try {
            TimeUnit.MILLISECONDS.sleep(milliseconds);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
    }

    private static void getGoOnSignal(){
        while(retringSignal.get()){
            try {
                TimeUnit.MILLISECONDS.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

kakfa 生产者

 if (isLogCheckDataSource(dataSourceName)) {
            producerParams.put("bootstrap.servers", CommonConfig.DELIVERY_KAFKA_TRACK_LOG_BOOTSTRAP_SERVERS);
            producerParams.put("compression.type", CommonConfig.DELIVERY_KAFKA_TRACK_LOG_COMPRESSION_TYPE);
        } else {
            producerParams.put("bootstrap.servers", CommonConfig.DELIVERY_KAFKA_BOOTSTRAP_SERVERS);
            producerParams.put("compression.type", CommonConfig.DELIVERY_KAFKA_COMPRESSION_TYPE);
        }
        producerParams.put("key.serializer", ByteArraySerializer.class.getName());
        producerParams.put("value.serializer", ByteArraySerializer.class.getName());
        producerParams.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 3145728);
        producerParams.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);
        producerParams.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        producerParams.put("acks", "1");

我的重试发送方案基本无用,因为达不到retryTimes,这是什么原因呢,同一个callback,retry次数应该是增加的?

然后这个报错一般是什么原因,该怎么解决呢?