个人随笔
目录
一、RocketMQ入门(单机安装、控制台、SpringBoot整合)
2020-02-10 23:32:33

一直在用RocketMQ消息中间件,但是没怎么好好的研究,都只是单存的会用而已,基本上也不怎么懂得调优,现在开个标签来好好的研究下,那么久从这一篇开始吧,先介绍下RocketMQ,然后通过控制台查看,然后用SpringBoot整合写个例子!

一、RocketMQ介绍

RocketMQ是一个分布式消息中间件,并支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。它里面有几个区别于标准消息中件间的概念,如Group、Topic、Queue等。系统组成则由Producer、Consumer、Broker、NameServer等。

RocketMQ以Topic来管理不同应用的消息,对于生产者(producer)而言,发送消息时需要指定消息的Topic,对于消费者(consumer)而言,在启动后需要订阅相应的Topic,然后可以消费相应的消息。Topic是逻辑上的概念,在物理实现上,一个Topic由多个Queue组成,采用多个Queue的好处是可以将Broker存储分布式化,提高系统性能。

废话不多说,开始进行安装!

集群部署架构

结合部署结构图,描述集群工作流程:

1,启动Namesrv,Namesrv起来后监听端口,等待Broker、Produer、Consumer连上来,相当于一个路由控制中心。

2,Broker启动,跟所有的Namesrv保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有topic信息。注册成功后,namesrv集群中就有Topic跟Broker的映射关系。

3,收发消息前,先创建topic,创建topic时需要指定该topic要存储在哪些Broker上。也可以在发送消息时自动创建Topic。

4,Producer发送消息,启动时先跟Namesrv集群中的其中一台建立长连接,并从Namesrv中获取当前发送的Topic存在哪些Broker上,然后跟对应的Broker建长连接,直接向Broker发消息。

5,Consumer跟Producer类似。跟其中一台Namesrv建立长连接,获取当前订阅Topic存在哪些Broker,然后直接跟Broker建立连接通道,开始消费消息。

二、单机版安装RocketMQ

其实我们完全可以参照官网的例子操作即可。

1、下载安装包

到官网:http://rocketmq.apache.org/ 下载即可,选择Quick Start跟着教程走,我这里下载的是:

  1. rocketmq-all-4.6.0-bin-release.zip

2、解压安装

解压就可以了,不过我们还可以把目录命名下:

  1. unzip rocketmq-all-4.6.0-bin-release.zip
  2. mv rocketmq-all-4.6.0-bin-release rocketmq

这里有可能找不到unzip命令,那么我们可以如下安装:

  1. yum install -y unzip zip

3、修改所需内存配置

rocketmq的nameserver和broker默认都需要很大的内存,最少2G的,我们的虚拟机怎么承受得了,启动都启动不了,所以这里需要去改一下:

修改NameServer所需内存

  1. cd bin/
  2. vi runserver.sh

将JAVA_OPT全部改为128m

  1. JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"

修改broker所需内存

  1. cd bin/
  2. vi runbroker.sh

将JAVA_OPT全部改为128m

  1. JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m"

4、启动NameServer

  1. nohup sh bin/mqnamesrv &

加上nohup让它在后台运行

5、启动broker

启动broker前,我们来看一下broker的配置文件

  1. cd conf/
  2. vi broker.conf

内容如下

  1. brokerClusterName = DefaultCluster
  2. brokerName = broker-a
  3. brokerId = 0
  4. deleteWhen = 04
  5. fileReservedTime = 48
  6. brokerRole = ASYNC_MASTER
  7. flushDiskType = ASYNC_FLUSH

虽然我们启动单机模式,这里是不需要进行任何修改的,但是还是需要讲解一下

brokerClusterName:顾名思义,如果是在一个集群下面,这个应该是要相同的
brokerName:broker节点的名称,这个是来区分各个broker的
brokerId :这个是用来区分主备(master,slaver),在brokerName相同的情况下,0是主,1是备

其他的内容这里暂时不讲解先!

也就是如果我们要实现三个broker的话,那么久需要有三个配置文件,每个配置文件的brokerName不同即可。如果我们要分主备的话,那么就可以在brokerName相同的情况下,0是主,1是备。

好了,我们现在是单机启动,直接执行下面的启动脚本启动即可!

  1. nohup sh bin/mqbroker -c ./conf/broker.conf -n 127.0.0.1:9876 autoCreateTopicEnable=true &

9876是我们NameServer的默认端口号, autoCreateTopicEnable=true让我们可以自动创建topic,不需要手动创建。如果不加上,可能有时候会在控制台报如下错误:

  1. No topic route info in name server for the topic:XXXXX

到这里就表明已经启动成功!

三、安装rocketmq console

这个也很简单,首先需要先去git下载项目(话说超级慢)!
https://github.com/apache/rocketmq-externals

下载到本地后解压进入文件夹,我们只需要rocketmq-console项目即可,当做maven项目导入eclipses.

在配置文件application.properties加上NameServer的ip和端口即可,我这里是:

  1. rocketmq.config.namesrvAddr=192.168.157.3:9876

记得这里一定要关掉防火墙哦,命令如下:

  1. systemctl stop firewalld

注:我发现,一开始一直报:connet fail的错误,不管我怎么百度怎么关防火墙怎么修改ip都不行,然后我对项目进行maven project下就可以了,我也不知道什么鬼~

启动后访问:localhost:8080 ,进入如下界面:

很清晰明了的界面,主题,消息,生产者,消费者,消息!

四、SpringBoot整合RocketMQ

这个真的是超级简单的,我这里就做个最简单的例子发送消息,然后业务层就消费消息,代码如下:

1、项目结构

2、项目依赖

  1. <parent>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-parent</artifactId>
  4. <version>2.0.5.RELEASE</version>
  5. </parent>
  6. <dependencies>
  7. <dependency>
  8. <groupId>org.springframework.boot</groupId>
  9. <artifactId>spring-boot-starter-web</artifactId>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.rocketmq</groupId>
  13. <artifactId>rocketmq-spring-boot-starter</artifactId>
  14. <version>2.0.3</version>
  15. </dependency>
  16. </dependencies>

3、生产者

  1. @RestController
  2. public class ProducerController {
  3. @Autowired
  4. private RocketMQTemplate rocketMQTemplate;
  5. @RequestMapping("/sendMsg")
  6. public String sendMsg() {
  7. Message message= new Message("您好", "随笔博客");
  8. rocketMQTemplate.convertAndSend("my-topic", message);
  9. return "success";
  10. }
  11. }

4、消费者

  1. @Service
  2. @RocketMQMessageListener(topic="my-topic",consumerGroup="my-group")
  3. public class MessageEventConsumer implements RocketMQListener<Message> {
  4. public void onMessage(Message message) {
  5. System.out.println("Comsumer recive message :"+message);
  6. }
  7. }

5、消息

  1. /**
  2. * 消息体,需要序列化
  3. */
  4. public class Message implements Serializable {
  5. private static final long serialVersionUID = 1L;
  6. private String title;
  7. private String content;
  8. public String getTitle() {
  9. return title;
  10. }
  11. public void setTitle(String title) {
  12. this.title = title;
  13. }
  14. public String getContent() {
  15. return content;
  16. }
  17. public void setContent(String content) {
  18. this.content = content;
  19. }
  20. public Message(String title, String content) {
  21. this.title = title;
  22. this.content = content;
  23. }
  24. @Override
  25. public String toString() {
  26. return "Message [title=" + title + ", content=" + content + "]";
  27. }
  28. }

6、启动类

  1. @SpringBootApplication
  2. public class App {
  3. public static void main(String[] args) {
  4. SpringApplication.run(App.class, args);
  5. }
  6. }

7、配置文件

  1. rocketmq:
  2. name-server: 192.168.157.3:9876
  3. producer:
  4. group: my-group
  5. server:
  6. port: 8088

必须制定一个生产者组哦,不然会报错。

8、启动测试

启动后访问:http://localhost:8088/sendMsg ,页面会返回success,日志会打印:

  1. Comsumer recive message :Message [title=您好, content=随笔博客]
  2. 看控制台会有消息记录:
  3. ![](/fileupload/images/20200211/1581352204054.png)

完成,转载请注明来源:suibibk.com随笔博客.

 553

啊!这个可能是世界上最丑的留言输入框功能~


当然,也是最丑的留言列表

有疑问发邮件到 : suibibk@qq.com 侵权立删
Copyright : 个人随笔   备案号 : 粤ICP备18099399号-2