你好,我是胡夕。今天,我们正式进入到第四大模块“状态机”的学习。
Kafka源码中有很多状态机和管理器,比如之前我们学过的Controller通道管理器ControllerChannelManager、处理Controller事件的ControllerEventManager,等等。这些管理器和状态机,大多与各自的“宿主”组件关系密切,可以说是大小不同、功能各异。就比如Controller的这两个管理器,必须要与Controller组件紧耦合在一起才能实现各自的功能。
不过,Kafka中还是有一些状态机和管理器具有相对独立的功能框架,不严重依赖使用方,也就是我在这个模块为你精选的TopicDeletionManager(主题删除管理器)、ReplicaStateMachine(副本状态机)和PartitionStateMachine(分区状态机)。
无论是主题、分区,还是副本,它们在Kafka中的生命周期通常都有多个状态。而这3个状态机,就是来管理这些状态的。而如何实现正确、高效的管理,就是源码要解决的核心问题。
今天,我们先来学习TopicDeletionManager,看一下Kafka是如何删除一个主题的。
刚开始学习Kafka的时候,我对Kafka删除主题的认识非常“浅薄”。之前我以为成功执行了kafka-topics.sh --delete命令后,主题就会被删除。我相信,很多人可能都有过这样的错误理解。
这种不正确的认知产生的一个结果就是,我们经常发现主题没有被删除干净。于是,网上流传着一套终极“武林秘籍”:手动删除磁盘上的日志文件,以及手动删除ZooKeeper下关于主题的各个节点。
就我个人而言,我始终不推荐你使用这套“秘籍”,理由有二:
所谓“本事大不如不摊上”,我们与其琢磨删除主题失败之后怎么自救,不如踏踏实实地研究下Kafka底层是怎么执行这个操作的。搞明白它的原理之后,再有针对性地使用“秘籍”,才能做到有的放矢。你说是不是?
好了,我们就正式开始学习TopicDeletionManager吧。
这个管理器位于kafka.controller包下,文件名是TopicDeletionManager.scala。在总共不到400行的源码中,它定义了3个类结构以及20多个方法。总体而言,它还是比较容易学习的。
为了让你先有个感性认识,我画了一张TopicDeletionManager.scala的代码UML图:
TopicDeletionManager.scala这个源文件,包括3个部分。
接下来,我们逐一讨论下这3部分。首先是DeletionClient接口及其实现类。
就像前面说的,DeletionClient接口定义的方法用于删除主题,并将删除主题这件事儿同步给其他Broker。
目前,DeletionClient这个接口只有一个实现类,即ControllerDeletionClient。我们看下这个实现类的代码:
class ControllerDeletionClient(controller: KafkaController, zkClient: KafkaZkClient) extends DeletionClient {
// 删除给定主题
override def deleteTopic(topic: String, epochZkVersion: Int): Unit = {
// 删除/brokers/topics/<topic>节点
zkClient.deleteTopicZNode(topic, epochZkVersion)
// 删除/config/topics/<topic>节点
zkClient.deleteTopicConfigs(Seq(topic), epochZkVersion)
// 删除/admin/delete_topics/<topic>节点
zkClient.deleteTopicDeletions(Seq(topic), epochZkVersion)
}
// 删除/admin/delete_topics下的给定topic子节点
override def deleteTopicDeletions(topics: Seq[String], epochZkVersion: Int): Unit = {
zkClient.deleteTopicDeletions(topics, epochZkVersion)
}
// 取消/brokers/topics/<topic>节点数据变更的监听
override def mutePartitionModifications(topic: String): Unit = {
controller.unregisterPartitionModificationsHandlers(Seq(topic))
}
// 向集群Broker发送指定分区的元数据更新请求
override def sendMetadataUpdate(partitions: Set[TopicPartition]): Unit = {
controller.sendUpdateMetadataRequest(
controller.controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
}
}
这个类的构造函数接收两个字段。同时,由于是DeletionClient接口的实现类,因而该类实现了DeletionClient接口定义的四个方法。
先来说构造函数的两个字段:KafkaController实例和KafkaZkClient实例。KafkaController实例,我们已经很熟悉了,就是Controller组件对象;而KafkaZkClient实例,就是Kafka与ZooKeeper交互的客户端对象。
接下来,我们再结合代码看下DeletionClient接口实现类ControllerDeletionClient定义的4个方法。我来简单介绍下这4个方法大致是做什么的。
1.deleteTopic
它用于删除主题在ZooKeeper上的所有“痕迹”。具体方法是,分别调用KafkaZkClient的3个方法去删除ZooKeeper下/brokers/topics/
2.deleteTopicDeletions
它用于删除ZooKeeper下待删除主题的标记节点。具体方法是,调用KafkaZkClient的deleteTopicDeletions方法,批量删除一组主题在/admin/delete_topics下的子节点。注意,deleteTopicDeletions这个方法名结尾的Deletions,表示/admin/delete_topics下的子节点。所以,deleteTopic是删除主题,deleteTopicDeletions是删除/admin/delete_topics下的对应子节点。
到这里,我们还要注意的一点是,这两个方法里都有一个epochZkVersion的字段,代表期望的Controller Epoch版本号。如果你使用一个旧的Epoch版本号执行这些方法,ZooKeeper会拒绝,因为和它自己保存的版本号不匹配。如果一个Controller的Epoch值小于ZooKeeper中保存的,那么这个Controller很可能是已经过期的Controller。这种Controller就被称为Zombie Controller。epochZkVersion字段的作用,就是隔离Zombie Controller发送的操作。
3.mutePartitionModifications
它的作用是屏蔽主题分区数据变更监听器,具体实现原理其实就是取消/brokers/topics/
那为什么要取消这个监听器呢?其实,主要是为了避免操作之间的相互干扰。设想下,用户A发起了主题删除,而同时用户B为这个主题新增了分区。此时,这两个操作就会相互冲突,如果允许Controller同时处理这两个操作,势必会造成逻辑上的混乱以及状态的不一致。为了应对这种情况,在移除主题副本和分区对象前,代码要先执行这个方法,以确保不再响应用户对该主题的其他操作。
mutePartitionModifications方法的实现原理很简单,它会调用unregisterPartitionModificationsHandlers,并接着调用KafkaZkClient的unregisterZNodeChangeHandler方法,取消ZooKeeper上对给定主题的分区节点数据变更的监听。
4.sendMetadataUpdate
它会调用KafkaController的sendUpdateMetadataRequest方法,给集群所有Broker发送更新请求,告诉它们不要再为已删除主题的分区提供服务。代码如下:
override def sendMetadataUpdate(partitions: Set[TopicPartition]): Unit = {
// 给集群所有Broker发送UpdateMetadataRequest
// 通知它们给定partitions的状态变化
controller.sendUpdateMetadataRequest(
controller.controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
}
该方法会给集群中的所有Broker发送更新元数据请求,告知它们要同步给定分区的状态。
有了这些铺垫,我们再来看主题删除管理器的主要入口:TopicDeletionManager类。这个类的定义代码,如下:
class TopicDeletionManager(
// KafkaConfig类,保存Broker端参数
config: KafkaConfig,
// 集群元数据
controllerContext: ControllerContext,
// 副本状态机,用于设置副本状态
replicaStateMachine: ReplicaStateMachine,
// 分区状态机,用于设置分区状态
partitionStateMachine: PartitionStateMachine,
// DeletionClient接口,实现主题删除
client: DeletionClient) extends Logging {
this.logIdent = s"[Topic Deletion Manager ${config.brokerId}] "
// 是否允许删除主题
val isDeleteTopicEnabled: Boolean = config.deleteTopicEnable
......
}
该类主要的属性有6个,我们分别来看看。
好了,知道这些字段的含义,我们再看看TopicDeletionManager类实例是如何被创建的。
实际上,这个实例是在KafkaController类初始化时被创建的。在KafkaController类的源码中,你可以很容易地找到这行代码:
val topicDeletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine,
partitionStateMachine, new ControllerDeletionClient(this, zkClient))
可以看到,代码实例化了一个全新的ControllerDeletionClient对象,然后利用这个对象实例和replicaStateMachine、partitionStateMachine,一起创建TopicDeletionManager实例。
为了方便你理解,我再给你画一张流程图:
除了类定义和初始化,TopicDeletionManager类还定义了16个方法。在这些方法中,最重要的当属resumeDeletions方法。它是重启主题删除操作过程的方法。
主题因为某些事件可能一时无法完成删除,比如主题分区正在进行副本重分配等。一旦这些事件完成后,主题重新具备可删除的资格。此时,代码就需要调用resumeDeletions重启删除操作。
这个方法之所以很重要,是因为它还串联了TopicDeletionManager类的很多方法,如completeDeleteTopic和onTopicDeletion等。因此,你完全可以从resumeDeletions方法开始,逐渐深入到其他方法代码的学习。
那我们就先学习resumeDeletions的实现代码吧。
private def resumeDeletions(): Unit = {
// 从元数据缓存中获取要删除的主题列表
val topicsQueuedForDeletion = Set.empty[String] ++ controllerContext.topicsToBeDeleted
// 待重试主题列表
val topicsEligibleForRetry = mutable.Set.empty[String]
// 待删除主题列表
val topicsEligibleForDeletion = mutable.Set.empty[String]
if (topicsQueuedForDeletion.nonEmpty)
info(s"Handling deletion for topics ${topicsQueuedForDeletion.mkString(",")}")
// 遍历每个待删除主题
topicsQueuedForDeletion.foreach { topic =>
// 如果该主题所有副本已经是ReplicaDeletionSuccessful状态
// 即该主题已经被删除
if (controllerContext.areAllReplicasInState(topic, ReplicaDeletionSuccessful)) {
// 调用completeDeleteTopic方法完成后续操作即可
completeDeleteTopic(topic)
info(s"Deletion of topic $topic successfully completed")
// 如果主题删除尚未开始并且主题当前无法执行删除的话
} else if (!controllerContext.isAnyReplicaInState(topic, ReplicaDeletionStarted)) {
if (controllerContext.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
// 把该主题加到待重试主题列表中用于后续重试
topicsEligibleForRetry += topic
}
}
// 如果该主题能够被删除
if (isTopicEligibleForDeletion(topic)) {
info(s"Deletion of topic $topic (re)started")
topicsEligibleForDeletion += topic
}
}
// 重试待重试主题列表中的主题删除操作
if (topicsEligibleForRetry.nonEmpty) {
retryDeletionForIneligibleReplicas(topicsEligibleForRetry)
}
// 调用onTopicDeletion方法,对待删除主题列表中的主题执行删除操作
if (topicsEligibleForDeletion.nonEmpty) {
onTopicDeletion(topicsEligibleForDeletion)
}
}
通过代码我们发现,这个方法首先从元数据缓存中获取要删除的主题列表,之后定义了两个空的主题列表,分别保存待重试删除主题和待删除主题。
然后,代码遍历每个要删除的主题,去看它所有副本的状态。如果副本状态都是ReplicaDeletionSuccessful,就表明该主题已经被成功删除,此时,再调用completeDeleteTopic方法,完成后续的操作就可以了。对于那些删除操作尚未开始,并且暂时无法执行删除的主题,源码会把这类主题加到待重试主题列表中,用于后续重试;如果主题是能够被删除的,就将其加入到待删除列表中。
最后,该方法调用retryDeletionForIneligibleReplicas方法,来重试待重试主题列表中的主题删除操作。对于待删除主题列表中的主题则调用onTopicDeletion删除之。
值得一提的是,retryDeletionForIneligibleReplicas方法用于重试主题删除。这是通过将对应主题副本的状态,从ReplicaDeletionIneligible变更到OfflineReplica来完成的。这样,后续再次调用resumeDeletions时,会尝试重新删除主题。
看到这里,我们再次发现,Kafka的方法命名真的是非常规范。得益于这一点,很多时候,我们不用深入到方法内部,就能知道这个方法大致是做什么用的。比如:
这时,你再去阅读这3个方法的源码,就会发现它们的作用确实如其名字标识的那样。这也再次证明了Kafka源码质量是非常不错的。因此,不管你是不是Kafka的使用者,都可以把Kafka的源码作为阅读开源框架源码、提升自己竞争力的一个选择。
下面,我再用一张图来解释下resumeDeletions方法的执行流程:
到这里,resumeDeletions方法的逻辑我就讲完了,它果然是串联起了TopicDeletionManger中定义的很多方法。其中,比较关键的两个操作是completeDeleteTopic和onTopicDeletion。接下来,我们就分别看看。
先来看completeDeleteTopic方法代码,我给每行代码都加标了注解。
private def completeDeleteTopic(topic: String): Unit = {
// 第1步:注销分区变更监听器,防止删除过程中因分区数据变更
// 导致监听器被触发,引起状态不一致
client.mutePartitionModifications(topic)
// 第2步:获取该主题下处于ReplicaDeletionSuccessful状态的所有副本对象,
// 即所有已经被成功删除的副本对象
val replicasForDeletedTopic = controllerContext.replicasInState(topic, ReplicaDeletionSuccessful)
// 第3步:利用副本状态机将这些副本对象转换成NonExistentReplica状态。
// 等同于在状态机中删除这些副本
replicaStateMachine.handleStateChanges(
replicasForDeletedTopic.toSeq, NonExistentReplica)
// 第4步:更新元数据缓存中的待删除主题列表和已开始删除的主题列表
// 因为主题已经成功删除了,没有必要出现在这两个列表中了
controllerContext.topicsToBeDeleted -= topic
controllerContext.topicsWithDeletionStarted -= topic
// 第5步:移除ZooKeeper上关于该主题的一切“痕迹”
client.deleteTopic(topic, controllerContext.epochZkVersion)
// 第6步:移除元数据缓存中关于该主题的一切“痕迹”
controllerContext.removeTopic(topic)
}
整个过程如行云流水般一气呵成,非常容易理解,我就不多解释了。
再来看看onTopicDeletion方法的代码:
private def onTopicDeletion(topics: Set[String]): Unit = {
// 找出给定待删除主题列表中那些尚未开启删除操作的所有主题
val unseenTopicsForDeletion = topics.diff(controllerContext.topicsWithDeletionStarted)
if (unseenTopicsForDeletion.nonEmpty) {
// 获取到这些主题的所有分区对象
val unseenPartitionsForDeletion = unseenTopicsForDeletion.flatMap(controllerContext.partitionsForTopic)
// 将这些分区的状态依次调整成OfflinePartition和NonExistentPartition
// 等同于将这些分区从分区状态机中删除
partitionStateMachine.handleStateChanges(
unseenPartitionsForDeletion.toSeq, OfflinePartition)
partitionStateMachine.handleStateChanges(
unseenPartitionsForDeletion.toSeq, NonExistentPartition)
// 把这些主题加到“已开启删除操作”主题列表中
controllerContext.beginTopicDeletion(unseenTopicsForDeletion)
}
// 给集群所有Broker发送元数据更新请求,告诉它们不要再为这些主题处理数据了
client.sendMetadataUpdate(
topics.flatMap(controllerContext.partitionsForTopic))
// 分区删除操作会执行底层的物理磁盘文件删除动作
onPartitionDeletion(topics)
}
我在代码中用注释的方式,已经把onTopicDeletion方法的逻辑解释清楚了。你可以发现,这个方法也基本是串行化的流程,没什么难理解的。我再给你梳理下其中的核心点。
onTopicDeletion方法会多次使用分区状态机,来调整待删除主题的分区状态。在后两讲的分区状态机和副本状态机的课里面,我还会和你详细讲解它们,包括它们都定义了哪些状态,这些状态彼此之间的转换规则都是什么,等等。
onTopicDeletion方法的最后一行调用了onPartitionDeletion方法,来执行真正的底层物理磁盘文件删除。实际上,这是通过副本状态机状态转换操作完成的。下节课,我会和你详细聊聊这个事情。
学到这里,我还想提醒你的是,在学习TopicDeletionManager方法的时候,非常重要一点的是,你要理解主题删除的脉络。对于其他部分的源码,也是这个道理。一旦你掌握了整体的流程,阅读那些细枝末节的方法代码就没有任何难度了。照着这个方法去读源码,搞懂Kafka源码也不是什么难事!
今天,我们主要学习了TopicDeletionManager.scala中关于主题删除的代码。这里有几个要点,需要你记住。
今天我们看到的代码中出现了大量的replicaStateMachine和partitionStateMachine,其实这就是我今天反复提到的副本状态机和分区状态机。接下来两讲,我会逐步带你走进它们的代码世界,让你领略下Kafka通过状态机机制管理副本和分区的风采。
上节课,我们在学习processTopicDeletion方法的代码时,看到了一个名为markTopicIneligibleForDeletion的方法,这也是和主题删除相关的方法。现在,你能说说,它是做什么用的、它的实现原理又是怎样的吗?
欢迎你在留言区畅所欲言,跟我交流讨论,也欢迎你把今天的内容分享给你的朋友。