分布式环境用,可以使用补偿机制实现解决一些问题,如通过下面发送邮件例子:
创建消息生产者
创建工程,具体如下:
1 |
|
配置:
1 | spring: |
1 |
|
rabbitmq配置
1 |
|
发消息到消息队列,说明发邮件
1 |
|
控制器
1 |
|
消费消息
消费消息,远程调邮件服务发邮件。
1 |
|
1 | spring: |
1 |
|
获取发邮件消息,之后调用邮件服务器发邮件。
1 |
|
邮件服务
下面模拟邮件服务
1 |
|
1 | @RestController |
启动邮件服务和生产者服务,浏览器中输入:http://192.168.6.238:8080/sendFanout?msg=0001
可以看到消息队列中有消息:
启动消费者服务:
服务消息被消费,并且消费者服务日志已经获取消息,并且调用邮件服务来发送邮件,而且邮件服务也有了日志信息。
1 | INFO 14684 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#3b152928:0/SimpleConnection@61b9a318 [delegate=amqp://guest@192.168.6.238:5672/, localPort= 59727] |
1 | 开始发送邮件:404914989@qq.com |
假如邮件服务器未启动,消费调用接口失败 会一直重试 重试五次,再次之间,如果邮件服务器启动成功 则重试成功 不再重试 不再进行补偿机制。
停止邮件服务器,http://192.168.6.238:8080/sendFanout?msg=456541再发送一次消息,可以看到消费者日志:
1
2
3
4
5
6邮件消费者获取生产者消息msg:{"msg":"456541","Time":1564126234535,"email":"404914989@qq.com"}
邮件消费者获取生产者消息msg:{"msg":"456541","Time":1564126234535,"email":"404914989@qq.com"}
邮件消费者获取生产者消息msg:{"msg":"456541","Time":1564126234535,"email":"404914989@qq.com"}
邮件消费者获取生产者消息msg:{"msg":"456541","Time":1564126234535,"email":"404914989@qq.com"}
邮件消费者获取生产者消息msg:{"msg":"456541","Time":1564126234535,"email":"404914989@qq.com"}
WARN 14684 --- [cTaskExecutor-1] o.s.a.r.r.RejectAndDontRequeueRecoverer : Retries exhausted for message (Body:'{"msg":"456541","Time":1564126234535,"email":"404914989@qq.com"}' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=email_queue, deliveryTag=4, consumerTag=amq.ctag-y3aGTKHoCsx2ydYRm4fflA, consumerQueue=email_queue])
服务器一直没有启动,重复消费了5次,之后不再补偿,并且有了异常信息。
网络延迟传输中,或者消费出现异常或者是消费延迟,会造成进行MQ重试进行重试补偿机制,在重试过程中,可能会造成重复消费。
如果不需要被重复消费: 使用全局MessageID判断消费,解决幂等性。
解决方式:
生产者:
1 |
|
1 | public void process(Message message) throws Exception { |
Spring boot 中还可以进行 AOP拦截 自动帮助做重试
如果不告诉服务器已经消费成功,则服务器不会删除 消息。告诉消费成功了才会删除。
消费者的yml加入:acknowledge-mode: manual
1 | spring: |
1 | public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { |
这样,消息只有正常消费后才会删除。由程序控制。
回调
生产者中加入配置:
1 | spring: |
类修改,即可有回调信息:
1 |
|
或者在send方法中加入:
1 | this.rabbitTemplate.setMandatory(true); |