kafka通过开始和结束时间戳,消费消息。

半兽人 发表于: 2018-06-20   最后更新时间: 2018-06-20  
  •   268 订阅,144 游览

kafka通过开始和结束时间戳,消费消息。

package com.system.kafka.clients.demo.producer.time;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class ConsumerForTimeTest {

    public static void main(String[] args) {
        String bootstrapServers;
        String topic;
        String groupId;
        String startTime;
        String endTime;

        if(args.length>0){
            bootstrapServers = args[0];
            topic = args[1];
            groupId = args[2];
            startTime = args[3];
            endTime = args[4];
        }

        bootstrapServers = "10.0.21.56:9092";
        topic = "TRADE-NOTIFY";
        groupId = "test_BILLING";
        startTime = "20180619000000";
        endTime = "20180620000000";
        getOffsetsForTimes(bootstrapServers, topic, groupId, startTime, endTime);
    }

    public static void getOffsetsForTimes(String bootstrapServers, String topic, String groupId, String startTime, String endTime) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
        long start = 0;
        long end = 0;
        try {
            start = sdf.parse(startTime).getTime();
            end = sdf.parse(endTime).getTime();
        } catch (ParseException e) {
            e.printStackTrace();
        }

        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("group.id", groupId);
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        TopicPartition topicPartition0 = new TopicPartition(topic, 0);
        TopicPartition topicPartition1 = new TopicPartition(topic, 1);
        Map<TopicPartition, Long> startMap = new HashMap<>();
        startMap.put(topicPartition0, start);
        startMap.put(topicPartition1, start);
        Map<TopicPartition, OffsetAndTimestamp> startOffsetMap = consumer.offsetsForTimes(startMap);
        Map<TopicPartition, Long> endMap = new HashMap<>();
        endMap.put(topicPartition0, end);
        endMap.put(topicPartition1, end);
        Map<TopicPartition, OffsetAndTimestamp> endOffsetMap = consumer.offsetsForTimes(endMap);

        long partition0StartOffset = 0;
        if (startOffsetMap.get(topicPartition0) != null) {
            partition0StartOffset = startOffsetMap.get(topicPartition0).offset();
        }
        long partition0EndOffset = 0;
        if (endOffsetMap.get(topicPartition0) != null) {
            partition0EndOffset = endOffsetMap.get(topicPartition0).offset();
        } else {
            if (partition0StartOffset > 0) {
                partition0EndOffset = consumer.endOffsets(Arrays.asList(topicPartition0)).get(topicPartition0);
            }
        }
        long partition1StartOffset = 0;
        if (startOffsetMap.get(topicPartition1) != null) {
            partition1StartOffset = startOffsetMap.get(topicPartition1).offset();
        }
        long partition1EndOffset = 0;
        if (endOffsetMap.get(topicPartition1) != null) {
            partition1EndOffset = endOffsetMap.get(topicPartition1).offset();
        } else {
            if (partition1StartOffset > 0) {
                partition1EndOffset = consumer.endOffsets(Arrays.asList(topicPartition1)).get(topicPartition1);
            }
        }
        long total = (partition0EndOffset - partition0StartOffset) + (partition1EndOffset - partition1StartOffset);
        System.out.println(topic + " offsets:" + total);

    }
}






发表于: 29天前   最后更新时间: 29天前   游览量:144
上一条: 手把手教你写Kafka Streams程序
下一条:

评论…


  • 评论…
    • in this conversation
      提问