你好,我是胡夕。今天我们进入到分区状态机(PartitionStateMachine)源码的学习。
PartitionStateMachine负责管理Kafka分区状态的转换,和ReplicaStateMachine是一脉相承的。从代码结构、实现功能和设计原理来看,二者都极为相似。上节课我们已经学过了ReplicaStateMachine,相信你在学习这节课的PartitionStateMachine时,会轻松很多。
在面试的时候,很多面试官都非常喜欢问Leader选举的策略。学完了今天的课程之后,你不但能够说出4种Leader选举的场景,还能总结出它们的共性。对于面试来说,绝对是个加分项!
话不多说,我们就正式开始吧。
PartitionStateMachine.scala文件位于controller包下,代码结构不复杂,可以看下这张思维导图:
代码总共有5大部分。
PartitionStateMachine和ReplicaStateMachine非常类似,我们先看下面这两段代码:
// PartitionStateMachine抽象类定义
abstract class PartitionStateMachine(
controllerContext: ControllerContext) extends Logging {
......
}
// ZkPartitionStateMachine继承子类定义
class ZkPartitionStateMachine(config: KafkaConfig,
stateChangeLogger: StateChangeLogger,
controllerContext: ControllerContext,
zkClient: KafkaZkClient,
controllerBrokerRequestBatch: ControllerBrokerRequestBatch) extends PartitionStateMachine(controllerContext) {
// Controller所在Broker的Id
private val controllerId = config.brokerId
......
}
从代码中,可以发现,它们的类定义一模一样,尤其是ZkPartitionStateMachine和ZKReplicaStateMachine,它们接收的字段列表都是相同的。此刻,你应该可以体会到它们要做的处理逻辑,其实也是差不多的。
同理,ZkPartitionStateMachine实例的创建和启动时机也跟ZkReplicaStateMachine的完全相同,即:每个Broker进程启动时,会在创建KafkaController对象的过程中,生成ZkPartitionStateMachine实例,而只有Controller组件所在的Broker,才会启动分区状态机。
下面这段代码展示了ZkPartitionStateMachine实例创建和启动的位置:
class KafkaController(......) {
......
// 在KafkaController对象创建过程中,生成ZkPartitionStateMachine实例
val partitionStateMachine: PartitionStateMachine =
new ZkPartitionStateMachine(config, stateChangeLogger,
controllerContext, zkClient,
new ControllerBrokerRequestBatch(config,
controllerChannelManager, eventManager, controllerContext,
stateChangeLogger))
......
private def onControllerFailover(): Unit = {
......
replicaStateMachine.startup() // 启动副本状态机
partitionStateMachine.startup() // 启动分区状态机
......
}
}
有句话我要再强调一遍:每个Broker启动时,都会创建对应的分区状态机和副本状态机实例,但只有Controller所在的Broker才会启动它们。如果Controller变更到其他Broker,老Controller所在的Broker要调用这些状态机的shutdown方法关闭它们,新Controller所在的Broker调用状态机的startup方法启动它们。
既然ZkPartitionStateMachine是管理分区状态转换的,那么,我们至少要知道分区都有哪些状态,以及Kafka规定的转换规则是什么。这就是PartitionState接口及其实现对象做的事情。和ReplicaState类似,PartitionState定义了分区的状态空间以及流转规则。
我以OnlinePartition状态为例,说明下代码是如何实现流转的:
sealed trait PartitionState {
def state: Byte // 状态序号,无实际用途
def validPreviousStates: Set[PartitionState] // 合法前置状态集合
}
case object OnlinePartition extends PartitionState {
val state: Byte = 1
val validPreviousStates: Set[PartitionState] = Set(NewPartition, OnlinePartition, OfflinePartition)
}
如代码所示,每个PartitionState都定义了名为validPreviousStates的集合,也就是每个状态对应的合法前置状态集。
对于OnlinePartition而言,它的合法前置状态集包括NewPartition、OnlinePartition和OfflinePartition。在Kafka中,从合法状态集以外的状态向目标状态进行转换,将被视为非法操作。
目前,Kafka为分区定义了4类状态。
下图展示了完整的分区状态转换规则:
图中的双向箭头连接的两个状态可以彼此进行转换,如OnlinePartition和OfflinePartition。Kafka允许一个分区从OnlinePartition切换到OfflinePartition,反之亦然。
另外,OnlinePartition和OfflinePartition都有一根箭头指向自己,这表明OnlinePartition切换到OnlinePartition的操作是允许的。当分区Leader选举发生的时候,就可能出现这种情况。接下来,我们就聊聊分区Leader选举那些事儿。
刚刚我们说了两个状态机的相同点,接下来,我们要学习的分区Leader选举,可以说是PartitionStateMachine特有的功能了。
每个分区都必须选举出Leader才能正常提供服务,因此,对于分区而言,Leader副本是非常重要的角色。既然这样,我们就必须要了解Leader选举什么流程,以及在代码中是如何实现的。我们重点学习下选举策略以及具体的实现方法代码。
先明确下分区Leader选举的含义,其实很简单,就是为Kafka主题的某个分区推选Leader副本。
那么,Kafka定义了哪几种推选策略,或者说,在什么情况下需要执行Leader选举呢?
这就是PartitionLeaderElectionStrategy接口要做的事情,请看下面的代码:
// 分区Leader选举策略接口
sealed trait PartitionLeaderElectionStrategy
// 离线分区Leader选举策略
final case class OfflinePartitionLeaderElectionStrategy(
allowUnclean: Boolean) extends PartitionLeaderElectionStrategy
// 分区副本重分配Leader选举策略
final case object ReassignPartitionLeaderElectionStrategy
extends PartitionLeaderElectionStrategy
// 分区Preferred副本Leader选举策略
final case object PreferredReplicaPartitionLeaderElectionStrategy
extends PartitionLeaderElectionStrategy
// Broker Controlled关闭时Leader选举策略
final case object ControlledShutdownPartitionLeaderElectionStrategy
extends PartitionLeaderElectionStrategy
当前,分区Leader选举有4类场景。
针对这4类场景,分区状态机的PartitionLeaderElectionAlgorithms对象定义了4个方法,分别负责为每种场景选举Leader副本,这4种方法是:
offlinePartitionLeaderElection方法的逻辑是这4个方法中最复杂的,我们就先从它开始学起。
def offlinePartitionLeaderElection(assignment: Seq[Int],
isr: Seq[Int], liveReplicas: Set[Int],
uncleanLeaderElectionEnabled: Boolean, controllerContext: ControllerContext): Option[Int] = {
// 从当前分区副本列表中寻找首个处于存活状态的ISR副本
assignment.find(id => liveReplicas.contains(id) && isr.contains(id)).orElse {
// 如果找不到满足条件的副本,查看是否允许Unclean Leader选举
// 即Broker端参数unclean.leader.election.enable是否等于true
if (uncleanLeaderElectionEnabled) {
// 选择当前副本列表中的第一个存活副本作为Leader
val leaderOpt = assignment.find(liveReplicas.contains)
if (leaderOpt.isDefined)
controllerContext.stats.uncleanLeaderElectionRate.mark()
leaderOpt
} else {
None // 如果不允许Unclean Leader选举,则返回None表示无法选举Leader
}
}
}
我再画一张流程图,帮助你理解它的代码逻辑:
这个方法总共接收5个参数。除了你已经很熟悉的ControllerContext类,其他4个非常值得我们花一些时间去探究下。
1.assignments
这是分区的副本列表。该列表有个专属的名称,叫Assigned Replicas,简称AR。当我们创建主题之后,使用kafka-topics脚本查看主题时,应该可以看到名为Replicas的一列数据。这列数据显示的,就是主题下每个分区的AR。assignments参数类型是Seq[Int]。这揭示了一个重要的事实:AR是有顺序的,而且不一定和ISR的顺序相同!
2.isr
ISR在Kafka中很有名气,它保存了分区所有与Leader副本保持同步的副本列表。注意,Leader副本自己也在ISR中。另外,作为Seq[Int]类型的变量,isr自身也是有顺序的。
3.liveReplicas
从名字可以推断出,它保存了该分区下所有处于存活状态的副本。怎么判断副本是否存活呢?可以根据Controller元数据缓存中的数据来判定。简单来说,所有在运行中的Broker上的副本,都被认为是存活的。
4.uncleanLeaderElectionEnabled
在默认配置下,只要不是由AdminClient发起的Leader选举,这个参数的值一般是false,即Kafka不允许执行Unclean Leader选举。所谓的Unclean Leader选举,是指在ISR列表为空的情况下,Kafka选择一个非ISR副本作为新的Leader。由于存在丢失数据的风险,目前,社区已经通过把Broker端参数unclean.leader.election.enable的默认值设置为false的方式,禁止Unclean Leader选举了。
值得一提的是,社区于2.4.0.0版本正式支持在AdminClient端为给定分区选举Leader。目前的设计是,如果Leader选举是由AdminClient端触发的,那就默认开启Unclean Leader选举。不过,在学习offlinePartitionLeaderElection方法时,你可以认为uncleanLeaderElectionEnabled=false,这并不会影响你对该方法的理解。
了解了这几个参数的含义,我们就可以研究具体的流程了。
代码首先会顺序搜索AR列表,并把第一个同时满足以下两个条件的副本作为新的Leader返回:
倘若无法找到这样的副本,代码会检查是否开启了Unclean Leader选举:如果开启了,则降低标准,只要满足上面第一个条件即可;如果未开启,则本次Leader选举失败,没有新Leader被选出。
其他3个方法要简单得多,我们直接看代码:
def reassignPartitionLeaderElection(reassignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int]): Option[Int] = {
reassignment.find(id => liveReplicas.contains(id) && isr.contains(id))
}
def preferredReplicaPartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int]): Option[Int] = {
assignment.headOption.filter(id => liveReplicas.contains(id) && isr.contains(id))
}
def controlledShutdownPartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], shuttingDownBrokers: Set[Int]): Option[Int] = {
assignment.find(id => liveReplicas.contains(id) && isr.contains(id) && !shuttingDownBrokers.contains(id))
}
可以看到,它们的逻辑几乎是相同的,大概的原理都是从AR,或给定的副本列表中寻找存活状态的ISR副本。
讲到这里,你应该已经知道Kafka为分区选举Leader的大体思路了。基本上就是,找出AR列表(或给定副本列表)中首个处于存活状态,且在ISR列表的副本,将其作为新Leader。
掌握了刚刚的这些知识之后,现在,我们正式来看PartitionStateMachine的工作原理。
前面我提到过,handleStateChanges是入口方法,所以我们先看它的方法签名:
def handleStateChanges(
partitions: Seq[TopicPartition],
targetState: PartitionState,
leaderElectionStrategy: Option[PartitionLeaderElectionStrategy]):
Map[TopicPartition, Either[Throwable, LeaderAndIsr]]
如果用一句话概括handleStateChanges的作用,应该这样说:handleStateChanges把partitions的状态设置为targetState,同时,还可能需要用leaderElectionStrategy策略为partitions选举新的Leader,最终将partitions的Leader信息返回。
其中,partitions是待执行状态变更的目标分区列表,targetState是目标状态,leaderElectionStrategy是一个可选项,如果传入了,就表示要执行Leader选举。
下面是handleStateChanges方法的完整代码,我以注释的方式给出了主要的功能说明:
override def handleStateChanges(
partitions: Seq[TopicPartition],
targetState: PartitionState,
partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
if (partitions.nonEmpty) {
try {
// 清空Controller待发送请求集合,准备本次请求发送
controllerBrokerRequestBatch.newBatch()
// 调用doHandleStateChanges方法执行真正的状态变更逻辑
val result = doHandleStateChanges(
partitions,
targetState,
partitionLeaderElectionStrategyOpt
)
// Controller给相关Broker发送请求通知状态变化
controllerBrokerRequestBatch.sendRequestsToBrokers(
controllerContext.epoch)
// 返回状态变更处理结果
result
} catch {
// 如果Controller易主,则记录错误日志,然后重新抛出异常
// 上层代码会捕获该异常并执行maybeResign方法执行卸任逻辑
case e: ControllerMovedException =>
error(s"Controller moved to another broker when moving some partitions to $targetState state", e)
throw e
// 如果是其他异常,记录错误日志,封装错误返回
case e: Throwable =>
error(s"Error while moving some partitions to $targetState state", e)
partitions.iterator.map(_ -> Left(e)).toMap
}
} else { // 如果partitions为空,什么都不用做
Map.empty
}
}
整个方法就两步:第1步是,调用doHandleStateChanges方法执行分区状态转换;第2步是,Controller给相关Broker发送请求,告知它们这些分区的状态变更。至于哪些Broker属于相关Broker,以及给Broker发送哪些请求,实际上是在第1步中被确认的。
当然,这个方法的重点,就是第1步中调用的doHandleStateChanges方法。
先来看这个方法的实现:
private def doHandleStateChanges(
partitions: Seq[TopicPartition],
targetState: PartitionState,
partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
val traceEnabled = stateChangeLog.isTraceEnabled
// 初始化新分区的状态为NonExistentPartition
partitions.foreach(partition => controllerContext.putPartitionStateIfNotExists(partition, NonExistentPartition))
// 找出要执行非法状态转换的分区,记录错误日志
val (validPartitions, invalidPartitions) = controllerContext.checkValidPartitionStateChange(partitions, targetState)
invalidPartitions.foreach(partition => logInvalidTransition(partition, targetState))
// 根据targetState进入到不同的case分支
targetState match {
......
}
}
这个方法首先会做状态初始化的工作,具体来说就是,在方法调用时,不在元数据缓存中的所有分区的状态,会被初始化为NonExistentPartition。
接着,检查哪些分区执行的状态转换不合法,并为这些分区记录相应的错误日志。
之后,代码携合法状态转换的分区列表进入到case分支。由于分区状态只有4个,因此,它的case分支代码远比ReplicaStateMachine中的简单,而且,只有OnlinePartition这一路的分支逻辑相对复杂,其他3路仅仅是将分区状态设置成目标状态而已,
所以,我们来深入研究下目标状态是OnlinePartition的分支:
case OnlinePartition =>
// 获取未初始化分区列表,也就是NewPartition状态下的所有分区
val uninitializedPartitions = validPartitions.filter(
partition => partitionState(partition) == NewPartition)
// 获取具备Leader选举资格的分区列表
// 只能为OnlinePartition和OfflinePartition状态的分区选举Leader
val partitionsToElectLeader = validPartitions.filter(
partition => partitionState(partition) == OfflinePartition ||
partitionState(partition) == OnlinePartition)
// 初始化NewPartition状态分区,在ZooKeeper中写入Leader和ISR数据
if (uninitializedPartitions.nonEmpty) {
val successfulInitializations = initializeLeaderAndIsrForPartitions(uninitializedPartitions)
successfulInitializations.foreach { partition =>
stateChangeLog.info(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " +
s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}")
controllerContext.putPartitionState(partition, OnlinePartition)
}
}
// 为具备Leader选举资格的分区推选Leader
if (partitionsToElectLeader.nonEmpty) {
val electionResults = electLeaderForPartitions(
partitionsToElectLeader,
partitionLeaderElectionStrategyOpt.getOrElse(
throw new IllegalArgumentException("Election strategy is a required field when the target state is OnlinePartition")
)
)
electionResults.foreach {
case (partition, Right(leaderAndIsr)) =>
stateChangeLog.info(
s"Changed partition $partition from ${partitionState(partition)} to $targetState with state $leaderAndIsr"
)
// 将成功选举Leader后的分区设置成OnlinePartition状态
controllerContext.putPartitionState(
partition, OnlinePartition)
case (_, Left(_)) => // 如果选举失败,忽略之
}
// 返回Leader选举结果
electionResults
} else {
Map.empty
}
虽然代码有点长,但总的步骤就两步。
第1步是为NewPartition状态的分区做初始化操作,也就是在ZooKeeper中,创建并写入分区节点数据。节点的位置是/brokers/topics/<topic>/partitions/<partition>
,每个节点都要包含分区的Leader和ISR等数据。而Leader和ISR的确定规则是:选择存活副本列表的第一个副本作为Leader;选择存活副本列表作为ISR。至于具体的代码,可以看下initializeLeaderAndIsrForPartitions方法代码片段的倒数第5行:
private def initializeLeaderAndIsrForPartitions(partitions: Seq[TopicPartition]): Seq[TopicPartition] = {
......
// 获取每个分区的副本列表
val replicasPerPartition = partitions.map(partition => partition -> controllerContext.partitionReplicaAssignment(partition))
// 获取每个分区的所有存活副本
val liveReplicasPerPartition = replicasPerPartition.map { case (partition, replicas) =>
val liveReplicasForPartition = replicas.filter(replica => controllerContext.isReplicaOnline(replica, partition))
partition -> liveReplicasForPartition
}
// 按照有无存活副本对分区进行分组
// 分为两组:有存活副本的分区;无任何存活副本的分区
val (partitionsWithoutLiveReplicas, partitionsWithLiveReplicas) = liveReplicasPerPartition.partition { case (_, liveReplicas) => liveReplicas.isEmpty }
......
// 为"有存活副本的分区"确定Leader和ISR
// Leader确认依据:存活副本列表的首个副本被认定为Leader
// ISR确认依据:存活副本列表被认定为ISR
val leaderIsrAndControllerEpochs = partitionsWithLiveReplicas.map { case (partition, liveReplicas) =>
val leaderAndIsr = LeaderAndIsr(liveReplicas.head, liveReplicas.toList)
......
}.toMap
......
}
第2步是为具备Leader选举资格的分区推选Leader,代码调用electLeaderForPartitions方法实现。这个方法会不断尝试为多个分区选举Leader,直到所有分区都成功选出Leader。
选举Leader的核心代码位于doElectLeaderForPartitions方法中,该方法主要有3步。
代码很长,我先画一张图来展示它的主要步骤,然后再分步骤给你解释每一步的代码,以免你直接陷入冗长的源码行里面去。
看着好像图也很长,别着急,我们来一步步拆解下。
就像前面说的,这个方法大体分为3步。第1步是从ZooKeeper中获取给定分区的Leader、ISR信息,并将结果封装进名为validLeaderAndIsrs的容器中,代码如下:
// doElectLeaderForPartitions方法的第1部分
val getDataResponses = try {
// 批量获取ZooKeeper中给定分区的znode节点数据
zkClient.getTopicPartitionStatesRaw(partitions)
} catch {
case e: Exception =>
return (partitions.iterator.map(_ -> Left(e)).toMap, Seq.empty)
}
// 构建两个容器,分别保存可选举Leader分区列表和选举失败分区列表
val failedElections = mutable.Map.empty[TopicPartition, Either[Exception, LeaderAndIsr]]
val validLeaderAndIsrs = mutable.Buffer.empty[(TopicPartition, LeaderAndIsr)]
// 遍历每个分区的znode节点数据
getDataResponses.foreach { getDataResponse =>
val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition]
val currState = partitionState(partition)
// 如果成功拿到znode节点数据
if (getDataResponse.resultCode == Code.OK) {
TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat) match {
// 节点数据中含Leader和ISR信息
case Some(leaderIsrAndControllerEpoch) =>
// 如果节点数据的Controller Epoch值大于当前Controller Epoch值
if (leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch) {
val failMsg = s"Aborted leader election for partition $partition since the LeaderAndIsr path was " +
s"already written by another controller. This probably means that the current controller $controllerId went through " +
s"a soft failure and another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}."
// 将该分区加入到选举失败分区列表
failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
} else {
// 将该分区加入到可选举Leader分区列表
validLeaderAndIsrs += partition -> leaderIsrAndControllerEpoch.leaderAndIsr
}
// 如果节点数据不含Leader和ISR信息
case None =>
val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
// 将该分区加入到选举失败分区列表
failedElections.put(partition, Left(exception))
}
// 如果没有拿到znode节点数据,则将该分区加入到选举失败分区列表
} else if (getDataResponse.resultCode == Code.NONODE) {
val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
failedElections.put(partition, Left(exception))
} else {
failedElections.put(partition, Left(getDataResponse.resultException.get))
}
}
if (validLeaderAndIsrs.isEmpty) {
return (failedElections.toMap, Seq.empty)
}
首先,代码会批量读取ZooKeeper中给定分区的所有Znode数据。之后,会构建两个容器,分别保存可选举Leader分区列表和选举失败分区列表。接着,开始遍历每个分区的Znode节点数据,如果成功拿到Znode节点数据,节点数据包含Leader和ISR信息且节点数据的Controller Epoch值小于当前Controller Epoch值,那么,就将该分区加入到可选举Leader分区列表。倘若发现Zookeeper中保存的Controller Epoch值大于当前Epoch值,说明该分区已经被一个更新的Controller选举过Leader了,此时必须终止本次Leader选举,并将该分区放置到选举失败分区列表中。
遍历完这些分区之后,代码要看下validLeaderAndIsrs容器中是否包含可选举Leader的分区。如果一个满足选举Leader的分区都没有,方法直接返回。至此,doElectLeaderForPartitions方法的第一大步完成。
下面,我们看下该方法的第2部分代码:
// doElectLeaderForPartitions方法的第2部分
// 开始选举Leader,并根据有无Leader将分区进行分区
val (partitionsWithoutLeaders, partitionsWithLeaders) =
partitionLeaderElectionStrategy match {
case OfflinePartitionLeaderElectionStrategy(allowUnclean) =>
val partitionsWithUncleanLeaderElectionState = collectUncleanLeaderElectionState(
validLeaderAndIsrs,
allowUnclean
)
// 为OffinePartition分区选举Leader
leaderForOffline(controllerContext, partitionsWithUncleanLeaderElectionState).partition(_.leaderAndIsr.isEmpty)
case ReassignPartitionLeaderElectionStrategy =>
// 为副本重分配的分区选举Leader
leaderForReassign(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
case PreferredReplicaPartitionLeaderElectionStrategy =>
// 为分区执行Preferred副本Leader选举
leaderForPreferredReplica(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
case ControlledShutdownPartitionLeaderElectionStrategy =>
// 为因Broker正常关闭而受影响的分区选举Leader
leaderForControlledShutdown(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
}
这一步是根据给定的PartitionLeaderElectionStrategy,调用PartitionLeaderElectionAlgorithms的不同方法执行Leader选举,同时,区分出成功选举Leader和未选出Leader的分区。
前面说过了,这4种不同的策略定义了4个专属的方法来进行Leader选举。其实,如果你打开这些方法的源码,就会发现它们大同小异。基本上,选择Leader的规则,就是选择副本集合中首个存活且处于ISR中的副本作为Leader。
现在,我们再来看这个方法的最后一部分代码,这一步主要是更新ZooKeeper节点数据,以及Controller端元数据缓存信息。
// doElectLeaderForPartitions方法的第3部分
// 将所有选举失败的分区全部加入到Leader选举失败分区列表
partitionsWithoutLeaders.foreach { electionResult =>
val partition = electionResult.topicPartition
val failMsg = s"Failed to elect leader for partition $partition under strategy $partitionLeaderElectionStrategy"
failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
}
val recipientsPerPartition = partitionsWithLeaders.map(result => result.topicPartition -> result.liveReplicas).toMap
val adjustedLeaderAndIsrs = partitionsWithLeaders.map(result => result.topicPartition -> result.leaderAndIsr.get).toMap
// 使用新选举的Leader和ISR信息更新ZooKeeper上分区的znode节点数据
val UpdateLeaderAndIsrResult(finishedUpdates, updatesToRetry) = zkClient.updateLeaderAndIsr(
adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
// 对于ZooKeeper znode节点数据更新成功的分区,封装对应的Leader和ISR信息
// 构建LeaderAndIsr请求,并将该请求加入到Controller待发送请求集合
// 等待后续统一发送
finishedUpdates.foreach { case (partition, result) =>
result.foreach { leaderAndIsr =>
val replicaAssignment = controllerContext.partitionFullReplicaAssignment(partition)
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipientsPerPartition(partition), partition,
leaderIsrAndControllerEpoch, replicaAssignment, isNew = false)
}
}
// 返回选举结果,包括成功选举并更新ZooKeeper节点的分区、选举失败分区以及
// ZooKeeper节点更新失败的分区
(finishedUpdates ++ failedElections, updatesToRetry)
首先,将上一步中所有选举失败的分区,全部加入到Leader选举失败分区列表。
然后,使用新选举的Leader和ISR信息,更新ZooKeeper上分区的Znode节点数据。对于ZooKeeper Znode节点数据更新成功的那些分区,源码会封装对应的Leader和ISR信息,构建LeaderAndIsr请求,并将该请求加入到Controller待发送请求集合,等待后续统一发送。
最后,方法返回选举结果,包括成功选举并更新ZooKeeper节点的分区列表、选举失败分区列表,以及ZooKeeper节点更新失败的分区列表。
这会儿,你还记得handleStateChanges方法的第2步是Controller给相关的Broker发送请求吗?那么,到底要给哪些Broker发送哪些请求呢?其实就是在上面这步完成的,即这行语句:
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(
recipientsPerPartition(partition), partition,
leaderIsrAndControllerEpoch, replicaAssignment, isNew = false)
今天,我们重点学习了PartitionStateMachine.scala文件的源码,主要是研究了Kafka分区状态机的构造原理和工作机制。
学到这里,我们再来回答开篇面试官的问题,应该就不是什么难事了。现在我们知道了,Kafka目前提供4种Leader选举策略,分别是分区下线后的Leader选举、分区执行副本重分配时的Leader选举、分区执行Preferred副本Leader选举,以及Broker下线时的分区Leader选举。
这4类选举策略在选择Leader这件事情上有着类似的逻辑,那就是,它们几乎都是选择当前副本有序集合中的、首个处于ISR集合中的存活副本作为新的Leader。当然,个别选举策略可能会有细小的差别,你可以结合我们今天学到的源码,课下再深入地研究一下每一类策略的源码。
我们来回顾下这节课的重点。
下个模块,我们将来到Kafka延迟操作代码的世界。在那里,你能了解Kafka是如何实现一个延迟请求的处理的。另外,一个O(N)时间复杂度的时间轮算法也等候在那里,到时候我们一起研究下它!
源码中有个triggerOnlineStateChangeForPartitions方法,请你分析下,它是做什么用的,以及它何时被调用?
欢迎你在留言区畅所欲言,跟我交流讨论,也欢迎你把今天的内容分享给你的朋友。
评论