你好,我是胡夕。今天,我们继续学习消费者组Rebalance流程,这节课我们重点学习这个流程的第2大步,也就是组同步。
组同步,也就是成员向Coordinator发送SyncGroupRequest请求,等待Coordinator发送分配方案。在GroupCoordinator类中,负责处理这个请求的入口方法就是handleSyncGroup。它进一步调用doSyncGroup方法完成组同步的逻辑。后者除了给成员下发分配方案之外,还需要在元数据缓存中注册组消息,以及把组状态变更为Stable。一旦完成了组同步操作,Rebalance宣告结束,消费者组开始正常工作。
接下来,我们就来具体学习下组同步流程的实现逻辑。我们先从顶层的入口方法handleSyncGroup方法开始学习,该方法被KafkaApis类的handleSyncGroupRequest方法调用,用于处理消费者组成员发送的SyncGroupRequest请求。顺着这个入口方法,我们会不断深入,下沉到具体实现组同步逻辑的私有化方法doSyncGroup。
我们从handleSyncGroup的方法签名开始学习,代码如下:
def handleSyncGroup(
groupId: String, // 消费者组名
generation: Int, // 消费者组Generation号
memberId: String, // 消费者组成员ID
protocolType: Option[String], // 协议类型
protocolName: Option[String], // 分区消费分配策略名称
groupInstanceId: Option[String], // 静态成员Instance ID
groupAssignment: Map[String, Array[Byte]], // 按照成员分组的分配方案
responseCallback: SyncCallback // 回调函数
): Unit = {
......
}
该方法总共定义了8个参数,你可以看下注释,了解它们的含义,我重点介绍6个比较关键的参数。
你可能已经注意到了,protocolType和protocolName都是Option类型,这说明,它们的取值可能是None,即表示没有值。这是为什么呢?
目前,这两个字段的取值,其实都是Coordinator帮助消费者组确定的,也就是在Rebalance流程的上一步加入组中确定的。
如果成员成功加入组,那么,Coordinator会给这两个字段赋上正确的值,并封装进JoinGroupRequest的Response里,发送给消费者程序。一旦消费者拿到了Response中的数据,就提取出这两个字段的值,封装进SyncGroupRequest请求中,再次发送给Coordinator。
如果成员没有成功加入组,那么,Coordinator会将这两个字段赋值成None,加到Response中。因此,在这里的handleSyncGroup方法中,它们的类型就是Option。
说完了handleSyncGroup的方法签名,我们看下它的代码:
// 验证消费者状态及合法性
validateGroupStatus(groupId, ApiKeys.SYNC_GROUP) match {
// 如果未通过合法性检查,且错误原因是Coordinator正在加载
// 那么,封装REBALANCE_IN_PROGRESS异常,并调用回调函数返回
case Some(error) if error == Errors.COORDINATOR_LOAD_IN_PROGRESS =>
responseCallback(SyncGroupResult(Errors.REBALANCE_IN_PROGRESS))
// 如果是其它错误,则封装对应错误,并调用回调函数返回
case Some(error) => responseCallback(SyncGroupResult(error))
case None =>
// 获取消费者组元数据
groupManager.getGroup(groupId) match {
// 如果未找到,则封装UNKNOWN_MEMBER_ID异常,并调用回调函数返回
case None =>
responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
// 如果找到的话,则调用doSyncGroup方法执行组同步任务
case Some(group) => doSyncGroup(
group, generation, memberId, protocolType, protocolName,
groupInstanceId, groupAssignment, responseCallback)
}
}
为了方便你理解,我画了一张流程图来说明此方法的主体逻辑。
handleSyncGroup方法首先会调用上一节课我们学习过的validateGroupStatus方法,校验消费者组状态及合法性。这些检查项包括:
前两个检查项很容易理解,我重点解释一下最后两项的含义。
当Coordinator变更到其他Broker上时,需要从内部位移主题中读取消息数据,并填充到内存上的消费者组元数据缓存,这就是所谓的加载。
代码对消费者组依次执行上面这4项校验,一旦发现有项目校验失败,validateGroupStatus方法就会将检查失败的原因作为结果返回。如果是因为Coordinator正在执行加载,就意味着本次Rebalance的所有状态都丢失了。这里的状态,指的是消费者组下的成员信息。那么,此时最安全的做法,是让消费者组重新从加入组开始,因此,代码会封装REBALANCE_IN_PROGRESS异常,然后调用回调函数返回。一旦消费者组成员接收到此异常,就会知道,它至少找到了正确的Coordinator,只需要重新开启Rebalance,而不需要在开启Rebalance之前,再大费周章地去定位Coordinator组件了。但如果是其它错误,就封装该错误,然后调用回调函数返回。
倘若消费者组通过了以上校验,那么,代码就会获取该消费者组的元数据信息。如果找不到对应的元数据,就封装UNKNOWN_MEMBER_ID异常,之后调用回调函数返回;如果找到了元数据信息,就调用doSyncGroup方法执行真正的组同步逻辑。
显然,接下来我们应该学习doSyncGroup方法的源码了,这才是真正实现组同步功能的地方。
doSyncGroup方法接收的输入参数,与它的调用方法handleSyncGroup如出一辙,所以这里我就不再展开讲了,我们重点关注一下它的源码实现。
鉴于它的代码很长,我把它拆解成两个部分,并配以流程图进行介绍。
我先给出第1部分的流程图,你可以先看一下,对这个流程有个整体的感知。
下面,我们来看这部分的代码:
if (group.is(Dead)) {
responseCallback(
SyncGroupResult(Errors.COORDINATOR_NOT_AVAILABLE))
} else if (group.isStaticMemberFenced(memberId, groupInstanceId, "sync-group")) {
responseCallback(SyncGroupResult(Errors.FENCED_INSTANCE_ID))
} else if (!group.has(memberId)) {
responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
} else if (generationId != group.generationId) {
responseCallback(SyncGroupResult(Errors.ILLEGAL_GENERATION))
} else if (protocolType.isDefined && !group.protocolType.contains(protocolType.get)) {
responseCallback(SyncGroupResult(Errors.INCONSISTENT_GROUP_PROTOCOL))
} else if (protocolName.isDefined && !group.protocolName.contains(protocolName.get)) {
responseCallback(SyncGroupResult(Errors.INCONSISTENT_GROUP_PROTOCOL))
} else {
// 第2部分源码......
}
可以看到,代码非常工整,全是if-else类型的判断。
首先,这部分代码会判断消费者组的状态是否是Dead。如果是的话,就说明该组的元数据信息已经被其他线程从Coordinator中移除了,这很可能是因为Coordinator发生了变更。此时,最佳的做法是拒绝该成员的组同步操作,封装COORDINATOR_NOT_AVAILABLE异常,显式告知它去寻找最新Coordinator所在的Broker节点,然后再尝试重新加入组。
接下来的isStaticMemberFenced方法判断是有关静态成员的,我们可以不用理会。
之后,代码判断memberId字段标识的成员是否属于这个消费者组。如果不属于的话,就封装UNKNOWN_MEMBER_ID异常,并调用回调函数返回;如果属于的话,则继续下面的判断。
再之后,代码判断成员的Generation是否和消费者组的相同。如果不同的话,则封装ILLEGAL_GENERATION异常给回调函数;如果相同的话,则继续下面的判断。
接下来,代码判断成员和消费者组的协议类型是否一致。如果不一致,则封装INCONSISTENT_GROUP_PROTOCOL异常给回调函数;如果一致,就进行下一步。
最后,判断成员和消费者组的分区消费分配策略是否一致。如果不一致,同样封装INCONSISTENT_GROUP_PROTOCOL异常给回调函数。
如果这些都一致,则顺利进入到第2部分。在开始之前,我依然用一张图来展示一下这里的实现逻辑。
进入到这部分之后,代码要做什么事情,完全取决于消费者组的当前状态。如果消费者组处于CompletingRebalance状态,这部分代码要做的事情就比较复杂,我们一会儿再说,现在先看除了这个状态之外的逻辑代码。
group.currentState match {
case Empty =>
// 封装UNKNOWN_MEMBER_ID异常,调用回调函数返回
responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
case PreparingRebalance =>
// 封装REBALANCE_IN_PROGRESS异常,调用回调函数返回
responseCallback(SyncGroupResult(Errors.REBALANCE_IN_PROGRESS))
case CompletingRebalance =>
// 下面详细展开......
case Stable =>
// 获取消费者组成员元数据
val memberMetadata = group.get(memberId)
// 封装组协议类型、分配策略、成员分配方案,调用回调函数返回
responseCallback(SyncGroupResult(group.protocolType, group.protocolName, memberMetadata.assignment, Errors.NONE))
// 设定成员下次心跳时间
completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
case Dead =>
// 抛出异常
throw new IllegalStateException(s"Reached unexpected condition for Dead group ${group.groupId}")
}
如果消费者组的当前状态是Empty或PreparingRebalance,那么,代码会封装对应的异常给回调函数,供其调用。
如果是Stable状态,则说明,此时消费者组已处于正常工作状态,无需进行组同步的操作。因此,在这种情况下,简单返回消费者组当前的分配方案给回调函数,供它后面发送给消费者组成员即可。
如果是Dead状态,那就说明,这是一个异常的情况了,因为理论上,不应该为处于Dead状态的组执行组同步,因此,代码只能选择抛出IllegalStateException异常,让上层方法处理。
如果这些状态都不是,那么,消费者组就只能处于CompletingRebalance状态,这也是执行组同步操作时消费者组最有可能处于的状态。因此,这部分的逻辑要复杂一些,我们看下代码:
// 为该消费者组成员设置组同步回调函数
group.get(memberId).awaitingSyncCallback = responseCallback
// 组Leader成员发送的SyncGroupRequest请求需要特殊处理
if (group.isLeader(memberId)) {
info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")
// 如果有成员没有被分配任何消费方案,则创建一个空的方案赋给它
val missing = group.allMembers.diff(groupAssignment.keySet)
val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
if (missing.nonEmpty) {
warn(s"Setting empty assignments for members $missing of ${group.groupId} for generation ${group.generationId}")
}
// 把消费者组信息保存在消费者组元数据中,并且将其写入到内部位移主题
groupManager.storeGroup(group, assignment, (error: Errors) => {
group.inLock {
// 如果组状态是CompletingRebalance以及成员和组的generationId相同
if (group.is(CompletingRebalance) && generationId == group.generationId) {
// 如果有错误
if (error != Errors.NONE) {
// 清空分配方案并发送给所有成员
resetAndPropagateAssignmentError(group, error)
// 准备开启新一轮的Rebalance
maybePrepareRebalance(group, s"error when storing group assignment during SyncGroup (member: $memberId)")
// 如果没错误
} else {
// 在消费者组元数据中保存分配方案并发送给所有成员
setAndPropagateAssignment(group, assignment)
// 变更消费者组状态到Stable
group.transitionTo(Stable)
}
}
}
})
groupCompletedRebalanceSensor.record()
}
第1步,为该消费者组成员设置组同步回调函数。我们总说回调函数,其实它的含义很简单,也就是将传递给回调函数的数据,通过Response的方式发送给消费者组成员。
第2步,判断当前成员是否是消费者组的Leader成员。如果不是Leader成员,方法直接结束,因为,只有Leader成员的groupAssignment字段才携带了分配方案,其他成员是没有分配方案的;如果是Leader成员,则进入到下一步。
第3步,为没有分配到任何分区的成员创建一个空的分配方案,并赋值给这些成员。这一步的主要目的,是构造一个统一格式的分配方案字段assignment。
第4步,调用storeGroup方法,保存消费者组信息到消费者组元数据,同时写入到内部位移主题中。一旦完成这些动作,则进入到下一步。
第5步,在组状态是CompletingRebalance,而且成员和组的Generation ID相同的情况下,就判断一下刚刚的storeGroup操作过程中是否出现过错误:
倘若组状态不是CompletingRebalance,或者是成员和组的Generation ID不相同,这就说明,消费者组可能开启了新一轮的Rebalance,那么,此时就不能继续给成员发送分配方案。
至此,CompletingRebalance状态下的组同步操作完成。总结一下,组同步操作完成了以下3件事情:
我建议你对照着代码,自行寻找并阅读一下完成这3件事情的源码,这不仅有助于你复习下今天所学的内容,还可以帮你加深对源码的理解。阅读的时候,你思考一下,这些代码的含义是否真的如我所说。如果你有不一样的理解,欢迎写在留言区,我们可以开放式讨论。
今天,我们重点学习了Rebalance流程的第2步,也就是组同步。至此,关于Rebalance的完整流程,我们就全部学完了。
Rebalance流程是Kafka提供的一个非常关键的消费者组功能。由于它非常重要,所以,社区在持续地对它进行着改进,包括引入增量式的Rebalance以及静态成员等。我们在这两节课学的Rebalance流程,是理解这些高级功能的基础。如果你不清楚Rebalance过程中的这些步骤都是做什么的,你就无法深入地掌握增量式Rebalance或静态成员机制所做的事情。
因此,我建议你结合上节课的内容,好好学习一下消费者组的Rebalance,彻底弄明白一个消费者组成员是如何参与其中并最终完成Rebalance过程的。
我们来回顾一下这节课的重点。
讲到这里,Coordinator组件的源码,我就介绍完了。在这个模块中,我们基本上还是践行“自上而下+自下而上”的学习方式。我们先从最低层次的消费者组元数据类开始学习,逐渐上浮到它的管理器类GroupMetadataManager类以及顶层类GroupCoordinator类。接着,在学习Rebalance流程时,我们反其道而行之,先从GroupCoordinator类的入口方法进行拆解,又逐渐下沉到GroupMetadataManager和更底层的GroupMetadata以及MemberMetadata。
如果你追随着课程的脚步一路走来,你就会发现,我经常采用这种方式讲解源码。我希望,你在日后的源码学习中,也可以多尝试运用这种方法。所谓择日不如撞日,我今天就给你推荐一个课后践行此道的绝佳例子。
我建议你去阅读下clients工程中的实现消息、消息批次以及消息集合部分的源码,也就是Record、RecordBatch和Records这些接口和类的代码,去反复实践“自上而下”和“自下而上”这两种阅读方法。
其实,这种方式不只适用于Kafka源码,在阅读其他框架的源码时,也可以采用这种方式。希望你可以不断总结经验,最终提炼出一套适合自己的学习模式。
Coordinator不会将所有消费者组的所有成员的分配方案下发给单个成员,这就是说,成员A看不到成员B的分区消费分配方案。那么,你能找出来,源码中的哪行语句做了这件事情吗?
欢迎在留言区写下你的思考和答案,跟我交流讨论,也欢迎你把今天的内容分享给你的朋友。
评论