RocketMQ

By youfang

RocketMQ

核心组件

NameServer:更新路由、管理节点
Broker:处理生产和消费(store)
Producer:生产者
Consumer:消费者

Producer

消息发送流程

  1. 验证消息
  2. 查找路由
  3. 选择队列
  4. 消息发送
  • 默认选择队列策略
    失败规避策略:
    BrokerA topic1 topic2
    BrokerB topic1 topic2
  • 选择队列策略增强版(故障延迟)
    每次发送消息的时候,记录发送的耗时
    根据耗时估算(经验数组)Broker不可用时长

Kafka存储模型:(Topic过多的时候~100,会很慢)
每一个topic都是一个独立的文件。
RocketMQ
共享一个文件。

store

定时任务:
同步刷盘:20ms
异步刷盘:500ms

随机读的时候,
增加了索引文件:consumeQueue

  1. 零拷贝mmap(commitlog consumeQueue)
  2. 读写分离 写时复制容器

消费时,定时触发 reblance(10s)

NameServer

RouteInfoManager维护了:

1
2
3
4
5
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

消费者

1. 并发消费流程
  1. 获取topic配置信息
  2. 获取Group的ConsumerList
  3. 获取Queue的消费Offset
  4. 拉去Queue的消息
  5. 更新Queue的消费Offset
  6. 注销Consumer
2. 顺序消费流程
  1. 获取topic配置信息
  2. 获取Group的ConsumerList
  3. 加锁Queue (锁主节点的Broker)
  4. 获取Queue的消费Offset
  5. 拉去Queue的消息
  6. 更新Queue的消费Offset
  7. 解锁Queue
  8. 注销Consumer

    加锁 ReentrantLock(clientId, Group, Queue) 续锁60s。

安装

需要配置环境变量,通过bat文件去运行。

场景问题

1. 消息堆积

增加机器,consumer数量。来解决消费与生产的不对等

2. 消费幂等

生产者:重试(第一次失败、网络原因)

消费者:消费完(commit的时候)网络原因 重试队列。

解决方案:

  1. MVCC 乐观并发控制(带上版本号)
  2. 去重表(redis)数据表特性(唯一索引)
  3. update 的时候查询一次校验
3. 消费一直不成功怎么办?

写入死信队列,订阅处理。