【生产环境】向kafka生产数据,报数据超时,有producer I/O异常

SmallHeart 发表于: 2019-10-24   最后更新时间: 2019-10-24 15:26:26   4,195 游览

warn信息

2019-10-24 14:28:06.105 [kafka-producer-network-thread | producer-17] WARN  com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_REPORT      retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 181 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_REPORT-14
2019-10-24 14:28:06.178 [kafka-producer-network-thread | producer-3] WARN  com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_SDKINFO      retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 164 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_GRAN-10
2019-10-24 14:28:06.404 [kafka-producer-network-thread | producer-20] WARN  com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_APPURLSTATS retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 290 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_URL-10
2019-10-24 14:28:06.405 [kafka-producer-network-thread | producer-17] WARN  com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_REPORT      retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 181 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_REPORT-14
2019-10-24 14:28:06.479 [kafka-producer-network-thread | producer-3] WARN  com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_SDKINFO      retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 164 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_GRAN-10
2019-10-24 14:28:06.704 [kafka-producer-network-thread | producer-20] WARN  com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_APPURLSTATS retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 290 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_URL-10
2019-10-24 14:28:06.706 [kafka-producer-network-thread | producer-17] WARN  com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_REPORT      retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 181 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_REPORT-14
2019-10-24 14:28:06.779 [kafka-producer-network-thread | producer-3] WARN  com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_SDKINFO      retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 164 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_GRAN-10

error:

screenshot

对应生产数据代码块:其中delivery.add跟生产者的send方法一样,delivery做了一层很简单的封装。

public class DeliveryUtils {
    private static final Logger logger = LoggerFactory.getLogger(DeliveryUtils.class);

    static AtomicBoolean retringSignal = new AtomicBoolean(false);

    public static void sendDataToDelivery(Delivery delivery, String tableName, Map<String, Object> data, String dataSource, String topic) {
        delivery.add(tableName, data, new Callback() {
            int retryTime = 0;
            @Override
            public void onSuccess(int i) {
                retringSignal.getAndSet(false);
                logger.debug("send data to delievery success! tableName:{}, dataLength:{}", tableName, i);
            }
            @Override
            public void onFail(Exception e) {
                retringSignal.getAndSet(true);
                retryTime++;
                printExceptionInfoAndSleep(300, tableName + "\tretryTimes" + retryTime, e);
                if(retryTime > 10){
                    // 多次重试之后改用新delivery对象
                    DeliveryPool.removeAndPolish(dataSource, tableName, delivery);
                    Delivery newDelivery = DeliveryPool.getDelivery(dataSource, topic, false);
                    newDelivery.add(tableName, data, this);
                    DeliveryPool.removeAndPolish(dataSource, tableName, newDelivery);
                }else{
                    delivery.add(tableName, data, this);
                }
            }
        });
        getGoOnSignal();
    }

    /**
     * 打印send 异常信息并暂停 seconds 时间等待重试
     *
     * @param milliseconds 单位 ms
     */
    private static void printExceptionInfoAndSleep(int milliseconds, String extraInfo, Exception e) {
        logger.warn("send to delivery data failed! will to retry after {}ms, extraInfo:{}, exception info :{}", milliseconds, extraInfo, e.getMessage());
        try {
            TimeUnit.MILLISECONDS.sleep(milliseconds);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
    }

    private static void getGoOnSignal(){
        while(retringSignal.get()){
            try {
                TimeUnit.MILLISECONDS.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

kakfa 生产者

 if (isLogCheckDataSource(dataSourceName)) {
            producerParams.put("bootstrap.servers", CommonConfig.DELIVERY_KAFKA_TRACK_LOG_BOOTSTRAP_SERVERS);
            producerParams.put("compression.type", CommonConfig.DELIVERY_KAFKA_TRACK_LOG_COMPRESSION_TYPE);
        } else {
            producerParams.put("bootstrap.servers", CommonConfig.DELIVERY_KAFKA_BOOTSTRAP_SERVERS);
            producerParams.put("compression.type", CommonConfig.DELIVERY_KAFKA_COMPRESSION_TYPE);
        }
        producerParams.put("key.serializer", ByteArraySerializer.class.getName());
        producerParams.put("value.serializer", ByteArraySerializer.class.getName());
        producerParams.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 3145728);
        producerParams.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);
        producerParams.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        producerParams.put("acks", "1");

我的重试发送方案基本无用,因为达不到retryTimes,这是什么原因呢,同一个callback,retry次数应该是增加的?

然后这个报错一般是什么原因,该怎么解决呢?

发表于 2019-10-24
添加评论

java.util.ConcurrentModificationException异常

  • 使用iterator遍历集合的同时对集合进行修改就会出现这种异常。
SmallHeart -> 半兽人 5年前

嗯嗯,这也是kafka producer内部的行为,那发生这种行为的原因,以及造成数据过期数据发不上去的原因以及解决方式就是目前需要知道的

半兽人 -> SmallHeart 5年前

在补充多一些你的发送逻辑,
另外,callback不能利用将自身传回去再重新发送的,很可能是这块导致的。

SmallHeart -> 半兽人 5年前

这是那里面的两个类。不知道怎么重新编辑问题,就贴这里了。

DeliveryPool .java

public class DeliveryPool {

    private static final ConcurrentHashMap<String, ConcurrentLinkedQueue<Delivery>> deliveries = new ConcurrentHashMap<>();

    private static final Object lock = new Object();

    private static final int PEER_SIZE = 4;

    private static final Logger logger = LoggerFactory.getLogger(DeliveryPool.class);

    /**
     * 获取delivery对象
     *
     * @param dataSource
     * @param topic
     * @param autoReturn 自动归还
     * @return
     */
    public static Delivery getObj(String dataSource, String topic, boolean autoReturn) {
        ConcurrentLinkedQueue<Delivery> queue = DeliveryPool.deliveries.get(dataSource);
        if (queue == null) {
            synchronized (lock) {
                queue = DeliveryPool.deliveries.get(dataSource);
                if (queue == null) {
                    queue = new ConcurrentLinkedQueue<>();
                    queue.addAll(DeliveryFactory.build(dataSource, topic, PEER_SIZE));
                    DeliveryPool.deliveries.put(dataSource, queue);
                }
            }
        }

        Delivery poll = queue.poll();
        while (poll == null) {
            try {
                TimeUnit.MILLISECONDS.sleep(20);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            poll = queue.poll();
        }
        if (autoReturn) {
            returnObj(dataSource, poll);
        }
        return poll;
    }

    /**
     * 归还
     *
     * @param dataSource
     */
    public static void returnObj(String dataSource, Delivery delivery) {
        DeliveryPool.deliveries.get(dataSource).add(delivery);
    }

    /**
     * 补齐
     *
     * @param dataSource
     * @param topic
     */
    public static void polishPool(String dataSource, String topic) {
        ConcurrentLinkedQueue<Delivery> queue = DeliveryPool.deliveries.get(dataSource);
        if (queue == null || queue.size() < PEER_SIZE) {
            synchronized (lock) {
                queue = DeliveryPool.deliveries.get(dataSource);
                int polishSize = 0;
                if (queue == null) {
                    polishSize = PEER_SIZE;
                    queue = new ConcurrentLinkedQueue<>();
                } else if (queue.size() < PEER_SIZE) {
                    polishSize = PEER_SIZE - queue.size();
                }
                if(polishSize > 0){
                    queue.addAll(DeliveryFactory.build(dataSource, topic, polishSize));
                    logger.info("delivery pool, success to polish delivery size:{}", polishSize);
                }
                DeliveryPool.deliveries.put(dataSource, queue);
            }
        }
    }

    /**
     * 先移除再补齐
     * @param dataSource
     * @param topic
     * @param delivery
     */
    public static void removeAndPolish(String dataSource, String topic, Delivery delivery){
        remove(dataSource, delivery);
        polishPool(dataSource, topic);
    }

    /**
     * 移除 并 进行资源close
     * @param dataSource
     * @param delivery
     */
    public static void remove(String dataSource, Delivery delivery){
        ConcurrentLinkedQueue<Delivery> queue = DeliveryPool.deliveries.get(dataSource);
        if(queue != null){
            queue.remove(delivery);
            logger.info("deliverey pool, success to remove delivery from pool name:{}", dataSource);
            close(dataSource, delivery);
        }
    }

    /**
     * 调用close方法关闭流并执行flush,此后此delivery不可用
     * @param delivery
     */
    public static void close(String dataSource, Delivery delivery){
        try {
            delivery.close();
        } catch (IOException e) {
            logger.error("deliverey pool, close delivery error! datasource:{}", dataSource);
        }
    }

    /**
     * 获取delivery  获取一个链接用,不从池中取出
     * @param dataSourceName
     * @param topic
     * @return
     */
    public static Delivery getDelivery(String dataSourceName, String topic) {
        return DeliveryPool.getObj(dataSourceName, topic, true);
    }

    /**
     * 获取delivery  获取一个链接用,根据autoReturn说明是否从池中取出
     * @param dataSourceName
     * @param topic
     * @param autoReturn
     * @return
     */
    public static Delivery getDelivery(String dataSourceName, String topic, boolean autoReturn) {
        return DeliveryPool.getObj(dataSourceName, topic, autoReturn);
    }


}

DeliveryFactory.java

public class DeliveryFactory {

    private static final Logger logger = LoggerFactory.getLogger(DeliveryFactory.class);

    protected static List<Delivery> build(String dataSourceName, String topic, int num) {
        ArrayList<Delivery> lists = new ArrayList<>();
        for (int i = 0; i < num; i++) {
            lists.add(create(dataSourceName, topic));
        }
        logger.info("delivery create, req to build {} delivery, and resp {} delivery", num, lists);
        return lists;
    }

    protected static Delivery build(String dataSourceName, String topic) {
        return create(dataSourceName, topic);
    }

    private static Delivery create(String dataSourceName, String topic) {
        Map<String, Object> producerParams = new HashMap<>(15);
        if (isLogCheckDataSource(dataSourceName)) {
            producerParams.put("bootstrap.servers", CommonConfig.DELIVERY_KAFKA_TRACK_LOG_BOOTSTRAP_SERVERS);
            producerParams.put("compression.type", CommonConfig.DELIVERY_KAFKA_TRACK_LOG_COMPRESSION_TYPE);
        } else {
            producerParams.put("bootstrap.servers", CommonConfig.DELIVERY_KAFKA_BOOTSTRAP_SERVERS);
            producerParams.put("compression.type", CommonConfig.DELIVERY_KAFKA_COMPRESSION_TYPE);
        }
        producerParams.put("key.serializer", ByteArraySerializer.class.getName());
        producerParams.put("value.serializer", ByteArraySerializer.class.getName());
        producerParams.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 3145728);
        producerParams.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);
        producerParams.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        producerParams.put("acks", "1");

        producerParams.putAll(CommonConfig.DELIVERY_KAFKA_PRODUCER_PARAMS);
        // kafka加密参数
        producerParams.putAll(CommonConfig.getKafkaEncryptParams());
        Delivery delivery = null;
        try {
            logger.info("create delivery , dataSourceName : {}, topic : {}", dataSourceName, topic);
            delivery = new Delivery.Builder().setMetadataURL(CommonConfig.DELIVERY_CONNECTION_METADATA_URL).setUsername(CommonConfig.DELIVERY_CONNECTION_USERNAME).setToken(CommonConfig.DELIVERY_CONNECTION_TOKEN).setDataSource(dataSourceName).setProducerParams(producerParams).setTopic(topic).setVersion(Delivery.KafkaVersion.VERSION_10).build();
        } catch (Exception e) {
            logger.error("create delivery error, metadataUrl:{}, params:{}.\n", CommonConfig.DELIVERY_CONNECTION_METADATA_URL, producerParams, e);
        }
        logger.info("create delivery");
        return delivery;
    }

    /**
     * 是否是DELIVERY_DATASOURCE_LOG数据源
     *
     * @param dataSourceName
     * @return
     */
    public static boolean isLogCheckDataSource(String dataSourceName) {
        return dataSourceName.startsWith(CommonConfig.DELIVERY_DATASOURCE_LOG);
    }
}
SmallHeart -> SmallHeart 5年前

再补充个内部封装的Delivery的源码

public class Delivery implements Closeable, Flushable {
    private String metadataURL;
    private String username;
    private String token;
    private String dataSource;
    private Map<String, Object> producerParams;
    private String topic;
    private AtomicBoolean start = new AtomicBoolean(false);
    private MetaDataHelper mateDataHelper;
    private Producer producer;
    private KafkaVersion version;
    private ProbabilityPatition partitionStrategy = new ProbabilityPatition();
    private Delivery() {}
    public static enum KafkaVersion{
        VERSION_8,
        VERSION_10;
    }

    private void bootstrap() throws IOException{
        Preconditions.checkArgument(!Strings.isNullOrEmpty(metadataURL), "metadataURL cannot be null");
        Preconditions.checkArgument(!Strings.isNullOrEmpty(dataSource), "dataSource cannot be null");
        Preconditions.checkArgument(!Strings.isNullOrEmpty(topic), "topic cannot be null");
        Preconditions.checkArgument(version != null, "kafka version cannot be null");
        Preconditions.checkArgument(producerParams != null && !producerParams.isEmpty(), "producerParams cannot be null");

        try {
            mateDataHelper = new MetaDataHelper(metadataURL, username, token, dataSource);
            mateDataHelper.start();
            switch(version) {
            case VERSION_10:
                producer = new Producer10(topic, producerParams);
                break;
            case VERSION_8:
                producer = new Producer8(topic, producerParams);
                break;
            }
            producer.connect();
        } catch (Exception e) {
            if(mateDataHelper != null) {
                mateDataHelper.close();
            }
            if(producer != null) {
                producer.close();
            }
            throw e;
        }
        start.set(true);
    }
    /**
     *  添加一条数据
     * @param tableName 表名
     * @param row 行数据
     * @deprecated Use {@link #add(String, Map, Object)} instead.
     */
    @Deprecated
    public void add(String tableName, Map<String, Object> row) {
        add(tableName, row, null);
    }

    /**
     *     添加一批消息
     * @param tableName 表名
     * @param rows 多行数据
     * @param callback 回调函数,可以返回入库结果,如果为null,不会返回异常信息
     * @deprecated Use {@link #add(String, Map, Callback, Object)} instead
     */
    @Deprecated
    public void add(String tableName, List<Map<String, Object>> rows, Callback callback) {
        for(Map<String, Object> row : rows) {
            add(tableName, row, callback);
        }
    }
    /**
     *   添加一条数据并绑定回调函数
     * @param tableName 表名
     * @param row 行数据
     * @param callback 回调函数,可以返回入库结果,如果为null,不会返回异常信息
     * @deprecated Use {@link #add(String, Map, Callback, Object)} instead
     */
    @Deprecated 
    public void add(String tableName, Map<String, Object> row, Callback callback) {
        add(tableName, row, callback, null, 2);
    }

    /**
     * 添加一条数据
     * @param tableName 表名
     * @param row 行数据
     * @param callback 回调函数,可以返回入库结果,如果为null,不会返回异常信息
     * @param partitionVaue 分区值,一般使用appid字段
     */
    public void add(String tableName, Map<String, Object> row, Object partitionVaue) {
        add(tableName, row, null, partitionVaue);
    }
    /**
     * 添加一条数据并绑定回调函数
     * @param tableName 表名
     * @param row 行数据
     * @param callback 回调函数,可以返回入库结果,如果为null,不会返回异常信息
     * @param partitionVaue 分区值,一般使用appid字段
     */
    public void add(String tableName, Map<String, Object> row, Callback callback, Object partitionVaue) {
        add(tableName, row, callback, partitionVaue, 2);
    }

    /**
     * 添加一条数据并绑定回调函数
     * @param tableName 表名
     * @param row 行数据
     * @param callback 回调函数,可以返回入库结果,如果为null,不会返回异常信息
     * @param partitionVaue 分区值
     * @param partitionNum 分区个数
     */
    public void add(String tableName, Map<String, Object> row, Callback callback, Object partitionVaue, int partitionNum) {
        Preconditions.checkArgument(start.get(), "already closed!");
        if(!mateDataHelper.containsTable(tableName)) {
            if(callback != null) {
                callback.onFail(new TableNotExistException(String.format("datasource: %s, table: %s", dataSource, tableName)));
            }
            return;
        }
        Tuple2<Integer, Map<String, Object>> tuple2 = null; 
        try {
            tuple2 = mateDataHelper.convertField(tableName, row, partitionVaue);
        } catch (FieldNotExistException e) {
            if(callback != null) {
                callback.onFail(e);
            }
            return;
        }
        final int hashCode = tuple2.getLeft();
        final int partition = partitionStrategy.getPartition(tableName, Objects.toString(partitionVaue), hashCode, producer.getPartitionNumForKafka());
        producer.add(partition, JSON.toJSONString(tuple2.getRight()), callback);
    }

    @Override
    public void flush() {
        Preconditions.checkArgument(start.get(), "already closed!");
        producer.flush();
    }

    @Override
    public void close() throws IOException {
        Preconditions.checkArgument(start.getAndSet(false), "already closed!");
        mateDataHelper.close();
        producer.flush();
        producer.close();
    }


    public String getMetadataURL() {
        return metadataURL;
    }

    public String getUsername() {
        return username;
    }

    public String getToken() {
        return token;
    }

    public String getDataSource() {
        return dataSource;
    }

    public Map<String, Object> getProducerParams() {
        return producerParams;
    }

    public KafkaVersion getVersion() {
        return version;
    }

    /**
     * 触发表和字段缓存更新,同步为最新的结构
     */
    public void reloadCache() {
        this.mateDataHelper.updateMateData();
    }

    public static class Builder {
        private String metadataURL;
        private String username;
        private String token;
        private String dataSource;
        private Map<String, Object> producerParams;
        private String topic;
        private KafkaVersion version = KafkaVersion.VERSION_10;
        /**
         * 设置元信息更新地址
         * @param metadataURL
         * @return
         */
        public Builder setMetadataURL(String metadataURL) {
            this.metadataURL = metadataURL;
            return this;
        }

        public String getUsername() {
            return username;
        }
        /**
         * 设置用户名
         * @param username
         * @return
         */
        public Builder setUsername(String username) {
            this.username = username;
            return this;
        }

        public String getToken() {
            return token;
        }

        /**
         * 设置令牌
         * @param token
         * @return
         */
        public Builder setToken(String token) {
            this.token = token;
            return this;
        }

        public String getDataSource() {
            return dataSource;
        }
        /**
         * 设置数据源名称,格式为'名称_粒度'
         * @param dataSource
         * @return
         */
        public Builder setDataSource(String dataSource) {
            this.dataSource = dataSource;
            return this;
        }

        public Map<String, Object> getProducerParams() {
            return producerParams;
        }

        /**
         * 设置生产者参数
         * @param producerParams
         * @return
         */
        public Builder setProducerParams(Map<String, Object> producerParams) {
            this.producerParams = producerParams;
            return this;
        }

        public String getMetadataURL() {
            return metadataURL;
        }

        public String getTopic() {
            return topic;
        }

        public Builder setTopic(String topic) {
            this.topic = topic;
            return this;
        }

        public KafkaVersion getVersion() {
            return version;
        }

        public Builder setVersion(KafkaVersion version) {
            this.version = version;
            return this;
        }

        public Delivery build() throws IOException {
            Delivery delivery = new Delivery();
            delivery.metadataURL = this.metadataURL;
            delivery.username = this.username;
            delivery.token = this.token;
            delivery.dataSource = this.dataSource;
            delivery.producerParams = this.producerParams;
            delivery.topic = this.topic;
            delivery.version = version;
            delivery.bootstrap();
            return delivery;
        }
    }

}
半兽人 -> SmallHeart 5年前

好长,先把new Callback的这个去掉,无法在这里调用的,我怀疑是这里导致的

newDelivery.add(tableName, data, this);
SmallHeart -> 半兽人 5年前

这个我在测试环境测了可以在这里调用,模拟的是buffser.size很小的一个值然后发大包数据引发生产不上去,然后这里会retrytimes达到10然后走这里可以发上去。

然后看上面的retrytimes其实一直都没达到10次,所以这段代码没有走到。看了下报错对应源码,好像意思是说该分区的请求响应超时,超过了requestTimeout
,如上报错的Sender.java205行,这个requestTimeout就是request.timeout.ms这个值,这个值我的生产者没配那就是默认值30000,但是这warn日志报的很频繁,几乎1s有近10条的刷的频率,似乎是单个patition很多batch数据都被判定为超时,但应该不会这么频繁吧,30s没这么快吧。从warn日志提示看,又是说请求patition元信息超时

List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
SmallHeart -> 半兽人 5年前

然后看我原问题的warn日志部分,第一行和第四行。两个日志间隔刚好300ms,并且同一producer线程,同一分区。这就奇怪了,这就说明发生了onFail里的重试,那重试就retrytime应该要加1,因为调用的callback对象是同一个,这个现象就好像两个callback不是同一个对象一样,就好像按您说的这里无法调用一样。

producer有自己的重试策略,但是我也没有配,那就默认值是0次,所以这里也不是producer自己重试的,就是走到了onFail这里对吧?
那这个retryTime没有自增就很奇怪了

半兽人 -> SmallHeart 5年前

callback每次都是new的,传递到客户端里用于回调,而你即使重新走原来的逻辑,也是new的,不会加1。

SmallHeart -> 半兽人 5年前

那日志里其实也有加了1的,只是加的很难一样,要很多个1,1,1,1之后才加个1变成2,并且自己测试的时候,是会加1的,如下是今天早些时候报的错

2019-10-24 03:01:47.494 [kafka-producer-network-thread | producer-6] WARN  com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_VIEW        retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 124 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_GRAN-9
2019-10-24 03:01:47.795 [kafka-producer-network-thread | producer-6] WARN  com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_VIEW        retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 124 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_GRAN-9
2019-10-24 03:01:48.095 [kafka-producer-network-thread | producer-6] WARN  com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_VIEW        retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 124 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_GRAN-9
2019-10-24 03:01:48.396 [kafka-producer-network-thread | producer-6] WARN  com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_VIEW        retryTimes:1, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 124 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_GRAN-9
2019-10-24 03:01:48.696 [kafka-producer-network-thread | producer-6] WARN  com.bonree.remote.zeus.DeliveryUtils - send to delivery data failed! will to retry after 300ms, extraInfo:T_SDK_STAT_DRU_SDKINFO     retryTimes:2, exception info :org.apache.kafka.common.errors.TimeoutException: Batch containing 124 record(s) expired due to timeout while requesting metadata from brokers for SDK_DELIVERY_GRAN-9
半兽人 -> 半兽人 5年前

我觉得反复的迭代传递producer对象,导致的,重试kafka有自己的策略,会每次调用callback。
另外,producer.add的实现可以贴下,

ps:问题可以编辑,在下一条上一条的上面

半兽人 -> SmallHeart 5年前

kafka生产者自己的重试,会调用同一个callback,这个会+1,而你本身又重新发送了消息的是新new callback。

SmallHeart -> 半兽人 5年前

传this也没用嘛

SmallHeart -> 半兽人 5年前
    @Override
    public void add(int partition, String value, Callback callback) {
        ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, partition, null, value.getBytes(Charsets.UTF_8));

        if(callback != null) {
            producer.send(record, new org.apache.kafka.clients.producer.Callback() {
                @Override
                public void onCompletion(RecordMetadata arg0, Exception arg1) {
                    if(arg1 == null) {
                        callback.onSuccess(arg0.serializedKeySize() + arg0.serializedValueSize());
                    }else {
                        callback.onFail(new MsgSendFailException(arg1));
                    }
                }
            });
        } else {
            producer.send(record);
        }
    }
半兽人 -> SmallHeart 5年前

callback的对象只实例化了一次,所有的回调通知共用这个对象,重点关注下onFail()的逻辑。
共享对象的300ms,一条300ms,那大家都得排着。
不知道你上面那个onFail()是不是你的逻辑,有休眠,有处理等等,还有重置生产传递对象,线程的可见性也要注意用volidate修饰。

SmallHeart -> 半兽人 5年前

onFail是我的逻辑,重试的逻辑。不知道这样设计有没有问题?我们系统不能丢一条数据,所以才设计的这个重试逻辑。
还有个疑点是自制的线程池那种方式应该没问题的吧?
还有那个getSignal..()的设计感觉也没问题?
另外,onComplete方法应该是kafka生产者自有的重试规则的次数试完之后仍然失败才会在这个回调方法里表现发送失败对吧?而不是试一次就回调一次

半兽人 -> SmallHeart 5年前

是的,callback你是单例,并且大家都会共同触发你的逻辑。
线程池疑点是对的,并发操作异常那块producer看到的消息缓存队列很可能是同一个,就用1个试试。

不知道你的并发量有多大。你可以producer.send(record).get();同步发送(性能会打折扣)。

你不想丢消息这么设计太复杂了,考虑过度,导致你的程序过度复杂。

  1. producer如果失效本身会自动重连,无需你自己替换。

  2. 同步发送失败信息可以直接捕获,我们捕获后直接打日志,然后人工处理(我们已经4年了,没有人工处理过,只要不是kill -9,普通的关闭是不会有问题的),另外一种异步callback将失败的加到失败队列里,从上层重新发送。

你可以参考这个例子,同步的,我们支付业务用的,运行5年没出过问题,没丢过消息。
https://www.orchome.com/1056

SmallHeart -> 半兽人 5年前

好的,我先按照您的点评和建议优化下重新测试下,现在看来确实太复杂了,看的头痛,假装不是自己写的然后心里默念哪个傻逼写的。
然后我们数据量是近pb级别的,所以就考虑的异步

你的答案

查看kafka相关的其他问题或提一个您自己的问题