侧边栏壁纸
  • 累计撰写 27 篇文章
  • 累计创建 42 个标签
  • 累计收到 33 条评论

目 录CONTENT

文章目录

RocketMQ 安装及入门案例

miykah
2023-11-15 / 0 评论 / 0 点赞 / 58 阅读 / 11569 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2023-11-15,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

RocketMQ 入门案例

参考官方文档:https://rocketmq.apache.org/zh/docs/4.x/introduction/02quickstart

Linux 安装 RocketMQ

安装 RocketMQ

下载 RocketMQ 二进制包,到服务器指定位置。

下载地址:https://rocketmq.apache.org/zh/download/

我这里放在了/opt/software 目录下

解压压缩包:

 unzip rocketmq-all-4.9.4-bin-release.zip 

解压后可以得到 rocketmq-all-4.9.4-bin-release 文件夹。

我这里将其移动到了固定的目录(/opt/install)

mv rocketmq-all-4.9.4-bin-release ../install/

rocketmq-all-4.9.4-bin-release 文件目录如下:

修改配置

在启动之前,我们需要先修改一点配置。默认情况下 RocketMQ 会吞掉我们机器的好多好多内存,为了避免 RocketMQ 吞掉过多的内存,所以我们需要修改启动文件 runserver.sh,将其中的 java 启动参数中占用内存降低一些。

修改 runserver.sh

vim bin/runserver.sh

JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

修改为

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

修改 runbroker.sh

vim bin/runbroker.sh

JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g"

修改为

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx512m"

启动 NameServer

安装完成后,启动 NameServer

nohup sh bin/mqnamesrv &

查看日志确认是否启动成功

tail -f ~/logs/rocketmqlogs/namesrv.log

在namesrv.log 中看到 The Name Server boot success.., 表示NameServer 已成功启动。

启动 Broker

NameServer成功启动后,我们启动Broker

nohup sh bin/mqbroker -n localhost:9876 &

验证 Broker 是否启动成功

tail -f ~/logs/rocketmqlogs/broker.log 

在 broker.log 中看到 The broker[brokerName,ip:port] boot success..,这表明 broker 已成功启动。

关闭服务器

关闭 broker

sh bin/mqshutdown broker

提示信息:

The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK

说明关闭成功

关闭 NameServer

sh bin/mqshutdown namesrv

提示信息:

The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK

说明关闭成功

测试案例

新建项目,引入依赖

新建一个 maven 项目,引入 rocketmq-client 依赖

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.4</version>
</dependency>

生产者

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * @Author: ranmaoqi
 * @Date: 2023/11/15 11:24
 */
public class ProducerExample {

    public static void main(String[] args) throws Exception {
        // 创建生产者,指定生产者组
        DefaultMQProducer producer = new DefaultMQProducer("testGroup");
        // 指定 NameServer 地址
        producer.setNamesrvAddr("192.168.31.128:9876");
        // 发送消息的超时时间
        producer.setSendMsgTimeout(60000);

        // 启动生产者
        producer.start();

        for (int i = 0; i < 100; i++) {
            // 创建消息,指定发送的 topic、tag、body
            Message message = new Message(
                    "testTopic",
                    "testTag",
                    ("这是第" + (i + 1) + "条消息").getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 发送消息得到发送结果
            SendResult sendResult = producer.send(message);
            System.out.println(sendResult);
        }

        // 关闭生产者
        producer.shutdown();
    }

}

运行生产者,控制台打印如下:

SendResult [sendStatus=SEND_OK, msgId=7F0000011B7818B4AAC24B3333E30000, offsetMsgId=C0A81F8000002A9F0000000000000DBD, messageQueue=MessageQueue [topic=testTopic, brokerName=miykah, queueId=1], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=7F0000011B7818B4AAC24B3333E80001, offsetMsgId=C0A81F8000002A9F0000000000000E81, messageQueue=MessageQueue [topic=testTopic, brokerName=miykah, queueId=2], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=7F0000011B7818B4AAC24B3333E90002, offsetMsgId=C0A81F8000002A9F0000000000000F45, messageQueue=MessageQueue [topic=testTopic, brokerName=miykah, queueId=3], queueOffset=4]
SendResult

...

消费者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @Author: ranmaoqi
 * @Date: 2023/11/15 12:03
 */
public class ConsumerExample {

    public static void main(String[] args) throws Exception {
        // 创建消费者(push模式),指定消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup");
        // 指定 NameServer 地址
        consumer.setNamesrvAddr("192.168.31.128:9876");
        // 订阅 topic,订阅其下所有消息,使用"*", 或使用其他tag
        consumer.subscribe("testTopic", "*");

        // 注册一个消费的监听器,当有消息的时候,会回调这个监听器来消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    System.out.println("收到消息:" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者
        consumer.start();
        System.out.println("消费者启动");
    }

}

运行,控制台打印如下:

消费者启动
收到消息:这是第3条消息
收到消息:这是第2条消息
收到消息:这是第1条消息
收到消息:这是第4条消息
收到消息:这是第6条消息

...

安装可视化工具 RocketMQ-Dashboard

RocketMQ Dashboard 是 RocketMQ 的管控利器,为用户提供客户端和应用程序的各种事件、性能的统计信息,支持以可视化工具代替 Topic 配置、Broker 管理等命令行操作。

安装 RocketMQ-Dashboard

通过源码安装(也可以通过docker安装),下载地址:https://github.com/apache/rocketmq-dashboard

下载好.zip 文件后上传到服务器指定位置,解压

unzip rocketmq-dashboard-rocketmq-dashboard-1.0.0.zip

解压后,修改文件夹中 src/main/resources/application.properties 文件

vim src/main/resources/application.properties 

添加内容:

server.address=0.0.0.0
server.port=19876 # 这个是dashboard启动后的访问端口
rocketmq.config.namesrvAddr=localhost:9876

打包源码文件

执行命令:

mvn clean package -Dmaven.test.skip=true

运行

nohup java -jar target/rocketmq-dashboard-1.0.0.jar --rocketmq.config.namesrvAddr=localhost:9876 > rocketmq-dashboard.log 2>&1 &

刚开始没加 --rocketmq.config.namesrvAddr=localhost:9876 , 但是运行之后,访问 ip:19876之后,会报错:

org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed。

搜索发现加上这个就没报错。

访问 Dashboard

访问地址为:服务器ip:19876 (dashboard配置文件中配置的端口)

0

评论区