-
Notifications
You must be signed in to change notification settings - Fork 4
Kafka深度解析3
(本节所有描述都是基于consumer hight level API而非low level API)
每一个consumer实例都属于一个consumer group,每一条消息只会被同一个consumer group里的一个consumer实例消费。(不同consumer group可以同时消费同一条消息)
很多传统的message queue都会在消息被消费完后将消息删除,一方面避免重复消费,另一方面可以保证queue的长度比较少,提高效率。而如上文所将,Kafka并不删除已消费的消息,为了实现传统message queue消息只被消费一次的语义,Kafka保证保证同一个consumer group里只有一个consumer会消费一条消息。与传统message queue不同的是,Kafka还允许不同consumer group同时消费同一条消息,这一特性可以为消息的多元化处理提供了支持。实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的consumer在不同的consumer group即可。下图展示了Kafka在Linkedin的一种简化部署。
为了更清晰展示Kafka consumer group的特性,笔者作了一项测试。创建一个topic (名为topic1),创建一个属于group1的consumer实例,并创建三个属于group2的consumer实例,然后通过producer向topic1发送key分别为1,2,3r的消息。结果发现属于group1的consumer收到了所有的这三条消息,同时group2中的3个consumer分别收到了key为1,2,3的消息。如下图所示。
(本节所讲述内容均基于Kafka consumer high level API)
Kafka保证同一consumer group中只有一个consumer会消费某条消息,实际上,Kafka保证的是稳定状态下每一个consumer实例只会消费某一个或多个特定partition的数据,而某个partition的数据只会被某一个特定的consumer实例所消费。这样设计的劣势是无法让同一个consumer group里的consumer均匀消费数据,优势是每个consumer不用都跟大量的broker通信,减少通信开销,同时也降低了分配难度,实现也更简单。另外,因为同一个partition里的数据是有序的,这种设计可以保证每个partition里的数据也是有序被消费。
如果某consumer group中consumer数量少于partition数量,则至少有一个consumer会消费多个partition的数据,如果consumer的数量与partition数量相同,则正好一个consumer消费一个partition的数据,而如果consumer的数量多于partition的数量时,会有部分consumer无法消费该topic下任何一条消息。
如下例所示,如果topic1有0,1,2共三个partition,当group1只有一个consumer(名为consumer1)时,该 consumer可消费这3个partition的所有数据。
增加一个consumer(consumer2)后,其中一个consumer(consumer1)可消费2个partition的数据,另外一个consumer(consumer2)可消费另外一个partition的数据。
再增加一个consumer(consumer3)后,每个consumer可消费一个partition的数据。consumer1消费partition0,consumer2消费partition1,consumer3消费partition2
再增加一个consumer(consumer4)后,其中3个consumer可分别消费一个partition的数据,另外一个consumer(consumer4)不能消费topic1任何数据。
此时关闭consumer1,剩下的consumer可分别消费一个partition的数据。
接着关闭consumer2,剩下的consumer3可消费2个partition,consumer4可消费1个partition。
再关闭consumer3,剩下的consumer4可同时消费topic1的3个partition。
consumer rebalance算法如下:
- Sort PT (all partitions in topic T)
- Sort CG(all consumers in consumer group G)
- Let i be the index position of Ci in CG and let N=size(PT)/size(CG)
- Remove current entries owned by Ci from the partition owner registry
- Assign partitions from iN to (i+1)N-1 to consumer Ci
- Add newly assigned partitions to the partition owner registry
目前consumer rebalance的控制策略是由每一个consumer通过Zookeeper完成的。具体的控制方式如下:
- Register itself in the consumer id registry under its group.
- Register a watch on changes under the consumer id registry.
- Register a watch on changes under the broker id registry.
- If the consumer creates a message stream using a topic filter, it also registers a watch on changes under the * broker topic registry.
- Force itself to rebalance within in its consumer group.
在这种策略下,每一个consumer或者broker的增加或者减少都会触发consumer rebalance。因为每个consumer只负责调整自己所消费的partition,为了保证整个consumer group的一致性,所以当一个consumer触发了rebalance时,该consumer group内的其它所有consumer也应该同时触发rebalance。 目前(2015-01-19)最新版(0.8.2)Kafka采用的是上述方式。但该方式有不利的方面:
Herd effect
任何broker或者consumer的增减都会触发所有的consumer的rebalance
Split Brain
每个consumer分别单独通过Zookeeper判断哪些partition down了,那么不同consumer从Zookeeper“看”到的view就可能不一样,这就会造成错误的reblance尝试。而且有可能所有的consumer都认为rebalance已经完成了,但实际上可能并非如此。
根据Kafka官方文档,Kafka作者正在考虑在还未发布的0.9.x版本中使用中心协调器(coordinator)。大体思想是选举出一个broker作为coordinator,由它watch Zookeeper,从而判断是否有partition或者consumer的增减,然后生成rebalance命令,并检查是否这些rebalance在所有相关的consumer中被执行成功,如果不成功则重试,若成功则认为此次rebalance成功(这个过程跟replication controller非常类似,所以我很奇怪为什么当初设计replication controller时没有使用类似方式来解决consumer rebalance的问题)。流程如下:
通过上文介绍,想必读者已经明天了producer和consumer是如何工作的,以及Kafka是如何做replication的,接下来要讨论的是Kafka如何确保消息在producer和consumer之间传输。有这么几种可能的delivery guarantee:
- At most once 消息可能会丢,但绝不会重复传输
- At least one 消息绝不会丢,但可能会重复传输
- Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的。
Kafka的delivery guarantee semantic非常直接。当producer向broker发送消息时,一旦这条消息被commit,因数replication的存在,它就不会丢。但是如果producer发送数据给broker后,遇到的网络问题而造成通信中断,那producer就无法判断该条消息是否已经commit。这一点有点像向一个自动生成primary key的数据库表中插入数据。虽然Kafka无法确定网络故障期间发生了什么,但是producer可以生成一种类似于primary key的东西,发生故障时幂等性的retry多次,这样就做到了Exactly one。截止到目前(Kafka 0.8.2版本,2015-01-25),这一feature还并未实现,有希望在Kafka未来的版本中实现。(所以目前默认情况下一条消息从producer和broker是确保了At least once,但可通过设置producer异步发送实现At most once)。
接下来讨论的是消息从broker到consumer的delivery guarantee semantic。(仅针对Kafka consumer high level API)。consumer在从broker读取消息后,可以选择commit,该操作会在Zookeeper中存下该consumer在该partition下读取的消息的offset。该consumer下一次再读该partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。当然可以将consumer设置为autocommit,即consumer一旦读到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka是确保了Exactly once。但实际上实际使用中consumer并非读取完数据就结束了,而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了消息从broker和consumer的delivery guarantee semantic。
-
读完消息先commit再处理消息。这种模式下,如果consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚* 刚已提交而未处理的消息,这就对应于At most once
读完消息先处理再commit。这种模式下,如果处理完了消息在commit之前consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了。这就对应于At least once。在很多情况使用场景下,消息都有一个primary key,所以消息的处理往往具有幂等性,即多次处理这一条消息跟只处理一次是等效的,那就可以认为是Exactly once。(人个感觉这种说法有些牵强,毕竟它不是Kafka本身提供的机制,而且primary key本身不保证操作的幂等性。而且实际上我们说delivery guarantee semantic是讨论被处理多少次,而非处理结果怎样,因为处理方式多种多样,我们的系统不应该把处理过程的特性—如是否幂等性,当成Kafka本身的feature)
-
如果一定要做到Exactly once,就需要协调offset和实际操作的输出。精典的做法是引入两阶段提交。如果能让offset和操作输入存在同一个地方,会更简洁和通用。这种方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,consumer拿到数据后可能把数据放到HDFS,如果把最新的offset和数据本身一起写到HDFS,那就可以保证数据的输出和offset的更新要么都完成,要么都不完成,间接实现Exactly once。(目前就high level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level API的offset是由自己去维护的,可以将之存于HDFS中)
总之,Kafka默认保证At least once,并且允许通过设置producer异步提交来实现At most once。而Exactly once要求与目标存储系统协作,幸运的是Kafka提供的offset可以使用这种方式非常直接非常容易。