Paddy

0 声望

这家伙太懒,什么都没留下

个人动态
  • 半兽人 回复 Paddy为什么使用ConsumerRecords获取到的timeStamp为-1 中 :

    你要不要换个新版的试试,我忘了这个有没有时间戳。

    5年前
  • Paddy 回复 半兽人为什么使用ConsumerRecords获取到的timeStamp为-1 中 :

    使用的是2.10-0.10.1.2的版本 时间索引文件是有的就是数据进不去

    5年前
  • 半兽人 回复 Paddy为什么使用ConsumerRecords获取到的timeStamp为-1 中 :

    你kafka什么版本呢,默认情况下,不需要任何配置,时间戳是可以获取到的,除非你的kafka版本过低。
    你可以关注我的微信公众号,里面分享了一个完整的例子。

    5年前
  • Paddy 回复 半兽人为什么使用ConsumerRecords获取到的timeStamp为-1 中 :

    public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "172.30.79.134:6667,172.30.79.135:6667,192.168.1.219:6667,192.168.1.220:6667,192.168.1.221:6667");
    //props.put("bootstrap.servers", "47.107.148.97:9092");
    //props.put("bootstrap.servers", "106.12.73.246:9092");
    //props.put("config","message.timestamp.type=LogAppendTime");
    props.put("group.id", "test-consumer-group");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    Consumer consumer = new KafkaConsumer(props);
    //TopicPartition partition01 = new TopicPartition("gcgTest7", 0);
    //consumer.assign(Arrays.asList(new TopicPartition[]{partition01}));
    //consumer.seek(partition01,10);
    consumer.subscribe(Arrays.asList(new String[]{"testGcg1"}));
    while (true) {
    //--消费100ms内的数据
    ConsumerRecords rs = consumer.poll(100);
    //--遍历得到的结果集
    for (ConsumerRecord r : rs) {
    //--得到信息 打印
    System.out.println("--topic:[" + r.topic() + "]"

                        + "--partition:[" + r.partition() + "]"
                        + "--offset:[" + r.offset() + "]"
                        + "--timestamp:[" + r.timestamp() + "]"
                        + "--key:[" + r.key() + "]"
                        + "--value:[" + r.value() + "]--" +
                        "type="+r.timestampType());}
        }
    }</string,></string,></string,>
    
    5年前
  • 订阅了 kafka 主题! · 5年前
  • Paddy 回复 漂泊的美好kafka的consumer.offsetsForTimes根据timestamp查找offset返回结果为空 中 :

    请问,我在server.properties中已经加入了message.timestamp.type=LogAppendTime这个配置,重启后新建的topic中消费出来的时间还是-1 这个是咋回事

    5年前