RocketMQ安装

By youfang

RocketMQ

安装:

Windows:

  1. 下载
  2. 配置环境变量
    1
    2
    ROCKETMQ_HOME :   E:\cloud\rocketmq-all
    NAMESRV_ADDR : localhost:9876
  3. 启动:
    • PS D:\developer\rocketmq-all-4.3.2-bin-release\bin> .\mqnamesrv.cmd
    • PS D:\developer\rocketmq-all-4.3.2-bin-release\bin> .\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
  4. 检查:
    • PS C:\Windows\system32> netstat -an|findStr 9876
      TCP 0.0.0.0:9876 0.0.0.0:0 LISTENING

遇到问题:

错误: 找不到或无法加载主类 Files\Java\jdk1.8.0_162\lib;C:\Program

原因是因为安装路径包含了空格。打开 runbroker.cmd >

修改前: set CLASSPATH=.;%BASE_DIR%conf;%CLASSPATH%

修改后: set CLASSPATH=.;%BASE_DIR%conf;"%CLASSPATH%"

RocketMq扩展

RocketMQ Console 控制台的安装与使用

参考文献1

GitHub官方地址
Gitee镜像地址

  1. git clone 到本地后
  2. mvn clean package -Dmaven.test.skip=true
  3. java -jar .\rocketmq-console-ng-2.0.0.jar
  4. 访问:http://localhost:8080/#/
V2

java -jar rocketmq-console-ng-1.0.0.jar --server.port=8080 --rocketmq.config.namesrvAddr=localhost:9876

rocketmq-spring

RocketMQ不支持任意精度的任务

延时消息的使用限制

// org/apache/rocketmq/store/config/MessageStoreConfig.java

private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;

现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关,详见代码SendMessageProcessor.java

概念

主题(Topic)

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。

RocketMQ在设计时就不希望一个消费者同时处理多个类型的消息,因此同一个consumerGroup下的consumer职责应该是一样的,不要干不同的事情(即消费多个topic)。
建议consumerGroup与topic一一对应。

标签(Tag)

为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。

如何指定topic的tags?

RocketMQ的最佳实践中推荐:一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。 在使用rocketMQTemplate发送消息时,
通过设置发送方法的destination参数来设置消息的目的地,destination的格式为topicName:tagName,:前面表示topic的名称,后面表示tags名称。

注意:

tags从命名来看像是一个复数,但发送消息时,目的地只能指定一个topic下的一个tag,不能指定多个。

消费消息时,除了获取消息payload外,还想获取RocketMQ消息的其它系统属性,需要怎么做?

消费者在实现RocketMQListener接口时,只需要起泛型为MessageExt即可,这样在onMessage方法将接收到RocketMQ原生的MessageExt消息。

1
2
3
4
5
6
7
8
@Slf4j
@Service
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
public class MyConsumer2 implements RocketMQListener<MessageExt>{
public void onMessage(MessageExt messageExt) {
log.info("received messageExt: {}", messageExt);
}
}