1、生产者丢失数据
RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。
transaction:开启事物(channel.txSelect()),回滚(channel.txRollback()),提交事物(channel.txCommit())。
这种方式缺点:吞吐量下降。
confirm模式:channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID,当消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者,就能知道消息正确发送到队列。
如果rabiitMQ没能处理该消息,则会发送一个Nack消息说明给你,你可以进行重试操作。
1 | private void send(String orderId) { |
配置文件中加上:
1 | #回调 |
3、消费者丢失数据
启用手动确认模式可以解决
自动确认模式,消费者挂掉,待ack的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直到处理成功。不会丢失消息,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让消息不断重试。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18spring:
rabbitmq:
host: 192.168.6.238
port: 5672
username: guest
password: guest
listener:
simple:
retry:
#开启消费者(程序出现异常的情况下会)进行重试
enabled: true
#最大重试次数
max-attempts: 5
#重试间隔次数
initial-interval: 3000
#重试次数过了是否丢失消息,false不丢弃时需要写相应代码将该消息加入死信队列
#生产者 --> 消息 --> 交换机 --> 队列 --> 变成死信 --> DLX交换机 -->队列 --> 消费者
default-requeue-rejected: false手动确认模式
指定Acknowledge的模式:1
2
3
4listener:
direct:
#开启手动ack
acknowledge-mode: manual如果消费者设置了手动应答模式,并且设置了重试,出现异常时无论是否捕获了异常,都是不会重试的
如果消费者没有设置手动应答模式,并且设置了重试,那么在出现异常时没有捕获异常会进行重试,如果捕获了异常不会重试。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19try {
int insertDistribute = dispatchMapper.insertDistribute(dispatchEntity);
if (insertDistribute > 0) {
// 手动签收消息,通知mq服务器端删除该消息,是否批量.true:将一次性ack所有小于deliveryTag的消息。
//channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG), false);
//默认情况下,当消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍是在队列头部,又继续异常,回滚,反复。所以加到对队尾
//重新发送消息到队尾
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
JSON.toJSONBytes(new Object()));
}
} catch (Exception e) {
e.printStackTrace();
//丢弃该消息
channel.basicNack((Long) headers.get(AmqpHeaders.DELIVERY_TAG), false, false);
//ack返回false,并重新回到队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}不确认模式,acknowledge=”none” 不使用确认机制,只要消息发送完成会立即在队列移除,无论客户端异常还是断开,只要发送完就移除,不会重发。
2、消息队列丢数据
如果需要持久化,持久化配置可以和confirm配合使用,在消息持久化磁盘后,再给生产者发送一个Ack信号。如果消息持久化磁盘之前,rabbitMQ挂了,生产者收不到Ack信号,会自动重发。
1 | //声明队列 |
发消息的时候
1 | // 封装消息 |
发消息也声明持久消息,这样rabbitMQ消息还没有持久化到硬盘时就算挂了,重启后也能通过引入mirrored-queue即镜像队列恢复数据,但不能保证消息百分百不丢失(整个集群都挂掉)