你好,我是胡夕。从今天这节课开始,我们进入到最后一个模块的源码学习:消费者组管理模块。

在这个模块中,我将会带你详细阅读Kafka消费者组在Broker端的源码实现,包括消费者组元数据的定义与管理、组元数据管理器、内部主题__consumer_offsets和重要的组件GroupCoordinator。

我先来给你简单介绍下这4部分的功能,以方便你对消费者组管理有一个大概的理解。

我把这4部分源码的功能,梳理到了一张思维导图中,你可以保存下来随时查阅:

今天,我们首先学习消费者组元数据的源码实现,这是我们理解消费者组工作机制和深入学习消费者组管理组件的基础。除此之外,掌握这部分代码对我们还有什么实际意义吗?

当然有了。我想你肯定对下面这条命令不陌生吧,它是查询消费者组状态的命令行工具。我们在输出中看到了GROUP、COORDINATOR、ASSIGNMENT-STRATEGY、STATE和MEMBERS等数据。实际上,这些数据就是消费者组元数据的一部分。

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group mygroup --verbose --state

GROUP     COORDINATOR (ID)  ASSIGNMENT-STRATEGY  STATE      #MEMBERS
mygroup  172.25.4.76:9092 (0)      range         Stable        2

所以你看,我们日常使用的命令和源码知识的联系是非常紧密的,弄明白了今天的内容,之后你在实际使用一些命令行工具时,就能理解得更加透彻了。

好了,我们现在就正式开始今天的学习吧。

就像我前面说的,元数据主要是由GroupMetadata和MemberMetadata两个类组成,它们分别位于GroupMetadata.scala和MemberMetadata.scala这两个源码文件中。从它们的名字上也可以看出来,前者是保存消费者组的元数据,后者是保存消费者组下成员的元数据。

由于一个消费者组下有多个成员,因此,一个GroupMetadata实例会对应于多个MemberMetadata实例。接下来,我们先学习下MemberMetadata.scala源文件。

组成员元数据(MemberMetadata)

MemberMetadata.scala源文件位于coordinator.group包下。事实上,coordinator.group包下的所有源代码,都是与消费者组功能息息相关的。下图是coordinator.group包的源码文件列表,你可以看到,MemberMetdata.scala和稍后我们要学到的GroupMetadata.scala,都在其中。

从这个包结构图中,我们还可以发现后面几节课中要学习的源码类(比如GroupCoordinator、GroupMetadataManager)也都在里面。当然了,你一定也发现了,coordinator包下还有个transcation包,里面保存了Kafka事务相关的所有源代码。如果你想深入学习事务机制的话,可以去阅读下这个包下的源代码。

现在,我们聚焦到MemberMetadata.scala文件,包括3个类和对象。

按照难易程度,我们从最简单的MemberSummary类开始学起。

MemberSummary类

MemberSummary类就是组成员元数据的一个概要数据类,它的代码本质上是一个POJO类,仅仅承载数据,没有定义任何逻辑。代码如下:

case class MemberSummary(
  memberId: String,                 // 成员ID,由Kafka自动生成
  groupInstanceId: Option[String],  // Consumer端参数group.instance.id值
  clientId: String,                 // client.id参数值
  clientHost: String,               // Consumer端程序主机名
  metadata: Array[Byte],            // 消费者组成员使用的分配策略
  assignment: Array[Byte])          // 成员订阅分区

可以看到,这个类定义了6个字段,我来详细解释下。

总之,MemberSummary类是成员概要数据的容器,类似于Java中的POJO类,不涉及任何操作逻辑,所以还是很好理解的。

MemberMetadata伴生对象

接下来,我们学习MemberMetadata伴生对象的代码。它只定义了一个plainProtocolSet方法,供上层组件调用。这个方法只做一件事儿,即从一组给定的分区分配策略详情中提取出分区分配策略的名称,并将其封装成一个集合对象,然后返回:

private object MemberMetadata {
  // 提取分区分配策略集合
  def plainProtocolSet(supportedProtocols: List[(String, Array[Byte])]) = supportedProtocols.map(_._1).toSet
}

我举个例子说明下。如果消费者组下有3个成员,它们的partition.assignment.strategy参数分别设置成RangeAssignor、RangeAssignor和RoundRobinAssignor,那么,plainProtocolSet方法的返回值就是集合[RangeAssignor,RoundRobinAssignor]。实际上,它经常被用来统计一个消费者组下的成员到底配置了多少种分区分配策略。

MemberMetadata类

现在,我们看下MemberMetadata类的源码。首先,我们看下该类的构造函数以及字段定义,了解下一个成员的元数据都有哪些。

@nonthreadsafe
private[group] class MemberMetadata(
  var memberId: String,
  val groupId: String,
  val groupInstanceId: Option[String],
  val clientId: String,
  val clientHost: String,
  val rebalanceTimeoutMs: Int,  // Rebalane操作超时时间
  val sessionTimeoutMs: Int,    // 会话超时时间
  val protocolType: String,     // 对消费者组而言,是"consumer"
  // 成员配置的多套分区分配策略
  var supportedProtocols: List[(String, Array[Byte])]) {
  // 分区分配方案
  var assignment: Array[Byte] = Array.empty[Byte]
  var awaitingJoinCallback: JoinGroupResult => Unit = null
  var awaitingSyncCallback: SyncGroupResult => Unit = null
  var isLeaving: Boolean = false
  var isNew: Boolean = false
  val isStaticMember: Boolean = groupInstanceId.isDefined
  var heartbeatSatisfied: Boolean = false
  ......
}

MemberMetadata类保存的数据很丰富,在它的构造函数中,除了包含MemberSummary类定义的6个字段外,还定义了4个新字段。

除了构造函数中的10个字段之外,该类还定义了7个额外的字段,用于保存元数据和判断状态。这些扩展字段都是var型变量,说明它们的值是可以变更的。MemberMetadata源码正是依靠这些字段,来不断地调整组成员的元数据信息和状态。

我选择了5个比较重要的扩展字段,和你介绍下。

以上就是MemberMetadata类的构造函数以及字段定义了。它定义的方法都是操作这些元数据的,而且大多都是逻辑很简单的操作。这里我选取metadata方法带你熟悉下它们的编码实现风格。你可以在课后自行阅读其他的方法代码,掌握它们的工作原理。

我们看下metadata方法的代码:

def metadata(protocol: String): Array[Byte] = {
  // 从配置的分区分配策略中寻找给定策略
  supportedProtocols.find(_._1 == protocol) match {
    case Some((_, metadata)) => metadata
    case None =>
      throw new IllegalArgumentException("Member does not support protocol")
  }
}

它实现的逻辑是:从该成员配置的分区分配方案列表中寻找给定策略的详情。如果找到,就直接返回详情字节数组数据,否则,就抛出异常。怎么样,是不是很简单?

组元数据(GroupMetadata类)

说完了组成员元数据类,我们进入到组元数据类GroupMetadata的学习。它位于coordinator.group包下的同名scala文件下。

GroupMetadata管理的是消费者组而不是消费者组成员级别的元数据,所以,它的代码结构要比MemberMetadata类复杂得多。我先画一张思维导图帮你梳理下它的代码结构。

总体上来看,GroupMetadata.scala文件由6部分构成。

接下来,我们依次看下这些代码结构中都保存了哪些元数据信息。我们从最简单的GroupState类开始。

GroupState类及实现对象

GroupState类定义了消费者组的状态。这个类及其实现对象Stable的代码如下:

// GroupState trait
private[group] sealed trait GroupState {
  // 合法前置状态
  val validPreviousStates: Set[GroupState]
}
// Stable状态
private[group] case object Stable extends GroupState {
  val validPreviousStates: Set[GroupState] = Set(CompletingRebalance)
}
......

这里我只展示了Stable状态的代码,其他4个状态的代码都差不多。为了方便你理解消费者组之间的状态流转,我绘制了一张完整的状态流转图。

你需要记住的是,一个消费者组从创建到正常工作,它的状态流转路径是Empty -> PreparingRebalance -> CompletingRebalance -> Stable。

GroupOverview类

接下来,我们看下GroupOverview类的代码。就像我刚才说的,这是一个非常简略的组概览信息。当我们在命令行运行kafka-consumer-groups.sh --list的时候,Kafka就会创建GroupOverview实例返回给命令行。

我们来看下它的代码:

case class GroupOverview(
  groupId: String,      // 组ID信息,即group.id参数值
  protocolType: String, // 消费者组的协议类型
  state: String)        // 消费者组的状态

怎么样,很简单吧。GroupOverview类封装了最基础的组数据,包括组ID、协议类型和状态信息。如果你熟悉Java Web开发的话,可以把GroupOverview和GroupMetadata的关系,理解为DAO和DTO的关系。

GroupSummary类

它的作用和GroupOverview非常相似,只不过它保存的数据要稍微多一点。我们看下它的代码:

case class GroupSummary(
  state: String,                  // 消费者组状态
  protocolType: String,           // 协议类型 
  protocol: String,               // 消费者组选定的分区分配策略
  members: List[MemberSummary])   // 成员元数据

GroupSummary类有4个字段,它们的含义也都很清晰,看字段名就能理解。你需要关注的是members字段,它是一个MemberSummary类型的列表,里面保存了消费者组所有成员的元数据信息。通过这个字段,我们可以看到,消费者组元数据和组成员元数据是1对多的关系

GroupMetadata类

最后,我们看下GroupMetadata类的源码。我们先看下该类构造函数所需的字段和自定义的扩展元数据:

@nonthreadsafe
private[group] class GroupMetadata(
  val groupId: String,  // 组ID
  initialState: GroupState, // 消费者组初始状态
  time: Time) extends Logging {
  type JoinCallback = JoinGroupResult => Unit
  // 组状态
  private var state: GroupState = initialState
  // 记录状态最近一次变更的时间戳
  var currentStateTimestamp: Option[Long] = Some(time.milliseconds())
  var protocolType: Option[String] = None
  var protocolName: Option[String] = None
  var generationId = 0
  // 记录消费者组的Leader成员,可能不存在
  private var leaderId: Option[String] = None
  // 成员元数据列表信息
  private val members = new mutable.HashMap[String, MemberMetadata]
  // 静态成员Id列表
  private val staticMembers = new mutable.HashMap[String, String]
  private var numMembersAwaitingJoin = 0
  // 分区分配策略支持票数
  private val supportedProtocols = new mutable.HashMap[String, Integer]().withDefaultValue(0)
  // 保存消费者组订阅分区的提交位移值
  private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset]
  // 消费者组订阅的主题列表
  private var subscribedTopics: Option[Set[String]] = None
  ......
}

GroupMetadata类定义的字段非常多,也正因为这样,它保存的数据是最全的,绝对担得起消费者组元数据类的称号。

除了我们之前反复提到的字段外,它还定义了很多其他字段。不过,有些字段要么是与事务相关的元数据,要么是属于中间状态的临时元数据,不属于核心的元数据,我们不需要花很多精力去学习它们。我们要重点关注的,是上面的代码中所展示的字段,这些是GroupMetadata类最重要的字段。

这些扩展字段和构造函数中的字段,共同构建出了完整的消费者组元数据。就我个人而言,我认为这些字段中最重要的就是members和offsets,它们分别保存了组内所有成员的元数据,以及这些成员提交的位移值。这样看的话,这两部分数据不就是一个消费者组最关心的3件事吗:组里面有多少个成员每个成员都负责做什么它们都做到了什么程度

总结

今天,我带你深入到了GroupMetadata.scala和MemberMetadata.scala这两个源码文件中,学习了消费者组元数据和组成员元数据的定义。它们封装了一个消费者组及其成员的所有数据。后续的GroupCoordinator和其他消费者组组件,都会大量利用这部分元数据执行消费者组的管理。

为了让你更好地掌握今天的内容,我们来回顾下这节课的重点。

今天这节课的逻辑不是特别复杂,我们重点学习了消费者组元数据的构成,几乎未曾涉及元数据的操作。在下节课,我们将继续在这两个scala文件中探索,去学习操作这些元数据的方法实现。

但我要再次强调的是,今天学习的这些方法是上层组件调用的基础。如果你想彻底了解消费者组的工作原理,就必须先把这部分基础“铺平夯实”了,这样你才能借由它们到达“完全掌握消费者组实现源码”的目的地。

课后讨论

请你思考下,这节课最开始的工具行命令输出中的ASSIGNMENT-STRATEGY项,对应于咱们今天学习的哪一项元数据呢?

欢迎在留言区写下你的思考和答案,跟我交流讨论,也欢迎你把今天的内容分享给你的朋友。

评论