The Kafka consumer works by issuing "fetch" requests to the brokers leading the partitions it wants to consume. The consumer specifies its offset in the log with each request and receives back a chunk of log beginning from that position. The consumer thus has significant control over this position and can rewind it to re-consume data if need be.
推送 vs 拉取
An initial question we considered is whether consumers should pull data from brokers or brokers should push data to the consumer. In this respect Kafka follows a more traditional design, shared by most messaging systems, where data is pushed to the broker from the producer and pulled from the broker by the consumer. Some logging-centric systems, such as Scribe and Apache Flume follow a very different push based path where data is pushed downstream. There are pros and cons to both approaches. However a push-based system has difficulty dealing with diverse consumers as the broker controls the rate at which data is transferred. The goal is generally for the consumer to be able to consume at the maximum possible rate; unfortunately in a push system this means the consumer tends to be overwhelmed when its rate of consumption falls below the rate of production (a denial of service attack, in essence). A pull-based system has the nicer property that the consumer simply falls behind and catches up when it can. This can be mitigated with some kind of backoff protocol by which the consumer can indicate it is overwhelmed, but getting the rate of transfer to fully utilize (but never over-utilize) the consumer is trickier than it seems. Previous attempts at building systems in this fashion led us to go with a more traditional pull model.
我们考虑的第一个问题是消费者应该从broker中pull数据还是broker向消费者push数据，在这方面，kafka遵循比较传统的设计，大多数消息系统，生产者推消息到broker，消费者从broker拉取消息，一些日志中心的系统，比如 Scribe 和Apache Flume ，采用非常不同的push模式（push数据到下游）。事实上，push模式和pull模式各有优劣。push模式很难适应消费速率不同的消费者，因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息，但是这样很容易造成消费者来不及处理消息，典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
Another advantage of a pull-based system is that it lends itself to aggressive batching of data sent to the consumer. A push-based system must choose to either send a request immediately or accumulate more data and then send it later without knowledge of whether the downstream consumer will be able to immediately process it. If tuned for low latency this will result in sending a single message at a time only for the transfer to end up being buffered anyway, which is wasteful. A pull-based design fixes this as the consumer always pulls all available messages after its current position in the log (or up to some configurable max size). So one gets optimal batching without introducing unnecessary latency.
The deficiency of a naive pull-based system is that if the broker has no data the consumer may end up polling in a tight loop, effectively busy-waiting for data to arrive. To avoid this we have parameters in our pull request that allow the consumer request to block in a "long poll" waiting until data arrives (and optionally waiting until a given number of bytes is available to ensure large transfer sizes).
You could imagine other possible designs which would be only pull,
end-to-end. The producer would locally write to a local log, and brokers
would pull from that with consumers pulling from them. A similar type
of "store-and-forward" producer is often proposed. This is intriguing
but we felt not very suitable for our target use cases which have
thousands of producers. Our experience running persistent data systems
at scale led us to feel that involving thousands of disks in the system
across many applications would not actually make things more reliable
and would be a nightmare to operate. And in practice we have found that
we can run a pipeline with strong SLAs at large scale without a need for
Keeping track of what has been consumed, is, surprisingly, one of the key performance points of a messaging system.
Most messaging systems keep metadata about what messages have been consumed on the broker. That is, as a message is handed out to a consumer, the broker either records that fact locally immediately or it may wait for acknowledgement from the consumer. This is a fairly intuitive choice, and indeed for a single machine server it is not clear where else this state could go. Since the data structure used for storage in many messaging systems scale poorly, this is also a pragmatic choice--since the broker knows what is consumed it can immediately delete it, keeping the data size small.
大多数消息系统保持哪些消息在broker已经消耗的元数据，即，当一个消息被分发给消费者时，broker直接记录或者它也可以等待来自消费者的确认。很多消息系统用于存储的规模有限，这也是务实的选择 - - 因为broker知道已经被消耗的可以立刻删除，保持存储的数据大小。大多数消息系统保留在broker上消费消息的元数据。 也就是说，当消息发送给消费者时，broker本地立即记录该事实，或者可以等待消费者的应答确认。 这是一个相当直观的选择，实际上对于单个机器服务器来说，尚不清楚这个状态是什么。 由于许多消息系统中用于存储的数据结构规模不大，这也是务实的选择 - 因为broker知道哪些已经消费，可以立即删除它，从而保持数据大小不变。
What is perhaps not obvious, is that getting the broker and consumer to come into agreement about what has been consumed is not a trivial problem. If the broker records a message as consumed immediately every time it is handed out over the network, then if the consumer fails to process the message (say because it crashes or the request times out or whatever) that message will be lost. To solve this problem, many messaging systems add an acknowledgement feature which means that messages are only marked as sent not consumed when they are sent; the broker waits for a specific acknowledgement from the consumer to record the message as consumed. This strategy fixes the problem of losing messages, but creates new problems. First of all, if the consumer processes the message but fails before it can send an acknowledgement then the message will be consumed twice. The second problem is around performance, now the broker must keep multiple states about every single message (first to lock it so it is not given out a second time, and then to mark it as permanently consumed so that it can be removed). Tricky problems must be dealt with, like what to do with messages that are sent but never acknowledged.
Kafka handles this differently. Our topic is divided into a set of totally ordered partitions, each of which is consumed by one consumer at any given time. This means that the position of consumer in each partition is just a single integer, the offset of the next message to consume. This makes the state about what has been consumed very small, just one number for each partition. This state can be periodically checkpointed. This makes the equivalent of message acknowledgements very cheap.
kafka处理方式不同。 我们的topic被分为一组完全有序的分区，每个分区在任何给定的时间都由每个订阅消费者组中的一个消费者消费。 这意味着消费者在每个分区中的位置只是一个整数，下一个消息消费的偏移量。 这使得关于已消费到哪里的状态变得非常的小，每个分区只有一个数字。 可以定期检查此状态。 这使得等同于消息应答并更轻量。
There is a side benefit of this decision. A consumer can deliberately rewind back to an old offset and re-consume data. This violates the common contract of a queue, but turns out to be an essential feature for many consumers. For example, if the consumer code has a bug and is discovered after some messages are consumed, the consumer can re-consume those messages once the bug is fixed.
这么做有一个好处。 消费者可以故意地回到旧的偏移量并重新消费数据。 这违反了一个队列的共同契约，但这被证明是许多消费者的基本特征。 例如，如果消费者代码有bug，并且在消费一些消息之后被发现，消费者可以在修复错误后重新消费这些消息。
Scalable persistence allows for the possibility of consumers that only periodically consume such as batch data loads that periodically bulk-load data into an offline system such as Hadoop or a relational data warehouse.
In the case of Hadoop we parallelize the data load by splitting the load over individual map tasks, one for each node/topic/partition combination, allowing full parallelism in the loading. Hadoop provides the task management, and tasks which fail can restart without danger of duplicate data—they simply restart from their original position.
在Hadoop的情况下，我们通过将负载分解为单独的map任务来并行化数据负载，每个node/topic/partition组合一个负载，允许在加载中完全并行。 Hadoop提供任务管理，无法重新启动的任务可以重新启动，而不会有重复数据的危险 - 他们只需从原始位置重新启动。
发表于: 1年前 最后更新时间: 1月前 游览量:5775