kafka消费者

半兽人 发表于: 2015-03-10   最后更新时间: 2017-08-05  
  •   109 订阅,7392 游览

The Kafka consumer works by issuing "fetch" requests to the brokers leading the partitions it wants to consume. The consumer specifies its offset in the log with each request and receives back a chunk of log beginning from that position. The consumer thus has significant control over this position and can rewind it to re-consume data if need be.

kafka消费者通过向broker的leader分区发起“提取”请求。消费者指定每次请求日志的偏移量并收到那一块日志的起始位置。因此,消费者可以重新指定位置,重新消费。

推送 vs 拉取

An initial question we considered is whether consumers should pull data from brokers or brokers should push data to the consumer. In this respect Kafka follows a more traditional design, shared by most messaging systems, where data is pushed to the broker from the producer and pulled from the broker by the consumer. Some logging-centric systems, such as Scribe and Apache Flume follow a very different push based path where data is pushed downstream. There are pros and cons to both approaches. However a push-based system has difficulty dealing with diverse consumers as the broker controls the rate at which data is transferred. The goal is generally for the consumer to be able to consume at the maximum possible rate; unfortunately in a push system this means the consumer tends to be overwhelmed when its rate of consumption falls below the rate of production (a denial of service attack, in essence). A pull-based system has the nicer property that the consumer simply falls behind and catches up when it can. This can be mitigated with some kind of backoff protocol by which the consumer can indicate it is overwhelmed, but getting the rate of transfer to fully utilize (but never over-utilize) the consumer is trickier than it seems. Previous attempts at building systems in this fashion led us to go with a more traditional pull model.

我们考虑的第一个问题是消费者应该从broker中pull数据还是broker向消费者push数据,在这方面,kafka遵循比较传统的设计,大多数消息系统,生产者推消息到broker,消费者从broker拉取消息,一些日志中心的系统,比如 Scribe 和Apache Flume ,采用非常不同的push模式(push数据到下游)。事实上,push模式和pull模式各有优劣。push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成消费者来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。


Another advantage of a pull-based system is that it lends itself to aggressive batching of data sent to the consumer. A push-based system must choose to either send a request immediately or accumulate more data and then send it later without knowledge of whether the downstream consumer will be able to immediately process it. If tuned for low latency this will result in sending a single message at a time only for the transfer to end up being buffered anyway, which is wasteful. A pull-based design fixes this as the consumer always pulls all available messages after its current position in the log (or up to some configurable max size). So one gets optimal batching without introducing unnecessary latency.

基于pull模式的另一个优点是,它有助于积极的批处理的数据发送到消费者。基于push模式必须选择要么立即发送请求或者积累更多的数据,然后在不知道下游消费者是否能够立即处理它的情况下发送,如果是低延迟,这将导致一次只发送一条消息,以便传输缓存,这是实在是一种浪费,基于pull的设计解决这个问题,消费者总是pull在日志的当前位置之后pull所有可用的消息(或配置一些大size),所以消费者可设置消费多大的量,也不会引起不必要的等待时间。


The deficiency of a naive pull-based system is that if the broker has no data the consumer may end up polling in a tight loop, effectively busy-waiting for data to arrive. To avoid this we have parameters in our pull request that allow the consumer request to block in a "long poll" waiting until data arrives (and optionally waiting until a given number of bytes is available to ensure large transfer sizes).

基于pull模式不足之处在于,如果broker没有数据,消费者会轮询,忙等待数据直到数据到达,为了避免这种情况,我们允许消费者在pull请求时候使用“long poll”进行阻塞,直到数据到达(并且设置等待时间的好处是可以积累消息,组成大数据块一并发送)。


You could imagine other possible designs which would be only pull, end-to-end. The producer would locally write to a local log, and brokers would pull from that with consumers pulling from them. A similar type of "store-and-forward" producer is often proposed. This is intriguing but we felt not very suitable for our target use cases which have thousands of producers. Our experience running persistent data systems at scale led us to feel that involving thousands of disks in the system across many applications would not actually make things more reliable and would be a nightmare to operate. And in practice we have found that we can run a pipeline with strong SLAs at large scale without a need for producer persistence.
你可以想一些其他的可能性的设计,不仅仅是pull,端对端。生产者在本地写入本地日志,broker从那里pull,消费者再pull,类似一种“存储-转发”的生产者,这种方式很有意思,但是我们觉得不是很适合我们这种有成千上万的生产者的情况,我们的大规模运行的持久化数据系统的经验使我们觉得,在许多应用领域涉及数以千计的系统磁盘不会真的使事情变得更加可靠,将是操作的噩梦。而在实践中我们发现,我们可以运行与大型强大的SLA管道,而不需要生产的持久性。

消费者定位

Keeping track of what has been consumed, is, surprisingly, one of the key performance points of a messaging system.

追踪已经消费的消息是令人惊讶的,在消息系统中,这是关键的性能之一。


 Most messaging systems keep metadata about what messages have been consumed on the broker. That is, as a message is handed out to a consumer, the broker either records that fact locally immediately or it may wait for acknowledgement from the consumer. This is a fairly intuitive choice, and indeed for a single machine server it is not clear where else this state could go. Since the data structure used for storage in many messaging systems scale poorly, this is also a pragmatic choice--since the broker knows what is consumed it can immediately delete it, keeping the data size small.
大多数消息系统保留在broker上消费消息的元数据。 也就是说,当消息发送给消费者时,broker本地立即记录该事实,或者可以等待消费者的应答确认。 这是一个相当直观的选择,实际上对于单个机器服务器来说,尚不清楚这个状态是什么。 由于许多消息系统中用于存储的数据结构规模不大,这也是务实的选择 - 因为broker知道哪些已经消费,可以立即删除它,从而保持数据大小不变。


What is perhaps not obvious, is that getting the broker and consumer to come into agreement about what has been consumed is not a trivial problem. If the broker records a message as consumed immediately every time it is handed out over the network, then if the consumer fails to process the message (say because it crashes or the request times out or whatever) that message will be lost. To solve this problem, many messaging systems add an acknowledgement feature which means that messages are only marked as sent not consumed when they are sent; the broker waits for a specific acknowledgement from the consumer to record the message as consumed. This strategy fixes the problem of losing messages, but creates new problems. First of all, if the consumer processes the message but fails before it can send an acknowledgement then the message will be consumed twice. The second problem is around performance, now the broker must keep multiple states about every single message (first to lock it so it is not given out a second time, and then to mark it as permanently consumed so that it can be removed). Tricky problems must be dealt with, like what to do with messages that are sent but never acknowledged.

并不明显的是,让broker和消费者所消费的消息达成一致并不是一个微不足道的问题。如果broker每次通过网络发出消息立即记录的话,那么如果消费者无法处理该消息(比如崩溃或请求超时),则该消息将丢失。为了解决这个问题,许多消息系统添加了一个“应答”功能,这意味着当消息发送时,消息仅仅标记为“发送”而不是“已消费”。broker等待消费者应答该消息,消息才被标记为“已消费”。这确认解决了丢失消息的问题,但是产生了一个新的问题。首先,如果消费者处理了消息,但是在发送应答时失败了,那么该消息将会被处理两次。第二个问题是关于性能,现在broker必须保持关于每个单个消息的多个状态(首先锁定它,所以它不会被发送两次,然后将其标记为永久已消耗,以便可以被删除)。必须处理这些棘手的问题,就像发送但未应答的消息一样。

 
Kafka handles this differently. Our topic is divided into a set of totally ordered partitions, each of which is consumed by one consumer at any given time. This means that the position of consumer in each partition is just a single integer, the offset of the next message to consume. This makes the state about what has been consumed very small, just one number for each partition. This state can be periodically checkpointed. This makes the equivalent of message acknowledgements very cheap.

kafka处理方式不同。 我们的topic被分为一组完全有序的分区,每个分区在任何给定的时间都由每个订阅消费者组中的一个消费者消费。 这意味着消费者在每个分区中的位置只是一个整数,下一个消息消费的偏移量。 这使得关于已消费到哪里的状态变得非常的小,每个分区只有一个数字。 可以定期检查此状态。 这使得等同于消息应答并更轻量。

 
There is a side benefit of this decision. A consumer can deliberately rewind back to an old offset and re-consume data. This violates the common contract of a queue, but turns out to be an essential feature for many consumers. For example, if the consumer code has a bug and is discovered after some messages are consumed, the consumer can re-consume those messages once the bug is fixed.

这么做有一个好处。 消费者可以故意地回到旧的偏移量并重新消费数据。 这违反了一个队列的共同契约,但这被证明是许多消费者的基本特征。 例如,如果消费者代码有bug,并且在消费一些消息之后被发现,消费者可以在修复错误后重新消费这些消息。

离线数据加载

Scalable persistence allows for the possibility of consumers that only periodically consume such as batch data loads that periodically bulk-load data into an offline system such as Hadoop or a relational data warehouse.

可扩展持久性,以允许消费者只需周期性地消费如批量数据加载的可能性,以便将数据定期批量加载到如Hadoop或关系数据仓库之类的离线系统中。


In the case of Hadoop we parallelize the data load by splitting the load over individual map tasks, one for each node/topic/partition combination, allowing full parallelism in the loading. Hadoop provides the task management, and tasks which fail can restart without danger of duplicate data—they simply restart from their original position.

在Hadoop的情况下,我们通过将负载分解为单独的map任务来并行化数据负载,每个node/topic/partition组合一个负载,允许在加载中完全并行。 Hadoop提供任务管理,无法重新启动的任务可以重新启动,而不会有重复数据的危险 - 他们只需从原始位置重新启动。







发表于: 1年前   最后更新时间: 17天前   游览量:7392
上一条: kafka生产者
下一条: kafka消息传递保障
评论…

  • 你可以想一些其他的可能性的设计,不仅仅是pull,端对端。生产者在本地写入本地日志,broker从那里pull,消费者再pull,我们经常推荐的类似一种“存储-转发”的生产者,这种方式很有意思,但是我们觉得不是很适合我们这种有成千上万的生产者的情况,我们的大规模运行的持久化数据系统的经验使我们觉得,在许多应用领域涉及数以千计的系统磁盘不会真的使事情变得更加可靠,将是操作的噩梦。而在实践中我们发现,我们可以运行与大型强大的销售管道,而不需要生产的持久性。你也可以想一些其他的可能性的设计,不仅仅是pull,端对端。生产者在本地写入本地日志,broker从那里pull,消费者再从broker那里pull,类似一种“存储-转发”的生产者,这种方式很有意思,但是我们觉得不是很适合我们这种有成千上万的生产者的情况,我们的大规模运行的持久化数据系统的经验使我们觉得,在许多应用领域涉及数以千计的系统磁盘不会真的使事情变得更加可靠,将是操作的噩梦。而在实践中我们发现,我们可以大规模运行具有强大的SLA管道,

    翻译重复了
    大多数消息系统保持哪些消息在broker已经消耗的元数据,即,当一个消息被分发给消费者时,broker直接记录或者它也可以等待来自消费者的确认。很多消息系统用于存储的规模有限,这也是务实的选择 - - 因为broker知道已经被消耗的可以立刻删除,保持存储的数据大小。大多数消息系统保留在broker上消费消息的元数据。 也就是说,当消息发送给消费者时,broker本地立即记录该事实,或者可以等待消费者的应答确认。 这是一个相当直观的选择,实际上对于单个机器服务器来说,尚不清楚这个状态是什么。 由于许多消息系统中用于存储的数据结构规模不大,这也是务实的选择 - 因为broker知道哪些已经消费,可以立即删除它,从而保持数据大小不变。

    这块也重复了
  • 评论…
    • in this conversation
      提问