第1章 项目概述及管理端

1. 项目概述

1.1 背景介绍

随着企业业务扩张、应用成倍的增加、短信规模化使用,传统短信平台的接入方式和单一的信息发送功能,已经不能完全满足现代企业管理的需求,所以统一入口、减少对接成本、同时兼顾多种短信业务、简单易行的操作与维护、高稳定、高可靠的移动信息化应用成为短信平台发展趋势。

image-20210916094348274
image-20210916094348274
  • 服务越来越多,每个服务都有可能发送短信,是否每个服务都需要对接一遍?
  • 多应用对接短信,如何做到短信发送服务高效、稳定?
  • 短信通道出现异常时,如何快速切换通道?
  • 切换通道时,如何做到应用服务无感知?
  • 如何统计各服务短信发送情况,以便进行后续营销分析?

本项目(动力短信平台)的核心在于保证短信高效、准确的送达、简单易操作的对接方式。通过对服务的解耦、通讯方式的升级来提升系统的吞吐量。同时在多通道的加持下,通过智能动态的通道评级、选举、降级、热插拔,增强了系统的健壮性,摆脱对单一通道的依赖。并且提供多种对接方式,满足企业内部的各种需求。

动力短信平台的整体架构如下:

image-20210916095249795
image-20210916095249795

1.2 业务架构

image-20210916100828128
image-20210916100828128

1.3 技术架构

1.3.1 系统管理服务

image-20210916104038523
image-20210916104038523

1.3.2 短信接收服务

image-20210916104817659
image-20210916104817659

1.3.3 短信发送服务

image-20210916105359377
image-20210916105359377

1.4 项目模块介绍

动力短信平台,项目整体工程结构和模块功能如下:

From: 元动力
1
2
3
4
5
6
ydl-sms-backend            # 动力短信平台父工程
├── ydl-sms-entity # 短信平台实体
├── ydl-sms-manage # 系统管理服务
├── ydl-sms-api # 短信接收服务,应用系统调用接口、发送短信
├── ydl-sms-server # 短信发送服务,调用短信通道、发送短信
└── ydl-sms-sdk # 短信SDK,应用系统引入、发送短信

动力短信服务有三个:后台管理服务,短信接收服务,短信发送服务:

应用端口说明启动命令
ydl-sms-manage8770后台管理服务java -jar ydl-sms-manage.jar &
ydl-sms-api8771短信接收服务java -jar ydl-sms-api.jar &
ydl-sms-server8772短信发送服务java -jar ydl-sms-server.jar &

2. 项目环境准备

2.1 环境要求

2.2 Redis集群

Redis集群的哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行。其原理是哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个Redis实例。 哨兵模式作用:

  • 通过发送命令,让Redis服务器返回监控其运行状态,包括主服务器和从服务器。
  • 当哨兵监测到master宕机,会自动将slave切换成master,然后通过发布订阅模式通知其他的从服务器,修改配置文件,让它们切换主机。

除了监控Redis服务之外,哨兵之间也会互相监控。本文采用一主、双从、三哨兵方式

image-20210916120800855
image-20210916120800855

部署方式为:docker compose:

安装、升级docker图文请看:https://mp.weixin.qq.com/s/F3kQvSMYSKbnyLaDIbVl1Qopen in new window

第一步:创建redis docker-compose.yml配置文件 目录,并复制docs/dockerfile/redis/docker-compose.yml 到当前目录,配置文件可根据需要调整

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
version: '3.4'
services:
master:
image: redis
restart: always
container_name: redis-master
network_mode: "host"
command: redis-server --port 16380 --requirepass 123456 --protected-mode no --daemonize no
ports:
- 16380:16380
slave1:
image: redis
restart: always
container_name: redis-slave-1
network_mode: "host"
command: redis-server --slaveof 127.0.0.1 16380 --port 16381 --requirepass 123456 --masterauth 123456 --protected-mode no --daemonize no
ports:
- 16381:16381
slave2:
image: redis
restart: always
container_name: redis-slave-2
network_mode: "host"
command: redis-server --slaveof 127.0.0.1 16380 --port 16382 --requirepass 123456 --masterauth 123456 --protected-mode no --daemonize no
ports:
- 16382:16382

第二步:执行启动命令 在当前目录下执行启动命令

From: 元动力
1
docker-compose -f docker-compose.yml up -d
image-20210922111122880
image-20210922111122880

第三步:创建sentinel docker-compose.yml配置文件 目录,并复制配置文件 复制 docs/dockerfile/sentinel/docker-compose.yml 到当前目录 复制 docs/dockerfile/sentinel/sentinel1.conf 到当前目录 复制 docs/dockerfile/sentinel/sentinel2.conf 到当前目录 复制 docs/dockerfile/sentinel/sentinel3.conf 到当前目录 docker-compose.yml

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
version: '3.4'
services:
sentinel1:
image: redis
restart: always
container_name: redis-sentinel-1
network_mode: "host"
command: redis-sentinel /root/dockerfile/sentinel/sentinel1.conf
volumes:
- ./sentinel1.conf:/root/dockerfile/sentinel/sentinel1.conf
sentinel2:
image: redis
restart: always
container_name: redis-sentinel-2
network_mode: "host"
command: redis-sentinel /root/dockerfile/sentinel/sentinel2.conf
volumes:
- ./sentinel2.conf:/root/dockerfile/sentinel/sentinel2.conf
sentinel3:
image: redis
restart: always
container_name: redis-sentinel-3
network_mode: "host"
command: redis-sentinel /root/dockerfile/sentinel/sentinel3.conf
volumes:
- ./sentinel3.conf:/root/dockerfile/sentinel/sentinel3.conf

sentinel1.conf

From: 元动力
1
2
3
4
5
6
7
8
9
10
port 26380
daemonize no
pidfile /var/run/redis-sentinel1.pid
dir /tmp
sentinel monitor mymaster 127.0.0.1 16380 2
sentinel auth-pass mymaster 123456
sentinel down-after-milliseconds mymaster 30000
sentinel parallel-syncs mymaster 1
sentinel failover-timeout mymaster 180000
sentinel deny-scripts-reconfig yes

sentinel2.conf

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
port 26381
daemonize no
pidfile /var/run/redis-sentinel2.pid
dir /tmp
sentinel monitor mymaster 127.0.0.1 16380 2
sentinel auth-pass mymaster 123456
sentinel down-after-milliseconds mymaster 30000
sentinel parallel-syncs mymaster 1
sentinel failover-timeout mymaster 180000
sentinel deny-scripts-reconfig yes

sentinel3.conf

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
port 26382
daemonize no
pidfile /var/run/redis-sentinel3.pid
dir /tmp
sentinel monitor mymaster 127.0.0.1 16380 2
sentinel auth-pass mymaster 123456
sentinel down-after-milliseconds mymaster 30000
sentinel parallel-syncs mymaster 1
sentinel failover-timeout mymaster 180000
sentinel deny-scripts-reconfig yes

第四步:执行启动命令 在当前目录下执行启动命令

From: 元动力
1
docker-compose -f docker-compose.yml up -d
image-20210913180549154
image-20210913180549154

2.3 后端工程导入

将 资料夹\资料\后端初始项目\jars 中的jar包install入本地仓库:

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
mvn install:install-file -Dfile=./ydl-tools-common.jar -DgroupId=com.ydl -DartifactId=ydl-tools-common -Dversion=1.0-SNAPSHOT -Dpackaging=jar

mvn install:install-file -Dfile=./ydl-tools-core.jar -DgroupId=com.ydl -DartifactId=ydl-tools-core -Dversion=1.0-SNAPSHOT -Dpackaging=jar

mvn install:install-file -Dfile=./ydl-tools-databases.jar -DgroupId=com.ydl -DartifactId=ydl-tools-databases -Dversion=1.0-SNAPSHOT -Dpackaging=jar

mvn install:install-file -Dfile=./ydl-tools-dozer.jar -DgroupId=com.ydl -DartifactId=ydl-tools-dozer -Dversion=1.0-SNAPSHOT -Dpackaging=jar

mvn install:install-file -Dfile=./ydl-tools-j2cache.jar -DgroupId=com.ydl -DartifactId=ydl-tools-j2cache -Dversion=1.0-SNAPSHOT -Dpackaging=jar

mvn install:install-file -Dfile=./ydl-tools-jwt.jar -DgroupId=com.ydl -DartifactId=ydl-tools-jwt -Dversion=1.0-SNAPSHOT -Dpackaging=jar

mvn install:install-file -Dfile=./ydl-tools-log.jar -DgroupId=com.ydl -DartifactId=ydl-tools-log -Dversion=1.0-SNAPSHOT -Dpackaging=jar

mvn install:install-file -Dfile=./ydl-tools-swagger2.jar -DgroupId=com.ydl -DartifactId=ydl-tools-swagger2 -Dversion=1.0-SNAPSHOT -Dpackaging=jar

mvn install:install-file -Dfile=./ydl-tools-user.jar -DgroupId=com.ydl -DartifactId=ydl-tools-user -Dversion=1.0-SNAPSHOT -Dpackaging=jar

mvn install:install-file -Dfile=./ydl-tools-validator.jar -DgroupId=com.ydl -DartifactId=ydl-tools-validator -Dversion=1.0-SNAPSHOT -Dpackaging=jar

mvn install:install-file -Dfile=./ydl-tools-xss.jar -DgroupId=com.ydl -DartifactId=ydl-tools-xss -Dversion=1.0-SNAPSHOT -Dpackaging=jar

将资料中的初始工程导入开发工具,如下:

image-20210914111704042
image-20210914111704042

2.4 数据库

数据库脚本位于导入的初始工程/docs/mysql/ydl-sms.sql,创建ydl_sms数据库并执行ydl-sms.sql脚本文件完成建表操作。创建完成后可以看到如下表:

image-20210914111857698
image-20210914111857698

2.5 前端工程

前端工程采用vue2 + ts + spa架构,前端工程在资料中已经提供,位置为:/资料/前端工程

  1. 前端代码结构和核心代码

    image-20210914114002904
    image-20210914114002904
  2. 编译、运行前端代码

    From: 元动力
    1
    2
    npm install
    npm run serve
image-20210914112925481
image-20210914112925481
image-20210914113139513
image-20210914113139513

登陆页面效果

image-20210916201900615
image-20210916201900615

进入效果

image-20210916201911039
image-20210916201911039

3. 后台管理服务

image-20210916095249795
image-20210916095249795

3.1. 项目结构

3.1.1 基础工程

基础工程为ydl-sms-entity工程,主要是一些实体类、DTO、工具类、Mapper接口等,作为基础模块,其他几个服务都会依赖此基础模块。

image-20210916220116083
image-20210916220116083

注意:dto entity vo 关系图

image-20210916221225307
image-20210916221225307

3.1.2 管理端工程

ydl-sms-manage作为后台管理服务的maven工程,主要功能是对基础数据进行维护操作,例如签名管理、模板管理、通道管理、通道优先级配置、数据统计等。

image-20210916220303128
image-20210916220303128

3.2. 功能清单

下图展示了后台管理服务实现的功能清单:

image-20210916224738144
image-20210916224738144

3.3. 数据模型与类

序号表名类名说明
1signatureSignatureEntity短信签名
2templateTemplateEntity短信模板
3configConfigEntity短信通道配置
4config_signatureConfigSignatureEntity通道与签名关系
5config_templateConfigTemplateEntity通道与模板关系
6platformPlatformEntity接入平台(应用管理)
7receive_logReceiveLogEntity短信接收日志
8manual_processManualProcessEntity人工处理任务
9send_logSendLogEntity短信发送日志
10black_listBlackListEntity黑名单
11timing_pushTimingPushEntity定时发送

名词解释:

  • 短信签名:是指主叫用户在发送短信过程中,附加主叫用户的个性化签名,发送到被叫手机用户的业务。
image-20210916194042731
image-20210916194042731
  • 短信模板:即具体发送的短信内容,短信模版通常可以支持验证码、通知、推广三种短信类型。

举例:尊敬的【变量a】先生,您尾号为【变量b】的卡,消费了【变量c】元。【农业银行】

  • 短信通道:指第三方短信平台,例如阿里云短信、乐信短信、梦网短信等。

3.4. 基础属性自动注入

功能:通过自定义注解和切面,在进行数据维护时实现实体中基础属性的自动赋值(创建者、创建时间、修改者、修改者)。

3.4.1 基础属性

基础实体类,业务实体类的基类:

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.ydl.sms.entity.base;

import com.baomidou.mybatisplus.annotation.TableId;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import java.io.Serializable;
import java.time.LocalDateTime;

/**
* 基础实体类,所有实体都需要继承
*/
@Data
public abstract class BaseEntity implements Serializable {

@TableId
@ApiModelProperty(value = "主键")
private String id;

@ApiModelProperty(value = "创建时间")
private LocalDateTime createTime;

@ApiModelProperty(value = "创建人")
private String createUser = "0";

@ApiModelProperty(value = "修改时间")
private LocalDateTime updateTime;

@ApiModelProperty(value = "修改人")
private String updateUser;

@ApiModelProperty(value = "逻辑删除:0删除")
private Integer isDelete;

}

3.4.2 自定义注解

From: 元动力
1
2
3
4
5
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface DefaultParams {
}

3.4.3 定义切面类

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* 通过切面方式,自定义注解,实现实体基础数据的注入(创建者、创建时间、修改者、修改时间)
*/
@Component
@Aspect
@Slf4j
public class DefaultParamsAspect {
@SneakyThrows
@Before("@annotation(com.itheima.sms.annotation.DefaultParams)")
public void beforeEvent(JoinPoint point) {
//从threadlocal中获取用户id
Long userId = BaseContextHandler.getUserId();
if (userId == null) {
userId = 0L;
}
Object[] args = point.getArgs();
for (int i = 0; i < args.length; i++) {
Class<?> classes = args[i].getClass();
Object id = null;
Method method = getMethod(classes, "getId");
if (null != method) {
id = method.invoke(args[i]);
}

//请求操作的对象的id为空时,为创建操作
if (null == id) {
method = getMethod(classes, "setCreateUser", String.class);
if (null != method) {
method.invoke(args[i], userId.toString());
}
method = getMethod(classes, "setCreateTime", LocalDateTime.class);
if (null != method) {
method.invoke(args[i], LocalDateTime.now());
}
}

//新建修改更新
method = getMethod(classes, "setUydlateUser", String.class);
if (null != method) {
method.invoke(args[i], userId.toString());
}
method = getMethod(classes, "setUydlateTime", LocalDateTime.class);
if (null != method) {
method.invoke(args[i], LocalDateTime.now());
}
}
}

private Method getMethod(Class classes, String name, Class... types) {
try {
return classes.getMethod(name, types);
} catch (NoSuchMethodException e) {
return null;
}
}
}

3.4.4 使用注解

在Controller方法上添加注解,保存基础属性。

From: 元动力
1
2
3
4
5
6
7
@PostMapping
@ApiOperation("保存")
@DefaultParams
public R save(@RequestBody ObjectEntity entity) {
//具体实现逻辑...
return R.success();
}

3.5. Redis发布订阅模式

Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。Redis 客户端可以订阅任意数量的频道。

Redis的发布订阅模式本质和传统的MQ的发布订阅类似,但是相对于其它几款MQ产品来说,redis的使用更加便捷,也更加轻量化,不需要单独去搭建集成一套繁重的MQ框架。但缺点也很明显,redis发布的消息不会持久化,所以当某一台服务器出现问题的时候,这个消息会丢失,所以在考虑使用之前要慎重,当前的业务是否对数据一致性要求很高,如果要求很高,还是建议使用MQ产品。

在发布者订阅者模式下,发布者将消息发布到指定的 channel 里面, 凡是监听该 channel 的消费者都会收到同样的一份消息,这种模式类似于是收音机模式,即凡是收听某个频道的听众都会收到主持人发布的相同的消息内容。 此模式常用于群聊天、 群通知、群公告等场景。

发布订阅模式下的几个概念:

  • Publisher: 发布者
  • Subscriber:订阅者
  • Channel: 频道

下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client1、 client2 和 client3 之间的关系:

image-20210922171439646
image-20210922171439646

当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:

image-20210922171712064
image-20210922171712064

3.5.1 案例演示

1.首先远程启动redis服务启动

2.启动4个客户端 redis-cli

From: 元动力
1
redis-cli.exe -h 192.168.200.131 -p 16380 -a 123456

3.将其中三个客户端设置监听频道 test

From: 元动力
1
subscribe ydlchannel
image-20210922172222994
image-20210922172222994

4.将第四个客户端作为消息发布的客户端,向频道 ydlchannel发布消息

From: 元动力
1
publish ydlchannel 'im itlilaoshi'

可以看到另外三个客户端都收到了消息

image-20210922172441645
image-20210922172441645

3.5.2 代码案例

1、导入spring-boot-starter-data-redis依赖

From: 元动力
1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2、编写消息监听器,作为消息的订阅者

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.ydl.sms.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

/**
* 自定义消息监听器,用于监听Redis频道中的消息
*/
@Component
@Slf4j
public class MyListener implements MessageListener {
/**
* 监听方法
* @param message
* @param pattern
*/
@Override
public void onMessage(Message message, byte[] pattern) {
log.info("接收到消息:" + message);
}
}

3、编写订阅发布模式的容器配置

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.ydl.sms.config;

import com.ydl.sms.listener.MyListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

/**
* 订阅发布模式的容器配置
*/
@Configuration
@AutoConfigureAfter({MyListener.class})
public class SubscriberConfig {

@Autowired
private MyListener myListener;

/**
* 创建消息监听容器
*
* @param redisConnectionFactory
* @return
*/
@Bean
public RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);

//可以添加多个监听订阅频道
//当前监听的是通道:MYTOPIC
redisMessageListenerContainer.addMessageListener(new MessageListenerAdapter(myListener), new PatternTopic("MYTOPIC"));

return redisMessageListenerContainer;
}
}

4、编写消息发布者

  • 通过redis客户端发送消息
image-20210922182117550
image-20210922182117550
  • 通过Java代码发送消息
From: 元动力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.ydl.test;

import com.ydl.sms.SmsManageApplication;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = SmsManageApplication.class)
public class RedisTest {
@Autowired
private RedisTemplate redisTemplate;

//@Test
public void test1(){
for (int i = 0; i < 10; i++) {
redisTemplate.convertAndSend("MYTOPIC","im itlilaoshi");
}
}

}

3.6. 通道管理

3.6.1 产品原型

image-20210922182054655
image-20210922182054655

3.6.2 需求分析

  • 通道信息增、删、改、查(分页、详情)
  • 通道排序:通过拖动对前端通道进行排序
  • 关联通道与短信签名:一个通道可以有多个签名
  • 关联通道与短信模板:一个通道可以有多个模板
  • 通道优先级排序后通知短信发送服务,更新缓存中的通道优先级

3.6.3 具体实现

管理端服务有一个场景使用了redis的发布订阅模式:短信通道的优先级发生变化(如人工设置)后,通过redis发布订阅模式通知短信发送服务,短信发送服务接收到消息后自动调整短信发送时使用的通道的优先级(短信发送服务缓存了短信通道的配置信息)。

基础代码都已经实现,此处只需要实现通道排序后通知短信发送服务的代码即可。也就是ConfigServiceImpl类的sendUpdateMessage方法。

短信发送服务业务逻辑说明:

1、为了保证短信发送服务的可用性,在短信发送服务启动时会自动生成当前服务实例的一个uuid作为服务标识保存到redis中,并且每隔3分钟上报服务信息证明服务状态正常

2、短信发送服务启动后会每隔10分钟检查redis中的服务上报信息,如果某个实例超过5分钟没有上报则认为此服务下线,就会从redis中将此服务实例信息删除

3、短信发送服务在启动时会从数据库中查询出可用通道列表并按照优先级排序,然后缓存到redis中

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//发送消息,通知短信发送服务更新缓存中的通道优先级
public void sendUpdateMessage() {
//获得所有注册到redis中的短信发送服务实例
Map map = redisTemplate.opsForHash().entries("SERVER_ID_HASH");
log.info("全部服务:{}", map);
Long current = System.currentTimeMillis();

for (Object key : map.keySet()) {
long valueLong = Long.parseLong(map.get(key).toString());

//五分钟内报告,说明短信发送服务状态正常
if (current - valueLong < (1000 * 60 * 5)) {
//删除redis中缓存的可用通道,因为通道优先级发生变化,redis中缓存的可用通道需要重新加载
redisTemplate.delete("listForConnect");

//发布消息
redisTemplate.convertAndSend("TOPIC_HIGH_SERVER",
ServerTopic
.builder()
.option(ServerTopic.INIT_CONNECT)
.value(key.toString())
.build()
.toString()
);
log.info("发送消息通知短信发送服务重新构建通道");
return;
}
}
}

第2章 短信接收服务

1. 短信接收服务介绍

短信接收服务的作用就是为应用提供访问接口,应用需要发送短信时只需要调用短信接收服务,由短信接收服务将信息保存到消息缓冲区(Mysql、Redis)。后续会由短信发送服务从消息缓冲区获取消息并发送短信。

动力短信短信平台整体架构:

image-20210916095249795
image-20210916095249795

动力短信短信平台业务架构:

image-20210916100828128
image-20210916100828128

通过上面的业务架构可以看到,短信接收服务(pd-sms-api)提供3种方式供业务系统调用:

  • HTTP接口
  • TCP
  • SDK形式

短信接收服务通过资质验证(可开关)、短信内容校验后将短信信息发送到对应中间件中(Redis、MySQL)。

短信发送方式分为两种类型:

1、定时发送短信:将短信内容存储到MySQL数据库中,由短信发送服务通过定时任务获取并发送 2、普通短信:将短信内容推送到Redis队列中,由短信发送服务异步接收并发送

2. Redis消息队列

2.1 Redis队列介绍

Redis支持五种数据类型:string(字符串),hash(哈希),list(列表),set(集合)及zset(sorted set:有序集合)。

Redis的list是简单的字符串列表,按照插入顺序排序。可以添加一个元素到列表的头部(左边)或者尾部(右边)。

使用Redis的list可以模拟消息队列,即使用rpushlpush命令将数据插入队列(生产消息),使用lpoprpop命令将数据弹出队列(消费消息)。

队列中的消息可以由不同的生产者写入,也可以由不同的消费者消费,但是一个消息一定是只能被消费一次。

redis所有命令,可从官网查看:http://redis.cn/commands.html#listopen in new window

image-20210923092616090
image-20210923092616090

2.2 案例演示

发布消息:

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
root@77889f10b0c8:/data# redis-cli
127.0.0.1:6379> LPUSH ydllist msg1
(integer) 1
127.0.0.1:6379> LPUSH ydllist msg2
(integer) 2
127.0.0.1:6379> LPUSH ydllist msg3
(integer) 3
127.0.0.1:6379> LPUSH ydllist msg4
(integer) 4
127.0.0.1:6379> LPUSH ydllist msg5
(integer) 5

查看消息:

From: 元动力
1
2
3
4
5
6
127.0.0.1:6379> LRANGE ydllist 0 -1
1) "msg5"
2) "msg4"
3) "msg3"
4) "msg2"
5) "msg1"

消费消息:

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
127.0.0.1:6379> RPOP ydllist
"msg1"
127.0.0.1:6379> RPOP ydllist
"msg2"
127.0.0.1:6379> LRANGE ydllist 0 -1
1) "msg5"
2) "msg4"
3) "msg3"
127.0.0.1:6379> RPOP ydllist
"msg3"
127.0.0.1:6379> RPOP ydllist
"msg4"
127.0.0.1:6379> RPOP ydllist
"msg5"
127.0.0.1:6379> RPOP ydllist
(nil)
127.0.0.1:6379> LRANGE channel1 0 -1
(empty list or set)
127.0.0.1:6379>

RPOP命令不具有阻塞功能,如果需要阻塞功能可以使用BRPOP命令。

image-20210923105449996
image-20210923105449996

2.3 代码案例

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.ydl.test;

import com.ydl.sms.SmsApiApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.concurrent.TimeUnit;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = SmsApiApplication.class)
public class RedisListTest {
@Autowired
RedisTemplate redisTemplate;

//消息发送者
@Test
public void testPush(){
for (int i = 0; i < 10; i++) {
redisTemplate.opsForList().leftPush("ydllist123", "msg"+i);
}
}

//消息消费者
@Test
public void testPop(){
for (int i = 0; i < 11; i++) {
Object value = redisTemplate.opsForList().rightPop("ydllist123");
System.out.println("取出的值是:"+value);
}
}

//消费者阻塞监听队列
@Test
public void testPopBlock(){
while (true){
Object value = redisTemplate.opsForList().rightPop("ydllist123", 10L, TimeUnit.SECONDS);
//业务逻辑
System.out.println("取出的值是:"+value);
}
}
}

3. 短信接收服务

3.1 需求分析-重点

功能需求:

  • 应用系统调用短信接收服务提供的接口,由短信接收服务将信息保存到消息缓冲区(Mysql、Redis)
  • 调用方式:HTTP、TCP、SDK

处理流程:

短信接收服务接收到应用系统请求后,会进行相关的校验处理,校验通过则将信息保存到消息缓存区,具体处理流程如下:

image-20210923174235681
image-20210923174235681

3.2 项目结构

image-20210923100907654
image-20210923100907654

3.3 数据模型与类

序号表名类名说明
1signatureSignatureEntity短信签名
2templateTemplateEntity短信模板
3configConfigEntity短信通道配置
4config_signatureConfigSignatureEntity通道与签名关系
5config_templateConfigTemplateEntity通道与模板关系
6platformPlatformEntity接入平台(应用管理)
7receive_logReceiveLogEntity短信接收日志
8black_listBlackListEntity黑名单
9timing_pushTimingPushEntity定时发送

注意:此处只是列出和短信接收服务有关的数据模型和类。

3.4 消息存储

导入的初始工程中已经实现了大部分代码,主要逻辑为通过Controller提供HTTP接口服务接收应用系统请求,然后调用Service,在Service中进行一系列校验,如果校验通过则需要将消息保存到消息缓冲区。

将消息保存到消息缓冲区的业务逻辑为:

1、进行短信分类,分为实时发送短信和定时发送短信

2、如果是定时发送短信则将消息保存到Mysql数据库

3、如果是实时发送短信则将消息保存到Redis队列,判断短信模板类型,如果是验证码类型则将消息保存到高优先级队列TOPIC_HIGH_SMS,如果是其他类型则将消息保存到普通队列TOPIC_GENERAL_SMS

4、保存短信接收日志到Mysql数据库

具体实现代码如下(SmsSendServiceImpl类的pushSmsMessage方法):

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
//缺啥补啥
/**
* 根据短信模板分类 并分发
*
* @param templateEntity
* @param smsSendDTO
* @param platformEntity
*/
private void pushSmsMessage(TemplateEntity templateEntity, SmsSendDTO smsSendDTO, PlatformEntity platformEntity) {
// TODO 短信接收服务:将短信信息保存到数据库或者Redis队列
ReceiveLogEntity receiveLogEntity = new ReceiveLogEntity();
receiveLogEntity.setApiLogId(UUID.randomUUID().toString());
long start = System.currentTimeMillis();
try {
//1、进行短信分类,分为实时发送短信和定时发送短信
String sendTime = smsSendDTO.getSendTime();

String request = JSON.toJSONString(smsSendDTO);
if (StringUtils.isNotEmpty(sendTime)) {
//2、如果是定时发送短信则将消息保存到Mysql数据库
TimingPushEntity timingPushEntity = new TimingPushEntity();
timingPushEntity.setMobile(smsSendDTO.getMobile());
timingPushEntity.setTemplate(smsSendDTO.getTemplate());
timingPushEntity.setRequest(request);
timingPushEntity.setSignature(smsSendDTO.getSignature());
timingPushEntity.setTiming(sendTime);

timingPushService.save(timingPushEntity);
} else {
//3、如果是实时发送短信则将消息保存到Redis队列,判断短信模板类型,如果是验证码类型则将消息保存到高优先级队列TOPIC_HIGH_SMS,如果是其他类型则将消息保存到普通队列TOPIC_GENERAL_SMS

if (templateEntity.getType() == TemplateType.VERIFICATION.getCode()) {
//您的验证码是${code}
redisTemplate.opsForList().leftPush("TOPIC_HIGH_SMS", request);
} else {
//理财产品${product}
redisTemplate.opsForList().leftPush("TOPIC_GENERAL_SMS", request);
}
}
receiveLogEntity.setStatus(1);
} catch (Exception e) {
receiveLogEntity.setStatus(0);
receiveLogEntity.setError(ExceptionUtils.getErrorStackTrace(e));
} finally {
//4、保存短信接收日志到Mysql数据库
receiveLogEntity.setPlatformId(platformEntity.getId());
receiveLogEntity.setPlatformName(platformEntity.getName());
receiveLogEntity.setBusiness(smsSendDTO.getBatchCode());
receiveLogEntity.setConfigIds(StringUtils.join(smsSendDTO.getConfigIds(), ",")); //[1,2,3]--->1,2,3
receiveLogEntity.setTemplate(smsSendDTO.getTemplate());
receiveLogEntity.setSignature(smsSendDTO.getSignature());
receiveLogEntity.setMobile(smsSendDTO.getMobile());
receiveLogEntity.setRequest(JSON.toJSONString(smsSendDTO.getParams()));
receiveLogEntity.setUseTime(System.currentTimeMillis() - start);

receiveLogMapper.insert(receiveLogEntity);
}
}

测试:

image-20210924114955634
image-20210924114955634

3.5 TCP接口

基于Netty进行网络编程,为短信接收服务提供TCP接口,应用系统可以通过TCP调用此接口来和短信接收服务对接。

涉及到的类:

image-20210924114505723
image-20210924114505723
  • Netty服务启动类:用于启动Netty服务

  • 通道初始化器:主要目的是为程序员提供一个简单的工具,用于在某个Channel注册到EventLoop后,对这个Channel执行一些初始化操作,例如可以添加用户自定义的服务端处理器

  • 服务端处理器:具体执行处理逻辑,例如读取消息

导入的初始工程中主体代码已经完成,只需要实现服务端处理器的具体处理逻辑即可(NettyServerHandler的channelRead0方法):

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//业务逻辑
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
// TODO 短信接收服务:接收应用系统的报文并解析,调用Service将消息保存到消息缓冲区
//就是和controller一样 调用service层即可
String restMsg="success";
log.info("tcp接口接受到消息:"+msg);
try {
//1sring msg--> SmsParamsDTO
SmsParamsDTO smsParamsDTO = parseMessage(msg);
if(null==smsParamsDTO){
log.info("报文解析失败!");
return;
}
//2调用service层即可
SpringUtils.getBean(SmsSendService.class).send(smsParamsDTO);

}catch (Exception e){
log.error("netty发送时,报错了!",e);
restMsg=e.getMessage();
}

log.info("回推报文============="+restMsg);
ctx.writeAndFlush(restMsg+"\n"); //为什么要加\n 不加客户端接收不到消息
}

测试:

打开telnet功能

公众号添加“IT李哥交朋友”,定期技术分享哦。

可以使用telnet作为Netty客户端来测试Netty服务,报文如下:

From: 元动力
1
{"accessKeyId": "dcbd824751cf448fa0b569f1f07b9b32","encryption": "2f2e4da7fffd4391a9aaabd594f37db3","mobile": "15718888888","params": {"code":"79857"},"signature": "DXQM_000000001","template": "DXMB_000000001","timestamp": "","sendTime":"2021-12-25 12:00"}

3.6 SDK

3.6.1 说明

SDK 是 Software Development Kit 的缩写,即软件开发工具包。SDK被开发出来是为了减少程序员工作量的,比如公司开发出某种软件的某一功能,把它封装成SDK,提供给其他公司和个人使用。

本小节需要开发短信接收服务SDK,通过SDK可以使应用系统更加方便的调用短信接收服务。

通过SDK方式调用短信接收服务,本质上还是调用的短信接收服务提供的HTTP接口(Controller),只不过是调用的过程在SDK中进行了封装。

项目结构:

image-20210924143027782
image-20210924143027782

3.6.2 实现

导入的初始工程中主体代码已经完成,只需要实现业务处理类的具体逻辑即可(SmsSendServiceImpl的send方法):

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* 通过HttpClient发送post请求,请求短信接收服务HTTP接口
*
* @param baseParamsDTO
* @param url
* @return
*/
private R send(BaseParamsDTO baseParamsDTO, String url) {
//设置此平台的秘钥
baseParamsDTO.setAccessKeyId(accessKeyId);

//电商 平台是否认证
if (auth) {
//key secret 都要有
if (StringUtils.isBlank(accessKeyId) || StringUtils.isBlank(accessKeySecret)) {
return R.fail("accessKeyId或accessKeySecret不能为空");
}
}

baseParamsDTO.setTimestamp(System.currentTimeMillis() + "");
baseParamsDTO.setEncryption(SmsEncryptionUtils.encode(baseParamsDTO.getTimestamp(), baseParamsDTO.getAccessKeyId(), accessKeySecret));

if (StringUtils.isBlank(domain)) {
return R.fail("domain不能为空");
}

//1 httpclien okhttp
CloseableHttpClient httpClient = HttpClients.createDefault();
//构造post请求
HttpPost post = new HttpPost(url);
//设置请求头
post.setHeader("Content-Type", "application/json; charset=UTF-8");
//设置请求体
StringEntity stringEntity = new StringEntity(JSON.toJSONString(baseParamsDTO), "UTF-8");
post.setEntity(stringEntity);

try {
//发送post请求
CloseableHttpResponse response = httpClient.execute(post);
HttpEntity responseEntity = response.getEntity();

//解析响应码
if (response.getStatusLine().getStatusCode() == 200) {
//发送成功
log.info("httpRequest access success, StatusCode is:{}", response.getStatusLine().getStatusCode());
String responseEntityStr = EntityUtils.toString(responseEntity);
log.info("responseContent is :" + responseEntityStr);

return JSON.parseObject(responseEntityStr, R.class);
} else {
//发送失败
log.error("httpRequest access fail ,StatusCode is:{}", response.getStatusLine().getStatusCode());
return R.fail("status is " + response.getStatusLine().getStatusCode());
}

} catch (Exception e) {
log.error("error", e);
return R.fail(e.getMessage());
} finally {
post.releaseConnection();
}
}

SDK开发完成后,为了方便其他应用使用,通常会将SDK打成jar包上传到远程maven仓库,在应用系统中直接通过maven坐标导入SDK即可使用。

如下是将SDK上传到Nexus后的效果:

image-20210924221808372
image-20210924221808372

如何搭建私服,详见公众号:IT李哥交朋友

3.6.3 测试

第一步:引入SDK的maven坐标

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
12
13
<repositories>
<repository>
<id>ydlrepo</id>
<url>http://192.168.200.131:8081/repository/ydlrepo/</url>
</repository>
</repositories>


<dependency>
<groupId>com.ydl</groupId>
<artifactId>ydl-sms-sdk</artifactId>
<version>1.0.0</version>
</dependency>

第二步:编写配置文件

From: 元动力
1
2
3
4
5
6
7
# 服务端使用sdk配置信息
ydlclass:
sms:
auth: true
domain: http://localhost:8771
accessKeyId: dcbd824751cf448fa0b569f1f07b9b32
accessKeySecret: 2f2e4da7fffd4391a9aaabd594f37db3

第三步:编写单元测试

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.ydl.test;

import com.itheima.sms.SmsManageApplication;
import com.itheima.sms.sms.dto.SmsParamsDTO;
import com.itheima.sms.sms.service.SmsSendService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.HashMap;
import java.util.Map;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = SmsManageApplication.class)
public class SdkTest {
@Autowired
private SmsSendService smsSendService;

/**
* 通过SDK方式调用短信接收服务
*/
@Test
public void testSend(){
SmsParamsDTO dto = new SmsParamsDTO();
dto.setMobile("15718888888");
dto.setSignature("DXQM_000000001");
dto.setTemplate("DXMB_000000001");
Map<String, String> map = new HashMap<>();
map.put("code","79857");
dto.setParams(map);
dto.setSendTime("2021-12-18 10:00");
dto.setTimestamp(System.currentTimeMillis() +"");

return smsSendService.sendSms(dto);
}
}

第3章 短信发送服务

1. 短信发送服务介绍

短信发送服务的作用就是从消息缓冲区获取消息并和具体的短信通道(例如:阿里云短信、梦网短信、乐信短信等)对接来发送短信。

动力短信短信平台整体架构:

image-20210924192730206
image-20210924192730206
  • 发送短信:实时发送、定时发送
  • 通道降级:通道发送失败,选择下一通道发送短信
  • 通道选举:同一通道多次发送失败,降级通道
  • 服务注册:有且只有一台机器执行通道选举

2. Redis实现分布式锁

对于简单的单体项目,即运行时程序在同一个Java虚拟机中,使用Java的锁机制(synchronized或者ReentrantLock)可以解决多线程并发问题。

可以运行资料/redis-lock-demo来重现线程并发问题。

测试过程:

第一步:启动redis-lock-demo服务

第二步:设置redis中库存stock值为100

image-20210925110700603
image-20210925110700603

第三步:使用apache jmeter进行压力测试

image-20210925110733847
image-20210925110733847
image-20210925110744160
image-20210925110744160

注:Apache JMeter是Apache组织开发的基于Java的压力测试工具。用于对软件做压力测试,它最初被设计用于Web应用测试,但后来扩展到其他测试领域。

可以发现对于单实例的应用来说,使用Java锁机制就可以解决线程并发问题。

但是在分布式环境中,程序是集群方式部署,如下图:

image-20210925114905838
image-20210925114905838

可以通过启动两个服务实例来测试集群部署时线程并发问题,具体测试步骤如下:

第一步:分别启动两个redis-lock-demo服务实例,端口号分别为8001和8002

image-20210925113940599
image-20210925113940599

第二步:配置Nginx负载均衡,通过Nginx将压力分发到两个实例上

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
upstream upstream_name{
server 127.0.0.1:8001;
server 127.0.0.1:8002;
}

server {
listen 8080;
server_name localhost;

location / {
proxy_pass http://upstream_name;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
}

第三步:使用Apache jmeter进行压力测试

可以发现对于集群环境下的多个服务实例又产生了线程并发问题。

上面的集群部署方式依然会产生线程并发问题,因为synchronized、ReentrantLock只是jvm级别的加锁,没有办法控制其他jvm。也就是上面两个tomcat实例还是可以出现并发执行的情况。要解决分布式环境下的并发问题,则必须使用分布式锁。

分布式锁可以理解为:控制分布式系统有序的去对共享资源进行操作,通过互斥来保证数据的一致性。

分布式锁,是控制分布式系统之间同步访问共享资源的一种方式。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。

实现分布式锁的方式很多,例如:Redis、数据库、Zookeeper等。

本小节主要讲Redis实现分布式锁的方式。

2.1 SETNX

这种加锁的思路是,如果 key 不存在则为 key 设置 value,如果 key 已存在则 SETNX 命令不做任何操作

  • 客户端A请求服务器设置key的值,如果设置成功就表示加锁成功 mykey->myvalue
  • 客户端B也去请求服务器设置key的值,如果返回失败,那么就代表加锁失败 mykey->myvalue
  • 客户端A执行代码完成,删除锁
  • 客户端B在等待一段时间后再去请求设置key的值,设置成功 mykey->myvalue
  • 客户端B执行代码完成,删除锁

格式

From: 元动力
1
2
3
4
5
6
#尝试获取锁
127.0.0.1:6379> SETNX key value
#设置锁过期时间
127.0.0.1:6379> EXPIRE key seconds
#删除锁
127.0.0.1:6379> DEL key

为什么要设置key过期时间呢?

如果某个客户端获得锁后因为某些原因意外退出了,导致创建了锁但是没有来得及删除锁,那么这个锁将一直存在,后面所有的客户端都无法再获得锁,所以必须要设置过期时间。

2.2 SET

通过前面的expire命令来设置锁过期时间还存在一个问题,就是SETNX和EXPIRE两个命令不是原子性操作。在极端情况下可能会出现获取锁后还没来得及设置过期时间程序就挂掉了,这样就又出现了锁一直存在,后面所有的客户端都无法再获得锁的问题。

如何解决这个问题?答案是使用SET命令。

SET 命令从Redis 2.6.12 版本开始包含设置过期时间的功能,这样获取锁和设置过期时间就是一个原子操作了。

格式

From: 元动力
1
SET key value [EX seconds] [NX]

示例

From: 元动力
1
127.0.0.1:6379> SET mykey myvalue EX 5 NX
  • EX seconds :将键的过期时间设置为 seconds 秒
  • NX :只在key不存在时才对键进行设置操作

2.3 代码实现

基于redis实现分布式锁:

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Autowired
RedisLock redisLock;

//使用redis分布式锁,无阻塞
@GetMapping("/stock3")
public String stock3() {
String mylock = redisLock.tryLock("MYLOCK", 2000);
if(mylock!=null){
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
stock--;
//放回redis
stringRedisTemplate.opsForValue().set("stock", stock + "");
System.out.println("减库存成功,剩余库存是:" + stock);
} else {
System.out.println("库存不足了!");
}

redisLock.unlock("MYLOCK",mylock);
}
return "OK";
}

//使用redis分布式锁,有阻塞
@GetMapping("/stock4")
public String stock4() {
String mylock = redisLock.lock("MYLOCK", 2000,1000);
if(mylock!=null){
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
stock--;
//放回redis
stringRedisTemplate.opsForValue().set("stock", stock + "");
System.out.println("减库存成功,剩余库存是:" + stock);
} else {
System.out.println("库存不足了!");
}

redisLock.unlock("MYLOCK",mylock);
}
return "OK";
}

3. Spring Task定时任务

3.1 Spring Task介绍

在企业软件开发中经常会遇到定时任务处理的业务场景,例如银行系统每晚会定时执行对账业务,很多系统每晚都会定时执行数据备份等等。

定时任务的实现方案有很多,例如:Quartz、Spring Task、XXL job等。对于简单的应用场景可以使用Spring Task。Spring Task相关API在spring-context包中已经提供了,不需要额外导入其他jar包。

3.2 代码案例

在Spring Boot项目中使用Spring Task非常简单,只需要在Bean的方法上加入@Scheduled注解,并通过cron表达式指定触发的时间即可。

第一步:在启动类上加入@EnableScheduling注解

第二步:编写定时任务类

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.itheima.job;

import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class DemoJob {


@Scheduled(cron = "0/5 * * * * ?")
public void testJob() {
System.out.println("现在时间是" + new Date());
}

}

4. 短信发送服务

4.1 需求分析

功能需求:

  • 和具体的短信通道对接(例如:阿里云短信、梦网短信等),发送短信
  • 短信定时发送
  • 短信实时发送
  • 服务注册,保证短信发送服务高可用
  • 通道自动选举、降级

处理过程:

image-20210925174705923
image-20210925174705923

4.2 项目结构

image-20210925165514680
image-20210925165514680

4.3 核心代码

短信发送服务核心类:

  • ServerRegister:服务注册器,用于将短信发送服务注册到Redis中,定时服务上报,定时服务检查
  • ConfigServiceImpl:通道配置器,用于查询可用通道(阿里短信、华为短信等),通道选举、降级
  • AbstractSmsService:短信发送器抽象父类,子类需要和具体的短信通道对接来完成发送短信的工作
  • SmsConnectLoader:通道实例加载器,根据通道配置,初始化每个通道的Bean对象
  • SmsFactory:短信发送工厂,获取具体的通道实例Bean对象(例如AliyunSmsService)来发送短信, 如果发送出现异常,触发通道选举和通道降级策略
  • SendTimingSmsImpl:定时短信业务处理器,具体负责定时短信的发送
  • SendSmsJob:短信发送定时任务,用于定时短信的发送,调用SendTimingSmsImpl发送定时短信
  • GeneralSmsListener、HighSmsListener:短信接收器,Redis队列的消费者,监听队列中的消息,如果有消息则调用SmsFactory发送实时短信
  • HighServerReceiver:通道消息监听器,通过Redis的发布订阅模式监听通道相关消息,调用SmsConnectLoader初始化通道和更新通道
  • SubscriberConfig:订阅发布模式的容器配置,创建消息监听容器,并将HighServerReceiver加入容器中

4.4 功能实现

4.4.1 服务注册器

服务注册器对应的为ServerRegister类。

短信发送服务支持分布式集群部署,可以是多个实例,实例越多,发送短信的能力越强。但是对于通道选举、持久化通道等操作,只能有一个服务实例执行,其他服务实例通过redis的广播机制获得通道变化。

如果要实现这一功能,需要将所有短信发送服务实例注册到某个地方,当前实现是将所有服务实例注册到Redis中。并且为了能够监控每个服务实例运行状态,需要每个服务实例定时上报并且定时进行服务检查。

业务逻辑:

1、服务注册,项目启动时将当前服务实例id注册到redis

2、服务上报,每三分钟报告一次,并传入当前时间戳

3、服务检查,每十分钟检查一次服务列表,清空超过五分钟没有报告的服务

代码实现:

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package com.ydl.sms.factory;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;

/**
* 服务注册器,将短信发送服务注册到Redis中,定时服务上报,定时服务检查
*/
@Component
@Slf4j
@Order(value = 100)
public class ServerRegister implements CommandLineRunner {
//当前服务实例的唯一标识,可以使用UUID随机生成
public static String SERVER_ID = null;

@Autowired
private RedisTemplate redisTemplate;


/**
* 项目启动时自动执行此方法,将当前服务实例注册到redis
*
* @param args
*/
@Override
public void run(String... args) {
//TODO 服务注册器,项目启动时将当前服务id注册到Redis中,使用Redis的Hash结构,key为SERVER_ID_HASH,Hash结构的key为服务id,value为时间戳
// SERVER_ID_HASH key value
SERVER_ID = UUID.randomUUID().toString(); //当前实例key
log.info("server实例启动,生成的id:{}", SERVER_ID);
redisTemplate.opsForHash().put("SERVER_ID_HASH", SERVER_ID, System.currentTimeMillis());
}

/**
* 定时服务报告
* 报告服务信息证明服务存在 每三分钟报告一次,并传入当前时间戳
*/
@Scheduled(cron = "1 1/3 * * * ?")
public void serverReport() {
//TODO 服务注册器,每三分钟报告一次,并传入当前时间戳

log.info("服务定时上报。id:{}",SERVER_ID);
redisTemplate.opsForHash().put("SERVER_ID_HASH", SERVER_ID, System.currentTimeMillis());
}

/**
* 定时服务检查
* 每十分钟检查一次服务列表,清空超过五分钟没有报告的服务
*/
@Scheduled(cron = "30 1/10 * * * ?")
public void checkServer() {
//TODO 服务注册器,定时检查redis,每隔10分钟查看,超过5分钟还没上报自己信息的服务,清除掉
log.info("定时服务检查。id:{}",SERVER_ID);
Map map = redisTemplate.opsForHash().entries("SERVER_ID_HASH");
log.info("当前服务有:"+map);

long now = System.currentTimeMillis();
List removeKeys=new ArrayList(); //该要删除的key
//jdk8 lambda表达式
map.forEach((key,value)->{
long registrTime = Long.parseLong(value.toString());
if(now-registrTime>(5*60*1000)){
removeKeys.add(key);
}
});

log.info("该要删除的key有:{}",removeKeys);
removeKeys.forEach(key->{
redisTemplate.opsForHash().delete("SERVER_ID_HASH", key);
});

}
}

可以启动两个服务实例来测试服务注册、服务上报、服务检查是否能够正常执行。

4.4.2 通道实例加载器

通道实例加载器对应的为SmsConnectLoader类。

短信发送服务存在多个通道(例如阿里云短信、华为云短信等),这些通道是通过后台管理系统设置的,包括通道的名称、签名、模板、连接方式等信息。当短信发送服务启动时,或者后台管理系统设置通道时,将会初始化短信通道。

image-20210925180125503
image-20210925180125503

通道实例加载器的作用就是根据通道配置,初始化每个通道的Bean对象(例如AliyunSmsService、MengWangSmsService等)。

SmsConnectLoader类中只需要实现initConnect方法即可,其他方法已经提供好。

业务逻辑:

1、查询数据库获得通道列表

2、遍历通道列表,通过反射创建每个通道的Bean对象(例如AliyunSmsService、MengWangSmsService等)

3、将每个通道的Bean对象保存到CONNECT_LIST集合中

具体代码如下:

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/**
* 根据通道配置,初始化每个通道的bean对象
*/
@SneakyThrows
public void initConnect() {
//TODO 根据通道配置,初始化每个通道的bean对象
//1、查询数据库获得通道列表
List<ConfigEntity> configEntitiesList = configService.listForConnect();
List constructorList=new ArrayList();
//2、遍历通道列表,通过反射创建每个通道的Bean对象(例如AliyunSmsService、MengWangSmsService等)
configEntitiesList.forEach(configEntity->{
try {
SmsConfig config=new SmsConfig();
config.setId(configEntity.getId());
config.setDomain(configEntity.getDomain());
config.setName(configEntity.getName());
config.setPlatform(configEntity.getPlatform().trim());
config.setAccessKeyId(configEntity.getAccessKeyId().trim());
config.setAccessKeySecret(configEntity.getAccessKeySecret().trim());
if(StringUtils.isNotBlank(configEntity.getOther())){
LinkedHashMap linkedHashMap = JSON.parseObject(configEntity.getOther(), LinkedHashMap.class);
config.setOtherConfig(linkedHashMap);
}

//反射 创建Service
//全限定名 com.ydl.sms.sms.AliyunSmsService
String className="com.ydl.sms.sms."+config.getPlatform()+"SmsService";
System.out.println(className);

Class<?> aClass = Class.forName(className); //加载类
Constructor<?> constructor = aClass.getConstructor(SmsConfig.class);//拿到具体的构造方法
Object obj = constructor.newInstance(config); //创建对象

//从容器中获取签名和模板的service
SignatureService signatureService = SpringUtils.getBean(SignatureService.class);
TemplateService templateService = SpringUtils.getBean(TemplateService.class);
//找到这两个service在父类中的属性
Field signatureServiceField = aClass.getSuperclass().getDeclaredField("signatureService");
Field templateServiceField = aClass.getSuperclass().getDeclaredField("templateService");
//打开访问权限
signatureServiceField.setAccessible(true);
templateServiceField.setAccessible(true);
//设置属性值了
signatureServiceField.set(obj,signatureService);
templateServiceField.set(obj,templateService);

constructorList.add(obj);

log.info("初始化通道成功:{},{}",config.getName(),config.getPlatform());
}catch (Exception e){
log.warn("初始化通道异常:{}",e.getMessage());
}

});
//3、将每个通道的Bean对象保存到CONNECT_LIST集合中
if(!CONNECT_LIST.isEmpty()){
CONNECT_LIST.clear();
}
CONNECT_LIST.addAll(constructorList);

//解锁逻辑
if(StringUtils.isNotBlank(BUILD_NEW_CONNECT_TOKEN)){
redisLock.unlock("buildNewConnect", BUILD_NEW_CONNECT_TOKEN);
}

log.info("通道初始化完成了。{}",CONNECT_LIST);
}

4.4.3 定时短信业务处理器

定时短信业务处理器对应的是SendTimingSmsImpl类,具体负责定时短信的发送。

业务逻辑:

1、查询数据库获取本次需要发送的定时短信

2、调用短信工厂发送短信

3、更新短信发送状态为“已处理”

实现代码:

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.ydl.sms.job;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.ydl.sms.entity.TimingPushEntity;
import com.ydl.sms.factory.SmsFactory;
import com.ydl.sms.mapper.TimingPushMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.util.List;
/**
* 定时短信业务处理器
*/
@Component
@Slf4j
public class SendTimingSmsImpl implements SendTimingSms {

@Autowired
private TimingPushMapper timingPushMapper;

@Autowired
private SmsFactory smsFactory;

/**
* 发送定时短信
* @param timing
*/
@Override
@Async
public void execute(String timing) {//timing格式:yyyy-MM-dd HH:mm 2021-12-25 18:00
//TODO 查询数据库获取本次需要发送的定时短信,调用短信工厂发送短信
//1、查询数据库获取本次需要发送的定时短信
LambdaQueryWrapper<TimingPushEntity> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(TimingPushEntity::getStatus,0);
wrapper.eq(TimingPushEntity::getTiming,timing);
wrapper.orderByAsc(TimingPushEntity::getCreateTime);
List<TimingPushEntity> list = timingPushMapper.selectList(wrapper);

log.info("这一批次要发的短信条数是:{},{}",timing,list.size());

list.forEach(x->{
//2、调用短信工厂发送短信
String request = x.getRequest();
smsFactory.send(request);
//3、更新短信发送状态为“已处理”
x.setStatus(1);
x.setUpdateTime(LocalDateTime.now());
x.setUpdateUser("system");
timingPushMapper.updateById(x);
});

log.info("任务执行完毕"+timing);

}
}

4.4.4 短信发送定时任务

短信发送定时任务对应的是SendSmsJob类,用于定时短信的发送,调用SendTimingSmsImpl发送定时短信。

业务逻辑:

1、每分钟触发一次定时任务

2、为了防止短信重复发送,需要使用分布式锁

3、调用SendTimingSmsImpl发送定时短信

代码实现:

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.ydl.sms.job;

import com.ydl.sms.config.RedisLock;
import com.ydl.utils.DateUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Date;

/**
* 定时任务,用于发送定时短信
*/
@Component
@Slf4j
public class SendSmsJob {

@Autowired
private SendTimingSms sendTimingSms;

@Autowired
private RedisLock redisLock;

/**
* 每分钟检查一次是否有定时短信需要发送
* @throws InterruptedException
*/
//1、每分钟触发一次定时任务
@Scheduled(cron = "10 0/1 * * * ?") //每分钟的第10秒执行一次
public void sendTimingSms() throws InterruptedException {
//TODO 定时任务,每分钟检查一次是否有定时短信需要发送
//1、每分钟触发一次定时任务
//2、为了防止短信重复发送,需要使用分布式锁
String lock = redisLock.tryLock("SEND_TIMING_SMS", 30 * 1000);
if(StringUtils.isNotBlank(lock)){
//3、调用SendTimingSmsImpl发送定时短信
sendTimingSms.execute(DateUtils.format(new Date(),"yyyy-MM-dd HH:mm"));
}
}
}

4.4.5 短信监听器

应用系统调用短信接收服务,短信接收服务将短信消息放入消息队列,实现短信的接收和发送的解耦,这种方案可以最大化提高短信接收服务性能,不会由于外部短信通道影响应用系统的响应,从而起到削峰填谷的效果。

本系统使用的Redis集群作为消息队列,在传输短信时,采用的是生产者消费者模式。

短信接收器对应的类为GeneralSmsListener和HighSmsListener,角色是Redis队列的消费者,监听队列中的消息,如果有消息则调用SmsFactory发送实时短信。

业务逻辑:

1、HighSmsListener监听TOPIC_HIGH_SMS队列,如果有消息则调用短信发送工厂SmsFactory发送实时短信

2、GeneralSmsListener监听TOPIC_GENERAL_SMS队列,如果有消息则调用短信发送工厂SmsFactory发送实时短信

HighSmsListener具体实现:

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.ydl.sms.redismq;

import com.ydl.sms.factory.SmsFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.ListOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;

/**
* Redis队列-----消费者
* 监听消息队列:TOPIC_HIGH_SMS,高优先级的短信,如验证码之类的短信
*/
@Component
@Slf4j
public class HighSmsListener extends Thread {
@Autowired
private RedisTemplate redisTemplate;

@Autowired
private SmsFactory smsFactory;

private String queueKey = "TOPIC_HIGH_SMS";

@Value("${spring.redis.queue.pop.timeout}")
private Long popTimeout = 8000L;

private ListOperations listOps;

@PostConstruct
private void init() {
listOps = redisTemplate.opsForList();
this.start();
}

@Override
public void run() {
//TODO 监听TOPIC_HIGH_SMS队列,如果有消息则调用短信发送工厂发送实时短信
//监听 TOPIC_HIGH_SMS 发送
while (true){
log.debug("队列{}正在监听中",queueKey);
//SmsSendDTO -->string
String message = (String) listOps.rightPop(queueKey, popTimeout, TimeUnit.MILLISECONDS);
if(StringUtils.isNotBlank(message)){
log.info("{}收到消息了:{}",queueKey,message);
//发送
smsFactory.send(message);
}
}
}
}

GeneralSmsListener具体实现:

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.ydl.sms.redismq;

import com.ydl.sms.factory.SmsFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.ListOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;

/**
* Redis队列消费者,监听消息队列TOPIC_GENERAL_SMS,普通优先级的短信,如营销短信
*/
@Component
@Slf4j
public class GeneralSmsListener extends Thread {
@Autowired
private RedisTemplate redisTemplate;

@Autowired
private SmsFactory smsFactory;

private String queueKey = "TOPIC_GENERAL_SMS";

@Value("${spring.redis.queue.pop.timeout}")
private Long popTimeout = 8000L;

private ListOperations listOps;

@PostConstruct
private void init() {
listOps = redisTemplate.opsForList();
this.start();
}

@Override
public void run() {
//TODO 监听TOPIC_GENERAL_SMS队列,如果有消息则调用短信发送工厂发送实时短信
//监听 TOPIC_GENERAL_SMS 发送
while (true){
log.debug("队列{}正在监听中",queueKey);
//SmsSendDTO -->string
String message = (String) listOps.rightPop(queueKey, popTimeout, TimeUnit.MILLISECONDS);
if(StringUtils.isNotBlank(message)){
log.info("{}收到消息了:{}",queueKey,message);
//发送
smsFactory.send(message);
}
}
}
}

4.4.6 通道消息监听器

通道消息监听器对应的类是HighServerReceiver,作用是通过Redis的发布订阅模式监听TOPIC_HIGH_SERVER频道中的消息,调用SmsConnectLoader初始化通道和更新通道。

业务逻辑:

1、通过Redis发布订阅模式监听TOPIC_HIGH_SERVER频道中的消息

2、如果消息为USE_NEW_CONNECT,表示通道更新,调用通道实例加载器SmsConnectLoader更新通道

3、如果消息为INIT_CONNECT,表示通道初始化,调用通道实例加载器SmsConnectLoader初始化通道

实现代码:

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.ydl.sms.redismq;

import com.ydl.sms.factory.SmsConnectLoader;
import com.ydl.sms.model.ServerTopic;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;

/**
* Redis发布订阅----订阅者,通过Redis的发布订阅模式监听TOPIC_HIGH_SERVER频道
*/
@Component
@Slf4j
public class HighServerReceiver implements MessageListener {

@Autowired
private RedisTemplate redisTemplate;

@Autowired
private SmsConnectLoader smsConnectLoader;

/**
* 消息监听
* @param message
* @param pattern
*/
@Override
public void onMessage(Message message, byte[] pattern) {
//TODO 消息监听,根据消息内容调用smsConnectLoader进行通道初始化或者通道更新

RedisSerializer<?> valueSerializer = redisTemplate.getDefaultSerializer();
String deserialize = valueSerializer.deserialize(message.getBody()).toString();

ServerTopic serverTopic = ServerTopic.load(deserialize);

switch (serverTopic.getOption()) {
case ServerTopic.USE_NEW_CONNECT:
log.info("服务:{} 发起【通道更新】", serverTopic.getValue());
smsConnectLoader.changeNewConnect();
break;
case ServerTopic.INIT_CONNECT:
log.info("服务:{} 发起【通道初始化】", serverTopic.getValue());
smsConnectLoader.initConnect();
break;
default:
break;
}

}
}

5对接阿里短信进行真实发送!

1申请阿里短信的签名 和 模板

image-20210926040554940
image-20210926040554940

2获取accessKey和secret

image-20210926040656499
image-20210926040656499

3参考文档经行代码编写

From: 元动力
1
2
3
4
5
6
7
8
9
10
11
private void init() {
//初始化acsClient,暂不支持region化 "cn-hangzhou"
//profile = DefaultProfile.getProfile(config.get("RegionId"), config.getAccessKeyId(), config.getAccessKeySecret());
aliconfig = new Config()
// 您的AccessKey ID
.setAccessKeyId(config.getAccessKeyId())
// 您的AccessKey Secret
.setAccessKeySecret( config.getAccessKeySecret());
// 访问的域名
aliconfig.endpoint = "dysmsapi.aliyuncs.com";
}
From: 元动力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
protected String sendSms(String mobile, Map<String, String> params, String signature, String template) {
// 获取 签名内容 和模板id
SignatureEntity signatureEntity = signatureService.getByCode(signature);
String code = templateService.getConfigCodeByCode(config.getId(), template);
try {
Client client = new Client(aliconfig);

SendSmsRequest request=new SendSmsRequest();
request.setPhoneNumbers(mobile);
request.setTemplateCode(code);
request.setTemplateParam(JSON.toJSONString(params));
request.setSignName(signatureEntity.getContent());

SendSmsResponse response = client.sendSms(request);
JSONObject jsonObject = JSON.parseObject(String.valueOf(response.getBody()));
if (response.getBody().getCode().equals("OK")) {
return response.getBody().toString();
} else {
return failResponse(jsonObject.getString("Message"),response.getBody().getMessage());
}
} catch (Exception e) {
log.error("Aliyun 短信发送失败:{} ,{}", mobile, template, e);
return failResponse(e.getMessage(), e.getMessage());
}
//"{\"Message\":\"OK\",\"RequestId\":\"" + UUID.randomUUID().toString().toUpperCase() + "-@\",\"BizId\":\"" + System.currentTimeMillis() + "\",\"Code\":\"OK\"}";
}

4平台配置相应正确参数

image-20210926040844298
image-20210926040844298
image-20210926040853475
image-20210926040853475
image-20210926040905138
image-20210926040905138

image-20210926040915170image-20210926040924104

image-20210926040930674
image-20210926040930674

5启动三个平台,和shop服务,进行发送

image-20210926041010750
image-20210926041010750

6结果

image-20210926041050855
image-20210926041050855

本站由 钟意 使用 Stellar 1.28.1 主题创建。
又拍云 提供CDN加速/云存储服务
vercel 提供托管服务
湘ICP备2023019799号-1
总访问 次 | 本页访问