1.背景

在上一章:《RocketMq通用生产和消费方法改造》里我们已经极大的简化了消息的发送和消费机制,但是对于我们逻辑上的消费失败以及异常情况下的消息重试我们需要自定义重试逻辑,在学习本章前,请保证自己已经完成了上一章的内容,本文便不在赘述。

2.改造CommonConsumer

在上一章中我们只简单的实现了通用消费的逻辑:

@Slf4j
@Component
public abstract class CommonConsumer implements RocketMQListener<MessageDTO> {
    public void onMessage(MessageDTO message) {
        log.info("收到延迟消息成功,消息体:{}", message);
        doConsumerProcess(message);
    }

    public abstract MsgRetryStatus doConsumerProcess(MessageDTO messageDTO);
}

但是对于异常以及MsgRetryStatus的返回的状态我们也未处理
在CommonConsumer中新增doRetrytConsumerProcess方法,该方法用于处理异常以及MsgRetryStatus对应的重试状态

@Slf4j
@Service
public abstract class CommonConsumer implements RocketMQListener<MessageDTO> {
    @Resource
    MessageProduct messageProduct;

    public void onMessage(MessageDTO message) {
        try {
            // 处理消息的逻辑
            log.info("收到延迟消息成功,消息体:{}", message);
            MsgRetryStatus msgRetryStatus = doConsumerProcess(message);
            if (MsgRetryStatus.RETRY.equals(msgRetryStatus)
                    || MsgRetryStatus.FAILURE.equals(msgRetryStatus)) {
                this.doRetryConsumerProcess(message);
            }
        } catch (Exception e) {
            // 记录错误日志
            log.error("消费异常,messageInfo:{}", JSON.toJSONString(message));
            // 可以选择将失败消息发送到指定Topic
            this.doRetryConsumerProcess(message);
        }
    }

    @Idempotent
    public abstract MsgRetryStatus doConsumerProcess(MessageDTO messageDTO);

    /**
     * 消息重试
     *
     * @param messageDTO
     */
    private void doRetryConsumerProcess(MessageDTO messageDTO) {
        messageProduct.SendMessage(messageDTO, MQConsumer.BOOT_RETRY_MQ_CONSUMER_TOPIC, ThreadLocalRandom.current().nextInt(0, 3));
    }
}

新增一个常量配置,用于维护topic列表

public interface MQConsumer {
    /**
     * 重试消息
     */
    String BOOT_RETRY_MQ_CONSUMER_TOPIC = "boot-mq-retry-topic";
}

新增重试消费的实现类

@Service
@RocketMQMessageListener(topic = MQConsumer.BOOT_RETRY_MQ_CONSUMER_TOPIC, consumerGroup = "boot_retry_group_1")
@Slf4j
public class BootRetryMqConsumer extends CommonConsumer {
    @Resource
    MessageProduct messageProduct;

    @Override
    public MsgRetryStatus doConsumerProcess(MessageDTO messageDTO) {
        //发送消息进行消费
        if (messageDTO == null || StringUtils.isBlank(messageDTO.getTopic())) {
            return MsgRetryStatus.SUCCEED;
        }
        log.info("执行消费逻辑,topic:{}", MQConsumer.BOOT_RETRY_MQ_CONSUMER_TOPIC);
        messageProduct.SendMessage(messageDTO, messageDTO.getTopic(), null);
        return MsgRetryStatus.SUCCEED;
    }
}

值得注意的是:BootRetryMqConsumer和我们之前实现的BootMqConsumer不能是同一个消费者组

因为RocketMQ 中的消费者组是区分消费者的唯一标识。
如果两个消费者使用相同的消费者组,那么它们可能会竞争消费消息,或者其中一个消费者可能不会收到消息

为了验证重试机制是否可以正常生效,我们改一下BootMqConsumer的消费结果
PS:BootMqConsumer

@Service
@RocketMQMessageListener(topic = "boot-mq-topic", consumerGroup = "boot_group_1")
@Slf4j
public class BootMqConsumer extends CommonConsumer {
    @Resource
    MessageService messageService;

    @Override
    public MsgRetryStatus doConsumerProcess(MessageDTO messageDTO) {
        log.info("执行消费逻辑,topic:{}", messageDTO.getTopic());
        return MsgRetryStatus.RETRY;
    }
}

测试:

    @GetMapping("/send/msg4")
    public String sendMsg4() {
        try {
            // 构建消息主体,此处可以用对象代替,为了方便演示,使用map
            User user = User.builder()
                    .id(1)
                    .name("ninesun")
                    .build();
            MessageDTO<User> messageDTO = MessageDTO.<User>builder()
                    .data(user)
                    .delayTime(3)
                    .topic("boot-mq-topic")
                    .key(String.valueOf(UUID.randomUUID()))
                    .build();
            messageProduct.SendMessage(messageDTO);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "OK";
    }

在这里插入图片描述

但是从结果可以看出如果我们的业务一直是重试或者消费失败,会一直无限次重试下去,所以我们还需要限制重试次数,所以还需要改造一下我们的doRetrytConsumerProcess重试方法。

由于我们现在都是分布式的服务,所以无法在本地去存储重试次数,所以需要新增redis去缓存重试次数

  • Redis工具类
public final class RedisUtil {

    private static final int defaultExpire = 60;//默认过期时间
    private static final String REMOTEURL = "192.168.1.2";//redis远程连接
    private static Jedis redis = new Jedis(REMOTEURL);//设置远程连接

    private RedisUtil() {
        //
    }

    /**
     * 自增
     *
     * @param key    redis key
     * @param expire 过期时间,单位秒
     */
    public static long incr(String key, long expire) {
        long res = redis.incr(key);
        redis.expire(key, expire);
        return res;
    }
}

  • doRetryConsumerProcess方法改造
   /**
     * 消息重试
     *
     * @param messageDTO
     */
    private void doRetryConsumerProcess(MessageDTO messageDTO) {
        //此处的过期时间和最大重试次数可以放在配置中心(nacos or Apollo)
        long retryTimes = RedisUtil.incr(getCacheKey(messageDTO), 60);
        if (retryTimes > 3) {
            log.warn("消息重试次数超过阈值,已重试:{}次,messageInfo:{}", retryTimes - 1, JSON.toJSONString(messageDTO));
            return;
        }
        messageProduct.SendMessage(messageDTO, MQConsumer.BOOT_RETRY_MQ_CONSUMER_TOPIC, ThreadLocalRandom.current().nextInt(0, 3));
    }

    public String getCacheKey(MessageDTO messageDTO) {
        if (messageDTO == null) {
            return null;
        }
        return messageDTO.getTopic() + "-" + messageDTO.getKey();
    }

再次测试:
在这里插入图片描述

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部