消息队列
yudao-spring-boot-starter-mq
(opens new window) 技术组件,基于 Redis 实现分布式消息队列:
- 使用 Stream (opens new window) 特性,提供【集群】消费的能力。
- 使用 Pub/Sub (opens new window) 特性,提供【广播】消费的能力。
友情提示:
考虑到有部分同学对分布式消息队列了解的不多,所以在下文的广播消费、集群消费的描述,去除【消费者分组】的概念。如果你对这块感兴趣,可以看看艿艿写的系列文章:
- 《芋道 Spring Boot 消息队列 RocketMQ 入门》 (opens new window) 对应 lab-31 (opens new window)
- 《芋道 Spring Boot 消息队列 Kafka 入门》 (opens new window) 对应 lab-03-kafka (opens new window)
- 《芋道 Spring Boot 消息队列 RabbitMQ 入门》 (opens new window) 对应 lab-04-rabbitmq (opens new window)
- 《芋道 Spring Boot 消息队列 ActiveMQ 入门》 (opens new window) 对应 lab-32 (opens new window)
# 1. 集群消费
集群消费,是指消息发送到 Redis 时,有且只会被一个消费者(应用 JVM 实例)收到,然后消费成功。如下图所示:
# 1.1 使用场景
集群消费在项目中的使用场景,主要是提供可靠的、可堆积的异步任务的能力。例如说:
- 短信模块,使用它异步 (opens new window)发送短信。
- 邮件模块,使用它异步 (opens new window)发送邮件。
相比 《开发指南 —— 异步任务》 来说,Spring Async 在 JVM 实例重启时,会导致未执行完的任务丢失。而集群消费,因为消息是存储在 Redis 中,所以不会存在该问题。
# 1.2 实现源码
集群消费基于 Redis Stream 实现:
- 实现 AbstractStreamMessage (opens new window) 抽象类,定义【集群】消息。
- 使用 RedisMQTemplate (opens new window) 的
#send(message)
(opens new window) 方法,发送消息。 - 实现 AbstractStreamMessageListener (opens new window) 接口,消费消息。
最终使用 YudaoMQAutoConfiguration (opens new window) 配置类,扫描所有的 AbstractStreamMessageListener 监听器,初始化对应的消费者。如下图所示:
# 1.3 实战案例
以短信模块异步发送短息为例子,讲解集群消费的使用。
# 1.3.1 引入依赖
在 yudao-module-system-biz
模块中,引入 yudao-spring-boot-starter-mq
技术组件。如下所示:
1 |
|
# 1.3.2 SmsSendMessage
在 yudao-module-system-biz
的
mq/message/sms
(
opens new window)
包下,创建
SmsSendMessage (
opens new window) 类,继承 AbstractStreamMessage
抽象类,短信发送消息。代码如下图:
# 1.3.3 SmsProducer
① 在 yudao-module-system-biz
的
mq/producer/sms
(
opens new window)
包下,创建
SmsProducer (
opens new window) 类,SmsSendMessage 的 Producer 生产者,核心是使用 RedisMQTemplate 发送 SmsSendMessage
消息。代码如下图:
② 发送短信时,需要使用 SmsProducer 发送消息。如下图所示:
# 1.3.4 SmsSendConsumer
在 yudao-module-system-biz
的
mq/consumer/sms
(
opens new window)
包下,创建
SmsSendConsumer (
opens new window) 类,SmsSendMessage 的 Consumer
消费者。代码如下图:
# 2. 广播消费
广播消费,是指消息发送到 Redis 时,所有消费者(应用 JVM 实例)收到,然后消费成功。如下图所示:
# 2.1 使用场景
例如说,在应用中,缓存了数据字典等配置表在内存中,可以通过 Redis 广播消费,实现每个应用节点都消费消息,刷新本地内存的缓存。
又例如说,我们基于 WebSocket 实现了 IM 聊天,在我们给用户主动发送消息时,因为我们不知道用户连接的是哪个提供 WebSocket 的应用,所以可以通过 Redis 广播消费。每个应用判断当前用户是否是和自己提供的 WebSocket 服务连接,如果是,则推送消息给用户。
# 2.2 实现源码
广播消费基于 Redis Pub/Sub 实现:
- 实现 AbstractChannelMessage ( opens new window) 抽象类,定义【广播】消息。
-
使用
RedisMQTemplate (
opens new window)
的
#send( message)
( opens new window) 方法,发送消息。 - 实现 AbstractChannelMessageListener ( opens new window) 接口,消费消息。
最终使用 YudaoMQAutoConfiguration ( opens new window) 配置类,扫描所有的 AbstractChannelMessageListener 监听器,初始化对应的消费者。如下图所示: