当消费者消费失败了,kafka 是怎么来处理的?

坏孩子 发表于: 2018-03-07   最后更新时间: 2018-03-07 19:07:55   14,787 游览

当消费者消费失败了,kafka 是怎么来处理的?是重试投递 还是如何,下面代码消费端已经配置了手动提交,当我再重启消费端的时候.并没有从消费失败的位置继续消费?

相关配置

package com.jasongj.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;

public class DemoConsumerCommitCallback {


    public static void main(String[] args) throws Exception {


        args = new String[] { "192.168.2.131:9092,192.168.2.132:9092,192.168.2.133:9092", "test3", "group11", "consumer2" };
        if (args == null || args.length != 4) {
            System.err.println(
                    "Usage:\n\tjava -jar kafka_consumer.jar ${bootstrap_server} ${topic_name} ${group_name} ${client_id}");
            System.exit(1);
        }
        String bootstrap = args[0];
        String topic = args[1];
        String groupid = args[2];
        String clientid = args[3];

        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrap);
        props.put("group.id", groupid);
        props.put("enable.auto.commit", "false");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        props.put("max.poll.interval.ms", "300000");
        props.put("max.poll.records", "500");
        props.put("auto.offset.reset", "earliest");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));
        AtomicLong atomicLong = new AtomicLong();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            records.forEach(record -> {
                if (Integer.valueOf(record.value())== 5){
                    System.out.println(1/0);
                }

                System.out.printf("client : %s , topic: %s , partition: %d , offset = %d, key = %s, value = %s%n",
                        clientid, record.topic(), record.partition(), record.offset(), record.key(), record.value());
                if (atomicLong.get() % 10 == 0){
//                    consumer.commitSync();
                    consumer.commitAsync((Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) -> {
                        offsets.forEach((TopicPartition partition, OffsetAndMetadata offset) ->
                            System.out.printf("Commit %s-%s-%s", partition.topic(), partition.partition(), offset.offset())
                        );
                        if(null != null ) {
                            exception.printStackTrace();
                        }
                    });
                }

            });
        }
    }

}
发表于 2018-03-07
添加评论

consumer.commitSync();
这个放到抛异常的后面。

你的答案

查看kafka相关的其他问题或提一个您自己的问题