RocketMQ

当前笔者使用版本为 RocketMQ 5.3.4-SNAPSHOT

保证顺序消费的四把锁

  1. 分布式锁(保证加入全局分区锁时的并发安全–broker中维护,类比redis

img

  1. 全局分区锁(key为消费者组名(因为多个topic被多个消费者组订阅),维护队列与单个消费者实例的锁)

img

img

img

  1. Synchronized(保证一个消费者实例只有一个线程拉取消息、消费消息、提交消息)

img

让单个实例的一个线程进行校验、拉取消息、消费消息、提交消息

  1. ReentrantLock(给队列加入一个锁,告诉别人我要进行使用,防止rebalance时队列调度到其他消费者组

img

可以看到processQueue的setDropped方法就是在重平衡的时候进行的

img

关于Kafka与RocketMQ的重平衡

对于顺序队列来说不会参与重平衡分配

RebalanceImpl&updateMessageQueueAssignment()

image-20250610175031735

维度 Kafka RocketMQ
触发机制 实时事件触发(消费者加入/离开、心跳失败、订阅变化) 定时轮询触发(默认 20 秒一次)
协调机制 中央协调器(GroupCoordinator)统一调度、Leader 选举 客户端自我协商,每个消费者独立发起
是否暂停消费全组 整个组暂停消费直到分配完成 局部影响,其余消费者继续工作
分配者角色 Leader 消费者计算新分配方案 每个客户端各自执行分配策略
分配粒度 Partition 粒度 MessageQueue 粒度
稳定性策略(新版) 支持 Cooperative Sticky(渐进式分配) 默认是平均分配策略,可自定义
消费者变动频率容忍性 容忍性差(高频上下线会频繁 Rebalance) 容忍性强(轮询机制稳定)

RocketMQ 重平衡—>RebalanceService

image-20250610172210297

RebalanceImpl&doReblance

public boolean doRebalance(final boolean isOrder) {
boolean balanced = true;

// 获取当前客户端订阅的所有 Topic
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();

if (subTable != null) {
for (Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();

try {
if (!clientRebalance(topic)) {
// 如果客户端不允许自己重平衡(如顺序消费场景)
boolean result = this.getRebalanceResultFromBroker(topic, isOrder);
if (!result) {
balanced = false;
}
} else {
// 普通消费场景:客户端自行计算并执行 Rebalance
boolean result = this.rebalanceByTopic(topic, isOrder);
if (!result) {
balanced = false;
}
}

} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalance Exception", e);
balanced = false;
}
}
}
}

// 清理掉不再订阅的 Topic 对应的 MessageQueue
this.truncateMessageQueueNotMyTopic();

return balanced;
}

简单举例

有一个Topic 有6个queue/partition

消费者实例A,B

新加入实例C

Kafka行为

  • 系统出发Rebalance,暂停所有消费者
  • 由Leader消费者重新分配分区
  • 所有消费者接收新分配后再恢复消费
1. 所有消费者发送 JoinGroup 请求给 GroupCoordinator(Broker);
2. GroupCoordinator 选中最先成功响应的消费者 → Group Leader;
3. Leader 收集其他消费者的订阅信息;
4. Leader 调用分配策略(RoundRobin、Range、Sticky 等);
5. Leader 把结果发回 Broker;
6. Broker 将分配结果下发给所有消费者;

RocketMQ行为

  • 每个客户端RebalanceService启动&定时轮询
  • 发现C加入后,队列局部重新分配
  • C接手部分队列,其余消费者照常消费

也就是说可能是A和C进入分配(此时A的定时器刚好触发然后发现C实例加入了)B正常消费不受影响

总结:RocketMQ 的 Rebalance 存在天然“延迟生效”机制,新加入的消费者不会立刻就能分配到队列

时序演示
时间轴:        T0       T+5s        T+20s       T+21s
─────────┬────────────┬────────────┬────────────

NameServer: | ←—— 更新消费者列表(注册 C) ——→ |

A: Q0 Q1 Q2
| ← 感知新成员 C(下一轮 Rebalance) |
| ←—— 释放 Q2 给 C ——→ |

B: Q3 Q4 Q5
| (不变) |

C: (空转)
| ← 初次启动,未分到队列 |
| ←—— 分配 Q2,开始消费 ——→ |