原文地址 关注design部分


动机

设计kafka是为了处理所有的很多大公司可能会有的实时数据流。为此需要考虑相当广泛的使用场景。

应该对大量的事件流有保持高吞吐量,比如日志聚合。

需要优雅的处理大数据的积压,以便支持离线系统数据的定期加载。

同时需要低延时,以便支撑传统的消息队列的应用场景。

我们希望支持分区的/分布式的/实时的流处理去创建新的继承流。这个驱动我们的分区(partitioning)和消费者(consumer)模型。

最终数据流被发送到其他的数据系统服务。系统应该能够容错,当出现机器失败的过程中。

为了支持这些用例导致我们设计一些独一无二的组件,相比传统的消息队列更像是数据库的日志系统。我们将在下面的章节指出设计元素。

存储

不要担心文件系统!

kafka严重依赖文件系统存储和缓存消息。人们通常有一个”磁盘很慢”的意识,对使用文件系统作为持久化结构持怀疑态度。事实上磁盘要比人们期待的慢得多或快得多,这取决于使用方式;好的磁盘设计的存储结构可以像网络一样快。

过去十年关于磁盘性能的事实是硬盘驱动的吞吐量和磁盘寻址延迟之间的巨大差异。在配置7200转,SATA RAID-5 array的一堆磁盘JBOD顺序写入的速率为600M/s而随机写的速率为100k/s,相差6000倍。在硬盘所有的使用模式中这些线性的读写是最容易预测的,同时被操作系统很好的优化了。现代的操作系统提供预读(read-ahead)和后写(write-behind)技术,以要读去的大小的倍数来读取数据,积累小块的写组成一个大块的写写入磁盘。这个问题的进一步讨论可以在ACM Queue article中找到,他们实际上发现了这篇文章,某种情况下顺序的硬盘访问设置比随机的内存访问更快sequential disk access can in some cases be faster than random memory access!

为了弥补性能差异,现代操作系统在使用主存作为硬盘缓存的时候表现的很激进。一个现代的操作系统会很高兴将所有的可用内存转移为硬盘的缓存,当然当内存被声明使用时这会伴随少量的额外性能损失。所有的硬盘的读写都会通过这个统一的缓存。除非使用直接IO(dirct I/O),否则这个特性很难关闭,即使进程内部缓存了一份数据,这份数据仍然会被操作系统重复缓存在页缓存中。有效的将所有的东西都缓存了两次。

此外,我们建立在JVM之上,接触过java内存使用的人都知道的两点:

1) 对象的内存开销非常高,通常是对象大小的一倍(或者更糟)。

2) 随着堆内存的增加,java的gc变得越来越繁琐和缓慢。

基于以上事实,使用文件系统以及依靠页缓存要优于维持一个内存缓存或其他结构-直接使用内存的方式会直接导致两倍的可用存储,如果要存储一个压缩的对象可能又会导致缓存加倍。这样做在没有gc惩罚的情况下32G的机器会缓存到28-30G。另外即使服务重启,缓存还是热的,然而进程内缓存需要在内存中重建(10G缓存可能需要10分钟)或者完全冷启动(意味着糟糕的初始化性能)。这也极大的简化了代码,因为用于维护缓存和文件系统之间一致性的所有逻辑现在都在操作系统中,这往往比一次性的进程内尝试更有效、更正确。如果你的磁盘使用偏好线性读取,那么预读实际上就是在每次磁盘读取时使用有用的数据预先填充此缓存。

这暗示了一种非常简单的设计: 相比起来尽可能多的在内存中缓存并刷新到文件系统导致可能的用尽内存的恐慌,我们反着来。所有数据立刻写到文件系统上的持久化日志,不必刷新到磁盘。实际上这只意味着传送到内核的页缓存。

这种以页面缓存为中心的设计在这篇文章中有描述。

恒定时间

使用在消息系统中的持久化数据结构通常是每个消费者队列伴随一个BTree或其他一般意义上的随机存储结构来保持消息的元数据。BTree是普适性最好的可用数据结构,在消息系统中可以支持事务的或非事务的多种情况。他们确实有相当高的成本,尽管BTree的操作是O(logN)。一般情况下O(logN)基本等同于常数时间,但是对于磁盘操作来说并不是这样。磁盘寻道10ms一次弹出,每个磁盘在一个时间只能进行一次寻道,所以并发是受限的。因此即便一次小的磁盘寻道也会导致很高的开销。存储系统混合了很快的缓存操作和很慢的物理磁盘操作,随着固定缓存的数据增长,可以观测到的树形结构的性能通常是超线性的。

直观的可以类似日志系统的方式建立在简单的读和追加文件上。这个结构的优势在于所有的操作都是O(1)的,且读写不会相互影响。这将有明显的效率改进,由于影响效率的操作完全和数据大小解藕了。服务可以完全发挥便宜/低转速1+TB SATA的驱动。即便他们有低效的寻道效率,这些驱动有大块读写的可接受的性能,且价格只是1/3容量是3倍。

在没有效率损失的情况下接入无限制的虚拟统一磁盘,意味着我们可以提供一些常规消息系统通常找不到的特性。例如,在kafka中当消息被消费后我们可以不删除消息,可以保留相对来说长一点的时间(比如一个星期)。像我们描述的那样,这可以使消费者有极大的灵活性。

效率

我们在效率上付出很大的努力。一个主要使用目标是网站活动数据,这个量很大:每次页面访问导致数十次写入。另外我们假设每个发布的消息被至少一个消费者消费(经常是更多)。因此我们努力让消费的代价尽量低廉。

我们同时发现,从相似系统的构建和运行看,效率是影响多租户操作的关键。如果下游的基础服务由于上层应用的使用波动很容易变成系统瓶颈,这样小的改动将经常导致问题。通过变得很快我们能确保在应用程序在基础设施之前变得满载,也就是说基础设施不会影响应用程序。这对于运行在中心分区上运行的中心服务支撑数十上百个应用的服务很重要,因为使用场景几乎每天都变。

前面章节我们讨论的磁盘效率。一旦消除了低效的磁盘访问方式,有两个常见的低效的访问原因:过多的小I/O操作和过多的字节拷贝。

为了避免这种情况,我们的协议建立在”message set”之上,自然将消息进行聚合。这样允许网络请求聚合消息减少网络访问的往返次数,而不是每次只发送一个消息。服务一次性追加一个消息块到日志,消费者一次性获得大的线性块。

这个简单的优化产生了数量级的加速。批量操作导致更大的网络包,更大的顺序磁盘操作,连续的内存块等等。所有的这些措施允许kafka将突发的随机写入流转换为流向消费者的线性写入。

另外一个低效操作是字节拷贝。消息速率低的情况下这并不是一个问题,但是高负载情况下影响比较重大。为了避免这种情况,我们采用了一个标准的二进制文件格式,这个格式被生产者broker和消费者共同使用(这样数据块在流动过程中不会被修改)。

broker本身维护的消息日志就是一个文件目录,每个都由一些列消息结合填充,这些消息集合以生产者和消费者相同的文件格式写入磁盘。保持这种通用格式可以优化最重要的操作:持久日志块的网络传输。现代 Unix 操作系统提供了高度优化的代码,用于将数据从页面缓存传输到套接字。在Linux系统中,这个操作是sendfile system call

为了理解sendfile的影响,理解从文件到socket的通用数据路径很重要。

1.操作系统在内核空间中将数据从磁盘读入页缓存。

2.应用程序把数据从内核空间读入用户空间缓冲。

3.应用程序将数据写回到内核空间的socket缓冲。

4.操作系统从socket缓冲中拷贝数据到网卡(NIC)缓冲,发送给网络。

这很明显不高效,有四次拷贝和两次系统调用。使用sendfile技术重复的拷贝被避免了,允许系统从页缓冲直接拷贝数据到网卡。因此在如此优化的路径上,仅仅保留最后拷贝到网卡的步骤。

我们期望的一个通用用例是一个topic下多个consumer。使用zero-copy优化技术,数据被拷贝到内核空间一次,每个消费进行复用,而不是每次consumer进行读取时从存储在内存并拷贝的用户空间。这允许了以接近网络链接限制的速率来消费消息。

页缓存和sendfile的这种组合意味着在consumer所在的kafka集群上几乎看不到磁盘的任何读取活动,因为其完全从缓存中读取数据。

更多的sendfile和zero-copy在java中的支持,可以参考这篇文章

端到端的批量压缩

在一些情况下,短板不在CPU或磁盘,但是在网络带宽。这些现象在数据管道应用中需要数据中心在广域网中传输数据尤其明显。当然,用户始终可以一次压缩一条消息,而无需kafka的任何支持,但这会导致压缩率特别低,因为大部分的冗余是由于不同消息间的重复造成的(例如JSON 中的字段名称或 Web 日志中的用户代理或常见字符串值)。有效的压缩依赖于一起压缩多条消息而不是单独压缩每条消息。

kafka使用高效率的批压缩格式支持。一批消息可以聚集在一起压缩并以这种形式发送到服务器。这批消息要以压缩方式写到日志中,同时将会被consumer解压。

kafka支持GZIP,Snappy,LZ4和ZStardard压缩协议。更多压缩细节参见这里

生产者(Producer)

负载均衡

生产者直接发送数据给作为分区领导的broker,而不需要中间路由。为了帮助生产者实现这样的功能,所有的kafka节点需要在任何时间都能回答哪个服务是存活的,以及一个topic下的分区领导者是谁,以允许生产者直链这个请求。

生产者控制发送消息到哪个分区。可以是随机的,实现了一系列随机的负载均衡算法,或者可以被一些语义分区函数。我们允许用户通过设置键值,以不同的哈希值来指定分区(必要情况下我们可以重载分区函数)。例如如果key被选为UserId,那么所有该用户的消息都将发送到一个分区。这反过来又允许消费者对他们的消费做出局部性假设。这种分区的风格被明确设计为允许消费者进行局部敏感性处理。

异步发送

批处理是提高效率的一大因素,为了能够批量,kafka生产者将尝试在内存中积累数据,并在一次请求中发送他们。批处理过程可以被配置为不超过一定量的消息或不超过固定的时长(比如64k或10ms)。这将允许积累更多的字节去发送,以及服务器上更少的大的I/O操作。这个缓冲以牺牲了少许的延迟而增加了吞吐量。

详细的信息在configurationapi章节。

消费者(Consumer)

kafka的consumer通过向他想消费的分区领导者的broker发送“fetch”请求来消费。消费者在每个请求中指定日志中的offset来取回一大块从那个位置来的日志。因此消费者对这个位置有显著的控制权,如果有需要可以返回来重复消费。

推和拉

我们最初考虑的问题是,consumer是否应该从broker拉消息,还是broker给consumer推消息。在这方面kafka遵循了大多数消息队列的传统设计,数据被producer推到broker,消费者从broker拉去数据。一些日志为中心的系统比如SucribeApache Flume遵循了一个不同的基于推的路径,数据是被推到下游的。每中方法都有优缺点。基于推的系统很难和各种各样的consumer达成一致,因为是由broker控制数据流速的。目标是让消费者能有最大的可能的消费速率;不幸的是,在推的模式下,当consumer的速率低于生产者时,consumer将变的不知所措(本质上是拒绝服务攻击)。基于拉的系统有一个好的属性,在这种情况下消费者只是落后然后等适时再赶上来。可以通过某种backoff协议来缓解,consumer表明已经不堪重负,但是充分使用转换率(而不是过载)consumer的处理比看起来更棘手。以前构建系统的尝试是我们更加倾向于采用传统的拉模式。

基于拉的系统的另外一个好处是,consumer可以自己聚合发送到consumer的批量数据。基于推的系统必须选择要么一条一条数据给到consumer或者一下给一批数据给consumer,而不知道下游的consumer是否可以一起处理他。如果调整为低延迟,只是传输上一次一条消息,但是如果仍被缓存,那这将是浪费。一个基于拉的设计修复了这种情况,因为consumer总是拉取日志中当前位置的所有可用消息(或者是配置的最大大小)。因此在不引入不必要延迟的情况下获得了更好的批处理。

基于拉系统的缺陷是,如果broker中没有数据,consumer会陷入紧密的无限循环,紧紧等待数据的来临。为了避免这种情况我们在pull请求中有一个参数可以设置consumer阻塞一个“长轮训”来等待数据到达。(并可以设置给定大小的数据来临时结束等待,以保证大的传输块)。

你可能会想到另外一个可能的设计仅有拉模式,点到点的。producer写到本地日志,broker从producer的本地日志拉取,consumer从broker拉取。一个相同“存储转发”(store-and-forward)生产者经常被提议。这很耐人寻味但是并不符合我们的目标场景,我们的目标场景可能有上千个生产者。从我们的经验来看,系统中规模的运行上千个磁盘跨多个应用进行写入存储不会很可靠,操作将是一场噩梦。在实践中我们发现我们可以运行大规模的有着强大的服务质量的管道,而不需要producer持久化。

Consumer Position

低于一个消息系统来说跟踪已经消费的内容很重要。

大部分消息系统将哪些消息已经消费了的消息元数据放在broker上。也就是说一旦消息被发送到消费者,broker要么立即本地为消息记录状态,或者等待消费者给一个确认后再记录状态。这是一个相当直观的选择,事实上对于单机服务器还不清楚除了broker这个状态还能记录在哪。由于在许多消息系统中针对数据存储的数据结构扩展性很差,这也是务实的选择-因为broker知道哪些被消费了,他能立刻删除被消费的数据,保持数据大小较小。

也许不明显的是,让broker和consumer达成一致并非小问题。如果broker当消息被网络发送出去后就记录该消息为consumed,如果consumer处理失败(应用崩溃,或超时等等),消息将会丢失。为了解决这个问题很多消息系统添加了回执机制,当消息被网络发送后只是表明为sent而不是consumed;broker等待指定的回执后才将状态置为consumed。这个策略解决了上述的问题,但是带来了新的问题。首先消费端在消费完发送回执的过程中可能出错,将会导致消息被消费两次。第二个问题是性能,现在broker必须记录每个消息的不同状态。(首先锁定他,以防止发送两次,然后将他标记为永久消费,以便删除)。必须处理棘手的问题,比如已经发送但是一直没有收到确认的消息。

kafka用不同的方式处理。我们的topic被分到一组完全有序的分区,在给定时间内一个分区只被一个订阅的消费组里的消费者消费。这意味着consumer的消费位置在每个分区中只是一个整数,笑一个要被消费的消息。这使要被消费的状态变得很小,每个分区仅仅是一个整数。可以定时检查该状态。这等效于消息确认,同时代价又很低。

还有另外一个好处,consumer可以估计重新消费已经消费的消息。这违反了队列的公共契约,但是对于许多consumer来说是必不可少的特性。例如消费端在消费了一些数据后发现了消费端的bug,当bug修复后,消费端可以重新消费这些消息。

离线数据加载

可扩展的持久化允许consumer可以定期处理批数据,定期将批量数据加载到离线系统,例如Hadoop或者关系型数据库。

在Hadoop的例子中我们通过将负载分拆到各个map任务上,实现并行化处理数据。每个节点/主题/分区组合一个,允许完全并行加载。Hadoop提供任务管理,任务失败后可以重启,而不用担心重复数据-他们仅仅从原位重启。

静态成员关系 Static MemberShip

静态成员关系旨在提高流应用,consumer分组和其他基于分组rebanlance协议的应用的可用性。rebanlance协议依赖于分组协调器给组内成员分配实体ID。这些生成的ID是临时性的,当成员重启或重新加入的时候将会改变。针对基于消费者的应用程序,这种“动态的成员关系“在管理员操作例如代码部署,配置更新或定期启动应用的过程中很大程度的将任务重新分配给不同的实例。对于大型状态应用,重新分配任务会在处理任务前需要长时间恢复其本地状态。引起应用程序部分或完全不可用。为防止这种情况,kafka的组管理协议允许给成员提供持久化的实体ID。根据这些实体ID组成员关系保持不变,所以不会触发rebalance。

如果你想使用静态成员关系:

  • 升级broker cluster和客户端应用到2.3或以上,同时保证升级后的brokers使用inter.broker.protocol.version大于2.3或以上。
  • 为一个组下的每个消费者实例配置ConsumerConfig#GROUP_INSTANCE_ID_CONFIG为唯一值。
  • 对于kafka stream应用程序,为每个程序设置ConsumerConfig#GROUP_INSTANCE_ID_CONFIG为唯一值,与实例使用的线程数无关。

如果使用的broker小于2.3,但是在客户端选择设置了ConsumerConfig#GROUP_INSTANCE_ID_CONFIG,程序将探测到并抛出一个UnsupportedException异常。如果不小心给不同的实例设置了重复的id,broker的围栏机制将抛出异常org.apache.kafka.common.errors.FencedInstanceIdException,通知你的重复的客户端立即关闭。更多详情KIP-345

消息传递语义

现在我们理解了producer和consumer的一些工作原理,我们讨论下kafka在producer和consumer之间提供的语义保证。显然有多种可能的消息传递语义可以保证被提供:

  • 最多一次(At most once)-消息可以丢,但是不能重新投递。
  • 至少一次(At last once)-消息不能丢,但是可以重复投递。
  • 正好一次(Exactly once)-这是人们最想到的,每个消息被投递一次,且仅有一次。

值得注意到是这分为两个问题:发布消息的保证和消费消息的保证。

很多系统生成提供“正好一次”的投递语义。但是需要用户认真阅读细则,这些声明大多数都是误导的(比如,没有考虑生产者或消费者失败的情况,多个消费者进程的情况,或写磁盘失败的情况)

kafka的语义是直接的。当发送消息时,我们有一个消息被“commited”到日志的概念。一旦一个发布的消息被提交,只要消息所写入的分区的broker是活跃的,消息就不会丢。已提交消息的定义,活跃的分区,以及我们将要处理怎样的错误,我们将在下章详细讲述。现在我们我们假设一个完美的,不丢失消息的broker同时试着理解对producer和consumer的保证。如果producer尝试发布一个消息,但是经历了网络错误,且不能确定是在消息提交之前还是之后。这很像使用自动生成的键值插入数据库表的语义。

0.11.0.0之前,如果一个producer没有收到消费是否提交的反馈,除了重新发送这个消息之外几乎没有其他选择。这个提供了at-last-once语义,因为如果原始请求已经成功,重发的消息可能又被写入日志一遍。从0.11.0.0开始kafka生产者支持幂等传输选项,以保证重发的消息不会在日志中导致重复的条目。为了实现幂等,broker给每个producer分配了一个ID,并且被producer发送的使用同一个序列号的消息会被broker删除。同样的从0.11.0.0开始,producer支持使用类似事务的语法发送消息到多个topic:要么所有的消息都成功写入,要么所有的消息都没有写入。主要的使用场景是kafka topic之间恰好一次(exactly-once)的场景(描述如下)。

并不是所有的使用场景要求这么强的一致性。对用延迟敏感的使用场景我们允许producer指定他所想要的durability level。如果producer指定他想让消息变为已提交状态,那么可能需要10ms的时间。然而producer可以指定完全异步的发送或者指定只想等到领导者节点(不必须让跟随者节点)收到这个消息。

现在让我们从consumer角度描述语义。所有副本有带有相同便宜的相同日志。consumer控制日志中的位置。如果consumer用不崩溃我们可以选择将offset存储在内存中,但是如果consumer崩溃了,我们希望让另外一个进程处理这个topic下分区的消息,当启动进程时我们需要选择一个恰当的起始位置。我们说consumer读取了一些消息 - 处理消息以及更新offset时他有几个选项。

  1. consumer可以读取消息,在日志中保存位置,然后处理消息。在这种情况下,有这样的一种可能,consumer在存储位置成功,但是在保存消息处理结果输出的时候崩溃。这种情况下新的进程将在存储的位置之后进行处理,该位置之前的一小部分消息将被丢失。这对应”至多一次(at-most-once)”语义,在consumer失败的情况下,消息有可能不会被处理。

  2. consumer可以读取消息,处理消息再保存位置。这种情况下有可能出现处理完消息,保存处理结果后,在保存位置前发生崩溃。这种情况下当新的进程将重复处理已经处理过的消息。这种情况下consumer的失败对应的是“至少一下(at-last-once)”语义。多数情况下消息有一个主键因此更新是幂等的(收到两次相同的消息,并使用他自己的副本覆盖自己的)。

那么恰好一次的语义呢(正是你想要的)?当从一个kafka的topic中消费同时生产到另外一个topic(就像在Kafka Streams应用),我们可以利用在前面提到过的0.11.0.0新的事务生产者特性。消费者的位置被作为消息存储在topic中,因此我们可以和输出接收到处理数据的topic在同一个事务中给kakfa写这个offset。如果事务异常退出,consumer的offset将会恢复到原值,同时根据“隔离级别(isolation level)“产生的输出到另外topic的数据将对consumer不可见。在默认的”读未提交(read_uncommitted)“隔离级别对用consumer的所有消息都可见,即便是异常退出的事务的消息,但是在”读提交(read_committed)“隔离级别,consumer只会返回事务中已经是提交的消息(以及不是事务一部分的消息)。

当写入外部系统时,实际的限制是将consumer写入的位置与实际的输出的内容进行协调。实现这一点的经典方法是在消费者位置的存储和消费者输出的存储之间引入两阶段提交(two-phase commit)。但是可以让consumer的位置和消息输出存储在同一个地方这样更简单,更通用。consumer的offset和消息输出写入同一个地方这种策略很好,因为大部分的consumer的输出系统不知道两阶段提交(two-phase commit)。举个例子,考虑Kafka Connectconnector会将读取的数据和consumer的offset一并填充到HDFS,这样保证消费的数据和偏移要么同时写入,要么同时没写入。对于要求更强语义的很多其他数据系统我们使用了相同的策略,同时每个消息的主键不允许重复。

kafka非常有效的支持了kafka streams的exactly-once语义,事务性producer/consumer通常被用来能够在kafka主题之间提供exactly-once传递语义。其他目标系统的exactly-once语义需要与这样的系统配合,kafka提供了consumer的offset是这变的可行(查看Kafka Connect)。除此之外kafka默认提供at-last-once语义,同时在处理一批消息之前允许客户通过关闭producer的重试并在consuer中提交偏移量来实现at-most-once语义。

副本

kafka为topic下的partiion跨可配置数量的服务器上进行日志备份(可以基于逐个topic设置副本因子)。这样在集群中服务器崩溃后可以自动故障转移到其他副本,因此在出现故障时消息仍然可用。

其他的消息系统提供了副本相关的特性,但是在我们(totally biased)看来,这似乎是一个附加的东西,没有被大量使用,有很大的负面作用:副本不处于活跃状态,吞吐量严重受影响,需要极其繁琐的人工配置等。默认情况下kafka使用副本。-事实上我们使用有副本的topic来实现非复制的topic,其副本因子是1。

副本的单位是topic partition。在没有失败的情况下,kakfa中的每个partition有一个leader和0个或多个follower。包含leader和副本总数组成副本因子配置。所有的读写操作都进入partition的leader。通常,partition比broker多很多,leader在broker上均匀分配。follower的log和leader的log完全一样-有同样的offset和相同顺序的消息(但是,当然,在给定的时间点leader在其日志文件末尾都有尚未复制的消息)。

follower就像普通的kafka的consumer一样从leader消费消息,并应用到他们自己的日志。follower从leader拉消息有一个好处,允许follower自然的将日志批量聚合到他们的日志。

就像大多数自动进行故障转移的分布式系统,对节点是否“alive“有定义。对kafka节点来说活动对节点有两个条件:

  1. 节点必须和zookeeper保持会话(通过zookeeper的心跳机制保证)。
  2. 如果follower,他必须复制leader上的消息复制,同时不能落后特别多。

我们参考满足如上两个条件的节点为“同步中(in sync)”,避免使用含糊不清的“alive“或“failed”。leader持续跟踪“同步中(in sync)”的节点集合。如果一个节点死掉了,堵住了或者消息很落后,leader将把他从in sync集合中移除。堵住或落后的副本被replica.lag.time.max.ms配置设置。

在分布式系统术语中我们仅尝试处理“失败/恢复(fail/recover)“类型的错误,其中节点突然停止工作然后恢复工作(也许节点并不知道他们曾经失败)。kafka并不处理所谓的“拜占庭“将军问题(“Byzantine“ failures),出现“拜占庭”将军问题的节点将产生恶意或者随意的回应(可能由于bug或者foul play)。

我们现在可以更加精确的定义消息被提交,当分区中所有的in sync的副本将消息写入log时,我们认为是消息被提交。只有提交的消息才能被发送给consumer。这意味着consumer不用担心如果这个leader失败可能导致的消息丢失。另一方面producer可以选择是否等待消息被提交,依赖于他的在延迟(latency)和持久性(durability)之间的权衡。这个权衡可以在producer的ack setting中控制。注意topic有一个in sync副本的”最小数量(minimum number)“设置,如果producer请求写入完全的in-sync的副本时会检查这个设置。如果producer不要求特别严格的确认,消息可以被commited,消费,即使同步副本的数量低于最小值。(例如他可以低到只保留leader)。

kafka提供了只要消息被提交就不会丢失的保证,只要同步副本中始终至少有一个处于活动状态。

节点失败出现短暂故障转移期间,kafka仍然是保持可用的。但是如果发生网络分区(network partitions)后,kafka将不可用。

备份日志:仲裁,中断处理和状态机(天啊!)Replicated Logs:Quorums, ISRs, and State Machine(Oh my!)

kafka分区的核心是一个备份的日志。分布式系统中备份日志是最基本的原语,有很多方式实现。备份的日志可以通过状态机风格state-machine style作为其他的系统的实现分布式的原语。

备份日志建模了一系列值(通常编号日志条目1,2,3)达成共识的过程。有很多方式实现,但最简单,最快速但是leader选择数值但顺序并提供序号。只要leader保持活跃,所有但follower只需拷贝数值,按照leader选择的顺序排序数值。

当然如果leader不失败,我们不需要follower!当leader失败的的时候我们需要从follower中选出一个新的leader。但follower本身可能失败导致落后或者崩溃,因此我们必须选举出最新但follower。日志备份算法的一个基本保证应该是当我们告诉客户端消息已经被提交,此时如果leader失败了,我们新选出来的leader一定得有这条消息。这产生了一个权衡:如果leader在生成消息已经被提交前等待更多的follower确认消息,则将会有更多的潜在的可选leader。

如果你选择的必要的follower的确认个数和选取一个leader需要比较的日志数量保证有一定的重叠,这就叫法定人数(Quorum)。

这个权衡的一个通用方式是对提交消息的决定和leader的选举都使用多数投票。这不是kafka使用的方式,但是为了理解这个权衡还是让我们看下这个方式。假设我们有2f+1个副本,如果leader对消息被提交的定义为至少f+1个副本接收到消息,同时如果我们要选举出来一个新的leader需要至少f+1个副本的完整日志,那么失败次数不超过f,leader将会保证有完整的已提交的消息。这是因为在f+1个副本中一定有至少一个副本包含所有的提交的消息。这个副本的日志是最完整的所以将会被选举为新的leader。剩下的有很多算法需要考虑的细节(例如日志更加完整的定义,leader失败期间保证日志一致性或修改备份集中的服务)但我们现在将要忽略这些。

多数投票方式有很好的特性:延迟依赖于最快的服务器。也就是说如果备份因子设置为3,延迟是由follower中最快的而不是最慢的那个决定的。

在这个家族中有很丰富种类的算法,包括ZooKeeper的ZapRaftViewstamped Replication。我们所知道与kafka实际实现最相似的学术报告是来自微软的PacificA

大多数选举的缺点是,当发生很多错误时可能导致没有可选举的leader。为了容忍一个失败需要数据的3个备份,容忍两个失败需要数据的5个备份。从我们的经验来看在实际的操作中只有一份冗余来做预防单次错误是不足够的,但是每次写入发生5倍的磁盘占用,以及5分之1的吞吐量在大数据问题上并不是非常实用。这就是为什么法定人数(quorum)算法经常用在共享类似zookeeper的集群配置上但是很少用在主数据存储上。例如在HDFS的命名节点的高可用特性是基于各个节点的多数投票上(majority-voted-based journal),但这种更加昂贵的方式并没用用在数据本身。

kafka采用了轻微不同的方式去选择他的法定人数。不同于使用多数投票,kafak动态维护了一组同步中的副本(in-sync replicas-ISR)追赶leader。只有这些成员有资格成为leader。对kafka分区的一次写入只有向所有的ISR中都写入了才会认定为已提交。无论何时变化这个ISR集合在zookeeper中进行的持久化。因为这样任何在ISR中的副本都可能被选举为leader。对于kakfa的使用模型来说这是一个重要因素,对很多分区以及保证领导权的平衡很重要。使用ISR模型和f+1备份,kafka的topic能够容忍f个错误而不丢失消息。

对于我们希望处理的更多的使用场景,我们认为这种权衡是合理的。实践中,为了容忍f次错误,无论法定人数方式还是ISR方式都需要在确认消息提交前有相同的数量的备份进行确认。(例如一次失败针对法定人数方法需要3个备份和一个备份的确认,ISR方式需要2个备份和一个备份的确认)。多数投票方式的一个优势是提交能避免最慢的备份服务。然而我们认为允许客户端选择在提交消息时是否阻塞来改善他,更低的复制因子带来的吞吐量的提升已经硬盘空间的节省是值得的。

另外一个重要的设计决策是kafka并不要求崩溃的节点所有的数据完好的进行恢复。依赖稳定存储的复制算法并不少见,在没有潜在的一致性冲突的情况下不能丢失任何失败-恢复的场景。这个假设有两个主要的问题:第一,在真实的持久化数据的系统中我们观察到磁盘错误是常见的问题,经常不能保证数据完整性。第二,即使磁盘没有问题,我们也不希望为了一致性保证每次都强制调用fsync,因为这样会导致2到3个数量级的性能减少。我们的协议允许一个副本重新加入ISR保证在加入之前必须全部重新同步,即使在他崩溃时丢失了没有刷新的数据。

不洁的leader选举:如果都死掉会怎么样?

注意kafka基于消息的保证是至少有一个同步中的副本。如果所有复制分区的节点挂掉,这个保证将不再有用。

然而实际的系统当所有副本挂掉的时候需要做些合理的事情。如果很不幸发生了,那么需要考虑将发生什么。有两种行为可以实现:

  1. 等待ISR中的副本回到集群,并将其作为leader(希望他仍有所有的数据)。
  2. 选择第一个副本(不一定在ISR中)回来作为leader。

这是在一致性和可用性之间的一个简单权衡。如果我们等待ISR中的副本回来,那么当ISR中的副本没有启动,那么我们就始终不可用。如果这些副本被销毁或他们的数据丢失了,我们就永久的垮了。如果,另一方面我们让不在in-sync中的副本做leader,将会使用他的日志,即便他可能没有所有的消息。从0.11.0.0版本起,默认使用第一个方式,永远等待一致性的副本。这个行为可以用配置unclean.leader.election.enable修改,以支持对可用性要求比一致性要求高的情况的使用场景。

这个困境并不是针对kafka的,所有使用法定人数投票的系统都会出现这个问题。例如在法定人数投票系统中,如果多数服务都永久失败了,你必须抉择100%丢失你的数据或随便挑选一个副本作为数据来源而损失了一致性。

可用性和持久性保证

当写入kafka,producer可以选择等待写入副本数量,0,1或全部(-1)。注意“被所有副本确认(acknowledgement by all replicas)“并不能保证所有的分配的副本确认了消息。默认情况下,当acks=all时,只要所有的in-sync的副本对消息进行了确认那么就认为完成了确认。例如一个topic被配置为仅有两个副本一个失败(例如,只有一个in-sync副本),写入指定的acks=all将会成功。然而这些写入将会丢失如果剩下的in-sync中的副本也失败。即使这保证了分区的最大的可用性,然而对于更倾向于持久性而非可用性的一些用户将是不受欢迎的。所以我们提供了两个topic级别的配置,使消息增强持久性而非可用性:

  1. 禁用unclean.leader.election - 如果所有的副本变为不可用,分区将保持不可用,直到最近的leader再次变为可用。这实际上更倾向于不可用而非消息丢失。详情可以看前面的Unclean Leader Election章节
  2. 指定一个最小的ISR大小 - 分区将仅接受ISR大于一个最小值的写入,为了防止数据只写到一个副本中造成到数据丢失,随后导致到不可用。这个设定在生产者设置ack=all时起作用,保证消息至少被in-sync的副本确认。这个设置提供了一个一致性和可用性的权衡。更高的最小ISR设置保证了更好的一致性,因为消息被保证写入多个副本,以减少丢失的可能。但是减少了可用性,因为如果in-sync的副本小于最小ISR的阈值,分区将不可用。

副本管理

上述关于副本日志的讨论仅局限于一个单独的日志,例如,一个topic分区。然而,一个kafka集群将管理成百上千个这些分区。我们尝试使用round-robin的方式来平衡集群中的分区以避免一小部分节点服务所有高容量的分区。同样的我们试着平衡leadership,以便每个节点都是其分区比例的领导者。

优化领导权选举过程同样重要,因为有不可用临界窗口。一个naive的leader选举的实现是,当节点失效后,为节点上托管的所有的分区运行每个分区的选举。相反的我们选举一个broker作为“controller”。这个管理者在broker级别探寻失败的分区,负责为在一个失败的broker下面受到影响的分区选举新的leader。结果就是我们可以批量一起处理需要进行领导权变更分区的通知,对于大量的分区来说,使选举过程更加的低廉和快速。如果管理者失败,一个幸存下来的broker会成为新的管理者。