kafka消费者客户端(0.9.0.1API)

半兽人 发表于: 2016-05-23   最后更新时间: 2016-10-25 22:22:38  
{{totalSubscript}} 订阅, 17,189 游览

kafka客户端从kafka集群消费消息(记录)。它会透明地处理kafka集群中服务器的故障。它获取集群内数据的分区,也和服务器进行交互,允许消费者组进行负载平衡消费。(见下文)。


消费者维持TCP连接到必要的broker来获取消息。故障导致消费者关闭使用,会泄露这些连接,消费者不是线程安全的,可以查看更多关于Multi-threaded(多线程)处理的细节。


偏移量和消费者的位置

kafka为每个分区的每条消息保持偏移量的值,这个偏移量是该分区中一条消息的唯一标示符。也表示消费者在分区的位置。也就是说,一个位置是5的消费者,说明已经消费了0到4的消息(记录)并下一个接收消息的偏移量设置为5。关于的消费者,实际上“位置”有2个概念。


消费者将给出下一个消息的偏移量的位置,这个是消费者在分区中能看到的最后的偏移量,消费者收到的数据称为poll(long)[长轮询],每次接收消息,偏移量会自动增长,


“已提交”的位置是已安全保存的最后偏移量,如果处理失败,这个偏移量会恢复并重新开始。消费者可以自动定期提交偏移量,也可以选择通过调用commitSync来控制,这是阻塞的,直到偏移量提交成功或在提交过程中发生致命的错误,commitAsync是非阻塞式的,当成功或失败时,会引发OffsetCommitCallback。


这个区别是当一条消息已认为已被消费,控制权在消费者,下面我们进一步更详细地讨论。


消费者组和主题订阅

Kafka使用消费者组概念,允许进程池瓜分消费和处理消息的工作。这些进程可以在同一台机器运行,或更可能的是,可以部分到多台机器上,以提供额外的可扩展性和容错性处理。


每个kafka消费者都能配置一个属于它自己的消费者组。并可以动态的配置它需要订阅的主题列表,通过`subscribe(List, ConsumerRebalanceListener)`,或订阅匹配特定模式的主题,通过`subscribe(Pattern, ConsumerRebalanceListener)`,kafka将已订阅主题的每个消费者组中的每条消息发送给一个进程。这是通过在每个组的消费者进程平衡主题的分区来实现的。 所有,如果一个主题有4个分区,并且一个消费者组有2个进程,每个进程将从2个分区来进行消费,这个是动态维护的:如果一个进程故障,分区将重新分派到同组的其他的进程。如果有新的进程加入该组,分区将现有消费者移动到新的进程。


所以,如果2个进程订阅了一个主题,指定不同的组,他们将获取这个主题所有的消息,如果他们指定相同的组,那么它们将每个获取大约一半的消息。

从概念上讲,你可以把消费者组看作一个单一的逻辑用户(订阅者),碰巧组成了多个进程。作为一个多用户系统。kafka也支持任意数量的消费者组提供一个指定的主题不重复的数据,

这是关于常用消息系统功能的简单的概述,类似于传统消息系统中的队列,所有进程将是一个单独的消费组的一部分,因此,消息的交付由该组进行平衡,类似于队列。与传统的消息传递系统不同的是,你可以有多个这样的组。在传统的消息传递系统中,每个进程都有它自己的消费组,所以每个进程都会订阅发布到主题的所有消息。


此外,当组重新分配自动发生,消费者可以通过调用`ConsumerRebalanceListener`通知,这使得他们能够完成必要的应用程序级的逻辑,如状态清除,手动偏移提交(注意,指定的消费者组的偏移量总是已经提交的)


它也有可能为消费者手动指定分配给它通过`assign(List)`,这将禁用动态分区分配。


示例

这个消费者API提供了灵活性,以涵盖各种消费场景,下面是一些例子来演示如何使用它们。

自动提交偏移量

这是个【自动提交偏移量】的简单的kafka消费者API。

    Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("session.timeout.ms", "30000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
     }
设置`enable.auto.commit`,偏移量由`auto.commit.interval.ms`控制自动提交的频率。

集群是通过配置bootstrap.servers指定一个或多个broker。不需要制定全部的broker,会自动发现在集群中的其余的borker(最好指定多个,万一有服务器故障)。

在这个例子中,客户端订阅了主题`foo`和`bar`。消费者组叫`test`。

broker通过心跳机器自动检测test组中失败的进程,消费者会自动`ping`集群,告诉进群它还活着。只要消费者能够做到这一点,它就被认为是活着的,并保留分配给它分区的权利,如果它停止心跳的时间超过`session.timeout.ms`,那么就会认为是故障的,它的分区将被分配到别的进程。

这个`deserializer`设置是指定如何去把byte转为为object类型,例子中,通过指定string的 deserializers,我们告诉我们获取到的消息的key和value只是简单的string类型。


手动控制偏移量

不依赖于定期提交偏移量,你可以自己控制偏移量,当消息认为已消费过了,这个时候再去提交它们的偏移量。这个是很有用的,当消费的消息结合了一些处理逻辑,这个消息就不应该认为是已经消费的,直到它完成了整个处理。在这里例子中,当我们有足够的消息进行批处理时我们将它们插入到数据。它们接收了消费者的消息之后,消息将被认为已经消费了,这个时候我们的过程失败了,我们读取我们的内存缓存区的消息,有可能他们已被插入到数据库中了。为了避免这一点,我们将手动提交的偏移量,当相应的消息已被插入到数据库中。我们准确控制一条消息才被认为是消费的。提出了一个相反的可能性:在插入到数据库中,但在提交之前,这个过程可能会失败(尽管这可能只有几毫秒,但它是一种可能性)。在这种情况下,进程将获取到已提交的偏移量,并会重复插入的最后一批数据。用这种方式,被称为`“至少一次”`担保,因为每个消息可能会提供一次,但在故障情况下,可以重复。

Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "false");
     props.put("auto.commit.interval.ms", "1000");
     props.put("session.timeout.ms", "30000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     final int minBatchSize = 200;
     List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records) {
             buffer.add(record);
         }
         if (buffer.size() >= minBatchSize) {
             insertIntoDb(buffer);
             consumer.commitSync();
             buffer.clear();
         }
     }
上面的例子使用commitSync表示所有收到的消息为”已提交",在某些情况下,你可以希望更精细的控制,通过指定一个明确消息的偏移量为“已提交”。在下面,我们的例子中,我们处理完每个分区中的消息后,提交偏移量。
try {
         while(running) {
             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
             for (TopicPartition partition : records.partitions()) {
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 for (ConsumerRecord<String, String> record : partitionRecords) {
                     System.out.println(record.offset() + ": " + record.value());
                 }
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
             }
         }
     } finally {
       consumer.close();
     }

注:已提交的偏移量应该一直是你的应用程序将读取的下一条消息来的偏移量。因此,调用commitSync(offsets)时,你应该添加最后一条消息的偏移量。


订阅指定的分区

在前面的例子中,我们订阅我们感兴趣的主题,让kafka提供给我们平分后的主题的分区,它提供了一个简单的负载平衡机制,所以我们的程序通过多个实例来瓜分处理这些消息。

在这种模式下,消费者将只会获取它订阅的分区,如果消费者实例故障,不会尝试重新平衡分区到其他的实例。

  • 第一种情况是,如果这个进程与该分区保持某种本地状态(如 本地磁盘上的key-value存储)因此它应该只能获取这个分区的消息,它是保持在磁盘上。
  • 另一种情况是,如果这个过程本身是高度可用的,将重新启动,如果失败(可能使用集群管理框架如YARN,Mesos,或者AWS设施,或作为一个流处理框架的一部分)。在这种情况下不需要kafka来发现故障和重新分配,而消费进程将在另一台机器重新启动。


这种模式很容易指定,不是订阅主题,只需要消费者订阅特定的分区即可:

String topic = "foo";
     TopicPartition partition0 = new TopicPartition(topic, 0);
     TopicPartition partition1 = new TopicPartition(topic, 1);
     consumer.assign(Arrays.asList(partition0, partition1));
消费者指定的组仍然用于提交偏移量。但现在分区设置只会改变消费者的分区。如果消费者指定新的分区,并不会尝试故障检查。

它不能即订阅了指定的分区(无负载平衡),又使用相同的消费者实例的主题(负载平衡)。


存储kafka之外的偏移量

消费者应用程序不必使用kafka内置的偏移量仓库,它可以自己选择存储偏移量的仓库。主要的一点是允许应用程序存储偏移量和偏移量的结果存储在同一个系统中,原子的消费情况,这并不一定,但将使消费完全原子,并给出“恰好一次”的语义比默认“至少一次”语义更强壮的,你要用kafka的偏移提交功能。


这有结合的例子。

  • 如果消费的结果被存储在关系数据库中,存储在数据库中的偏移,也允许提交偏移量和结果,并在单个事务中。因此,事物将成功和偏移量将被更新的基础上。如果被消耗或结果将不会被存储并且偏移量也不会被更新。
  • 如果结果存储在一个本地仓库,它也可能存储偏移量。例如,一个搜索索引可以通过订阅一个特定的分区和存储两个偏移和索引数据一起建立。如果这是在一种原子的方式进行的,它通常是可能的,即使发生事故导致unsync数据丢失,剩下是也相应的偏移量存储。这意味着,在这种情况下,当恢复后,失去最近更新索引进程从刚刚地方恢复索引,它确保没有更新丢失。

每个消息都有自己的偏移,所以要管理自己的偏移,你只需要做到以下几点:

  • 配置 enable.auto.commit=false
  • 使用提供的每个 ConsumerRecord 来保存你的位置。.
  • 在重启时使用恢复消费者的位置用 seek(TopicPartition, long).

当分区分配也手动完成,这种类型的使用是最小的。(像上文搜索索引的情况).如果分区分配是自动完成的,需要特别小心处理分区分配变更的情况。可以通过提供的 `ConsumerRebalanceListener`调用`subscribe(Collection, ConsumerRebalanceListener) `和`subscribe(Pattern, ConsumerRebalanceListener)`。例如,当分区从消费者拿一条消息,消息费想要提交这些分区的偏移量,通过执行`ConsumerRebalanceListener.onPartitionsRevoked(Collection)`,当分区分配给消费者,消费者通过`ConsumerRebalanceListener.onPartitionsAssigned(Collection)`,为新的分区和正确初始化位置的消费者找到偏移。

kafka允许指定位置,使用`seek(TopicPartition, long)`来指定新的位置,一些特别的方法寻找最早和最晚的偏移,服务器维护也可用(`seekToBeginning(Collection)` 和 `seekToEnd(Collection))`。

消费流控制

如果多个分区分配一个消费者获取的数据,它将试图同时消费所有的,有效地给这些分区为消费相同的优先级。然而在某些情况下,消费者可能首先想全速专注于获取的一些子集分配分区,当这些分区很少或已经没有消费数据了在去抓取其他分区。

还有这样一种情况,流处理,其中处理器由两个topic获取和执行这两个流的连接。当的topic之一是早已落后于其他落后,处理器想暂停为了得到滞后流赶上从前面的topic取。另一个例子是在消费者的Bootstrap启动,其中有很多历史数据的追赶中,应用程序通常想要得到的一些话题考虑获取其他topic之前的最新数据。

kafka支持动态控制消费流量,分别在future的`poll(long)`调用中执行中使用 `pause(Collection)` 和 `resume(Collection)` 来暂停消费指定分配的分区,重新开始消费指定暂停的分区。


更新于 2016-10-25
在线,7小时前登录

七安 7年前

现在用的版本是0.8.2.2,暂时无法升级 不知道为什么consume一段时候后 就不能再接收新的消息了,偏移量不再改变,没有任何报错,

    public void taskConsume() {

        ConsumerConnector consumer = createConsumer();

        Map<String, Integer> topicCountMap = new HashMap<>();

        topicCountMap.put("rcmlog", 1);

        Map<String, List<KafkaStream<byte[], byte[]>>> message = consumer.createMessageStreams(topicCountMap);

        KafkaStream<byte[], byte[]> stream = message.get("rcm_log").get(0);

        ConsumerIterator<byte[], byte[]> iterator = stream.iterator();

        logger.info("consume starts");

        while (iterator.hasNext()) {

            try {

                String log = new String(iterator.next().message(), "utf-8");

                logger.info("print: " + log);

                processData(JsonUtil.readValue(log, KafkaLogInstance.class));

            } catch (Exception e) {

                logger.error("processing log error: ",e.getMessage());

                if(e instanceof  InterruptedException){

                    taskConsume();

                }

                continue;

            }

        }

    }



    public ConsumerConnector createConsumer() {

        Properties properties = new Properties();

        zookeeperConnect = getDirPath();

        properties.put("zookeeper.connect", zookeeperConnect);

        properties.put("group.id", "discovery_msg");

        properties.put("fetch.message.max.bytes", "20971520");

        properties.put("auto.commit.interval.ms", "1000");

        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

    }

<img src="file://C:\Users\ANRAN\AppData\Roaming\Tencent\Users\307587866\QQ\WinTemp\RichOle\QD}2%$E)
@]E]4Y30(8ZX]O.png" />


七安 -> 七安 7年前

之前考虑是不是跟client链接断开 加了try/catch 但是好像并不是这个原因;

尝试换group id, 消费一段时间,就停止了,日志一直在print:Got ping response for sessionid: 0x15d4b69db43018e after 0ms

半兽人 -> 七安 7年前

kafka集群中有错误吗?

七安 -> 半兽人 7年前

没有报错

七安 -> 半兽人 7年前

刚发现zookeeper 在不停打这个日志

 Accepted socket connection from /10.65.10.228:64971
2017-07-24 19:29:09,419 [myid:] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@362] - Exception causing close of session 0x0 due to java.io.EOFException
2017-07-24 19:29:09,419 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1007] - Closed socket connection for client /10.65.10.228:64971 (no session established for client)



半兽人 -> 七安 7年前

升级zookeeper。我遇到过新版kafka,zookeeper升到3.4.9后就没问题了。但是老版本就不确定,你可以尝试一下。

ruiliang 7年前

网页翻译出来的哈,有些句子读不通

半兽人 -> ruiliang 7年前

这个好老了哦,看看新的
https://www.orchome.com/451

查看history更多相关的文章或提一个关于history的问题,也可以与我们一起分享文章