RocketMQ 入门案例
参考官方文档:https://rocketmq.apache.org/zh/docs/4.x/introduction/02quickstart
Linux 安装 RocketMQ
安装 RocketMQ
我这里放在了/opt/software
目录下
解压压缩包:
unzip rocketmq-all-4.9.4-bin-release.zip
解压后可以得到 rocketmq-all-4.9.4-bin-release
文件夹。
我这里将其移动到了固定的目录(/opt/install
)
mv rocketmq-all-4.9.4-bin-release ../install/
rocketmq-all-4.9.4-bin-release
文件目录如下:
修改配置
在启动之前,我们需要先修改一点配置。默认情况下 RocketMQ 会吞掉我们机器的好多好多内存,为了避免 RocketMQ 吞掉过多的内存,所以我们需要修改启动文件 runserver.sh
,将其中的 java 启动参数中占用内存降低一些。
修改 runserver.sh
vim bin/runserver.sh
将
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
修改为
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
修改 runbroker.sh
vim bin/runbroker.sh
将
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g"
修改为
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx512m"
启动 NameServer
安装完成后,启动 NameServer
nohup sh bin/mqnamesrv &
查看日志确认是否启动成功
tail -f ~/logs/rocketmqlogs/namesrv.log
在namesrv.log 中看到 The Name Server boot success.., 表示NameServer 已成功启动。
启动 Broker
NameServer成功启动后,我们启动Broker
nohup sh bin/mqbroker -n localhost:9876 &
验证 Broker 是否启动成功
tail -f ~/logs/rocketmqlogs/broker.log
在 broker.log 中看到 The broker[brokerName,ip:port] boot success..,这表明 broker 已成功启动。
关闭服务器
关闭 broker
sh bin/mqshutdown broker
提示信息:
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK
说明关闭成功
关闭 NameServer
sh bin/mqshutdown namesrv
提示信息:
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK
说明关闭成功
测试案例
新建项目,引入依赖
新建一个 maven 项目,引入 rocketmq-client 依赖
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.4</version>
</dependency>
生产者
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* @Author: ranmaoqi
* @Date: 2023/11/15 11:24
*/
public class ProducerExample {
public static void main(String[] args) throws Exception {
// 创建生产者,指定生产者组
DefaultMQProducer producer = new DefaultMQProducer("testGroup");
// 指定 NameServer 地址
producer.setNamesrvAddr("192.168.31.128:9876");
// 发送消息的超时时间
producer.setSendMsgTimeout(60000);
// 启动生产者
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,指定发送的 topic、tag、body
Message message = new Message(
"testTopic",
"testTag",
("这是第" + (i + 1) + "条消息").getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息得到发送结果
SendResult sendResult = producer.send(message);
System.out.println(sendResult);
}
// 关闭生产者
producer.shutdown();
}
}
运行生产者,控制台打印如下:
SendResult [sendStatus=SEND_OK, msgId=7F0000011B7818B4AAC24B3333E30000, offsetMsgId=C0A81F8000002A9F0000000000000DBD, messageQueue=MessageQueue [topic=testTopic, brokerName=miykah, queueId=1], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=7F0000011B7818B4AAC24B3333E80001, offsetMsgId=C0A81F8000002A9F0000000000000E81, messageQueue=MessageQueue [topic=testTopic, brokerName=miykah, queueId=2], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=7F0000011B7818B4AAC24B3333E90002, offsetMsgId=C0A81F8000002A9F0000000000000F45, messageQueue=MessageQueue [topic=testTopic, brokerName=miykah, queueId=3], queueOffset=4]
SendResult
...
消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* @Author: ranmaoqi
* @Date: 2023/11/15 12:03
*/
public class ConsumerExample {
public static void main(String[] args) throws Exception {
// 创建消费者(push模式),指定消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup");
// 指定 NameServer 地址
consumer.setNamesrvAddr("192.168.31.128:9876");
// 订阅 topic,订阅其下所有消息,使用"*", 或使用其他tag
consumer.subscribe("testTopic", "*");
// 注册一个消费的监听器,当有消息的时候,会回调这个监听器来消费消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list) {
System.out.println("收到消息:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("消费者启动");
}
}
运行,控制台打印如下:
消费者启动
收到消息:这是第3条消息
收到消息:这是第2条消息
收到消息:这是第1条消息
收到消息:这是第4条消息
收到消息:这是第6条消息
...
安装可视化工具 RocketMQ-Dashboard
RocketMQ Dashboard
是 RocketMQ 的管控利器,为用户提供客户端和应用程序的各种事件、性能的统计信息,支持以可视化工具代替 Topic 配置、Broker 管理等命令行操作。
安装 RocketMQ-Dashboard
通过源码安装(也可以通过docker安装),下载地址:https://github.com/apache/rocketmq-dashboard
下载好.zip
文件后上传到服务器指定位置,解压
unzip rocketmq-dashboard-rocketmq-dashboard-1.0.0.zip
解压后,修改文件夹中 src/main/resources/application.properties
文件
vim src/main/resources/application.properties
添加内容:
server.address=0.0.0.0
server.port=19876 # 这个是dashboard启动后的访问端口
rocketmq.config.namesrvAddr=localhost:9876
打包源码文件
执行命令:
mvn clean package -Dmaven.test.skip=true
运行
nohup java -jar target/rocketmq-dashboard-1.0.0.jar --rocketmq.config.namesrvAddr=localhost:9876 > rocketmq-dashboard.log 2>&1 &
刚开始没加
--rocketmq.config.namesrvAddr=localhost:9876
, 但是运行之后,访问 ip:19876之后,会报错:org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed。
搜索发现加上这个就没报错。
访问 Dashboard
访问地址为:服务器ip:19876 (dashboard配置文件中配置的端口)
评论区