使用消息机制补偿处理实例

分布式环境用,可以使用补偿机制实现解决一些问题,如通过下面发送邮件例子:

创建消息生产者

创建工程,具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.hu</groupId>
<artifactId>descout-buchang-producer</artifactId>
<version>1.0-SNAPSHOT</version>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.32</version>
</dependency>
</dependencies>
</project>

配置:

1
2
3
4
5
6
spring:
rabbitmq:
host: 192.168.6.238
port: 5672
username: guest
password: guest
1
2
3
4
5
6
@SpringBootApplication
public class ProcucerApplication {
public static void main(String[] args){
SpringApplication.run(ProcucerApplication.class,args);
}
}

rabbitmq配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Component
public class FanoutConfig {

// 邮件队列
private String FANOUT_EMAIL_QUEUE = "email_queue";

// 短信队列
private String FANOUT_SMS_QUEUE = "sms_queue";

// fanout 交换机
private String EXCHANGE_NAME = "fanoutExchange";

// 1.定义邮件队列
@Bean
public Queue fanOutEamilQueue() {
return new Queue(FANOUT_EMAIL_QUEUE);
}

// 2.定义短信队列
@Bean
public Queue fanOutSmsQueue() {
return new Queue(FANOUT_SMS_QUEUE);
}

// 2.定义交换机
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_NAME);
}

// 3.队列与交换机绑定邮件队列
@Bean
Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange);
}

// 4.队列与交换机绑定短信队列
@Bean
Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange);
}
}

发消息到消息队列,说明发邮件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Service
public class FanoutProducer {
@Autowired
private AmqpTemplate amqpTemplate;

public void send(String msg) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("email", "404914989@qq.com");
jsonObject.put("msg",msg);
jsonObject.put("Time", System.currentTimeMillis());
String jsonString = jsonObject.toJSONString();
System.out.println("jsonString:" + jsonString);
amqpTemplate.convertAndSend("email_queue", jsonString);
}
}

控制器

1
2
3
4
5
6
7
8
9
10
11
@RestController
public class ProducerController {
@Autowired
private FanoutProducer fanoutProducer;

@RequestMapping("/sendFanout")
public String sendFanout(String msg) {
fanoutProducer.send(msg);
return "success";
}
}
消费消息

消费消息,远程调邮件服务发邮件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.hu</groupId>
<artifactId>descout-buchang-consumer</artifactId>
<version>1.0-SNAPSHOT</version>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.49</version>
</dependency>

</dependencies>
</project>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
spring:
rabbitmq:
host: 192.168.6.238
port: 5672
username: guest
password: guest
listener:
simple:
retry:
####开启消费者异常重试
enabled: true
####最大重试次数
max-attempts: 5
####重试间隔次数
initial-interval: 2000
server:
port: 8081
1
2
3
4
5
6
7
8
9
10
11
@SpringBootApplication
public class Application {
public static void main(String[] args){
SpringApplication.run(Application.class,args);
}

@Bean
public RestTemplate restTemplate(){
return new RestTemplate();
}
}

获取发邮件消息,之后调用邮件服务器发邮件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Component
public class FanoutEamilConsumer {
@Autowired
RestTemplate restTemplate;
@RabbitListener(queues = "email_queue")
public void process(String msg) throws Exception {
System.out.println("邮件消费者获取生产者消息msg:" + msg);
JSONObject jsonObject = JSONObject.parseObject(msg);
String email = jsonObject.getString("email");
String emailUrl = "http://192.168.6.238:8083/sendEmail?email=" + email;
JSONObject result = restTemplate.getForObject(emailUrl,JSONObject.class);
if (result == null) {
throw new Exception("调用接口失败!");
}
System.out.println("执行结束....");

}
}
邮件服务

下面模拟邮件服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.hu</groupId>
<artifactId>descout-buchang-emailser</artifactId>
<version>1.0-SNAPSHOT</version>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.49</version>
</dependency>
</dependencies>
</project>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RestController
public class MsgController {

// 模拟发送邮件
@RequestMapping("/sendEmail")
public Map<String, Object> sendEmail(String email) {
System.out.println("开始发送邮件:" + email);
Map<String, Object> result = new HashMap<String, Object>();
result.put("code", "200");
result.put("msg", "发送邮件成功..");
System.out.println("发送邮件成功");
return result;
}
}

启动邮件服务和生产者服务,浏览器中输入:http://192.168.6.238:8080/sendFanout?msg=0001
可以看到消息队列中有消息:

20200423155603

启动消费者服务:

20200423155620
服务消息被消费,并且消费者服务日志已经获取消息,并且调用邮件服务来发送邮件,而且邮件服务也有了日志信息。

1
2
 INFO 14684 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#3b152928:0/SimpleConnection@61b9a318 [delegate=amqp://guest@192.168.6.238:5672/, localPort= 59727]
邮件消费者获取生产者消息msg:{"msg":"0001","Time":1564124703209,"email":"404914989@qq.com"}
1
2
开始发送邮件:404914989@qq.com
发送邮件成功

20200423155636

假如邮件服务器未启动,消费调用接口失败 会一直重试 重试五次,再次之间,如果邮件服务器启动成功 则重试成功 不再重试 不再进行补偿机制。
停止邮件服务器,http://192.168.6.238:8080/sendFanout?msg=456541再发送一次消息,可以看到消费者日志:

1
2
3
4
5
6
邮件消费者获取生产者消息msg:{"msg":"456541","Time":1564126234535,"email":"404914989@qq.com"}
邮件消费者获取生产者消息msg:{"msg":"456541","Time":1564126234535,"email":"404914989@qq.com"}
邮件消费者获取生产者消息msg:{"msg":"456541","Time":1564126234535,"email":"404914989@qq.com"}
邮件消费者获取生产者消息msg:{"msg":"456541","Time":1564126234535,"email":"404914989@qq.com"}
邮件消费者获取生产者消息msg:{"msg":"456541","Time":1564126234535,"email":"404914989@qq.com"}
WARN 14684 --- [cTaskExecutor-1] o.s.a.r.r.RejectAndDontRequeueRecoverer : Retries exhausted for message (Body:'{"msg":"456541","Time":1564126234535,"email":"404914989@qq.com"}' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=email_queue, deliveryTag=4, consumerTag=amq.ctag-y3aGTKHoCsx2ydYRm4fflA, consumerQueue=email_queue])

服务器一直没有启动,重复消费了5次,之后不再补偿,并且有了异常信息。

网络延迟传输中,或者消费出现异常或者是消费延迟,会造成进行MQ重试进行重试补偿机制,在重试过程中,可能会造成重复消费。
如果不需要被重复消费: 使用全局MessageID判断消费,解决幂等性。

解决方式:
生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Service
public class FanoutProducer {
@Autowired
private AmqpTemplate amqpTemplate;

public void send(String msg) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("email", "404914989@qq.com");
jsonObject.put("msg",msg);
jsonObject.put("Time", System.currentTimeMillis());
String jsonString = jsonObject.toJSONString();
System.out.println("jsonString:" + jsonString);
// 设置消息唯一id 保证每次重试消息id唯一
Message message = MessageBuilder.withBody(jsonString.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8")
.setMessageId(UUID.randomUUID() + "").build();
amqpTemplate.convertAndSend("email_queue", jsonString);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void process(Message message) throws Exception {
// 获取消息Id
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "UTF-8");
System.out.println("messageId:" + messageId + ",消息内容:" + msg);
if (messageId == null) {
return;
}
JSONObject jsonObject = JSONObject.parseObject(msg);
String email = jsonObject.getString("email");
String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email;
JSONObject result = restTemplate.getForObject(emailUrl,JSONObject.class);
if (result == null) {
throw new Exception("调用接口失败!");
}
System.out.println("执行结束....");
}

Spring boot 中还可以进行 AOP拦截 自动帮助做重试
如果不告诉服务器已经消费成功,则服务器不会删除 消息。告诉消费成功了才会删除。
消费者的yml加入:acknowledge-mode: manual

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
spring:
rabbitmq:
host: 192.168.6.238
port: 5672
username: guest
password: guest
listener:
simple:
retry:
####开启消费者异常重试
enabled: true
####最大重试次数
max-attempts: 5
####重试间隔次数
initial-interval: 2000
#开启手动ack
acknowledge-mode: manual
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
// 获取消息Id
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "UTF-8");
System.out.println("messageId:" + messageId + ",消息内容:" + msg);
if (messageId == null) {
return;
}
JSONObject jsonObject = JSONObject.parseObject(msg);
String email = jsonObject.getString("email");
String emailUrl = "http://192.168.6.238:8083/sendEmail?email=" + email;
JSONObject result = restTemplate.getForObject(emailUrl,JSONObject.class);
if (result == null) {
throw new Exception("调用接口失败!");
}

Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
//RabbitMQ消费成功了 消息可以删除
channel.basicAck(deliveryTag, false);
}

这样,消息只有正常消费后才会删除。由程序控制。

回调

生产者中加入配置:

1
2
3
4
5
6
7
8
9
spring:
rabbitmq:
host: 192.168.6.238
port: 5672
username: guest
password: guest
#开启回调
publisher-confirms: true
publisher-returns: true

类修改,即可有回调信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Service
public class FanoutProducer implements RabbitTemplate.ConfirmCallback{
@Autowired
private RabbitTemplate amqpTemplate;
@Autowired
public FanoutProducer(RabbitTemplate rabbitTemplate) {
this.amqpTemplate = rabbitTemplate;
rabbitTemplate.setConfirmCallback(this);
}
public void send(String msg) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("email", "404914989@qq.com");
jsonObject.put("msg",msg);
jsonObject.put("Time", System.currentTimeMillis());
String jsonString = jsonObject.toJSONString();
System.out.println("jsonString:" + jsonString);
// 设置消息唯一id 保证每次重试消息id唯一
String uuid = UUID.randomUUID().toString();
Message message = MessageBuilder.withBody(jsonString.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8")
.setMessageId(uuid).build();
CorrelationData correlationData = new CorrelationData(uuid);
amqpTemplate.convertAndSend("email_queue", message,correlationData);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
if(correlationData!=null){
System.out.println("confirm: " + correlationData.getId() + ",s=" + s + ",b:" + b);
}
System.out.println("回调");
}
/*
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String orderId = correlationData.getId();
System.out.println("消息id:" + correlationData.getId());
if (ack) { //消息发送成功
System.out.println("消息发送确认成功");
} else {
//重试机制
send(orderId);
System.out.println("消息发送确认失败:" + cause);
}
}*/
}

或者在send方法中加入:

1
2
this.rabbitTemplate.setMandatory(true);
this.rabbitTemplate.setConfirmCallback(this);

demo见:https://github.com/huingsn/tech-point-record