RocketMQ
核心组件
NameServer:更新路由、管理节点
Broker:处理生产和消费(store)
Producer:生产者
Consumer:消费者
Producer
消息发送流程
- 验证消息
- 查找路由
- 选择队列
- 消息发送
- 默认选择队列策略
失败规避策略:
BrokerA topic1 topic2
BrokerB topic1 topic2 - 选择队列策略增强版(故障延迟)
每次发送消息的时候,记录发送的耗时
根据耗时估算(经验数组)Broker不可用时长
Kafka存储模型:(Topic过多的时候~100,会很慢)
每一个topic都是一个独立的文件。
RocketMQ
共享一个文件。
store
定时任务:
同步刷盘:20ms
异步刷盘:500ms
随机读的时候,
增加了索引文件:consumeQueue
- 零拷贝mmap(commitlog consumeQueue)
- 读写分离 写时复制容器
消费时,定时触发 reblance(10s)
NameServer
RouteInfoManager维护了:
1 | private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; |
消费者
1. 并发消费流程
- 获取topic配置信息
- 获取Group的ConsumerList
- 获取Queue的消费Offset
- 拉去Queue的消息
- 更新Queue的消费Offset
- 注销Consumer
2. 顺序消费流程
- 获取topic配置信息
- 获取Group的ConsumerList
- 加锁Queue (锁主节点的Broker)
- 获取Queue的消费Offset
- 拉去Queue的消息
- 更新Queue的消费Offset
- 解锁Queue
- 注销Consumer
加锁
ReentrantLock
(clientId, Group, Queue) 续锁60s。
安装
需要配置环境变量,通过bat文件去运行。
场景问题
1. 消息堆积
增加机器,consumer数量。来解决消费与生产的不对等
2. 消费幂等
生产者:重试(第一次失败、网络原因)
消费者:消费完(commit的时候)网络原因 重试队列。
解决方案:
- MVCC 乐观并发控制(带上版本号)
- 去重表(redis)数据表特性(唯一索引)
- update 的时候查询一次校验
3. 消费一直不成功怎么办?
写入死信队列,订阅处理。