一直在用RocketMQ消息中间件,但是没怎么好好的研究,都只是单存的会用而已,基本上也不怎么懂得调优,现在开个标签来好好的研究下,那么久从这一篇开始吧,先介绍下RocketMQ,然后通过控制台查看,然后用SpringBoot整合写个例子!
一、RocketMQ介绍
RocketMQ是一个分布式消息中间件,并支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。它里面有几个区别于标准消息中件间的概念,如Group、Topic、Queue等。系统组成则由Producer、Consumer、Broker、NameServer等。
RocketMQ以Topic来管理不同应用的消息,对于生产者(producer)而言,发送消息时需要指定消息的Topic,对于消费者(consumer)而言,在启动后需要订阅相应的Topic,然后可以消费相应的消息。Topic是逻辑上的概念,在物理实现上,一个Topic由多个Queue组成,采用多个Queue的好处是可以将Broker存储分布式化,提高系统性能。
废话不多说,开始进行安装!
集群部署架构
结合部署结构图,描述集群工作流程:
1,启动Namesrv,Namesrv起来后监听端口,等待Broker、Produer、Consumer连上来,相当于一个路由控制中心。
2,Broker启动,跟所有的Namesrv保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有topic信息。注册成功后,namesrv集群中就有Topic跟Broker的映射关系。
3,收发消息前,先创建topic,创建topic时需要指定该topic要存储在哪些Broker上。也可以在发送消息时自动创建Topic。
4,Producer发送消息,启动时先跟Namesrv集群中的其中一台建立长连接,并从Namesrv中获取当前发送的Topic存在哪些Broker上,然后跟对应的Broker建长连接,直接向Broker发消息。
5,Consumer跟Producer类似。跟其中一台Namesrv建立长连接,获取当前订阅Topic存在哪些Broker,然后直接跟Broker建立连接通道,开始消费消息。
二、单机版安装RocketMQ
其实我们完全可以参照官网的例子操作即可。
1、下载安装包
到官网:http://rocketmq.apache.org/ 下载即可,选择Quick Start跟着教程走,我这里下载的是:
rocketmq-all-4.6.0-bin-release.zip
2、解压安装
解压就可以了,不过我们还可以把目录命名下:
unzip rocketmq-all-4.6.0-bin-release.zip
mv rocketmq-all-4.6.0-bin-release rocketmq
这里有可能找不到unzip命令,那么我们可以如下安装:
yum install -y unzip zip
3、修改所需内存配置
rocketmq的nameserver和broker默认都需要很大的内存,最少2G的,我们的虚拟机怎么承受得了,启动都启动不了,所以这里需要去改一下:
修改NameServer所需内存
cd bin/
vi runserver.sh
将JAVA_OPT全部改为128m
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
修改broker所需内存
cd bin/
vi runbroker.sh
将JAVA_OPT全部改为128m
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m"
4、启动NameServer
nohup sh bin/mqnamesrv &
加上nohup让它在后台运行
5、启动broker
启动broker前,我们来看一下broker的配置文件
cd conf/
vi broker.conf
内容如下
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
虽然我们启动单机模式,这里是不需要进行任何修改的,但是还是需要讲解一下
brokerClusterName:顾名思义,如果是在一个集群下面,这个应该是要相同的
brokerName:broker节点的名称,这个是来区分各个broker的
brokerId :这个是用来区分主备(master,slaver),在brokerName相同的情况下,0是主,1是备
其他的内容这里暂时不讲解先!
也就是如果我们要实现三个broker的话,那么久需要有三个配置文件,每个配置文件的brokerName不同即可。如果我们要分主备的话,那么就可以在brokerName相同的情况下,0是主,1是备。
好了,我们现在是单机启动,直接执行下面的启动脚本启动即可!
nohup sh bin/mqbroker -c ./conf/broker.conf -n 127.0.0.1:9876 autoCreateTopicEnable=true &
9876是我们NameServer的默认端口号, autoCreateTopicEnable=true让我们可以自动创建topic,不需要手动创建。如果不加上,可能有时候会在控制台报如下错误:
No topic route info in name server for the topic:XXXXX
到这里就表明已经启动成功!
三、安装rocketmq console
这个也很简单,首先需要先去git下载项目(话说超级慢)!
https://github.com/apache/rocketmq-externals
下载到本地后解压进入文件夹,我们只需要rocketmq-console项目即可,当做maven项目导入eclipses.
在配置文件application.properties加上NameServer的ip和端口即可,我这里是:
rocketmq.config.namesrvAddr=192.168.157.3:9876
记得这里一定要关掉防火墙哦,命令如下:
systemctl stop firewalld
注:我发现,一开始一直报:connet
启动后访问:localhost:8080 ,进入如下界面:
很清晰明了的界面,主题,消息,生产者,消费者,消息!
四、SpringBoot整合RocketMQ
这个真的是超级简单的,我这里就做个最简单的例子发送消息,然后业务层就消费消息,代码如下:
1、项目结构
2、项目依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
</dependencies>
3、生产者
@RestController
public class ProducerController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@RequestMapping("/sendMsg")
public String sendMsg() {
Message message= new Message("您好", "随笔博客");
rocketMQTemplate.convertAndSend("my-topic", message);
return "success";
}
}
4、消费者
@Service
@RocketMQMessageListener(topic="my-topic",consumerGroup="my-group")
public class MessageEventConsumer implements RocketMQListener<Message> {
public void onMessage(Message message) {
System.out.println("Comsumer recive message :"+message);
}
}
5、消息
/**
* 消息体,需要序列化
*/
public class Message implements Serializable {
private static final long serialVersionUID = 1L;
private String title;
private String content;
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public Message(String title, String content) {
this.title = title;
this.content = content;
}
@Override
public String toString() {
return "Message [title=" + title + ", content=" + content + "]";
}
}
6、启动类
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
7、配置文件
rocketmq:
name-server: 192.168.157.3:9876
producer:
group: my-group
server:
port: 8088
必须制定一个生产者组哦,不然会报错。
8、启动测试
启动后访问:http://localhost:8088/sendMsg ,页面会返回success,日志会打印:
Comsumer recive message :Message [title=您好, content=随笔博客]
看控制台会有消息记录:

完成,转载请注明来源:suibibk.com随笔博客.