同步通讯、异步通讯、RabbitMQ的基本了解
https://blog.csdn.net/weixin_51351637/article/details/129501470?spm=1001.2014.3001.5502
上次跟着老师学完RabbitMQ之后,感觉白学了,也不知道企业中这两个哪个用的比较多,这次再来学学SpringAMQP
SpringAmqp的官方地址:https://spring.io/projects/spring-amqp
AMQP:高级消息队列协议,在应用程序之间传递业务消息的开放标准,该协议与语言和平台无关,更符合微服务中独立性的要求
Spring AMQP:是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
Spring AMQP 特征:
监听器容器,用于异步处理入站消息
用于发送和接收消息的RabbitTemplate
RabbitAdmin,用于自动声明队列,交换和绑定
SpringAMQP大大简化了消息发送和接收的Api
要学习的知识
Basic Queue简单队列模型
Work Queue工作队列模型
发布、订阅模型-Fanout
发布、订阅模型-Direct
发布、订阅模型-Topic
消息转换器
父亲引入之后,两个儿子也会有
org.springframework.boot spring-boot-starter-amqp
在RabbitMQ中需要我们自己手写代码,但是在SpringAMQP中,我们直接在配置文件中配置就可以了
spring:rabbitmq:host: localhost #rabbitMQ的IP地址port: 5672 # 端口 15672是访问页面的端口地址virtual-host: / # 虚拟主机username: guest #用户名password: guest #密码
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue(){String queueName ="simple.queue";String message ="hello,spring amqp!";rabbitTemplate.convertAndSend(queueName,message);}
}
我们查看一下,确实在这个队列中有内容!!
前提!!必须存在此队列,如果不存在是不行的
因为之前我们已经父工程中导入坐标,现在就不需要了,之间看配置文件就行
spring:rabbitmq:host: localhost #rabbitMQ的IP地址port: 5672 # 端口 15672是访问页面的端口地址virtual-host: / # 虚拟主机username: guest #用户名password: guest #密码
在consumer服务中新建一个类,编写消费逻辑
@Component // 注册成bean
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue") //监听的是哪个队列
// 将来如果有simple.queue队列的消息,会立即投放到下面的方法中,并且下面的方法还能处理对应的消息
// Spring会自动把消息投递给这个方法,然后通过参数传递给我们,我们拿到消息后就可以进行处理
// 对于参数列表,那边发的什么类型,我们就用什么类型接收public void listenSimpleQueue(String msg) throws InterruptedException{System.out.println("接收到的消息:"+msg);}
}
确实没了
发送消息
父工程引入amqp的starter依赖
配置RabbitMQ地址
引入RabbitTemplate对象,并使用方法convertAndSend()发送消息
接收消息(监听)
父工程引入amqp的starter依赖
配置RabbitMQ地址
定义类,添加@Component注解
类中声明方法,添加@RabbitListener注解,方法参数就是消息
注意:消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能
与Base Queue工作队列模型的差别就是,这个queue后面挂着两个consumer
我们的消息是阅后即焚。
当我们queue中有50条消息,不可能全部交给consumer1或者consumer2中的其中一个,比如consumer1处理20条,consumer2处理30条。此时consumer1和consumer2的关系就是合作关系
有些人会想,为什么要挂在两个消费者?
因为队列中存放的消息是有上限的,假设queue中只能存储60条,publisher每秒钟发送50条,一个consumer每秒钟只能处理40条,这样随着时间的推移,queue中存储的数据就会堆满。
如果是两个consumer后,加大了消息的处理速度,有效的避免了堆积问题
实现思路
在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue
在consumer服务中定义两个消息监听者,都监听simple.queue队列
consumer1每秒处理50条消息,consumer2每秒处理10条消息
说明:消息预取
当有大量的消息到达队列时,队列queue就会把消息进行投递,consumer1和consumer2的通道会先把消息拿过来(不管能不能处理的了,先拿过来),于是两边平均拿了所有的消息。
但是consumer1处理的速度快,很快就把消息处理了,consumer2处理的慢,处理需要一段时间
发送消息
@Testpublic void testSendMessage2WorkQueue() throws InterruptedException {String queueName ="simple.queue";String message ="hello message__";
// 循环发送50for(int i=0;i<50;i++){rabbitTemplate.convertAndSend(queueName,message+i);Thread.sleep(20);}}
接收消息
@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue(){String queueName ="simple.queue";String message ="hello,spring amqp!";rabbitTemplate.convertAndSend(queueName,message);}@Testpublic void testSendMessage2WorkQueue() throws InterruptedException {String queueName ="simple.queue";String message ="hello message__";
// 循环发送50for(int i=0;i<50;i++){rabbitTemplate.convertAndSend(queueName,message+i);Thread.sleep(20);}}
我们看到最后,consumer1很快就把消息处理完了,但是consumer2还在继续处理消息
如果不设置的话,默认值是无线,有多少拿多少
spring:rabbitmq:host: localhost #rabbitMQ的IP地址port: 5672 # 端口 15672是访问页面的端口地址virtual-host: / # 虚拟主机username: guest #用户名password: guest #密码listener:simple:prefetch: 1 # 每次只能获取一条下下哦i,处理完成才能获取下一个消息
exchange会将消息发送给与其绑定的所有队列
发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入exchange(交换机)
消费者和队列之间依然会有一个绑定。
之前是publisher直接发送给queue,但是现在publisher先把消息发送给exchange(交换机),再有exchange将消息发送给queue,那这样来说,publisher不用知道queue是否存在
那到底交换机是给一个队列发信息还是给多个队列发信息呢?
那这就要看交换机的类型了
Fanout Exchange:广播
Direct Exchange:路由
Topic Exchange:主题
备注:exchange交换机负责消息路由而不是存储,路由失败则消息丢失!!!!
发布订阅的第一种交换机Fanout Exchange
Fanout Exchange 会将接受到的消息路由到每一个跟其绑定的queue
实现思路:
在consumer服务中,利用代码声明队列、交换机,并将两者绑定
在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
在publisher中编写测试方法,向itcast.fanout(交换机)发送消息
在consumer服务中添加一个类,添加@Configuration注解,并声明FanoutExchange、Queue和绑定关系对象Binding
@Configuration
public class FanoutConfig {
// 创建交换机@Beanpublic FanoutExchange fanoutExchange(){
// 创建交换机,交换机的名字叫itcast.fanoutreturn new FanoutExchange("itcast.fanout");}// 第一个队列@Beanpublic Queue fanoutQueue1(){
// fanout.queue1 队列名称return new Queue("fanout.queue1");}// 绑定队列1和交换机
// 将队列1和交换机当作参数传递进来了@Beanpublic Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
// 绑定队列1和交换机return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}// 第一个队列@Beanpublic Queue fanoutQueue2(){
// fanout.queue1 队列名称return new Queue("fanout.queue2");}// 绑定队列1和交换机
// 将队列1和交换机当作参数传递进来了@Beanpublic Binding bindingQueue2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
// 绑定队列1和交换机return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
我们在页面上也确实能看到交换机icast.fanout
也可以点进去查看一下这个交换器
也确实能看到两个队列
我们以前是发送的队列,现在是发送到交换机
@Testpublic void testSendFanoutExchange(){
// 交换机名称String exchangeName = "itcast.fanout";
// 消息String message ="hello everyone";rabbitTemplate.convertAndSend(exchangeName,"",message);}
@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到消息fanout.queue1:~~~~~~~~~"+msg);}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg) {System.out.println("消费者1接收到消息fanout.queue2:~~~~~~~~~"+msg);}
明显是可以收到的,两个队列都可以收到
交换机的作用
接收publisher发送的消息
将消息按照规则路由到与之绑定的队列
不能缓存消息,路由失败,消息丢失
FanoutExchange的会将消息路由到每个绑定的队列
声明队列、交换机、绑定关系的Bean是什么?
Queue
Exchange的子类FanoutExchange
Binding
DirectExchange会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)
发布者发送消息时,指定消息的RoutingKey
每一个Queue都与Exchange设置一个BindingKey(可以理解为约定的暗号)
Exchange将消息路由到BingingKey与消息RoutingKey一致的队列
队列在和交换机绑定的时候可以指定多个key,如下图所示
那这样看来DirectExchange比FanoutExchange更灵活一点,DirectExchange也可以模拟FanoutExchange
实现思路
利用@RabbitListener声明Exchange、Queue、RoutingKey(之前的交换机和队列都是@Bean的方式创建出来的)
在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
在publisher中编写测试方法,向itcast.direct发送消息
发送给blue
@Testpublic void testSendDirectExchange(){
// 交换机名称String exchangeName = "itcast.direct";
// 消息String message ="hello blue";
// "blue"指定的是BindingKey 当与RoutingKey一样时,consumer便可以收到消息rabbitTemplate.convertAndSend(exchangeName,"blue",message);}
此时确实只有队列一收到消息
利用@RabbitListener声明Exchange、Queue、RoutingKey
在consumer包下
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void listenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1:~~~~~~~~~"+msg);}// key 指定routingKey@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void listenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2:~~~~~~~~~"+msg);}
运行之后发送Queue被正确的声明
交换机也被正确的声明
我们也可以从下面看出来队列1和队列2各绑定了两个key
描述下Direct交换机与Fanout交换机的差异?
· Fanout交换机将消息路由给每一个与之绑定的队列
Direct交换机根据RoutingKey判断路由给哪个队列
如果多个队列具有相同的RoutingKey,则与Fanout功能类似
基于@RabbitListener注解声明队列和交换机有哪些常见注解?
@Queue
@Exchange
@QueueBinding
TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以.分割
Queue与Exchange指定BindingKey时可以使用通配符
# 代指0个或多个单词
* 代指一个单词
china.news 代表中国的新闻消息
china.weather 代表中国的天气消息
jepan.weather 代表日本的天气消息
实现思路:
并利用@RabbitListener声明Exchange、Queue、RoutingKey
在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
在publisher中编写测试方法,向itcast.topic发送消息
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = {"china.#"}))public void listenTopicQueue1(String msg) {System.out.println("消费者接收到topic.queue1:~~~~~~~~~" + msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = {"#.news"}))public void listenTopicQueue2(String msg) {System.out.println("消费者接收到topic.queue2:~~~~~~~~~" + msg);}
交换机
绑定关系
两个队列也成功声明
@Testpublic void testSendTopicExchange(){
// 交换机名称String exchangeName = "itcast.topic";
// 消息String message ="张靖奇最牛逼";rabbitTemplate.convertAndSend(exchangeName,"china.news",message);}
当我们点击发送之后,我们也看到consumer都可以从对应的队列中取出我们想要的数据
topic交换机中的BandingKey支持通配符,RoutingKey是多个单词以点分割
在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送
// 声明队列@Beanpublic Queue objectQueue(){return new Queue("object.queue");}
@Testpublic void testSendObjectQueue() {Map map = new HashMap<>();map.put("name","柳岩");rabbitTemplate.convertAndSend("object.queue",map);}
确实是有消息的
我们查看一下消息的内容,没有发现柳岩,而且类型是
content_type: application/x-java-serialized-object,将我们的对象做序列化,也就是jdk的序列化方式(性能比较差,安全性有问题,数据长度过长)
所以我们非常不推荐我们用这种默认的方式
Spring对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的
而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
如果修改只需要定义一个MessageConverter类型的Bean即可。推荐使用JSON方式序列化
com.fasterxml.jackson.core jackson-databind
也可以用下面这个
com.fasterxml.jackson.dataformat jackson-dataformat-xml
在publisher服务声明MessageConverter的Bean,我们一旦声明。就会把默认的给覆盖掉,这是spring自动装配的原理
注意!!! 一定是下面的两个类,不要覆盖错了
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
在继续4.5.2发送新的消息,然后在页面的队列中查看,发现这次是汉字了
在consumer中引入Jackson依赖
@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
@RabbitListener(queues = "object.queue")public void listenObjectQueue(Map msg){System.out.println("收到消息"+msg);}
SpringAMQP中消息的序列化和反序列化是怎么实现的?
利用MessageConverter实现的,默认是JDK的序列化
注意发送方和接收方必须使用相同的MessageConverter