kafka副本和leader选举

半兽人 发表于: 2015-03-10   最后更新时间: 2017-03-13  
  •   68 订阅,4846 游览

4.7 副本

Kafka replicates the log for each topic's partitions across a configurable number of servers (you can set this replication factor on a topic-by-topic basis). This allows automatic failover to these replicas when a server in the cluster fails so messages remain available in the presence of failures.

kafka集群在各个服务器上备份topic分区中日志(ps:就是备份我们的消息,称为副本,你可以设置每个topic的副本数)。当集群中某个服务器发生故障时,自动切换到这些副本,从而保障在故障时消息仍然可用。


Other messaging systems provide some replication-related features, but, in our (totally biased) opinion, this appears to be a tacked-on thing, not heavily used, and with large downsides: slaves are inactive, throughput is heavily impacted, it requires fiddly manual configuration, etc. Kafka is meant to be used with replication by default—in fact we implement un-replicated topics as replicated topics where the replication factor is one.
其他消息系统提供一些复制相关的特性,但是,在我们看来(有偏见),这似乎是一个附加的东西,没有大量的使用,这有很大的缺点:slave不活跃,吞吐量受到严重影响,它需要的手动配置等。kafka使用的是默认副本 — 就是不需要副本的topic的复制因子就是1。


The unit of replication is the topic partition. Under non-failure conditions, each partition in Kafka has a single leader and zero or more followers. The total number of replicas including the leader constitute the replication factor. All reads and writes go to the leader of the partition. Typically, there are many more partitions than brokers and the leaders are evenly distributed among brokers. The logs on the followers are identical to the leader's log—all have the same offsets and messages in the same order (though, of course, at any given time the leader may have a few as-yet unreplicated messages at the end of its log).

副本单元是topic分区。在正常情况下,kafka每个分区都有一个单独的leader,0个或多个follower。副本的总数包括leader。所有的读取和写入到该分区的leader。通常,分区数比broker多,leader均匀分布在broker。follower的日志完全等同于leader的日志 — 相同的顺序相同的偏移量和消息(当然,在任何一个时间点上,leader比follower多几条消息,尚未同步到follower)


Followers consume messages from the leader just as a normal Kafka consumer would and apply them to their own log. Having the followers pull from the leader has the nice property of allowing the follower to naturally batch together log entries they are applying to their log.

Followers作为普通的消费者从leader中消费消息并应用到自己的日志中。并允许follower从leader拉取批量日志应用到自己的日志,这样具有良好的性能。


As with most distributed systems automatically handling failures requires having a precise definition of what it means for a node to be "alive". For Kafka node liveness has two conditions

与大多数分布式系统一样,自动处理失败请求。而这需要一个精确的定义,什么样的节点是“活着”的,对于kafka的节点活着有2个条件:

  1. A node must be able to maintain its session with ZooKeeper (via ZooKeeper's heartbeat mechanism)
    一个节点必须能维持与zookeeper的会话(通过zookeeper的心跳机制)
  2. If it is a slave it must replicate the writes happening on the leader and not fall "too far" behind
    如果它是一个slave,它必须复制写入的leader并且不能落后"太多"

We refer to nodes satisfying these two conditions as being "in sync" to avoid the vagueness of "alive" or "failed". The leader keeps track of the set of "in sync" nodes. If a follower dies, gets stuck, or falls behind, the leader will remove it from the list of in sync replicas. The definition of, how far behind is too far, is controlled by the replica.lag.max.messages configuration and the definition of a stuck replica is controlled by the replica.lag.time.max.ms configuration.

我们让节点满足这2个条件为“同步”,以避免分不清楚是“活着”还是“故障”。leader跟踪“同步”节点。如果一个follower死掉,被卡住了,或落后,leader将从同步副本列表中移除它。卡住和落后的副本规则是通过replica.lag.time.max.ms配置控制。


In distributed systems terminology we only attempt to handle a "fail/recover" model of failures where nodes suddenly cease working and then later recover (perhaps without knowing that they have died). Kafka does not handle so-called "Byzantine" failures in which nodes produce arbitrary or malicious responses (perhaps due to bugs or foul play).

在分布式系统,我们只是尝试处理故障节点突然停止工作和然后稍后恢复的“故障/恢复”模式(也许不知道自己已经死了)。kafka不处理节点产生任意或恶意的响应(也许是因为bug或犯规),所谓的“Byzantine”故障。


A message is considered "committed" when all in sync replicas for that partition have applied it to their log. Only committed messages are ever given out to the consumer. This means that the consumer need not worry about potentially seeing a message that could be lost if the leader fails. Producers, on the other hand, have the option of either waiting for the message to be committed or not, depending on their preference for tradeoff between latency and durability. This preference is controlled by the request.required.acks setting that the producer uses.

当所有同步副本、分区已经应用自己的日志,消息才被认为是“承诺”,只有“承诺”的消息才会发送给消费者,这意味着消费者不必担心会看到如果leader失败时可能丢失的消息。生产者,另一方面,可以选择要么等待消息“已提交”要么不等,取决于他们的偏好延迟还是耐久性。可通过设置生产者的 request.required.acks 。


The guarantee that Kafka offers is that a committed message will not be lost, as long as there is at least one in sync replica alive, at all times.

kafka提供担保,在任何时候,只要至少有一个同步副本活着,承诺的消息就不会丢失。


Kafka will remain available in the presence of node failures after a short fail-over period, but may not remain available in the presence of network partitions.

kafka短暂的故障转移期间,失败的节点仍可用。但可能无法在网络分区仍然可用。


复制日志:Quorums,ISR,和状态机(嗷,我的!)

Quorum:原指为了处理事务、拥有做出决定的权力而必须出席的众议员或参议员的数量(一般指半数以上)。


At its heart a Kafka partition is a replicated log. The replicated log is one of the most basic primitives in distributed data systems, and there are many approaches for implementing one. A replicated log can be used by other systems as a primitive for implementing other distributed systems in the state-machine style.

kafka分区的核心是一个副本日志,副本是在分布式数据系统的最基础原始功能之一。并有许多方法实现,副本日志可以被其他系统用作状态机类型实现其他分布式系统的原始功能。


A replicated log models the process of coming into consensus on the order of a series of values (generally numbering the log entries 0, 1, 2, ...). There are many ways to implement this, but the simplest and fastest is with a leader who chooses the ordering of values provided to it. As long as the leader remains alive, all followers need to only copy the values and ordering, the leader chooses.

副本日志模拟了对一系列值顺序进入的过程(通常日志编号是 0,1,2,……)。有很多方法可以实现这一点,但最简单和最快的是leader提供选择的排序值,只要leader活着,所有的followers只需要复制和排序。


Of course if leaders didn't fail we wouldn't need followers! When the leader does die we need to choose a new leader from among the followers. But followers themselves may fall behind or crash so we must ensure we choose an up-to-date follower. The fundamental guarantee a log replication algorithm must provide is that if we tell the client a message is committed, and the leader fails, the new leader we elect must also have that message. This yields a tradeoff: if the leader waits for more followers to acknowledge a message before declaring it committed then there will be more potentially electable leaders.

当然,如果leader没有故障,我们就不需要follower!当leader确实故障了,我们需要从follower中选出新的leader,但是follower自己可能落后或崩溃,所以我们必须选择一个最新的follower。日志复制算法必须提供保证,如果我们告诉客户端消息是已发送,leader故障了,我们选举的新的leader必须要有这条消息,这就产生一个权衡:如果leader等待更多的follwer声明已提交之前,应答消息的话,将会有更多有资格的leader。


If you choose the number of acknowledgements required and the number of logs that must be compared to elect a leader such that there is guaranteed to be an overlap, then this is called a Quorum.

如果你选择需要应答数量必须和日志的数量进行比较,选出一个leader。这样保证有重叠,那么这就是所谓的Quorum(法定人数)。


A common approach to this tradeoff is to use a majority vote for both the commit decision and the leader election. This is not what Kafka does, but let's explore it anyway to understand the tradeoffs. Let's say we have 2f+1 replicas. If f+1 replicas must receive a message prior to a commit being declared by the leader, and if we elect a new leader by electing the follower with the most complete log from at least f+1 replicas, then, with no more than f failures, the leader is guaranteed to have all committed messages. This is because among any f+1 replicas, there must be at least one replica that contains all committed messages. That replica's log will be the most complete and therefore will be selected as the new leader. There are many remaining details that each algorithm must handle (such as precisely defined what makes a log more complete, ensuring log consistency during leader failure or changing the set of servers in the replica set) but we will ignore these for now.

一种常见的方法,用多数投票决定leader选举。kafka不是这样做的,但先让我们了解这个权衡,假如,我们有2f+1副本,如果f+1副本在leader提交之前必须收到消息,并且如果我们选举新的leader,至少从f+1副本选出最完整日志的follwer,并且不大于f的失败,leader担保所有已提交的信息。这是因为任何f+1副本中,必须至少有一个副本,其中包含所有已提交的消息。该副本的日志是最完整的,因此选定为新的leader。有许多其余细节,每个算法必须处理(如 精确的定义是什么让一个日志更加完整,确保日志一致性,leader故障期间或更改服务器的副本集),但我们现在不讲这些。


This majority vote approach has a very nice property: the latency is dependent on only the fastest servers. That is, if the replication factor is three, the latency is determined by the faster slave not the slower one.

这种投票表决的方式有一个非常好的特性:仅依赖速度最快的服务器,也就是说,如果复制因子为三个,由最快的一个来确定。


There are a rich variety of algorithms in this family including ZooKeeper's Zab, Raft, and Viewstamped Replication. The most similar academic publication we are aware of to Kafka's actual implementation is PacificA from Microsoft.

有各种丰富的算法,包括zookeeper的Zab、 Raft和 Viewstamped Replication。kafka实现的最相似的学术理论是微软的PacificA。


The downside of majority vote is that it doesn't take many failures to leave you with no electable leaders. To tolerate one failure requires three copies of the data, and to tolerate two failures requires five copies of the data. In our experience having only enough redundancy to tolerate a single failure is not enough for a practical system, but doing every write five times, with 5x the disk space requirements and 1/5th the throughput, is not very practical for large volume data problems. This is likely why quorum algorithms more commonly appear for shared cluster configuration such as ZooKeeper but are less common for primary data storage. For example in HDFS the namenode's high-availability feature is built on a majority-vote-based journal, but this more expensive approach is not used for the data itself.

多数投票的缺点是,它不需要通过故障排除,而让你没有候选人可选,要容忍1个故障需要3个数据副本,容忍2个故障需要5个数据副本。实际的系统以我们的经验只能容忍单个故障的冗余是不够的,但是如果5个数据副本,每个写5次,5倍的磁盘空间要求,1/5的吞吐量,这对于大数据量系统是不实用的,这可能是quorum算法更通常在共享集群配置。如zookeeper,主要用于数据存储的系统是不太常见的。例如,在HDFS namenode的高可用性特性是建立在majority-vote-based journal,但这更昂贵的方法不能用于数据本身。


Kafka takes a slightly different approach to choosing its quorum set. Instead of majority vote, Kafka dynamically maintains a set of in-sync replicas (ISR) that are caught-up to the leader. Only members of this set are eligible for election as leader. A write to a Kafka partition is not considered committed until all in-sync replicas have received the write. This ISR set is persisted to ZooKeeper whenever it changes. Because of this, any replica in the ISR is eligible to be elected leader. This is an important factor for Kafka's usage model where there are many partitions and ensuring leadership balance is important. With this ISR model and f+1 replicas, a Kafka topic can tolerate f failures without losing committed messages.

kafka采用了一种稍微不同的方法选择quorum,而不是多数投票,kafka动态维护一组同步副本(ISR),就是以后的leader,只有这个组的成员才有资格当选leader,kafka副本写入不被认为是已提交,直到所有的同步副本已经接收才认为。这组ISR保存在zookeeper,正因为如此,在ISR中的任何副本都有资格当选leader,这是kafka的使用模型,有多个分区和确保leader平衡是很重要的一个重要因素。有了这个模型,ISR和f+1副本,kafka的主题可以容忍f失败而不会丢失已提交的消息。


For most use cases we hope to handle, we think this tradeoff is a reasonable one. In practice, to tolerate f failures, both the majority vote and the ISR approach will wait for the same number of replicas to acknowledge before committing a message (e.g. to survive one failure a majority quorum needs three replicas and one acknowledgement and the ISR approach requires two replicas and one acknowledgement). The ability to commit without the slowest servers is an advantage of the majority vote approach. However, we think it is ameliorated by allowing the client to choose whether they block on the message commit or not, and the additional throughput and disk space due to the lower required replication factor is worth it.

对于大多数情况下,我们希望这么处理,我们认为这个代价是合理的,在实践中,容忍f故障,多数投票和ISR方法将等待相同数量的副本提交消息之前进行确认(例如:活着1个,故障多数的quorum,需要3个副本和1个应答,ISR方法需要2个副本和1个应答)。排除最慢的服务器是多数投票的优点,但是,我们认为允许客户选择是否阻塞消息的提交可以改善这个问题,并通过降低复制因子获得额外的吞吐量和磁盘空间也是值得的。


Another important design distinction is that Kafka does not require that crashed nodes recover with all their data intact. It is not uncommon for replication algorithms in this space to depend on the existence of "stable storage" that cannot be lost in any failure-recovery scenario without potential consistency violations. There are two primary problems with this assumption. First, disk errors are the most common problem we observe in real operation of persistent data systems and they often do not leave data intact. Secondly, even if this were not a problem, we do not want to require the use of fsync on every write for our consistency guarantees as this can reduce performance by two to three orders of magnitude. Our protocol for allowing a replica to rejoin the ISR ensures that before rejoining, it must fully re-sync again even if it lost unflushed data in its crash.

另一个重要的区别是,kafka不要求节点崩溃后所有的数据保持原样恢复。它不寻常在这个空间的复制算法依赖“稳定存储”不能在任何故障恢复方案中违反一致性。这种假设有两个主要的问题,首先,根据我们的观察,磁盘错误在持久化数据系统是最常见的问题,通常数据不会完好无损。其次,即使这不是一个问题,我们不希望在每次写入都用fsync做一致性的保障。因为这导致2个至3个数量级的性能下降,我们允许一个副本重新加入ISR协议确保在加入之前,必须再次完全重新同步,即使丢失崩溃未刷新的数据。

unclean leader选举:如果他们都死了怎么办?


Note that Kafka's guarantee with respect to data loss is predicated on at least on replica remaining in sync. If all the nodes replicating a partition die, this guarantee no longer holds.

请注意,kafka对数据丢失的保障是基于至少有一个副本在保持同步。如果分区的所有复制节点都死了,这保证就不再成立。

However a practical system needs to do something reasonable when all the replicas die. If you are unlucky enough to have this occur, it is important to consider what will happen. There are two behaviors that could be implemented:

如果你人品超差,遇到所有的副本都死了,这时候,你要考虑将会发生问题,并做重要的2件事:

  1. Wait for a replica in the ISR to come back to life and choose this replica as the leader (hopefully it still has all its data).
    等待在ISR中的副本起死回生并选择该副本作为leader(希望它仍有所有数据)。
  2. Choose the first replica (not necessarily in the ISR) that comes back to life as the leader.
    选择第一个副本 (不一定在 ISR),作为leader。

This is a simple tradeoff between availability and consistency. If we wait for replicas in the ISR, then we will remain unavailable as long as those replicas are down. If such replicas were destroyed or their data was lost, then we are permanently down. If, on the other hand, a non-in-sync replica comes back to life and we allow it to become leader, then its log becomes the source of truth even though it is not guaranteed to have every committed message. In our current release we choose the second strategy and favor choosing a potentially inconsistent replica when all replicas in the ISR are dead.This behavior can be disabled using configuration property unclean.leader.election.enable, to support use cases where downtime is preferable to inconsistency.

这是在可用性和一致性的简单权衡。如果我们等待ISR中的副本,那么只要副本不可用,我们将保持不可用,如果这些副本摧毁或数据已经丢失,那么就是永久的不可用。另一方面,如果non-in-sync(非同步)的副本,我们让它成为leader,让它的日志成为`源`,即使它不能保证承诺的消息不丢失。在我们当前的版本中我们选择第2种方式,支持选择在ISR中所有副本死了时候可选择不能保证一致的副本。可以通过配置unclean.leader.election.enable禁用此行为,以支持停机优先于不一致。


This dilemma is not specific to Kafka. It exists in any quorum-based scheme. For example in a majority voting scheme, if a majority of servers suffer a permanent failure, then you must either choose to lose 100% of your data or violate consistency by taking what remains on an existing server as your new source of truth.

这个难题不是只有kafka有,任何基于quorum的都有。例如在多数投票中,如果多数服务器都遭受永久性的故障,那么你必须选择丢失100%的数据,或违反一致性,用剩下现有服务器作为新的源。


可用性和耐久性保证  (Availability and Durability Guarantees)


When writing to Kafka, producers can choose whether they wait for the message to be acknowledged by 0,1 or all (-1) replicas. Note that "acknowledgement by all replicas" does not guarantee that the full set of assigned replicas have received the message. By default, when acks=all, acknowledgement happens as soon as all the current in-sync replicas have received the message. For example, if a topic is configured with only two replicas and one fails (i.e., only one in sync replica remains), then writes that specify acks=all will succeed. However, these writes could be lost if the remaining replica also fails. Although this ensures maximum availability of the partition, this behavior may be undesirable to some users who prefer durability over availability. Therefore, we provide two topic-level configurations that can be used to prefer message durability over availability:

当kafka写入时,生产者可以选择是否等待0,1 或 全部副本(-1)的消息确认。需要注意的是“所有副本确认”并不能保证全部分配副本已收到消息。默认情况下,当acks=all时,只要当前所有在同步副本收到消息,就会进行确认。例如:如果一个topic有2个副本,并且一个故障(即,只剩下一个同步副本),即使写入是 acks=all 也将会成功。如果剩下的副本也故障了那么这些写入可能会丢失。虽然这可以确保分区的最大可用性,这种方式可能不受欢迎,一些用户喜欢耐久性超过可用性。因此,我们提供两种配置。


  1. Disable unclean leader election - if all replicas become unavailable, then the partition will remain unavailable until the most recent leader becomes available again. This effectively prefers unavailability over the risk of message loss. See the previous section on Unclean Leader Election for clarification.
    禁用unclean leader选举 - 如果所有副本不可用,那份分区将一直不可用,直到最近的leader再次变得可用,这种宁愿不可用,而不是冒着丢失消息的风险。
  2. Specify a minimum ISR size - the partition will only accept writes if the size of the ISR is above a certain minimum, in order to prevent the loss of messages that were written to just a single replica, which subsequently becomes unavailable. This setting only takes effect if the producer uses required.acks=-1 and guarantees that the message will be acknowledged by at least this many in-sync replicas. This setting offers a trade-off between consistency and availability. A higher setting for minimum ISR size guarantees better consistency since the message is guaranteed to be written to more replicas which reduces the probability that it will be lost. However, it reduces availability since the partition will be unavailable for writes if the number of in-sync replicas drops below the minimum threshold.
    指定一个最小的ISR大小 — 如果ISR的大小高于最小值,则该分区才接受写入,以预防消息丢失,防止消息写到单个副本上,则让其变为不可用。如果生产者使用的是acks=all并保证最少这些同步分本已确认,则设置才生效。该设置提供一致性和可用性之间的权衡。ISR的大小设置的越高更好的保证一致性,因为消息写到更多的副本以减少消息丢失的风险。但是,这样降低了可用性,因为如果同步副本数低于最小的阈值,则该分区将不可写入。

副本管理


The above discussion on replicated logs really covers only a single log, i.e. one topic partition. However a Kafka cluster will manage hundreds or thousands of these partitions. We attempt to balance partitions within a cluster in a round-robin fashion to avoid clustering all partitions for high-volume topics on a small number of nodes. Likewise we try to balance leadership so that each node is the leader for a proportional share of its partitions.

上面讨论的复制日志只说了单个日志,即一个topic的分区,然而,kafka集群需要管理成百上千的分区,我们试图用循环的方式在集群内平衡分区,以避免高容量高热度的主题的所有分区在少数几个节点上。同样,我们尽量使每个节点都是其分区按比例分担平衡的leader。


It is also important to optimize the leadership election process as that is the critical window of unavailability. A naive implementation of leader election would end up running an election per partition for all partitions a node hosted when that node failed. Instead, we elect one of the brokers as the "controller". This controller detects failures at the broker level and is responsible for changing the leader of all affected partitions in a failed broker. The result is that we are able to batch together many of the required leadership change notifications which makes the election process far cheaper and faster for a large number of partitions. If the controller fails, one of the surviving brokers will become the new controller.

同样重要的是优化leader选举的过程,一个傻的实现是当节点故障,leader将在运行中的所有分区中选举一个节点来托管。相反的,我们选出一个broker作为“控制器”。这个控制器检查broker级别故障和负责改变所有故障的broker中的受影响的leader的分区,这样的好处是,我们能够批量处理多个需要leader变更的分区,这使得选举更廉价、更快。如果控制器发生故障,在幸存的broker之中,将选举一个成为新的控制器。








发表于: 1年前   最后更新时间: 1月前   游览量:4846
上一条: kafka消息传递保障
下一条: kafka日志压缩
评论…

  • 多数投票的缺点是,为了让你还有候选人而不能接受很多错误
    我们认为通过允许客户选择是否屏蔽消息的提交,和额外的吞吐量和磁盘空间由低需求复制的因素是值得的改进。
    应该是:

    我们认为通过允许客户选择是否阻塞消息提交可以改善问题,并且由较低的复制因子导致的额外的吞吐量和磁盘空间是值得的。
    指定一个最小的ISR大小的方式没错,只在required.acks=1时起作用,由于生产者只等待一个确认相应,那么写入越多则越安全,
    如果ISR的大小低于最小值,则该分区仅接受写入(ps:我个人觉得官网写的有问题,这tm怎么是接受写入,我觉得应该是仅接受读才对!!)
    这里是你自己翻译错了哈,above是高于:
    官网是说:如果ISR的大小高于最小值,则该分区接受写入
    • 哈哈,大意了大意了,怪不得我理解上觉得不对。
      核心意思是,ISR的大小要高于必要的最小值,则该分区才接受写入,这样可防止因单节点崩溃有有备份而导致消息丢失。
      • 评论…
        • in this conversation
          提问