第1章 项目概述及管理端
1. 项目概述
1.1 背景介绍
随着企业业务扩张、应用成倍的增加、短信规模化使用,传统短信平台的接入方式和单一的信息发送功能,已经不能完全满足现代企业管理的需求,所以统一入口、减少对接成本、同时兼顾多种短信业务、简单易行的操作与维护、高稳定、高可靠的移动信息化应用成为短信平台发展趋势。
- 服务越来越多,每个服务都有可能发送短信,是否每个服务都需要对接一遍?
- 多应用对接短信,如何做到短信发送服务高效、稳定?
- 短信通道出现异常时,如何快速切换通道?
- 切换通道时,如何做到应用服务无感知?
- 如何统计各服务短信发送情况,以便进行后续营销分析?
本项目(动力短信平台)的核心在于保证短信高效、准确的送达、简单易操作的对接方式。通过对服务的解耦、通讯方式的升级来提升系统的吞吐量。同时在多通道的加持下,通过智能动态的通道评级、选举、降级、热插拔,增强了系统的健壮性,摆脱对单一通道的依赖。并且提供多种对接方式,满足企业内部的各种需求。
动力短信平台的整体架构如下:
1.2 业务架构
1.3 技术架构
1.3.1 系统管理服务
1.3.2 短信接收服务
1.3.3 短信发送服务
1.4 项目模块介绍
动力短信平台,项目整体工程结构和模块功能如下:
From: 元动力1 2 3 4 5 6
| ydl-sms-backend # 动力短信平台父工程 ├── ydl-sms-entity # 短信平台实体 ├── ydl-sms-manage # 系统管理服务 ├── ydl-sms-api # 短信接收服务,应用系统调用接口、发送短信 ├── ydl-sms-server # 短信发送服务,调用短信通道、发送短信 └── ydl-sms-sdk # 短信SDK,应用系统引入、发送短信
|
动力短信服务有三个:后台管理服务,短信接收服务,短信发送服务:
应用 | 端口 | 说明 | 启动命令 |
---|
ydl-sms-manage | 8770 | 后台管理服务 | java -jar ydl-sms-manage.jar & |
ydl-sms-api | 8771 | 短信接收服务 | java -jar ydl-sms-api.jar & |
ydl-sms-server | 8772 | 短信发送服务 | java -jar ydl-sms-server.jar & |
2. 项目环境准备
2.1 环境要求
2.2 Redis集群
Redis集群的哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行。其原理是哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个Redis实例。 哨兵模式作用:
- 通过发送命令,让Redis服务器返回监控其运行状态,包括主服务器和从服务器。
- 当哨兵监测到master宕机,会自动将slave切换成master,然后通过发布订阅模式通知其他的从服务器,修改配置文件,让它们切换主机。
除了监控Redis服务之外,哨兵之间也会互相监控。本文采用一主、双从、三哨兵方式
部署方式为:docker compose:
安装、升级docker图文请看:https://mp.weixin.qq.com/s/F3kQvSMYSKbnyLaDIbVl1Qopen in new window
第一步:创建redis docker-compose.yml配置文件 目录,并复制docs/dockerfile/redis/docker-compose.yml 到当前目录,配置文件可根据需要调整
From: 元动力1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| version: '3.4' services: master: image: redis restart: always container_name: redis-master network_mode: "host" command: redis-server --port 16380 --requirepass 123456 --protected-mode no --daemonize no ports: - 16380:16380 slave1: image: redis restart: always container_name: redis-slave-1 network_mode: "host" command: redis-server --slaveof 127.0.0.1 16380 --port 16381 --requirepass 123456 --masterauth 123456 --protected-mode no --daemonize no ports: - 16381:16381 slave2: image: redis restart: always container_name: redis-slave-2 network_mode: "host" command: redis-server --slaveof 127.0.0.1 16380 --port 16382 --requirepass 123456 --masterauth 123456 --protected-mode no --daemonize no ports: - 16382:16382
|
第二步:执行启动命令 在当前目录下执行启动命令
From: 元动力1
| docker-compose -f docker-compose.yml up -d
|
第三步:创建sentinel docker-compose.yml配置文件 目录,并复制配置文件 复制 docs/dockerfile/sentinel/docker-compose.yml 到当前目录 复制 docs/dockerfile/sentinel/sentinel1.conf 到当前目录 复制 docs/dockerfile/sentinel/sentinel2.conf 到当前目录 复制 docs/dockerfile/sentinel/sentinel3.conf 到当前目录 docker-compose.yml
From: 元动力1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| version: '3.4' services: sentinel1: image: redis restart: always container_name: redis-sentinel-1 network_mode: "host" command: redis-sentinel /root/dockerfile/sentinel/sentinel1.conf volumes: - ./sentinel1.conf:/root/dockerfile/sentinel/sentinel1.conf sentinel2: image: redis restart: always container_name: redis-sentinel-2 network_mode: "host" command: redis-sentinel /root/dockerfile/sentinel/sentinel2.conf volumes: - ./sentinel2.conf:/root/dockerfile/sentinel/sentinel2.conf sentinel3: image: redis restart: always container_name: redis-sentinel-3 network_mode: "host" command: redis-sentinel /root/dockerfile/sentinel/sentinel3.conf volumes: - ./sentinel3.conf:/root/dockerfile/sentinel/sentinel3.conf
|
sentinel1.conf
From: 元动力1 2 3 4 5 6 7 8 9 10
| port 26380 daemonize no pidfile /var/run/redis-sentinel1.pid dir /tmp sentinel monitor mymaster 127.0.0.1 16380 2 sentinel auth-pass mymaster 123456 sentinel down-after-milliseconds mymaster 30000 sentinel parallel-syncs mymaster 1 sentinel failover-timeout mymaster 180000 sentinel deny-scripts-reconfig yes
|
sentinel2.conf
From: 元动力1 2 3 4 5 6 7 8 9 10 11
| port 26381 daemonize no pidfile /var/run/redis-sentinel2.pid dir /tmp sentinel monitor mymaster 127.0.0.1 16380 2 sentinel auth-pass mymaster 123456 sentinel down-after-milliseconds mymaster 30000 sentinel parallel-syncs mymaster 1 sentinel failover-timeout mymaster 180000 sentinel deny-scripts-reconfig yes
|
sentinel3.conf
From: 元动力1 2 3 4 5 6 7 8 9 10 11
| port 26382 daemonize no pidfile /var/run/redis-sentinel3.pid dir /tmp sentinel monitor mymaster 127.0.0.1 16380 2 sentinel auth-pass mymaster 123456 sentinel down-after-milliseconds mymaster 30000 sentinel parallel-syncs mymaster 1 sentinel failover-timeout mymaster 180000 sentinel deny-scripts-reconfig yes
|
第四步:执行启动命令 在当前目录下执行启动命令
From: 元动力1
| docker-compose -f docker-compose.yml up -d
|
2.3 后端工程导入
将 资料夹\资料\后端初始项目\jars 中的jar包install入本地仓库:
From: 元动力1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| mvn install:install-file -Dfile=./ydl-tools-common.jar -DgroupId=com.ydl -DartifactId=ydl-tools-common -Dversion=1.0-SNAPSHOT -Dpackaging=jar
mvn install:install-file -Dfile=./ydl-tools-core.jar -DgroupId=com.ydl -DartifactId=ydl-tools-core -Dversion=1.0-SNAPSHOT -Dpackaging=jar
mvn install:install-file -Dfile=./ydl-tools-databases.jar -DgroupId=com.ydl -DartifactId=ydl-tools-databases -Dversion=1.0-SNAPSHOT -Dpackaging=jar
mvn install:install-file -Dfile=./ydl-tools-dozer.jar -DgroupId=com.ydl -DartifactId=ydl-tools-dozer -Dversion=1.0-SNAPSHOT -Dpackaging=jar
mvn install:install-file -Dfile=./ydl-tools-j2cache.jar -DgroupId=com.ydl -DartifactId=ydl-tools-j2cache -Dversion=1.0-SNAPSHOT -Dpackaging=jar
mvn install:install-file -Dfile=./ydl-tools-jwt.jar -DgroupId=com.ydl -DartifactId=ydl-tools-jwt -Dversion=1.0-SNAPSHOT -Dpackaging=jar
mvn install:install-file -Dfile=./ydl-tools-log.jar -DgroupId=com.ydl -DartifactId=ydl-tools-log -Dversion=1.0-SNAPSHOT -Dpackaging=jar
mvn install:install-file -Dfile=./ydl-tools-swagger2.jar -DgroupId=com.ydl -DartifactId=ydl-tools-swagger2 -Dversion=1.0-SNAPSHOT -Dpackaging=jar
mvn install:install-file -Dfile=./ydl-tools-user.jar -DgroupId=com.ydl -DartifactId=ydl-tools-user -Dversion=1.0-SNAPSHOT -Dpackaging=jar
mvn install:install-file -Dfile=./ydl-tools-validator.jar -DgroupId=com.ydl -DartifactId=ydl-tools-validator -Dversion=1.0-SNAPSHOT -Dpackaging=jar
mvn install:install-file -Dfile=./ydl-tools-xss.jar -DgroupId=com.ydl -DartifactId=ydl-tools-xss -Dversion=1.0-SNAPSHOT -Dpackaging=jar
|
将资料中的初始工程导入开发工具,如下:
2.4 数据库
数据库脚本位于导入的初始工程/docs/mysql/ydl-sms.sql
,创建ydl_sms数据库并执行ydl-sms.sql脚本文件完成建表操作。创建完成后可以看到如下表:
2.5 前端工程
前端工程采用vue2 + ts + spa架构,前端工程在资料中已经提供,位置为:/资料/前端工程
前端代码结构和核心代码
编译、运行前端代码
From: 元动力1 2
| npm install npm run serve
|
登陆页面效果
进入效果
3. 后台管理服务
3.1. 项目结构
3.1.1 基础工程
基础工程为ydl-sms-entity工程,主要是一些实体类、DTO、工具类、Mapper接口等,作为基础模块,其他几个服务都会依赖此基础模块。
注意:dto entity vo 关系图
3.1.2 管理端工程
ydl-sms-manage作为后台管理服务的maven工程,主要功能是对基础数据进行维护操作,例如签名管理、模板管理、通道管理、通道优先级配置、数据统计等。
3.2. 功能清单
下图展示了后台管理服务实现的功能清单:
3.3. 数据模型与类
序号 | 表名 | 类名 | 说明 |
---|
1 | signature | SignatureEntity | 短信签名 |
2 | template | TemplateEntity | 短信模板 |
3 | config | ConfigEntity | 短信通道配置 |
4 | config_signature | ConfigSignatureEntity | 通道与签名关系 |
5 | config_template | ConfigTemplateEntity | 通道与模板关系 |
6 | platform | PlatformEntity | 接入平台(应用管理) |
7 | receive_log | ReceiveLogEntity | 短信接收日志 |
8 | manual_process | ManualProcessEntity | 人工处理任务 |
9 | send_log | SendLogEntity | 短信发送日志 |
10 | black_list | BlackListEntity | 黑名单 |
11 | timing_push | TimingPushEntity | 定时发送 |
名词解释:
- 短信签名:是指主叫用户在发送短信过程中,附加主叫用户的个性化签名,发送到被叫手机用户的业务。
- 短信模板:即具体发送的短信内容,短信模版通常可以支持验证码、通知、推广三种短信类型。
举例:尊敬的【变量a】先生,您尾号为【变量b】的卡,消费了【变量c】元。【农业银行】
- 短信通道:指第三方短信平台,例如阿里云短信、乐信短信、梦网短信等。
3.4. 基础属性自动注入
功能:通过自定义注解和切面,在进行数据维护时实现实体中基础属性的自动赋值(创建者、创建时间、修改者、修改者)。
3.4.1 基础属性
基础实体类,业务实体类的基类:
From: 元动力1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| package com.ydl.sms.entity.base;
import com.baomidou.mybatisplus.annotation.TableId; import io.swagger.annotations.ApiModelProperty; import lombok.Data;
import java.io.Serializable; import java.time.LocalDateTime;
@Data public abstract class BaseEntity implements Serializable {
@TableId @ApiModelProperty(value = "主键") private String id;
@ApiModelProperty(value = "创建时间") private LocalDateTime createTime;
@ApiModelProperty(value = "创建人") private String createUser = "0";
@ApiModelProperty(value = "修改时间") private LocalDateTime updateTime;
@ApiModelProperty(value = "修改人") private String updateUser;
@ApiModelProperty(value = "逻辑删除:0删除") private Integer isDelete;
}
|
3.4.2 自定义注解
From: 元动力1 2 3 4 5
| @Documented @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface DefaultParams { }
|
3.4.3 定义切面类
From: 元动力1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
|
@Component @Aspect @Slf4j public class DefaultParamsAspect { @SneakyThrows @Before("@annotation(com.itheima.sms.annotation.DefaultParams)") public void beforeEvent(JoinPoint point) { Long userId = BaseContextHandler.getUserId(); if (userId == null) { userId = 0L; } Object[] args = point.getArgs(); for (int i = 0; i < args.length; i++) { Class<?> classes = args[i].getClass(); Object id = null; Method method = getMethod(classes, "getId"); if (null != method) { id = method.invoke(args[i]); }
if (null == id) { method = getMethod(classes, "setCreateUser", String.class); if (null != method) { method.invoke(args[i], userId.toString()); } method = getMethod(classes, "setCreateTime", LocalDateTime.class); if (null != method) { method.invoke(args[i], LocalDateTime.now()); } }
method = getMethod(classes, "setUydlateUser", String.class); if (null != method) { method.invoke(args[i], userId.toString()); } method = getMethod(classes, "setUydlateTime", LocalDateTime.class); if (null != method) { method.invoke(args[i], LocalDateTime.now()); } } }
private Method getMethod(Class classes, String name, Class... types) { try { return classes.getMethod(name, types); } catch (NoSuchMethodException e) { return null; } } }
|
3.4.4 使用注解
在Controller方法上添加注解,保存基础属性。
From: 元动力1 2 3 4 5 6 7
| @PostMapping @ApiOperation("保存") @DefaultParams public R save(@RequestBody ObjectEntity entity) { return R.success(); }
|
3.5. Redis发布订阅模式
Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。Redis 客户端可以订阅任意数量的频道。
Redis的发布订阅模式本质和传统的MQ的发布订阅类似,但是相对于其它几款MQ产品来说,redis的使用更加便捷,也更加轻量化,不需要单独去搭建集成一套繁重的MQ框架。但缺点也很明显,redis发布的消息不会持久化,所以当某一台服务器出现问题的时候,这个消息会丢失,所以在考虑使用之前要慎重,当前的业务是否对数据一致性要求很高,如果要求很高,还是建议使用MQ产品。
在发布者订阅者模式下,发布者将消息发布到指定的 channel 里面, 凡是监听该 channel 的消费者都会收到同样的一份消息,这种模式类似于是收音机模式,即凡是收听某个频道的听众都会收到主持人发布的相同的消息内容。 此模式常用于群聊天、 群通知、群公告等场景。
发布订阅模式下的几个概念:
- Publisher: 发布者
- Subscriber:订阅者
- Channel: 频道
下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client1、 client2 和 client3 之间的关系:
当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:
3.5.1 案例演示
1.首先远程启动redis服务启动
2.启动4个客户端 redis-cli
From: 元动力1
| redis-cli.exe -h 192.168.200.131 -p 16380 -a 123456
|
3.将其中三个客户端设置监听频道 test
4.将第四个客户端作为消息发布的客户端,向频道 ydlchannel发布消息
From: 元动力1
| publish ydlchannel 'im itlilaoshi'
|
可以看到另外三个客户端都收到了消息
3.5.2 代码案例
1、导入spring-boot-starter-data-redis依赖
From: 元动力1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
|
2、编写消息监听器,作为消息的订阅者
From: 元动力1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| package com.ydl.sms.listener;
import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.stereotype.Component;
@Component @Slf4j public class MyListener implements MessageListener {
@Override public void onMessage(Message message, byte[] pattern) { log.info("接收到消息:" + message); } }
|
3、编写订阅发布模式的容器配置
From: 元动力1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package com.ydl.sms.config;
import com.ydl.sms.listener.MyListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
@Configuration @AutoConfigureAfter({MyListener.class}) public class SubscriberConfig {
@Autowired private MyListener myListener;
@Bean public RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) { RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer(); redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
redisMessageListenerContainer.addMessageListener(new MessageListenerAdapter(myListener), new PatternTopic("MYTOPIC"));
return redisMessageListenerContainer; } }
|
4、编写消息发布者
From: 元动力1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| package com.ydl.test;
import com.ydl.sms.SmsManageApplication; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class) @SpringBootTest(classes = SmsManageApplication.class) public class RedisTest { @Autowired private RedisTemplate redisTemplate;
public void test1(){ for (int i = 0; i < 10; i++) { redisTemplate.convertAndSend("MYTOPIC","im itlilaoshi"); } }
}
|
3.6. 通道管理
3.6.1 产品原型
3.6.2 需求分析
- 通道信息增、删、改、查(分页、详情)
- 通道排序:通过拖动对前端通道进行排序
- 关联通道与短信签名:一个通道可以有多个签名
- 关联通道与短信模板:一个通道可以有多个模板
- 通道优先级排序后通知短信发送服务,更新缓存中的通道优先级
3.6.3 具体实现
管理端服务有一个场景使用了redis的发布订阅模式:短信通道的优先级发生变化(如人工设置)后,通过redis发布订阅模式通知短信发送服务,短信发送服务接收到消息后自动调整短信发送时使用的通道的优先级(短信发送服务缓存了短信通道的配置信息)。
基础代码都已经实现,此处只需要实现通道排序后通知短信发送服务的代码即可。也就是ConfigServiceImpl类的sendUpdateMessage方法。
短信发送服务业务逻辑说明:
1、为了保证短信发送服务的可用性,在短信发送服务启动时会自动生成当前服务实例的一个uuid作为服务标识保存到redis中,并且每隔3分钟上报服务信息证明服务状态正常
2、短信发送服务启动后会每隔10分钟检查redis中的服务上报信息,如果某个实例超过5分钟没有上报则认为此服务下线,就会从redis中将此服务实例信息删除
3、短信发送服务在启动时会从数据库中查询出可用通道列表并按照优先级排序,然后缓存到redis中
From: 元动力1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public void sendUpdateMessage() { Map map = redisTemplate.opsForHash().entries("SERVER_ID_HASH"); log.info("全部服务:{}", map); Long current = System.currentTimeMillis();
for (Object key : map.keySet()) { long valueLong = Long.parseLong(map.get(key).toString());
if (current - valueLong < (1000 * 60 * 5)) { redisTemplate.delete("listForConnect");
redisTemplate.convertAndSend("TOPIC_HIGH_SERVER", ServerTopic .builder() .option(ServerTopic.INIT_CONNECT) .value(key.toString()) .build() .toString() ); log.info("发送消息通知短信发送服务重新构建通道"); return; } } }
|
第2章 短信接收服务
1. 短信接收服务介绍
短信接收服务的作用就是为应用提供访问接口,应用需要发送短信时只需要调用短信接收服务,由短信接收服务将信息保存到消息缓冲区(Mysql、Redis)。后续会由短信发送服务从消息缓冲区获取消息并发送短信。
动力短信短信平台整体架构:
动力短信短信平台业务架构:
通过上面的业务架构可以看到,短信接收服务(pd-sms-api)提供3种方式供业务系统调用:
短信接收服务通过资质验证(可开关)、短信内容校验后将短信信息发送到对应中间件中(Redis、MySQL)。
短信发送方式分为两种类型:
1、定时发送短信:将短信内容存储到MySQL数据库中,由短信发送服务通过定时任务获取并发送 2、普通短信:将短信内容推送到Redis队列中,由短信发送服务异步接收并发送
2. Redis消息队列
2.1 Redis队列介绍
Redis支持五种数据类型:string(字符串),hash(哈希),list(列表),set(集合)及zset(sorted set:有序集合)。
Redis的list是简单的字符串列表,按照插入顺序排序。可以添加一个元素到列表的头部(左边)或者尾部(右边)。
使用Redis的list可以模拟消息队列,即使用rpush和lpush命令将数据插入队列(生产消息),使用lpop和rpop命令将数据弹出队列(消费消息)。
队列中的消息可以由不同的生产者写入,也可以由不同的消费者消费,但是一个消息一定是只能被消费一次。
redis所有命令,可从官网查看:http://redis.cn/commands.html#listopen in new window
2.2 案例演示
发布消息:
From: 元动力1 2 3 4 5 6 7 8 9 10 11
| root@77889f10b0c8:/data 127.0.0.1:6379> LPUSH ydllist msg1 (integer) 1 127.0.0.1:6379> LPUSH ydllist msg2 (integer) 2 127.0.0.1:6379> LPUSH ydllist msg3 (integer) 3 127.0.0.1:6379> LPUSH ydllist msg4 (integer) 4 127.0.0.1:6379> LPUSH ydllist msg5 (integer) 5
|
查看消息:
From: 元动力1 2 3 4 5 6
| 127.0.0.1:6379> LRANGE ydllist 0 -1 1) "msg5" 2) "msg4" 3) "msg3" 4) "msg2" 5) "msg1"
|
消费消息:
From: 元动力1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| 127.0.0.1:6379> RPOP ydllist "msg1" 127.0.0.1:6379> RPOP ydllist "msg2" 127.0.0.1:6379> LRANGE ydllist 0 -1 1) "msg5" 2) "msg4" 3) "msg3" 127.0.0.1:6379> RPOP ydllist "msg3" 127.0.0.1:6379> RPOP ydllist "msg4" 127.0.0.1:6379> RPOP ydllist "msg5" 127.0.0.1:6379> RPOP ydllist (nil) 127.0.0.1:6379> LRANGE channel1 0 -1 (empty list or set) 127.0.0.1:6379>
|
RPOP命令不具有阻塞功能,如果需要阻塞功能可以使用BRPOP命令。
2.3 代码案例
From: 元动力1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| package com.ydl.test;
import com.ydl.sms.SmsApiApplication; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.TimeUnit;
@RunWith(SpringRunner.class) @SpringBootTest(classes = SmsApiApplication.class) public class RedisListTest { @Autowired RedisTemplate redisTemplate;
@Test public void testPush(){ for (int i = 0; i < 10; i++) { redisTemplate.opsForList().leftPush("ydllist123", "msg"+i); } }
@Test public void testPop(){ for (int i = 0; i < 11; i++) { Object value = redisTemplate.opsForList().rightPop("ydllist123"); System.out.println("取出的值是:"+value); } }
@Test public void testPopBlock(){ while (true){ Object value = redisTemplate.opsForList().rightPop("ydllist123", 10L, TimeUnit.SECONDS); System.out.println("取出的值是:"+value); } } }
|
3. 短信接收服务
3.1 需求分析-重点
功能需求:
- 应用系统调用短信接收服务提供的接口,由短信接收服务将信息保存到消息缓冲区(Mysql、Redis)
- 调用方式:HTTP、TCP、SDK
处理流程:
短信接收服务接收到应用系统请求后,会进行相关的校验处理,校验通过则将信息保存到消息缓存区,具体处理流程如下:
3.2 项目结构
3.3 数据模型与类
序号 | 表名 | 类名 | 说明 |
---|
1 | signature | SignatureEntity | 短信签名 |
2 | template | TemplateEntity | 短信模板 |
3 | config | ConfigEntity | 短信通道配置 |
4 | config_signature | ConfigSignatureEntity | 通道与签名关系 |
5 | config_template | ConfigTemplateEntity | 通道与模板关系 |
6 | platform | PlatformEntity | 接入平台(应用管理) |
7 | receive_log | ReceiveLogEntity | 短信接收日志 |
8 | black_list | BlackListEntity | 黑名单 |
9 | timing_push | TimingPushEntity | 定时发送 |
注意:此处只是列出和短信接收服务有关的数据模型和类。
3.4 消息存储
导入的初始工程中已经实现了大部分代码,主要逻辑为通过Controller提供HTTP接口服务接收应用系统请求,然后调用Service,在Service中进行一系列校验,如果校验通过则需要将消息保存到消息缓冲区。
将消息保存到消息缓冲区的业务逻辑为:
1、进行短信分类,分为实时发送短信和定时发送短信
2、如果是定时发送短信则将消息保存到Mysql数据库
3、如果是实时发送短信则将消息保存到Redis队列,判断短信模板类型,如果是验证码类型则将消息保存到高优先级队列TOPIC_HIGH_SMS,如果是其他类型则将消息保存到普通队列TOPIC_GENERAL_SMS
4、保存短信接收日志到Mysql数据库
具体实现代码如下(SmsSendServiceImpl类的pushSmsMessage方法):
From: 元动力1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
|
private void pushSmsMessage(TemplateEntity templateEntity, SmsSendDTO smsSendDTO, PlatformEntity platformEntity) { ReceiveLogEntity receiveLogEntity = new ReceiveLogEntity(); receiveLogEntity.setApiLogId(UUID.randomUUID().toString()); long start = System.currentTimeMillis(); try { String sendTime = smsSendDTO.getSendTime();
String request = JSON.toJSONString(smsSendDTO); if (StringUtils.isNotEmpty(sendTime)) { TimingPushEntity timingPushEntity = new TimingPushEntity(); timingPushEntity.setMobile(smsSendDTO.getMobile()); timingPushEntity.setTemplate(smsSendDTO.getTemplate()); timingPushEntity.setRequest(request); timingPushEntity.setSignature(smsSendDTO.getSignature()); timingPushEntity.setTiming(sendTime);
timingPushService.save(timingPushEntity); } else {
if (templateEntity.getType() == TemplateType.VERIFICATION.getCode()) { redisTemplate.opsForList().leftPush("TOPIC_HIGH_SMS", request); } else { redisTemplate.opsForList().leftPush("TOPIC_GENERAL_SMS", request); } } receiveLogEntity.setStatus(1); } catch (Exception e) { receiveLogEntity.setStatus(0); receiveLogEntity.setError(ExceptionUtils.getErrorStackTrace(e)); } finally { receiveLogEntity.setPlatformId(platformEntity.getId()); receiveLogEntity.setPlatformName(platformEntity.getName()); receiveLogEntity.setBusiness(smsSendDTO.getBatchCode()); receiveLogEntity.setConfigIds(StringUtils.join(smsSendDTO.getConfigIds(), ",")); receiveLogEntity.setTemplate(smsSendDTO.getTemplate()); receiveLogEntity.setSignature(smsSendDTO.getSignature()); receiveLogEntity.setMobile(smsSendDTO.getMobile()); receiveLogEntity.setRequest(JSON.toJSONString(smsSendDTO.getParams())); receiveLogEntity.setUseTime(System.currentTimeMillis() - start);
receiveLogMapper.insert(receiveLogEntity); } }
|
测试:
3.5 TCP接口
基于Netty进行网络编程,为短信接收服务提供TCP接口,应用系统可以通过TCP调用此接口来和短信接收服务对接。
涉及到的类:
导入的初始工程中主体代码已经完成,只需要实现服务端处理器的具体处理逻辑即可(NettyServerHandler的channelRead0方法):
From: 元动力1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { String restMsg="success"; log.info("tcp接口接受到消息:"+msg); try { SmsParamsDTO smsParamsDTO = parseMessage(msg); if(null==smsParamsDTO){ log.info("报文解析失败!"); return; } SpringUtils.getBean(SmsSendService.class).send(smsParamsDTO);
}catch (Exception e){ log.error("netty发送时,报错了!",e); restMsg=e.getMessage(); }
log.info("回推报文============="+restMsg); ctx.writeAndFlush(restMsg+"\n"); }
|
测试:
打开telnet功能
公众号添加“IT李哥交朋友”,定期技术分享哦。
可以使用telnet作为Netty客户端来测试Netty服务,报文如下:
From: 元动力1
| {"accessKeyId": "dcbd824751cf448fa0b569f1f07b9b32","encryption": "2f2e4da7fffd4391a9aaabd594f37db3","mobile": "15718888888","params": {"code":"79857"},"signature": "DXQM_000000001","template": "DXMB_000000001","timestamp": "","sendTime":"2021-12-25 12:00"}
|
3.6 SDK
3.6.1 说明
SDK 是 Software Development Kit 的缩写,即软件开发工具包。SDK被开发出来是为了减少程序员工作量的,比如公司开发出某种软件的某一功能,把它封装成SDK,提供给其他公司和个人使用。
本小节需要开发短信接收服务SDK,通过SDK可以使应用系统更加方便的调用短信接收服务。
通过SDK方式调用短信接收服务,本质上还是调用的短信接收服务提供的HTTP接口(Controller),只不过是调用的过程在SDK中进行了封装。
项目结构:
3.6.2 实现
导入的初始工程中主体代码已经完成,只需要实现业务处理类的具体逻辑即可(SmsSendServiceImpl的send方法):
From: 元动力1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
|
private R send(BaseParamsDTO baseParamsDTO, String url) { baseParamsDTO.setAccessKeyId(accessKeyId);
if (auth) { if (StringUtils.isBlank(accessKeyId) || StringUtils.isBlank(accessKeySecret)) { return R.fail("accessKeyId或accessKeySecret不能为空"); } }
baseParamsDTO.setTimestamp(System.currentTimeMillis() + ""); baseParamsDTO.setEncryption(SmsEncryptionUtils.encode(baseParamsDTO.getTimestamp(), baseParamsDTO.getAccessKeyId(), accessKeySecret));
if (StringUtils.isBlank(domain)) { return R.fail("domain不能为空"); }
CloseableHttpClient httpClient = HttpClients.createDefault(); HttpPost post = new HttpPost(url); post.setHeader("Content-Type", "application/json; charset=UTF-8"); StringEntity stringEntity = new StringEntity(JSON.toJSONString(baseParamsDTO), "UTF-8"); post.setEntity(stringEntity);
try { CloseableHttpResponse response = httpClient.execute(post); HttpEntity responseEntity = response.getEntity();
if (response.getStatusLine().getStatusCode() == 200) { log.info("httpRequest access success, StatusCode is:{}", response.getStatusLine().getStatusCode()); String responseEntityStr = EntityUtils.toString(responseEntity); log.info("responseContent is :" + responseEntityStr);
return JSON.parseObject(responseEntityStr, R.class); } else { log.error("httpRequest access fail ,StatusCode is:{}", response.getStatusLine().getStatusCode()); return R.fail("status is " + response.getStatusLine().getStatusCode()); }
} catch (Exception e) { log.error("error", e); return R.fail(e.getMessage()); } finally { post.releaseConnection(); } }
|
SDK开发完成后,为了方便其他应用使用,通常会将SDK打成jar包上传到远程maven仓库,在应用系统中直接通过maven坐标导入SDK即可使用。
如下是将SDK上传到Nexus后的效果:
如何搭建私服,详见公众号:IT李哥交朋友
3.6.3 测试
第一步:引入SDK的maven坐标
From: 元动力1 2 3 4 5 6 7 8 9 10 11 12 13
| <repositories> <repository> <id>ydlrepo</id> <url>http://192.168.200.131:8081/repository/ydlrepo/</url> </repository> </repositories>
<dependency> <groupId>com.ydl</groupId> <artifactId>ydl-sms-sdk</artifactId> <version>1.0.0</version> </dependency>
|
第二步:编写配置文件
From: 元动力1 2 3 4 5 6 7
| ydlclass: sms: auth: true domain: http://localhost:8771 accessKeyId: dcbd824751cf448fa0b569f1f07b9b32 accessKeySecret: 2f2e4da7fffd4391a9aaabd594f37db3
|
第三步:编写单元测试
From: 元动力1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| package com.ydl.test;
import com.itheima.sms.SmsManageApplication; import com.itheima.sms.sms.dto.SmsParamsDTO; import com.itheima.sms.sms.service.SmsSendService; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.HashMap; import java.util.Map;
@RunWith(SpringRunner.class) @SpringBootTest(classes = SmsManageApplication.class) public class SdkTest { @Autowired private SmsSendService smsSendService;
@Test public void testSend(){ SmsParamsDTO dto = new SmsParamsDTO(); dto.setMobile("15718888888"); dto.setSignature("DXQM_000000001"); dto.setTemplate("DXMB_000000001"); Map<String, String> map = new HashMap<>(); map.put("code","79857"); dto.setParams(map); dto.setSendTime("2021-12-18 10:00"); dto.setTimestamp(System.currentTimeMillis() +"");
return smsSendService.sendSms(dto); } }
|
第3章 短信发送服务
1. 短信发送服务介绍
短信发送服务的作用就是从消息缓冲区获取消息并和具体的短信通道(例如:阿里云短信、梦网短信、乐信短信等)对接来发送短信。
动力短信短信平台整体架构:
- 发送短信:实时发送、定时发送
- 通道降级:通道发送失败,选择下一通道发送短信
- 通道选举:同一通道多次发送失败,降级通道
- 服务注册:有且只有一台机器执行通道选举
2. Redis实现分布式锁
对于简单的单体项目,即运行时程序在同一个Java虚拟机中,使用Java的锁机制(synchronized或者ReentrantLock)可以解决多线程并发问题。
可以运行资料/redis-lock-demo
来重现线程并发问题。
测试过程:
第一步:启动redis-lock-demo服务
第二步:设置redis中库存stock值为100
第三步:使用apache jmeter进行压力测试
注:Apache JMeter是Apache组织开发的基于Java的压力测试工具。用于对软件做压力测试,它最初被设计用于Web应用测试,但后来扩展到其他测试领域。
可以发现对于单实例的应用来说,使用Java锁机制就可以解决线程并发问题。
但是在分布式环境中,程序是集群方式部署,如下图:
可以通过启动两个服务实例来测试集群部署时线程并发问题,具体测试步骤如下:
第一步:分别启动两个redis-lock-demo服务实例,端口号分别为8001和8002
第二步:配置Nginx负载均衡,通过Nginx将压力分发到两个实例上
From: 元动力1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| upstream upstream_name{ server 127.0.0.1:8001; server 127.0.0.1:8002; }
server { listen 8080; server_name localhost;
location / { proxy_pass http://upstream_name; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; } }
|
第三步:使用Apache jmeter进行压力测试
可以发现对于集群环境下的多个服务实例又产生了线程并发问题。
上面的集群部署方式依然会产生线程并发问题,因为synchronized、ReentrantLock只是jvm级别的加锁,没有办法控制其他jvm。也就是上面两个tomcat实例还是可以出现并发执行的情况。要解决分布式环境下的并发问题,则必须使用分布式锁。
分布式锁可以理解为:控制分布式系统有序的去对共享资源进行操作,通过互斥来保证数据的一致性。
分布式锁,是控制分布式系统之间同步访问共享资源的一种方式。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。
实现分布式锁的方式很多,例如:Redis、数据库、Zookeeper等。
本小节主要讲Redis实现分布式锁的方式。
2.1 SETNX
这种加锁的思路是,如果 key 不存在则为 key 设置 value,如果 key 已存在则 SETNX 命令不做任何操作
- 客户端A请求服务器设置key的值,如果设置成功就表示加锁成功 mykey->myvalue
- 客户端B也去请求服务器设置key的值,如果返回失败,那么就代表加锁失败 mykey->myvalue
- 客户端A执行代码完成,删除锁
- 客户端B在等待一段时间后再去请求设置key的值,设置成功 mykey->myvalue
- 客户端B执行代码完成,删除锁
格式:
From: 元动力1 2 3 4 5 6
| 127.0.0.1:6379> SETNX key value
127.0.0.1:6379> EXPIRE key seconds
127.0.0.1:6379> DEL key
|
为什么要设置key过期时间呢?
如果某个客户端获得锁后因为某些原因意外退出了,导致创建了锁但是没有来得及删除锁,那么这个锁将一直存在,后面所有的客户端都无法再获得锁,所以必须要设置过期时间。
2.2 SET
通过前面的expire命令来设置锁过期时间还存在一个问题,就是SETNX和EXPIRE两个命令不是原子性操作。在极端情况下可能会出现获取锁后还没来得及设置过期时间程序就挂掉了,这样就又出现了锁一直存在,后面所有的客户端都无法再获得锁的问题。
如何解决这个问题?答案是使用SET命令。
SET 命令从Redis 2.6.12 版本开始包含设置过期时间的功能,这样获取锁和设置过期时间就是一个原子操作了。
格式:
From: 元动力1
| SET key value [EX seconds] [NX]
|
示例:
From: 元动力1
| 127.0.0.1:6379> SET mykey myvalue EX 5 NX
|
- EX seconds :将键的过期时间设置为 seconds 秒
- NX :只在key不存在时才对键进行设置操作
2.3 代码实现
基于redis实现分布式锁:
From: 元动力1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| @Autowired RedisLock redisLock;
@GetMapping("/stock3") public String stock3() { String mylock = redisLock.tryLock("MYLOCK", 2000); if(mylock!=null){ int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { stock--; stringRedisTemplate.opsForValue().set("stock", stock + ""); System.out.println("减库存成功,剩余库存是:" + stock); } else { System.out.println("库存不足了!"); }
redisLock.unlock("MYLOCK",mylock); } return "OK"; }
@GetMapping("/stock4") public String stock4() { String mylock = redisLock.lock("MYLOCK", 2000,1000); if(mylock!=null){ int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { stock--; stringRedisTemplate.opsForValue().set("stock", stock + ""); System.out.println("减库存成功,剩余库存是:" + stock); } else { System.out.println("库存不足了!"); }
redisLock.unlock("MYLOCK",mylock); } return "OK"; }
|
3. Spring Task定时任务
3.1 Spring Task介绍
在企业软件开发中经常会遇到定时任务处理的业务场景,例如银行系统每晚会定时执行对账业务,很多系统每晚都会定时执行数据备份等等。
定时任务的实现方案有很多,例如:Quartz、Spring Task、XXL job等。对于简单的应用场景可以使用Spring Task。Spring Task相关API在spring-context包中已经提供了,不需要额外导入其他jar包。
3.2 代码案例
在Spring Boot项目中使用Spring Task非常简单,只需要在Bean的方法上加入@Scheduled注解,并通过cron表达式指定触发的时间即可。
第一步:在启动类上加入@EnableScheduling注解
第二步:编写定时任务类
From: 元动力1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| package com.itheima.job;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component;
import java.util.Date;
@Component public class DemoJob {
@Scheduled(cron = "0/5 * * * * ?") public void testJob() { System.out.println("现在时间是" + new Date()); }
}
|
4. 短信发送服务
4.1 需求分析
功能需求:
- 和具体的短信通道对接(例如:阿里云短信、梦网短信等),发送短信
- 短信定时发送
- 短信实时发送
- 服务注册,保证短信发送服务高可用
- 通道自动选举、降级
处理过程:
4.2 项目结构
4.3 核心代码
短信发送服务核心类:
- ServerRegister:服务注册器,用于将短信发送服务注册到Redis中,定时服务上报,定时服务检查
- ConfigServiceImpl:通道配置器,用于查询可用通道(阿里短信、华为短信等),通道选举、降级
- AbstractSmsService:短信发送器抽象父类,子类需要和具体的短信通道对接来完成发送短信的工作
- SmsConnectLoader:通道实例加载器,根据通道配置,初始化每个通道的Bean对象
- SmsFactory:短信发送工厂,获取具体的通道实例Bean对象(例如AliyunSmsService)来发送短信, 如果发送出现异常,触发通道选举和通道降级策略
- SendTimingSmsImpl:定时短信业务处理器,具体负责定时短信的发送
- SendSmsJob:短信发送定时任务,用于定时短信的发送,调用SendTimingSmsImpl发送定时短信
- GeneralSmsListener、HighSmsListener:短信接收器,Redis队列的消费者,监听队列中的消息,如果有消息则调用SmsFactory发送实时短信
- HighServerReceiver:通道消息监听器,通过Redis的发布订阅模式监听通道相关消息,调用SmsConnectLoader初始化通道和更新通道
- SubscriberConfig:订阅发布模式的容器配置,创建消息监听容器,并将HighServerReceiver加入容器中
4.4 功能实现
4.4.1 服务注册器
服务注册器对应的为ServerRegister类。
短信发送服务支持分布式集群部署,可以是多个实例,实例越多,发送短信的能力越强。但是对于通道选举、持久化通道等操作,只能有一个服务实例执行,其他服务实例通过redis的广播机制获得通道变化。
如果要实现这一功能,需要将所有短信发送服务实例注册到某个地方,当前实现是将所有服务实例注册到Redis中。并且为了能够监控每个服务实例运行状态,需要每个服务实例定时上报并且定时进行服务检查。
业务逻辑:
1、服务注册,项目启动时将当前服务实例id注册到redis
2、服务上报,每三分钟报告一次,并传入当前时间戳
3、服务检查,每十分钟检查一次服务列表,清空超过五分钟没有报告的服务
代码实现:
From: 元动力1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
| package com.ydl.sms.factory;
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.core.annotation.Order; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component;
import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID;
@Component @Slf4j @Order(value = 100) public class ServerRegister implements CommandLineRunner { public static String SERVER_ID = null;
@Autowired private RedisTemplate redisTemplate;
@Override public void run(String... args) { SERVER_ID = UUID.randomUUID().toString(); log.info("server实例启动,生成的id:{}", SERVER_ID); redisTemplate.opsForHash().put("SERVER_ID_HASH", SERVER_ID, System.currentTimeMillis()); }
@Scheduled(cron = "1 1/3 * * * ?") public void serverReport() {
log.info("服务定时上报。id:{}",SERVER_ID); redisTemplate.opsForHash().put("SERVER_ID_HASH", SERVER_ID, System.currentTimeMillis()); }
@Scheduled(cron = "30 1/10 * * * ?") public void checkServer() { log.info("定时服务检查。id:{}",SERVER_ID); Map map = redisTemplate.opsForHash().entries("SERVER_ID_HASH"); log.info("当前服务有:"+map);
long now = System.currentTimeMillis(); List removeKeys=new ArrayList(); map.forEach((key,value)->{ long registrTime = Long.parseLong(value.toString()); if(now-registrTime>(5*60*1000)){ removeKeys.add(key); } });
log.info("该要删除的key有:{}",removeKeys); removeKeys.forEach(key->{ redisTemplate.opsForHash().delete("SERVER_ID_HASH", key); });
} }
|
可以启动两个服务实例来测试服务注册、服务上报、服务检查是否能够正常执行。
4.4.2 通道实例加载器
通道实例加载器对应的为SmsConnectLoader类。
短信发送服务存在多个通道(例如阿里云短信、华为云短信等),这些通道是通过后台管理系统设置的,包括通道的名称、签名、模板、连接方式等信息。当短信发送服务启动时,或者后台管理系统设置通道时,将会初始化短信通道。
通道实例加载器的作用就是根据通道配置,初始化每个通道的Bean对象(例如AliyunSmsService、MengWangSmsService等)。
SmsConnectLoader类中只需要实现initConnect方法即可,其他方法已经提供好。
业务逻辑:
1、查询数据库获得通道列表
2、遍历通道列表,通过反射创建每个通道的Bean对象(例如AliyunSmsService、MengWangSmsService等)
3、将每个通道的Bean对象保存到CONNECT_LIST集合中
具体代码如下:
From: 元动力1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
|
@SneakyThrows public void initConnect() { List<ConfigEntity> configEntitiesList = configService.listForConnect(); List constructorList=new ArrayList(); configEntitiesList.forEach(configEntity->{ try { SmsConfig config=new SmsConfig(); config.setId(configEntity.getId()); config.setDomain(configEntity.getDomain()); config.setName(configEntity.getName()); config.setPlatform(configEntity.getPlatform().trim()); config.setAccessKeyId(configEntity.getAccessKeyId().trim()); config.setAccessKeySecret(configEntity.getAccessKeySecret().trim()); if(StringUtils.isNotBlank(configEntity.getOther())){ LinkedHashMap linkedHashMap = JSON.parseObject(configEntity.getOther(), LinkedHashMap.class); config.setOtherConfig(linkedHashMap); }
String className="com.ydl.sms.sms."+config.getPlatform()+"SmsService"; System.out.println(className);
Class<?> aClass = Class.forName(className); Constructor<?> constructor = aClass.getConstructor(SmsConfig.class); Object obj = constructor.newInstance(config);
SignatureService signatureService = SpringUtils.getBean(SignatureService.class); TemplateService templateService = SpringUtils.getBean(TemplateService.class); Field signatureServiceField = aClass.getSuperclass().getDeclaredField("signatureService"); Field templateServiceField = aClass.getSuperclass().getDeclaredField("templateService"); signatureServiceField.setAccessible(true); templateServiceField.setAccessible(true); signatureServiceField.set(obj,signatureService); templateServiceField.set(obj,templateService);
constructorList.add(obj);
log.info("初始化通道成功:{},{}",config.getName(),config.getPlatform()); }catch (Exception e){ log.warn("初始化通道异常:{}",e.getMessage()); }
}); if(!CONNECT_LIST.isEmpty()){ CONNECT_LIST.clear(); } CONNECT_LIST.addAll(constructorList);
if(StringUtils.isNotBlank(BUILD_NEW_CONNECT_TOKEN)){ redisLock.unlock("buildNewConnect", BUILD_NEW_CONNECT_TOKEN); }
log.info("通道初始化完成了。{}",CONNECT_LIST); }
|
4.4.3 定时短信业务处理器
定时短信业务处理器对应的是SendTimingSmsImpl类,具体负责定时短信的发送。
业务逻辑:
1、查询数据库获取本次需要发送的定时短信
2、调用短信工厂发送短信
3、更新短信发送状态为“已处理”
实现代码:
From: 元动力1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| package com.ydl.sms.job;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.ydl.sms.entity.TimingPushEntity; import com.ydl.sms.factory.SmsFactory; import com.ydl.sms.mapper.TimingPushMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component;
import java.time.LocalDateTime; import java.util.List;
@Component @Slf4j public class SendTimingSmsImpl implements SendTimingSms {
@Autowired private TimingPushMapper timingPushMapper;
@Autowired private SmsFactory smsFactory;
@Override @Async public void execute(String timing) { LambdaQueryWrapper<TimingPushEntity> wrapper = new LambdaQueryWrapper<>(); wrapper.eq(TimingPushEntity::getStatus,0); wrapper.eq(TimingPushEntity::getTiming,timing); wrapper.orderByAsc(TimingPushEntity::getCreateTime); List<TimingPushEntity> list = timingPushMapper.selectList(wrapper);
log.info("这一批次要发的短信条数是:{},{}",timing,list.size());
list.forEach(x->{ String request = x.getRequest(); smsFactory.send(request); x.setStatus(1); x.setUpdateTime(LocalDateTime.now()); x.setUpdateUser("system"); timingPushMapper.updateById(x); });
log.info("任务执行完毕"+timing);
} }
|
4.4.4 短信发送定时任务
短信发送定时任务对应的是SendSmsJob类,用于定时短信的发送,调用SendTimingSmsImpl发送定时短信。
业务逻辑:
1、每分钟触发一次定时任务
2、为了防止短信重复发送,需要使用分布式锁
3、调用SendTimingSmsImpl发送定时短信
代码实现:
From: 元动力1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| package com.ydl.sms.job;
import com.ydl.sms.config.RedisLock; import com.ydl.utils.DateUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.Date;
@Component @Slf4j public class SendSmsJob {
@Autowired private SendTimingSms sendTimingSms;
@Autowired private RedisLock redisLock;
@Scheduled(cron = "10 0/1 * * * ?") public void sendTimingSms() throws InterruptedException { String lock = redisLock.tryLock("SEND_TIMING_SMS", 30 * 1000); if(StringUtils.isNotBlank(lock)){ sendTimingSms.execute(DateUtils.format(new Date(),"yyyy-MM-dd HH:mm")); } } }
|
4.4.5 短信监听器
应用系统调用短信接收服务,短信接收服务将短信消息放入消息队列,实现短信的接收和发送的解耦,这种方案可以最大化提高短信接收服务性能,不会由于外部短信通道影响应用系统的响应,从而起到削峰填谷的效果。
本系统使用的Redis集群作为消息队列,在传输短信时,采用的是生产者消费者模式。
短信接收器对应的类为GeneralSmsListener和HighSmsListener,角色是Redis队列的消费者,监听队列中的消息,如果有消息则调用SmsFactory发送实时短信。
业务逻辑:
1、HighSmsListener监听TOPIC_HIGH_SMS队列,如果有消息则调用短信发送工厂SmsFactory发送实时短信
2、GeneralSmsListener监听TOPIC_GENERAL_SMS队列,如果有消息则调用短信发送工厂SmsFactory发送实时短信
HighSmsListener具体实现:
From: 元动力1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| package com.ydl.sms.redismq;
import com.ydl.sms.factory.SmsFactory; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.ListOperations; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import java.util.concurrent.TimeUnit;
@Component @Slf4j public class HighSmsListener extends Thread { @Autowired private RedisTemplate redisTemplate;
@Autowired private SmsFactory smsFactory;
private String queueKey = "TOPIC_HIGH_SMS";
@Value("${spring.redis.queue.pop.timeout}") private Long popTimeout = 8000L;
private ListOperations listOps;
@PostConstruct private void init() { listOps = redisTemplate.opsForList(); this.start(); }
@Override public void run() { while (true){ log.debug("队列{}正在监听中",queueKey); String message = (String) listOps.rightPop(queueKey, popTimeout, TimeUnit.MILLISECONDS); if(StringUtils.isNotBlank(message)){ log.info("{}收到消息了:{}",queueKey,message); smsFactory.send(message); } } } }
|
GeneralSmsListener具体实现:
From: 元动力1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| package com.ydl.sms.redismq;
import com.ydl.sms.factory.SmsFactory; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.ListOperations; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import java.util.concurrent.TimeUnit;
@Component @Slf4j public class GeneralSmsListener extends Thread { @Autowired private RedisTemplate redisTemplate;
@Autowired private SmsFactory smsFactory;
private String queueKey = "TOPIC_GENERAL_SMS";
@Value("${spring.redis.queue.pop.timeout}") private Long popTimeout = 8000L;
private ListOperations listOps;
@PostConstruct private void init() { listOps = redisTemplate.opsForList(); this.start(); }
@Override public void run() { while (true){ log.debug("队列{}正在监听中",queueKey); String message = (String) listOps.rightPop(queueKey, popTimeout, TimeUnit.MILLISECONDS); if(StringUtils.isNotBlank(message)){ log.info("{}收到消息了:{}",queueKey,message); smsFactory.send(message); } } } }
|
4.4.6 通道消息监听器
通道消息监听器对应的类是HighServerReceiver,作用是通过Redis的发布订阅模式监听TOPIC_HIGH_SERVER频道中的消息,调用SmsConnectLoader初始化通道和更新通道。
业务逻辑:
1、通过Redis发布订阅模式监听TOPIC_HIGH_SERVER频道中的消息
2、如果消息为USE_NEW_CONNECT,表示通道更新,调用通道实例加载器SmsConnectLoader更新通道
3、如果消息为INIT_CONNECT,表示通道初始化,调用通道实例加载器SmsConnectLoader初始化通道
实现代码:
From: 元动力1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| package com.ydl.sms.redismq;
import com.ydl.sms.factory.SmsConnectLoader; import com.ydl.sms.model.ServerTopic; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.stereotype.Component;
@Component @Slf4j public class HighServerReceiver implements MessageListener {
@Autowired private RedisTemplate redisTemplate;
@Autowired private SmsConnectLoader smsConnectLoader;
@Override public void onMessage(Message message, byte[] pattern) {
RedisSerializer<?> valueSerializer = redisTemplate.getDefaultSerializer(); String deserialize = valueSerializer.deserialize(message.getBody()).toString();
ServerTopic serverTopic = ServerTopic.load(deserialize);
switch (serverTopic.getOption()) { case ServerTopic.USE_NEW_CONNECT: log.info("服务:{} 发起【通道更新】", serverTopic.getValue()); smsConnectLoader.changeNewConnect(); break; case ServerTopic.INIT_CONNECT: log.info("服务:{} 发起【通道初始化】", serverTopic.getValue()); smsConnectLoader.initConnect(); break; default: break; }
} }
|
5对接阿里短信进行真实发送!
1申请阿里短信的签名 和 模板
2获取accessKey和secret
3参考文档经行代码编写
From: 元动力1 2 3 4 5 6 7 8 9 10 11
| private void init() { //初始化acsClient,暂不支持region化 "cn-hangzhou" //profile = DefaultProfile.getProfile(config.get("RegionId"), config.getAccessKeyId(), config.getAccessKeySecret()); aliconfig = new Config() // 您的AccessKey ID .setAccessKeyId(config.getAccessKeyId()) // 您的AccessKey Secret .setAccessKeySecret( config.getAccessKeySecret()); // 访问的域名 aliconfig.endpoint = "dysmsapi.aliyuncs.com"; }
|
From: 元动力1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Override protected String sendSms(String mobile, Map<String, String> params, String signature, String template) { // 获取 签名内容 和模板id SignatureEntity signatureEntity = signatureService.getByCode(signature); String code = templateService.getConfigCodeByCode(config.getId(), template); try { Client client = new Client(aliconfig);
SendSmsRequest request=new SendSmsRequest(); request.setPhoneNumbers(mobile); request.setTemplateCode(code); request.setTemplateParam(JSON.toJSONString(params)); request.setSignName(signatureEntity.getContent());
SendSmsResponse response = client.sendSms(request); JSONObject jsonObject = JSON.parseObject(String.valueOf(response.getBody())); if (response.getBody().getCode().equals("OK")) { return response.getBody().toString(); } else { return failResponse(jsonObject.getString("Message"),response.getBody().getMessage()); } } catch (Exception e) { log.error("Aliyun 短信发送失败:{} ,{}", mobile, template, e); return failResponse(e.getMessage(), e.getMessage()); } //"{\"Message\":\"OK\",\"RequestId\":\"" + UUID.randomUUID().toString().toUpperCase() + "-@\",\"BizId\":\"" + System.currentTimeMillis() + "\",\"Code\":\"OK\"}"; }
|
4平台配置相应正确参数
5启动三个平台,和shop服务,进行发送
6结果