RabbitMQ常见数据问题

1、生产者丢失数据

RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。
transaction:开启事物(channel.txSelect()),回滚(channel.txRollback()),提交事物(channel.txCommit())。
这种方式缺点:吞吐量下降。

confirm模式:channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID,当消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者,就能知道消息正确发送到队列。
如果rabiitMQ没能处理该消息,则会发送一个Nack消息说明给你,你可以进行重试操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void send(String orderId) {
JSONObject jsonObect = new JSONObject();
jsonObect.put("orderId", orderId);
String msg = jsonObect.toJSONString();
System.out.println("msg:" + msg);
// 封装消息
Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8").setMessageId(orderId).build();
// 构建回调返回的数据
CorrelationData correlationData = new CorrelationData(orderId);
// 发送消息 回调
this.rabbitTemplate.setMandatory(true);
this.rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.convertAndSend("order_exchange_name", "orderRoutingKey", message, correlationData);
}
// 生产消息确认机制 生产者往服务器端发送消息的时候 采用应答机制
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String orderId = correlationData.getId(); //全局ID
System.out.println("消息id:" + correlationData.getId());
System.out.println(cause);
if (ack) { //消息发送成功
System.out.println("消息发送确认成功");
} else {
//重试机制
send(orderId);
System.out.println("消息发送确认失败:" + cause);
}
}

配置文件中加上:

1
2
3
#回调
publisher-confirms: true
publisher-returns: true
3、消费者丢失数据

启用手动确认模式可以解决

  • 自动确认模式,消费者挂掉,待ack的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直到处理成功。不会丢失消息,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让消息不断重试。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    spring:
    rabbitmq:
    host: 192.168.6.238
    port: 5672
    username: guest
    password: guest
    listener:
    simple:
    retry:
    #开启消费者(程序出现异常的情况下会)进行重试
    enabled: true
    #最大重试次数
    max-attempts: 5
    #重试间隔次数
    initial-interval: 3000
    #重试次数过了是否丢失消息,false不丢弃时需要写相应代码将该消息加入死信队列
    #生产者 --> 消息 --> 交换机 --> 队列 --> 变成死信 --> DLX交换机 -->队列 --> 消费者
    default-requeue-rejected: false
  • 手动确认模式
    指定Acknowledge的模式:

    1
    2
    3
    4
    listener:
    direct:
    #开启手动ack
    acknowledge-mode: manual

    如果消费者设置了手动应答模式,并且设置了重试,出现异常时无论是否捕获了异常,都是不会重试的
    如果消费者没有设置手动应答模式,并且设置了重试,那么在出现异常时没有捕获异常会进行重试,如果捕获了异常不会重试。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    try {
    int insertDistribute = dispatchMapper.insertDistribute(dispatchEntity);
    if (insertDistribute > 0) {
    // 手动签收消息,通知mq服务器端删除该消息,是否批量.true:将一次性ack所有小于deliveryTag的消息。
    //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG), false);
    //默认情况下,当消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍是在队列头部,又继续异常,回滚,反复。所以加到对队尾
    //重新发送消息到队尾
    channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
    message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
    JSON.toJSONBytes(new Object()));
    }
    } catch (Exception e) {
    e.printStackTrace();
    //丢弃该消息
    channel.basicNack((Long) headers.get(AmqpHeaders.DELIVERY_TAG), false, false);
    //ack返回false,并重新回到队列
    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    }
  • 不确认模式,acknowledge=”none” 不使用确认机制,只要消息发送完成会立即在队列移除,无论客户端异常还是断开,只要发送完就移除,不会重发。

2、消息队列丢数据

如果需要持久化,持久化配置可以和confirm配合使用,在消息持久化磁盘后,再给生产者发送一个Ack信号。如果消息持久化磁盘之前,rabbitMQ挂了,生产者收不到Ack信号,会自动重发。

1
2
3
4
5
6
7
8
9
10
11
12
//声明队列
@Bean
public Queue directCreateOrdersQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 30000);//30秒自动删除
/* 第一个参数:队列名称
第二个参数:是否持久化 durable=true。
第三个参数:排他队列exclusive,如果一个队列被声明为排他队列,连接的时候可见,断开就删除了,即使是持久化队列也一样。
第四个参数:自动删除,如果该队列没有任何订阅的消费者的话,临时队列。*/
Queue queue = new Queue(ORDER_CREATE_QUEUE, true, false, true, arguments);
return queue;
}

发消息的时候

1
2
3
4
5
6
// 封装消息
Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).
setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE).//持久化设置
setExpiration("2019-12-10 11:22:33").//设置到期时间
setContentEncoding("utf-8").setMessageId(orderId).build();
this.rabbitTemplate.sendAndReceive("exchange","topic.message",message);

发消息也声明持久消息,这样rabbitMQ消息还没有持久化到硬盘时就算挂了,重启后也能通过引入mirrored-queue即镜像队列恢复数据,但不能保证消息百分百不丢失(整个集群都挂掉)