生产和消费消息重试及处理

2021-07-31 消息队列RocketMQ中间件SpringBoot

由于MQ经常处于复杂的分布式系统中,考虑⽹络波动,服务宕机,程序异常因素,很有可能出现消息发送或者消费失败的问题。因此,消息的重试就是所有MQ中间件必须考虑到的⼀个关键点。如果没有消息重试,就可能产⽣消息丢失的问题,可能对系统产⽣很⼤的影响。所以,秉承宁可多发消息,也不可丢失消息的原则。 MQ消费者的消费逻辑失败时,可以通过设置返回状态达到消息重试的结果。

生产者Producer重试(异步和SendOneWay下配置无效)

  • 消息重投(保证数据的高可靠性),本身内部支持重试,默认次数是2,
  • 如果网络情况比较差,或者跨集群则建改多几次
// com.example.rocketmq.jms.PayProducer
@Component
public class PayProducer {

    private String producerGroup = "pay_group";

    private DefaultMQProducer producer;

    public PayProducer() {
        producer = new DefaultMQProducer(producerGroup);
        // 生产者投递消息重试次数
        producer.setRetryTimesWhenSendFailed(3);

        producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        start();

    }
}

com.example.rocketmq.controller.PayController

@RestController
@RequestMapping("/api/pay")
public class PayController {

    @Autowired
    private PayProducer payProducer;

    @RequestMapping("/pay_cb")
    public Object callback(String text) throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
        // key是唯一的,一般是订单号等,这里仅做测试,生产者根据key进行消息重投,默认次数为2
        Message message = new Message(JmsConfig.TOPIC, "tag1","1234", ("Hello rocketmq = " + text).getBytes());
        // 发送消息
        SendResult sendResult = payProducer.getProducer().send(message, 10000);
        System.out.println(sendResult);
        return new HashMap<>();
    }

}

消费端重试原因:消息处理异常、broker端到consumer端各种问题,如网络原因闪断,消费处理失败,ACK返回失败等问题。

注意: 重试间隔时间配置 ,默认每条消息最多重试 16 次 messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

  • 超过重试次数人工补偿
  • 消费端去重
  • 一条消息无论重试多少次,这些重试消息的 Message ID,key 不会改变。
  • 消费重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息,

com.example.rocketmq.jms.PayConsumer

上次更新: 5 个月前