RocketMQ
安装:
Windows:
- 下载
- 配置环境变量
1
2ROCKETMQ_HOME : E:\cloud\rocketmq-all
NAMESRV_ADDR : localhost:9876 - 启动:
PS D:\developer\rocketmq-all-4.3.2-bin-release\bin> .\mqnamesrv.cmd
PS D:\developer\rocketmq-all-4.3.2-bin-release\bin> .\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
- 检查:
PS C:\Windows\system32> netstat -an|findStr 9876
TCP 0.0.0.0:9876 0.0.0.0:0 LISTENING
遇到问题:
错误: 找不到或无法加载主类 Files\Java\jdk1.8.0_162\lib;C:\Program
原因是因为安装路径包含了空格。打开 runbroker.cmd >
修改前:
set CLASSPATH=.;%BASE_DIR%conf;%CLASSPATH%
修改后:
set CLASSPATH=.;%BASE_DIR%conf;"%CLASSPATH%"
RocketMq扩展
RocketMQ Console 控制台的安装与使用
- git clone 到本地后
- mvn clean package -Dmaven.test.skip=true
- java -jar .\rocketmq-console-ng-2.0.0.jar
- 访问:
http://localhost:8080/#/
V2
java -jar rocketmq-console-ng-1.0.0.jar --server.port=8080 --rocketmq.config.namesrvAddr=localhost:9876
rocketmq-spring
RocketMQ不支持任意精度的任务
延时消息的使用限制
// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关,详见代码SendMessageProcessor.java
概念
主题(Topic)
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
RocketMQ在设计时就不希望一个消费者同时处理多个类型的消息,因此同一个consumerGroup下的consumer职责应该是一样的,不要干不同的事情(即消费多个topic)。
建议consumerGroup与topic一一对应。
标签(Tag)
为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
如何指定topic的tags?
RocketMQ的最佳实践中推荐:一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。 在使用rocketMQTemplate发送消息时,
通过设置发送方法的destination参数来设置消息的目的地,destination的格式为topicName:tagName,:前面表示topic的名称,后面表示tags名称。
注意:
tags从命名来看像是一个复数,但发送消息时,目的地只能指定一个topic下的一个tag,不能指定多个。
消费消息时,除了获取消息payload外,还想获取RocketMQ消息的其它系统属性,需要怎么做?
消费者在实现RocketMQListener接口时,只需要起泛型为MessageExt即可,这样在onMessage方法将接收到RocketMQ原生的MessageExt消息。
1 | @Slf4j |