kafka消费者

半兽人 发表于: 2015-03-10   最后更新时间: 2016-10-27  
  •   43 订阅,3510 游览

The Kafka consumer works by issuing "fetch" requests to the brokers leading the partitions it wants to consume. The consumer specifies its offset in the log with each request and receives back a chunk of log beginning from that position. The consumer thus has significant control over this position and can rewind it to re-consume data if need be.

Kafka消费者向broker发出“fetch”的请求获取到它要使用的分区。消费者指定每一次请求日志的偏移量并收到那一块日志的起始位置。因此,消费者可以重新指定位置,重新消费。

Push vs. pull

An initial question we considered is whether consumers should pull data from brokers or brokers should push data to the consumer. In this respect Kafka follows a more traditional design, shared by most messaging systems, where data is pushed to the broker from the producer and pulled from the broker by the consumer. Some logging-centric systems, such as Scribe and Apache Flume follow a very different push based path where data is pushed downstream. There are pros and cons to both approaches. However a push-based system has difficulty dealing with diverse consumers as the broker controls the rate at which data is transferred. The goal is generally for the consumer to be able to consume at the maximum possible rate; unfortunately in a push system this means the consumer tends to be overwhelmed when its rate of consumption falls below the rate of production (a denial of service attack, in essence). A pull-based system has the nicer property that the consumer simply falls behind and catches up when it can. This can be mitigated with some kind of backoff protocol by which the consumer can indicate it is overwhelmed, but getting the rate of transfer to fully utilize (but never over-utilize) the consumer is trickier than it seems. Previous attempts at building systems in this fashion led us to go with a more traditional pull model.

我们考虑的第一个问题是消费者应该从broker中pull数据还是broker向消费者push数据,在这方面,kafka遵循比较传统的设计,大多数消息系统,生产者推消息到broker,消费者从broker拉取消息,一些logging-centric的系统,比如 Scribe 和Apache Flume ,采用非常不同的push模式(push消息到消费者)。事实上,push模式和pull模式各有优劣。push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。


Another advantage of a pull-based system is that it lends itself to aggressive batching of data sent to the consumer. A push-based system must choose to either send a request immediately or accumulate more data and then send it later without knowledge of whether the downstream consumer will be able to immediately process it. If tuned for low latency this will result in sending a single message at a time only for the transfer to end up being buffered anyway, which is wasteful. A pull-based design fixes this as the consumer always pulls all available messages after its current position in the log (or up to some configurable max size). So one gets optimal batching without introducing unnecessary latency.

基于pull模式的另一个优点是,它有助于积极的批处理的数据发送到消费者。基于push模式必须选择要么立即发送请求或者积累更多的数据,稍后发送它,无论消费者是否能立刻处理它,如果是低延迟,这将导致短时间只发送一条消息,不用缓存,这是实在是一种浪费,基于pull的设计解决这个问题,消费者总是pull在日志的当前位置之后pull所有可用的消息(或配置一些大size),所以消费者可设置消费多大的量,也不会引入不必要的等待时间。


The deficiency of a naive pull-based system is that if the broker has no data the consumer may end up polling in a tight loop, effectively busy-waiting for data to arrive. To avoid this we have parameters in our pull request that allow the consumer request to block in a "long poll" waiting until data arrives (and optionally waiting until a given number of bytes is available to ensure large transfer sizes).

基于pull模式不足之处在于,如果broker没有数据,消费者会轮询,忙等待数据直到数据到达,为了避免这种情况,我们允许消费者pull请求,以“long poll”阻塞等待,直到数据到达(设置接收消息的块大小)。


You could imagine other possible designs which would be only pull, end-to-end. The producer would locally write to a local log, and brokers would pull from that with consumers pulling from them. A similar type of "store-and-forward" producer is often proposed. This is intriguing but we felt not very suitable for our target use cases which have thousands of producers. Our experience running persistent data systems at scale led us to feel that involving thousands of disks in the system across many applications would not actually make things more reliable and would be a nightmare to operate. And in practice we have found that we can run a pipeline with strong SLAs at large scale without a need for producer persistence.
你可以想一些其他的可能性的设计,不仅仅是pull,端对端。生产者在本地写入本地日志,broker从那里pull,消费者再pull,我们经常推荐的类似一种“存储-转发”的生产者,这种方式很有意思,但是我们觉得不是很适合我们这种有成千上万的生产者的情况,我们的大规模运行的持久化数据系统的经验使我们觉得,在许多应用领域涉及数以千计的系统磁盘不会真的使事情变得更加可靠,将是操作的噩梦。而在实践中我们发现,我们可以运行与大型强大的销售管道,而不需要生产的持久性。


消费者定位

Keeping track of what has been consumed, is, surprisingly, one of the key performance points of a messaging system.

跟踪哪些已被消费,是令人惊讶的,一个消息传送系统的关键性能点中的一个。


 Most messaging systems keep metadata about what messages have been consumed on the broker. That is, as a message is handed out to a consumer, the broker either records that fact locally immediately or it may wait for acknowledgement from the consumer. This is a fairly intuitive choice, and indeed for a single machine server it is not clear where else this state could go. Since the data structure used for storage in many messaging systems scale poorly, this is also a pragmatic choice--since the broker knows what is consumed it can immediately delete it, keeping the data size small.
大多数消息系统保持哪些消息在broker已经消耗的元数据,即,当一个消息被分发给消费者时,broker直接记录或者它也可以等待来自消费者的确认。很多消息系统用于存储的规模有限,这也是务实的选择 - - 因为broker知道已经被消耗的可以立刻删除,保持存储的数据大小。


What is perhaps not obvious, is that getting the broker and consumer to come into agreement about what has been consumed is not a trivial problem. If the broker records a message as consumed immediately every time it is handed out over the network, then if the consumer fails to process the message (say because it crashes or the request times out or whatever) that message will be lost. To solve this problem, many messaging systems add an acknowledgement feature which means that messages are only marked as sent not consumed when they are sent; the broker waits for a specific acknowledgement from the consumer to record the message as consumed. This strategy fixes the problem of losing messages, but creates new problems. First of all, if the consumer processes the message but fails before it can send an acknowledgement then the message will be consumed twice. The second problem is around performance, now the broker must keep multiple states about every single message (first to lock it so it is not given out a second time, and then to mark it as permanently consumed so that it can be removed). Tricky problems must be dealt with, like what to do with messages that are sent but never acknowledged.

也许并不明显,就是让broker和消费者知道哪些已经消费过了,如果broker记录每次消费后立即通过网络发放到各个消费者,那如果消费者无法处理消息(比如,因为它崩溃了或者请求超时或者其他),这条消息就会丢失。为了解决这个问题,很多消息系统增加一个确认功能,这意味着当消息发送仅标记发送非消费,broker等待消费者的确认来记录消息被消费成功。这种策略解决了消息丢失的问题,但是又造成了新的问题,首先,如果消费者处理消息,但是失败之前它可以发送确认反馈,则消息将消费2遍,第二个问题就是性能。现在broker必须保持每一条消息的多个状态(先锁定它,这样它不会多次发送,然后将其标记为消费过的,之后可以被删除)。这些问题必须要解决,像如何处理发送,但从来没有确认的消息的问题。

 
Kafka handles this differently. Our topic is divided into a set of totally ordered partitions, each of which is consumed by one consumer at any given time. This means that the position of consumer in each partition is just a single integer, the offset of the next message to consume. This makes the state about what has been consumed very small, just one number for each partition. This state can be periodically checkpointed. This makes the equivalent of message acknowledgements very cheap.

kafka不同的方式处理这一点,我们的topic被分成一组完全有序的partition,其中每个都被一个消费者在任意时间消费,这意味着,消费者的每个分区中的位置仅仅是一个整数,是下一条消息的偏移量,这使得哪些已消费变得非常小,只有一个编号记录每个分区的状态,这种状态可以定期检查,这使得消息确认变得非常“廉价”。

 
There is a side benefit of this decision. A consumer can deliberately rewind back to an old offset and re-consume data. This violates the common contract of a queue, but turns out to be an essential feature for many consumers. For example, if the consumer code has a bug and is discovered after some messages are consumed, the consumer can re-consume those messages once the bug is fixed.

这种做法还有别的好处,消费者可以重新消费消息,这违反了队列的共用协议,但是这样消费者更灵活,例如,如果消费者代码有bug,用户就可以重新消费这些消息。

离线数据加载

Scalable persistence allows for the possibility of consumers that only periodically consume such as batch data loads that periodically bulk-load data into an offline system such as Hadoop or a relational data warehouse.

可扩展持久性允许消费者定期进行消费,如批处理数据加载到一个离线系统,例如 Hadoop 或关系型数据仓库。


In the case of Hadoop we parallelize the data load by splitting the load over individual map tasks, one for each node/topic/partition combination, allowing full parallelism in the loading. Hadoop provides the task management, and tasks which fail can restart without danger of duplicate data—they simply restart from their original position.

在Hadoop中的情况下,我们通过把负载对单个map任务,每个node/topic/parition组合并行数据加载,允许完全并行的装载。Hadoop提供了任务管理,没有重复的危险,可以重新启动任务的数据,会从原来的位置重新启动。







发表于: 1年前   最后更新时间: 3月前   游览量:3510
上一条: kafka生产者
下一条: kafka消息传递保障
评论…

  • 评论…
    • in this conversation
      提问