你好,我是胡夕。

上节课,我们学习了单线程事件队列模型处理Controller事件的代码。Controller组件通过ControllerEventManager类构造了一个阻塞队列,同时配以专属的事件处理线程,实现了对各类ControllerEvent的处理。

这种设计思路既保证了多线程访问所需的线程安全,还简化了Controller端的代码结构,极大地提升了代码的可维护性。

今天,我们学习下Controller选举部分的源码。

还记得我在第11节课的案例中提到的“恢复大法”——删除ZooKeeper的/controller节点吗?当时,我们靠着这个“秘籍”涉险过关,既恢复了错误的集群状态,又避免了重启整个生产环境。

但你有没有想过,为什么删除/controller节点能够令集群元数据重新保持同步呢?如果不了解这背后的原理,我们是不敢贸然在生产环境做这种操作的。今天,我们要学习的就是这背后的一整套实现逻辑,重点关注下Controller是怎么被选举出来的。

我始终认为,只有掌握了这些知识,才算真正入门Kafka服务器端的代码了。作为Broker端最重要的组件之一,Controller在Kafka中的地位无可替代。整个Kafka集群就只有一个Controller,从某种意义上来说,它是目前Kafka这个分布式系统中唯一的“单点”。

因此,了解这个“单点”的选举触发场景,以及如何被选举出来的,对于我们后面深入理解Controller在集群中的作用非常有帮助。毕竟,Controller对外提供的一些服务也是采用了类似的实现原理。

概览

ZooKeeper /controller节点

再次强调下,在一个Kafka集群中,某段时间内只能有一台Broker被选举为Controller。随着时间的推移,可能会有不同的Broker陆续担任过Controller的角色,但是在某一时刻,Controller只能由一个Broker担任

那选择哪个Broker充当Controller呢?当前,Controller的选举过程依赖ZooKeeper完成。ZooKeeper除了扮演集群元数据的“真理之源”角色,还定义了/controller临时节点(Ephemeral Node),以协助完成Controller的选举。

下面这段代码展示的是一个双Broker的Kafka集群上的ZooKeeper中/controller节点:

{"version":1,"brokerid":0,"timestamp":"1585098432431"}
cZxid = 0x1a
ctime = Wed Mar 25 09:07:12 CST 2020
mZxid = 0x1a
mtime = Wed Mar 25 09:07:12 CST 2020
pZxid = 0x1a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x100002d3a1f0000
dataLength = 54
numChildren = 0

有两个地方的内容,你要重点关注一下。

既然是临时节点,那么,一旦Broker与ZooKeeper的会话终止,该节点就会消失。Controller选举就依靠了这个特性。每个Broker都会监听/controller节点随时准备应聘Controller角色。下图展示了Broker与/controller节点的交互关系:

如图所示,集群上所有的Broker都在实时监听ZooKeeper上的这个节点。这里的“监听”有两个含义。

掌握了这些基础之后,下面我们来阅读具体的源码文件:KafkaController.scala。这是一个2200行的大文件。我先向你介绍一下这个文件的大致结构,以免你陷入到一些繁枝末节中。

源码结构

KafkaController文件的代码结构如下图所示:

整体而言,该文件大致由五部分组成。

接下来,我会给你重点介绍KafkaController类、ZooKeeper监听器和Controller选举这三大部分。在众多的ZooKeeper监听器中,我会详细介绍监听Controller变更的监听器,它也是我们了解Controller选举流程的核心环节。

KafkaController类

这个类大约有1900行代码,里面定义了非常多的变量和方法。这些方法大多是处理不同Controller事件的。后面讲到选举流程的时候,我会挑一些有代表性的来介绍。我希望你能举一反三,借此吃透其他方法的代码。毕竟,它们做的事情大同小异,至少代码风格非常相似。

在学习重要的方法之前,我们必须要先掌握KafkaController类的定义。接下来,我们从4个维度来进行学习,分别是原生字段、辅助字段、各类ZooKeeper监听器字段和统计字段。

弄明白了这些字段的含义之后,再去看操作这些字段的方法,会更加有的放矢,理解起来也会更加容易。

原生字段

首先来看原生字段。所谓的原生字段,是指在创建一个KafkaController实例时,需要指定的字段。

先来看下KafkaController类的定义代码:

// 字段含义:
// config:Kafka配置信息,通过它,你能拿到Broker端所有参数的值
// zkClient:ZooKeeper客户端,Controller与ZooKeeper的所有交互均通过该属性完成
// time:提供时间服务(如获取当前时间)的工具类
// metrics:实现指标监控服务(如创建监控指标)的工具类
// initialBrokerInfo:Broker节点信息,包括主机名、端口号,所用监听器等
// initialBrokerEpoch:Broker Epoch值,用于隔离老Controller发送的请求
// tokenManager:实现Delegation token管理的工具类。Delegation token是一种轻量级的认证机制
// threadNamePrefix:Controller端事件处理线程名字前缀
class KafkaController(val config: KafkaConfig,
                      zkClient: KafkaZkClient,
                      time: Time,
                      metrics: Metrics,
                      initialBrokerInfo: BrokerInfo,
                      initialBrokerEpoch: Long,
                      tokenManager: DelegationTokenManager,
                      threadNamePrefix: Option[String] = None)
  extends ControllerEventProcessor with Logging with KafkaMetricsGroup {
  ......
}

就像我上节课说过的,KafkaController实现了ControllerEventProcessor接口,因而也就实现了处理Controller事件的process方法。这里面比较重要的字段有3个。

其他字段要么是像time、metrics一样,是工具类字段,要么是像initialBrokerInfo、tokenManager字段一样,使用场景很有限,我就不展开讲了。

辅助字段

除了原生字段之外,KafkaController还定义了很多辅助字段,帮助实现Controller的各类功能。

我们来看一些重要的辅助字段:

......
// 集群元数据类,保存集群所有元数据
val controllerContext = new ControllerContext
// Controller端通道管理器类,负责Controller向Broker发送请求
var controllerChannelManager = new ControllerChannelManager(controllerContext, config, time, metrics,
  stateChangeLogger, threadNamePrefix)
// 线程调度器,当前唯一负责定期执行Leader重选举
private[controller] val kafkaScheduler = new KafkaScheduler(1)
// Controller事件管理器,负责管理事件处理线程
private[controller] val eventManager = new ControllerEventManager(config.brokerId, this, time,
  controllerContext.stats.rateAndTimeMetrics)
......
// 副本状态机,负责副本状态转换
val replicaStateMachine: ReplicaStateMachine = new ZkReplicaStateMachine(config, stateChangeLogger, controllerContext, zkClient,
  new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))
// 分区状态机,负责分区状态转换
val partitionStateMachine: PartitionStateMachine = new ZkPartitionStateMachine(config, stateChangeLogger, controllerContext, zkClient,
  new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))
// 主题删除管理器,负责删除主题及日志
val topicDeletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine,
  partitionStateMachine, new ControllerDeletionClient(this, zkClient))
......

其中,有7个字段是重中之重。

各类ZooKeeper监听器

我们今天开头学到的ControllerChangeHandler仅仅是其中的一种。实际上,该类定义了很多监听器,如下所示:

// Controller节点ZooKeeper监听器
private val controllerChangeHandler = new ControllerChangeHandler(eventManager)
// Broker数量ZooKeeper监听器
private val brokerChangeHandler = new BrokerChangeHandler(eventManager)
// Broker信息变更ZooKeeper监听器集合
private val brokerModificationsHandlers: mutable.Map[Int, BrokerModificationsHandler] = mutable.Map.empty
// 主题数量ZooKeeper监听器
private val topicChangeHandler = new TopicChangeHandler(eventManager)
// 主题删除ZooKeeper监听器
private val topicDeletionHandler = new TopicDeletionHandler(eventManager)
// 主题分区变更ZooKeeper监听器
private val partitionModificationsHandlers: mutable.Map[String, PartitionModificationsHandler] = mutable.Map.empty
// 主题分区重分配ZooKeeper监听器
private val partitionReassignmentHandler = new PartitionReassignmentHandler(eventManager)
// Preferred Leader选举ZooKeeper监听器
private val preferredReplicaElectionHandler = new PreferredReplicaElectionHandler(eventManager)
// ISR副本集合变更ZooKeeper监听器
private val isrChangeNotificationHandler = new IsrChangeNotificationHandler(eventManager)
// 日志路径变更ZooKeeper监听器
private val logDirEventNotificationHandler = new LogDirEventNotificationHandler(eventManager)

我分别解释一下这些ZooKeeper监听器的作用:

我画了一张脑图,希望可以帮助你更高效地记住这些ZooKeeper监听器:

统计字段

最后,我们来看统计字段。

这些统计字段大多用于计算统计指标。有的监控指标甚至是非常重要的Controller监控项,比如ActiveControllerCount指标。下面,我们来了解下KafkaController都定义了哪些统计字段。这些指标的含义一目了然,非常清晰,我用注释的方式给出每个字段的含义:

// 当前Controller所在Broker Id
@volatile private var activeControllerId = -1
// 离线分区总数
@volatile private var offlinePartitionCount = 0
// 满足Preferred Leader选举条件的总分区数
@volatile private var preferredReplicaImbalanceCount = 0
// 总主题数
@volatile private var globalTopicCount = 0
// 总主题分区数
@volatile private var globalPartitionCount = 0
// 待删除主题数
@volatile private var topicsToDeleteCount = 0
//待删除副本数
@volatile private var replicasToDeleteCount = 0
// 暂时无法删除的主题数
@volatile private var ineligibleTopicsToDeleteCount = 0
// 暂时无法删除的副本数
@volatile private var ineligibleReplicasToDeleteCount = 0

好了,KafkaController类的定义我们就全部介绍完了。再次强调下,因为KafkaController类的代码很多,我强烈建议你熟练掌握这些字段的含义,因为后面的所有方法都是围绕着这些字段进行操作的。

接下来,我以Controller的选举流程为例,引出KafkaController的一些方法的实现原理。不过,在此之前,我们要学习监听Controller变更的ZooKeeper监听器:ControllerChangeHandler的源码。

ControllerChangeHandler监听器

就像我前面说到的,KafkaController定义了十几种ZooKeeper监听器。和Controller相关的监听器是ControllerChangeHandler,用于监听Controller的变更,定义代码如下:

class ControllerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChangeHandler {
  // ZooKeeper中Controller节点路径,即/controller
  override val path: String = ControllerZNode.path
  // 监听/controller节点创建事件
  override def handleCreation(): Unit = eventManager.put(ControllerChange)
  // 监听/controller节点被删除事件
  override def handleDeletion(): Unit = eventManager.put(Reelect)
  // 监听/controller节点数据变更事件
  override def handleDataChange(): Unit = eventManager.put(ControllerChange)
}

该监听器接收ControllerEventManager实例,实现了ZNodeChangeHandler接口的三个方法:handleCreationhandleDeletionhandleDataChange。该监听器下的path变量,实际上就是/controller字符串,表示它监听ZooKeeper的这个节点。

3个handle方法都用于监听/controller节点的变更,但实现细节上稍有不同。

handleCreation和handleDataChange的处理方式是向事件队列写入ControllerChange事件;handleDeletion的处理方式是向事件队列写入Reelect事件。

Deletion表明ZooKeeper中/controller节点不存在了,即Kafka集群中的Controller暂时空缺了。因为它和Creation和DataChange是不同的状态,需要区别对待,因此,Reelect事件做的事情要比ControllerChange的多:处理ControllerChange事件,只需要当前Broker执行“卸任Controller”的逻辑即可,而Reelect事件是重选举,除了Broker执行卸任逻辑之外,还要求Broker参与到重选举中来。

由于KafkaController的process方法代码非常长,因此,我节选了刚刚提到的那两个事件的处理代码:

// process方法(部分)
override def process(event: ControllerEvent): Unit = {
    try {
      event match {
       ......
       // ControllerChange事件
       case ControllerChange =>
          processControllerChange()
       // Reelect事件
       case Reelect =>
          processReelect()
        ......
      }
    }
    ......
}
// 如果是ControllerChange事件,仅执行卸任逻辑即可
private def processControllerChange(): Unit = {
    maybeResign()
  }
// 如果是Reelect事件,还需要执行elect方法参与新一轮的选举
private def processReelect(): Unit = {
    maybeResign()
    elect()
}

可以看到,虽然代码非常长,但整体结构却工整清晰,全部都是基于模式匹配的事件处理。process方法会根据给定的Controller事件类型,调用对应的process***方法处理该事件。这里只列举了ZooKeeper端/controller节点监听器监听的两类事件,以及对应的处理方法。

对于ControllerChange事件而言,处理方式是调用maybeResign去执行Controller的卸任逻辑。如果是Reelect事件,除了执行卸任逻辑之外,还要额外执行elect方法进行新一轮的Controller选举。

Controller选举流程

说完了ControllerChangeHandler源码,我们来看下Controller的选举。所谓的Controller选举,是指Kafka选择集群中一台Broker行使Controller职责。整个选举过程分为两个步骤:触发选举和开始选举。

触发选举

我先用一张图展示下可能触发Controller选举的三个场景。

这三个场景是:

  1. 集群从零启动时;
  2. Broker侦测/controller节点消失时;
  3. Broker侦测到/controller节点数据发生变更时。

这三个场景殊途同归,最后都要执行选举Controller的动作。我来一一解释下这三个场景,然后再介绍选举Controller的具体操作。

场景一:集群从零启动

集群首次启动时,Controller尚未被选举出来。于是,Broker启动后,首先将Startup这个ControllerEvent写入到事件队列中,然后启动对应的事件处理线程和ControllerChangeHandler ZooKeeper监听器,最后依赖事件处理线程进行Controller的选举。

在源码中,KafkaController类的startup方法就是做这些事情的。当Broker启动时,它会调用这个方法启动ControllerEventThread线程。值得注意的是,每个Broker都需要做这些事情,不是说只有Controller所在的Broker才需要执行这些逻辑

startup方法的主体代码如下:

def startup() = {
  // 第1步:注册ZooKeeper状态变更监听器,它是用于监听Zookeeper会话过期的
  zkClient.registerStateChangeHandler(new StateChangeHandler {
    override val name: String = StateChangeHandlers.ControllerHandler
    override def afterInitializingSession(): Unit = {
      eventManager.put(RegisterBrokerAndReelect)
    }
    override def beforeInitializingSession(): Unit = {
      val queuedEvent = eventManager.clearAndPut(Expire)
      queuedEvent.awaitProcessing()
    }
  })
  // 第2步:写入Startup事件到事件队列
  eventManager.put(Startup)
  // 第3步:启动ControllerEventThread线程,开始处理事件队列中的ControllerEvent
  eventManager.start()
}

首先,startup方法会注册ZooKeeper状态变更监听器,用于监听Broker与ZooKeeper之间的会话是否过期。接着,写入Startup事件到事件队列,然后启动ControllerEventThread线程,开始处理事件队列中的Startup事件。

接下来,我们来学习下KafkaController的process方法处理Startup事件的方法:

// KafkaController的process方法,
override def process(event: ControllerEvent): Unit = {
    try {
      event match {
       ......
       case Startup =>
          processStartup() // 处理Startup事件
      }
    }
    ......
}
private def processStartup(): Unit = {
   // 注册ControllerChangeHandler ZooKeeper监听器
   zkClient.registerZNodeChangeHandlerAndCheckExistence(
    controllerChangeHandler)
   // 执行Controller选举
   elect()
}

从这段代码可知,process方法调用processStartup方法去处理Startup事件。而processStartup方法又会调用zkClient的registerZNodeChangeHandlerAndCheckExistence方法注册ControllerChangeHandler监听器。

值得注意的是,虽然前面的三个场景是并列的关系,但实际上,后面的两个场景必须要等场景一的这一步成功执行之后,才能被触发。

这三种场景都要选举Controller,因此,我们最后统一学习elect方法的代码实现。

总体来说,集群启动时,Broker通过向事件队列“塞入”Startup事件的方式,来触发Controller的竞选。

场景二:/controller节点消失

Broker检测到/controller节点消失时,就意味着,此时整个集群中没有Controller。因此,所有检测到/controller节点消失的Broker,都会立即调用elect方法执行竞选逻辑。

你可能会问:“Broker是怎么侦测到ZooKeeper上的这一变化的呢?”实际上,这是ZooKeeper监听器提供的功能,换句话说,这是Apache ZooKeeper自己实现的功能,所以我们才说,Kafka依赖ZooKeeper完成Controller的选举。

讲到这里,我说点题外话,社区最近正在酝酿彻底移除ZooKeeper依赖。具体到Controller端的变化,就是在Kafka内部实现一个类似于Raft的共识算法来选举Controller。我会在后面的特别放送里详细讲一下社区移除ZooKeeper的全盘计划。

场景三:/controller节点数据变更

Broker检测到/controller节点数据发生变化,通常表明,Controller“易主”了,这就分为两种情况:

具体到代码层面,maybeResign方法形象地说明了这两种情况。你要注意方法中的maybe字样,这表明,Broker可能需要执行卸任操作,也可能不需要。Kafka源码非常喜欢用maybe***来命名方法名,以表示那些在特定条件下才需要执行的逻辑。以下是maybeResign的实现:

private def maybeResign(): Unit = {
  // 非常关键的一步!这是判断是否需要执行卸任逻辑的重要依据!
  // 判断该Broker之前是否是Controller
  val wasActiveBeforeChange = isActive
  // 注册ControllerChangeHandler监听器  
  zkClient.registerZNodeChangeHandlerAndCheckExistence(
    controllerChangeHandler)
  // 获取当前集群Controller所在的Broker Id,如果没有Controller则返回-1
  activeControllerId = zkClient.getControllerId.getOrElse(-1)
  // 如果该Broker之前是Controller但现在不是了
  if (wasActiveBeforeChange && !isActive) {
    onControllerResignation() // 执行卸任逻辑
  }
}

代码的第一行非常关键,它是决定是否需要执行卸任的重要依据。毕竟,如果Broker之前不是Controller,那何来“卸任”一说呢?之后代码要注册ControllerChangeHandler监听器,获取当前集群Controller所在的Broker ID,如果没有Controller,则返回-1。有了这些数据之后,maybeResign方法需要判断该Broker是否之前是Controller但现在不是了。如果是这种情况的话,则调用onControllerResignation方法执行Controller卸任逻辑。

说到“卸任”,你可能会问:“卸任逻辑是由哪个方法执行的呢?”实际上,这是由onControllerResignation方法执行的,它主要是用于清空各种数据结构的值、取消ZooKeeper监听器、关闭各种状态机以及管理器,等等。我用注释的方式给出它的逻辑实现:

private def onControllerResignation(): Unit = {
  debug("Resigning")
  // 取消ZooKeeper监听器的注册
  zkClient.unregisterZNodeChildChangeHandler(
    isrChangeNotificationHandler.path)
  zkClient.unregisterZNodeChangeHandler(
    partitionReassignmentHandler.path)
  zkClient.unregisterZNodeChangeHandler(
    preferredReplicaElectionHandler.path)
  zkClient.unregisterZNodeChildChangeHandler(
    logDirEventNotificationHandler.path)
  unregisterBrokerModificationsHandler(
    brokerModificationsHandlers.keySet)
  // 关闭Kafka线程调度器,其实就是取消定期的Leader重选举
  kafkaScheduler.shutdown()
  // 将统计字段全部清0
  offlinePartitionCount = 0
  preferredReplicaImbalanceCount = 0
  globalTopicCount = 0
  globalPartitionCount = 0
  topicsToDeleteCount = 0
  replicasToDeleteCount = 0
  ineligibleTopicsToDeleteCount = 0
  ineligibleReplicasToDeleteCount = 0
  // 关闭Token过期检查调度器
  if (tokenCleanScheduler.isStarted)
    tokenCleanScheduler.shutdown()
  // 取消分区重分配监听器的注册
  unregisterPartitionReassignmentIsrChangeHandlers()
  // 关闭分区状态机
  partitionStateMachine.shutdown()
  // 取消主题变更监听器的注册
  zkClient.unregisterZNodeChildChangeHandler(topicChangeHandler.path)
  // 取消分区变更监听器的注册
  unregisterPartitionModificationsHandlers(
    partitionModificationsHandlers.keys.toSeq)
  // 取消主题删除监听器的注册
  zkClient.unregisterZNodeChildChangeHandler(
    topicDeletionHandler.path)
  // 关闭副本状态机
  replicaStateMachine.shutdown()
  // 取消Broker变更监听器的注册
  zkClient.unregisterZNodeChildChangeHandler(brokerChangeHandler.path)
  // 关闭Controller通道管理器
  controllerChannelManager.shutdown()
  // 清空集群元数据
  controllerContext.resetContext()
  info("Resigned")
}

选举Controller

讲完了触发场景,接下来,我们就要学习Controller选举的源码了。前面说过了,这三种选举场景最后都会调用elect方法来执行选举逻辑。我们来看下它的实现:

private def elect(): Unit = {
    // 第1步:获取当前Controller所在Broker的序号,如果Controller不存在,显式标记为-1
    activeControllerId = zkClient.getControllerId.getOrElse(-1)

    // 第2步:如果当前Controller已经选出来了,直接返回即可
    if (activeControllerId != -1) {
      debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")
      return
    }

    try {
      // 第3步:注册Controller相关信息
      // 主要是创建/controller节点
      val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)
      controllerContext.epoch = epoch
      controllerContext.epochZkVersion = epochZkVersion
      activeControllerId = config.brokerId

      info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " +
        s"and epoch zk version is now ${controllerContext.epochZkVersion}")

      // 第4步:执行当选Controller的后续逻辑
      onControllerFailover()
    } catch {
      case e: ControllerMovedException =>
        maybeResign()

        if (activeControllerId != -1)
          debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}", e)
        else
          warn("A controller has been elected but just resigned, this will result in another round of election", e)

      case t: Throwable =>
        error(s"Error while electing or becoming controller on broker ${config.brokerId}. " +
          s"Trigger controller movement immediately", t)
        triggerControllerMove()
    }
  }

为了帮助你更好地理解这个方法,我再画一张图来进行说明:

该方法首先检查Controller是否已经选出来了。要知道,集群中的所有Broker都要执行这些逻辑,因此,非常有可能出现某些Broker在执行elect方法时,Controller已经被选出来的情况。如果Controller已经选出来了,那么,自然也就不用再做什么了。相反地,如果Controller尚未被选举出来,那么,代码会尝试创建/controller节点去抢注Controller。

一旦抢注成功,就调用onControllerFailover方法,执行选举成功后的动作。这些动作包括注册各类ZooKeeper监听器、删除日志路径变更和ISR副本变更通知事件、启动Controller通道管理器,以及启动副本状态机和分区状态机。

如果抢注失败了,代码会抛出ControllerMovedException异常。这通常表明Controller已经被其他Broker抢先占据了,那么,此时代码调用maybeResign方法去执行卸任逻辑。

总结

今天,我们梳理了Controller选举的全过程,包括Controller如何借助ZooKeeper监听器实现监听Controller节点,以及Controller的选举触发场景和完整流程。我们来回顾一下这节课的重点。

下节课,我将带你学习Controller的其他重要功能,包括它如何管理Broker和副本等。你千万不要错过。

课后讨论

在这节课刚开始的时候,我提到,删除/controller会触发Controller选举,之后会同步集群元数据信息。那么,你知道源码是在哪里更新的元数据请求吗?

欢迎你在留言区畅所欲言,跟我交流讨论,也欢迎你把今天的内容分享给你的朋友。

评论