SpringAMQP-Basic Queue、Work Queue、Fanout、Direct、Topic
创始人
2025-05-31 08:28:48

同步通讯、异步通讯、RabbitMQ的基本了解
https://blog.csdn.net/weixin_51351637/article/details/129501470?spm=1001.2014.3001.5502
上次跟着老师学完RabbitMQ之后,感觉白学了,也不知道企业中这两个哪个用的比较多,这次再来学学SpringAMQP

一、基本信息


SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

AMQP:高级消息队列协议,在应用程序之间传递业务消息的开放标准,该协议与语言和平台无关,更符合微服务中独立性的要求

Spring AMQP:是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

Spring AMQP 特征:

  • 监听器容器,用于异步处理入站消息

  • 用于发送和接收消息的RabbitTemplate

  • RabbitAdmin,用于自动声明队列,交换和绑定

SpringAMQP大大简化了消息发送和接收的Api

要学习的知识

Basic Queue简单队列模型

Work Queue工作队列模型

发布、订阅模型-Fanout

发布、订阅模型-Direct

发布、订阅模型-Topic

消息转换器

二、入门案例的消息发送(Basic Queue简单队列模型)

2.1 利用SpringAMQP实现HelloWorld中的基础消息队列(Basic Queue)功能

2.1.1 父工程中引入spring-amqp的依赖

父亲引入之后,两个儿子也会有

        org.springframework.bootspring-boot-starter-amqp

2.1.2 在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列

2.1.2.1 在publisher服务中编写application.yaml,添加mq连接信息

在RabbitMQ中需要我们自己手写代码,但是在SpringAMQP中,我们直接在配置文件中配置就可以了

spring:rabbitmq:host: localhost   #rabbitMQ的IP地址port: 5672        # 端口  15672是访问页面的端口地址virtual-host: /   # 虚拟主机username: guest   #用户名password: guest   #密码

2.1.2.2 编写代码

@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue(){String queueName ="simple.queue";String message ="hello,spring amqp!";rabbitTemplate.convertAndSend(queueName,message);}
}

我们查看一下,确实在这个队列中有内容!!

前提!!必须存在此队列,如果不存在是不行的

2.1.3 在consumer中编写消费逻辑,监听simple.queue队列(监听与接收)

因为之前我们已经父工程中导入坐标,现在就不需要了,之间看配置文件就行

2.1.3.1 consumer的配置文件信息

spring:rabbitmq:host: localhost   #rabbitMQ的IP地址port: 5672        # 端口  15672是访问页面的端口地址virtual-host: /   # 虚拟主机username: guest   #用户名password: guest   #密码

2.1.3.2 编写代码

在consumer服务中新建一个类,编写消费逻辑

@Component  // 注册成bean
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")   //监听的是哪个队列
//  将来如果有simple.queue队列的消息,会立即投放到下面的方法中,并且下面的方法还能处理对应的消息
//  Spring会自动把消息投递给这个方法,然后通过参数传递给我们,我们拿到消息后就可以进行处理
//    对于参数列表,那边发的什么类型,我们就用什么类型接收public void listenSimpleQueue(String msg) throws InterruptedException{System.out.println("接收到的消息:"+msg);}
}

确实没了

2.2 总结Basic Queue发送和接收消息的步骤

发送消息

  1. 父工程引入amqp的starter依赖

  1. 配置RabbitMQ地址

  1. 引入RabbitTemplate对象,并使用方法convertAndSend()发送消息

接收消息(监听)

  1. 父工程引入amqp的starter依赖

  1. 配置RabbitMQ地址

  1. 定义类,添加@Component注解

  1. 类中声明方法,添加@RabbitListener注解,方法参数就是消息

注意:消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能

三、Work Queue工作队列模型

与Base Queue工作队列模型的差别就是,这个queue后面挂着两个consumer

我们的消息是阅后即焚。

当我们queue中有50条消息,不可能全部交给consumer1或者consumer2中的其中一个,比如consumer1处理20条,consumer2处理30条。此时consumer1和consumer2的关系就是合作关系

有些人会想,为什么要挂在两个消费者?

因为队列中存放的消息是有上限的,假设queue中只能存储60条,publisher每秒钟发送50条,一个consumer每秒钟只能处理40条,这样随着时间的推移,queue中存储的数据就会堆满。

如果是两个consumer后,加大了消息的处理速度,有效的避免了堆积问题

3.1 模拟Work Queue,实现一个队列绑定多个消费者

实现思路

  • 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue

  • 在consumer服务中定义两个消息监听者,都监听simple.queue队列

  • consumer1每秒处理50条消息,consumer2每秒处理10条消息

说明:消息预取

当有大量的消息到达队列时,队列queue就会把消息进行投递,consumer1和consumer2的通道会先把消息拿过来(不管能不能处理的了,先拿过来),于是两边平均拿了所有的消息。
但是consumer1处理的速度快,很快就把消息处理了,consumer2处理的慢,处理需要一段时间

发送消息

    @Testpublic void testSendMessage2WorkQueue() throws InterruptedException {String queueName ="simple.queue";String message ="hello message__";
//      循环发送50for(int i=0;i<50;i++){rabbitTemplate.convertAndSend(queueName,message+i);Thread.sleep(20);}}

接收消息

    @Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue(){String queueName ="simple.queue";String message ="hello,spring amqp!";rabbitTemplate.convertAndSend(queueName,message);}@Testpublic void testSendMessage2WorkQueue() throws InterruptedException {String queueName ="simple.queue";String message ="hello message__";
//      循环发送50for(int i=0;i<50;i++){rabbitTemplate.convertAndSend(queueName,message+i);Thread.sleep(20);}}

我们看到最后,consumer1很快就把消息处理完了,但是consumer2还在继续处理消息

3.1.1 控制consumer预取消息的上限

如果不设置的话,默认值是无线,有多少拿多少

spring:rabbitmq:host: localhost   #rabbitMQ的IP地址port: 5672        # 端口  15672是访问页面的端口地址virtual-host: /   # 虚拟主机username: guest   #用户名password: guest   #密码listener:simple:prefetch: 1 # 每次只能获取一条下下哦i,处理完成才能获取下一个消息

四、发布(Publish)、订阅(Subscribe)

4.1 基本介绍

exchange会将消息发送给与其绑定的所有队列

发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入exchange(交换机)

消费者和队列之间依然会有一个绑定。

之前是publisher直接发送给queue,但是现在publisher先把消息发送给exchange(交换机),再有exchange将消息发送给queue,那这样来说,publisher不用知道queue是否存在

那到底交换机是给一个队列发信息还是给多个队列发信息呢?

那这就要看交换机的类型了

  • Fanout Exchange:广播

  • Direct Exchange:路由

  • Topic Exchange:主题

备注:exchange交换机负责消息路由而不是存储,路由失败则消息丢失!!!!

4.2 发布订阅-Fanout Exchange

发布订阅的第一种交换机Fanout Exchange

Fanout Exchange 会将接受到的消息路由到每一个跟其绑定的queue

4.2.1 利用SpringAMQP演示FanoutExchange的使用

实现思路:

  1. 在consumer服务中,利用代码声明队列、交换机,并将两者绑定

  1. 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

  1. 在publisher中编写测试方法,向itcast.fanout(交换机)发送消息

4.2.2 在consumer服务声明Exchange(交换机)、Queue(队列)、Binding(绑定关系)

在consumer服务中添加一个类,添加@Configuration注解,并声明FanoutExchange、Queue和绑定关系对象Binding

@Configuration
public class FanoutConfig {
//  创建交换机@Beanpublic FanoutExchange fanoutExchange(){
//      创建交换机,交换机的名字叫itcast.fanoutreturn new FanoutExchange("itcast.fanout");}//  第一个队列@Beanpublic Queue fanoutQueue1(){
//        fanout.queue1 队列名称return new Queue("fanout.queue1");}//   绑定队列1和交换机
//     将队列1和交换机当作参数传递进来了@Beanpublic Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
//      绑定队列1和交换机return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}//  第一个队列@Beanpublic Queue fanoutQueue2(){
//        fanout.queue1 队列名称return new Queue("fanout.queue2");}//   绑定队列1和交换机
//     将队列1和交换机当作参数传递进来了@Beanpublic Binding bindingQueue2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
//      绑定队列1和交换机return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

我们在页面上也确实能看到交换机icast.fanout

也可以点进去查看一下这个交换器

也确实能看到两个队列

4.2.3 消息的发送

我们以前是发送的队列,现在是发送到交换机

    @Testpublic void testSendFanoutExchange(){
//      交换机名称String exchangeName = "itcast.fanout";
//      消息String message ="hello everyone";rabbitTemplate.convertAndSend(exchangeName,"",message);}

4.2.4 消息的订阅(接收)

    @RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到消息fanout.queue1:~~~~~~~~~"+msg);}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg) {System.out.println("消费者1接收到消息fanout.queue2:~~~~~~~~~"+msg);}

明显是可以收到的,两个队列都可以收到

4.2.5 总结

交换机的作用

  • 接收publisher发送的消息

  • 将消息按照规则路由到与之绑定的队列

  • 不能缓存消息,路由失败,消息丢失

  • FanoutExchange的会将消息路由到每个绑定的队列

声明队列、交换机、绑定关系的Bean是什么?

  • Queue

  • Exchange的子类FanoutExchange

  • Binding

4.3 发布订阅-DirectExchange(路由模式)

DirectExchange会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)

  • 发布者发送消息时,指定消息的RoutingKey

  • 每一个Queue都与Exchange设置一个BindingKey(可以理解为约定的暗号)

  • Exchange将消息路由到BingingKey与消息RoutingKey一致的队列

队列在和交换机绑定的时候可以指定多个key,如下图所示

那这样看来DirectExchange比FanoutExchange更灵活一点,DirectExchange也可以模拟FanoutExchange

4.3.1 利用SpringAMQP演示DirectExchange的使用

实现思路

  • 利用@RabbitListener声明Exchange、Queue、RoutingKey(之前的交换机和队列都是@Bean的方式创建出来的)

  • 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

  • 在publisher中编写测试方法,向itcast.direct发送消息

4.3.2 消息的发送

发送给blue

    @Testpublic void testSendDirectExchange(){
//      交换机名称String exchangeName = "itcast.direct";
//      消息String message ="hello  blue";
//      "blue"指定的是BindingKey  当与RoutingKey一样时,consumer便可以收到消息rabbitTemplate.convertAndSend(exchangeName,"blue",message);}

此时确实只有队列一收到消息

4.3.3 消息的订阅(接收)

利用@RabbitListener声明Exchange、Queue、RoutingKey

在consumer包下

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void listenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1:~~~~~~~~~"+msg);}//    key 指定routingKey@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void listenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2:~~~~~~~~~"+msg);}

运行之后发送Queue被正确的声明

交换机也被正确的声明

我们也可以从下面看出来队列1和队列2各绑定了两个key

4.3.4 总结

描述下Direct交换机与Fanout交换机的差异?

  • · Fanout交换机将消息路由给每一个与之绑定的队列

  • Direct交换机根据RoutingKey判断路由给哪个队列

  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

基于@RabbitListener注解声明队列和交换机有哪些常见注解?

@Queue

@Exchange

@QueueBinding

4.4 发布订阅-TopicExchange

TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以.分割

Queue与Exchange指定BindingKey时可以使用通配符

  • # 代指0个或多个单词

  • * 代指一个单词

china.news 代表中国的新闻消息

china.weather 代表中国的天气消息

jepan.weather 代表日本的天气消息

4.4.1 利用SpringAMQP演示TopicExchange的使用

实现思路:

  • 并利用@RabbitListener声明Exchange、Queue、RoutingKey

  • 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2

  • 在publisher中编写测试方法,向itcast.topic发送消息

4.4.2 声明队列和交换机(包括消息的接收-订阅)

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = {"china.#"}))public void listenTopicQueue1(String msg) {System.out.println("消费者接收到topic.queue1:~~~~~~~~~" + msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = {"#.news"}))public void listenTopicQueue2(String msg) {System.out.println("消费者接收到topic.queue2:~~~~~~~~~" + msg);}

交换机

绑定关系

两个队列也成功声明

4.4.3 消息的发送

    @Testpublic void testSendTopicExchange(){
//      交换机名称String exchangeName = "itcast.topic";
//      消息String message ="张靖奇最牛逼";rabbitTemplate.convertAndSend(exchangeName,"china.news",message);}

当我们点击发送之后,我们也看到consumer都可以从对应的队列中取出我们想要的数据

4.4.4 总结Direct交换机与Topic交换机的差异

topic交换机中的BandingKey支持通配符,RoutingKey是多个单词以点分割

4.5 消息转换器-测试发送Object类型的消息

在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送

4.5.1 声明队列

//  声明队列@Beanpublic Queue objectQueue(){return new Queue("object.queue");}

4.5.2 向队列发布消息

    @Testpublic void testSendObjectQueue() {Map map = new HashMap<>();map.put("name","柳岩");rabbitTemplate.convertAndSend("object.queue",map);}

确实是有消息的

我们查看一下消息的内容,没有发现柳岩,而且类型是

content_type: application/x-java-serialized-object,将我们的对象做序列化,也就是jdk的序列化方式(性能比较差,安全性有问题,数据长度过长)

所以我们非常不推荐我们用这种默认的方式

4.5.3 修改JDK序列化使得数据是JSON形式发送

Spring对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的

而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。

如果修改只需要定义一个MessageConverter类型的Bean即可。推荐使用JSON方式序列化

4.5.3.1 依赖:

        com.fasterxml.jackson.corejackson-databind

也可以用下面这个

        com.fasterxml.jackson.dataformatjackson-dataformat-xml

4.5.3.2 覆盖默认bean

在publisher服务声明MessageConverter的Bean,我们一旦声明。就会把默认的给覆盖掉,这是spring自动装配的原理

注意!!! 一定是下面的两个类,不要覆盖错了
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
@Beanpublic MessageConverter messageConverter(){return  new Jackson2JsonMessageConverter();}

在继续4.5.2发送新的消息,然后在页面的队列中查看,发现这次是汉字了

4.5.4 消息订阅(接收-JSON串)

4.5.4.1 依赖

在consumer中引入Jackson依赖

4.5.4.2 定义消息转换器覆盖默认转换器

    @Beanpublic MessageConverter messageConverter(){return  new Jackson2JsonMessageConverter();}

4.5.4.3 消息订阅-接收-消息的监听

    @RabbitListener(queues = "object.queue")public void listenObjectQueue(Map msg){System.out.println("收到消息"+msg);}

4.5.5 总结

SpringAMQP中消息的序列化和反序列化是怎么实现的?

  • 利用MessageConverter实现的,默认是JDK的序列化

  • 注意发送方和接收方必须使用相同的MessageConverter

相关内容

热门资讯

永远用英语怎么说,“永远”除了... 永远用英语怎么说目录永远用英语怎么说“永远”除了“forever”的英文翻译~~还有哪些
少年音怎么练,怎么配出清爽的少... 少年音怎么练目录少年音怎么练怎么配出清爽的少年音?怎么学正太音少年音,像是龙马啊、镜音连啊不二啊那种...
情侣之间的爱称有哪些,情侣称呼... 情侣之间的爱称有哪些目录情侣之间的爱称有哪些情侣称呼有创意的爱称情侣之间好听的称呼都有什么?情侣爱称...
共享汽车怎么租车 极速百科网 ... 共享汽车怎么租车目录共享汽车怎么租车共享汽车怎么租车gofun出行有人开吗?使用方法是什么?共享汽车...
Python应用之爬虫基础:r... 引言 在生活中,大家都使用过浏览器,通过输入要搜索的内容以及鼠标点击等操...
jsp医疗辅助诊断管理系统se... 一、源码特点      JSP医疗辅助诊断管理系统是一套完善的java web信息管理系统ÿ...
db19密钥库和加密 创建密钥库ENCRYPTION_WALLET_LOCATION =(SOURCE =...
开局之年是什么意思(开局之年之... 本篇文章极速百科给大家谈谈开局之年是什么意思,以及开局之年之后是什么年对应的知识点,希望对各位有所帮...
抖音gga什么意思(抖音gg是... 本篇文章极速百科给大家谈谈抖音gga什么意思,以及抖音gg是什么意思对应的知识点,希望对各位有所帮助...
DMZ是什么(防火墙的dmz是... 今天给各位分享DMZ是什么的知识,其中也会对防火墙的dmz是什么进行解释,如果能碰巧解决你现在面临的...
风行SX6Sx6后视镜加热打不... 本篇文章极速百科给大家谈谈风行SX6Sx6后视镜加热打不开,以及东风风行sx6反光镜多少钱对应的知识...
CKA-17 Check Da... 文章目录Issue summary:Useful comment:1. 创建场景1.1...
elasticsearch的入... 目录一.数据聚合1.聚合的种类2.DSL实现聚合2.1.Bucket聚合语法2.2.聚合结果排序2....
成都男子误入停车场51秒收费8... 本篇文章极速百科给大家谈谈成都男子误入停车场51秒收费8元,属于乱收费吗,以及成都停车费贵对应的知识...
城市的路灯系统是如何控制开灯和... 本篇文章极速百科给大家谈谈城市的路灯系统是如何控制开灯和熄灯时间的?,以及路灯咋调制,路灯的时控开关...
对抗雾霾的有效方法(对抗雾霾的... 本篇文章极速百科给大家谈谈对抗雾霾的有效方法,以及对抗雾霾的有效方法英语作文对应的知识点,希望对各位...
Kubernetes集群 服务... Kubernetes集群 服务暴露 Traefik 一、认识traefik1.1 traefik简介...
广汽菲克Jeep指南者真实油耗... 本篇文章极速百科给大家谈谈广汽菲克Jeep指南者真实油耗多少,以及广汽菲克jeep指南者真实油耗多少...
关于OpenResty+dou... 关于OpenResty+doujiang24/lua-resty-kafka写入kafka故...
STM32产品命名规则,系统结... 产品系列 STM32系列芯片是由意法半导体(ST Microelectronics&...
souho(搜猴浏览器下载) ... 今天给各位分享souho的知识,其中也会对搜猴浏览器下载进行解释,如果能碰巧解决你现在面临的问题,别...
什么是金棘末(金棘末什么危害)... 今天给各位分享什么是金棘末的知识,其中也会对金棘末什么危害进行解释,如果能碰巧解决你现在面临的问题,...
徐州周边100公里自驾游景点(... 本篇文章极速百科给大家谈谈徐州周边100公里自驾游景点,以及徐州周边100公里自驾游景点有哪些对应的...
帮帮侠热评:盲人被撞前一秒被公... 本篇文章极速百科给大家谈谈帮帮侠热评:盲人被撞前一秒被公交司机拉回,以及盲人被撞前1秒被公交司机拉回...
【教程】使用ChatGPT制作... 目录 描述 代码 效果 说明 描述         给ChatGPT的描述内容: ...
centos7安装mysql5... centos7安装mysql5.7.40 1.先去下载安装包 下载地址 Tip:使用迅雷下载会快一点...
牛客C/C++刷题笔记(六) 153、函数的递归调用不过是一个函数直接或间接地调用它自身。() 15...
100个常见车标大全新版,10... 今天给各位分享100个常见车标大全新版,100种常见的轿车车标和图片的知识,其中也会对50个常见车标...
奥迪a5敞篷是哪款?奥迪a5敞... 今天给各位分享奥迪a5敞篷是哪款?奥迪a5敞篷多少钱一辆的知识,其中也会对奥迪a5敞篷是哪款车型进行...
...天津北京重庆今日宣布公共... 本篇文章极速百科给大家谈谈...天津北京重庆今日宣布公共交通放宽查验核酸报告,以及对应的知识点,希望...