Procházet zdrojové kódy

优化消息队列重试异常

刘凯雄 před 2 roky
rodič
revize
1ad05569b9

+ 13 - 0
elab-mq/src/main/java/com/elab/mq/exception/ConsumerWaitException.java

@@ -0,0 +1,13 @@
+package com.elab.mq.exception;
+
+/**
+ * 消费数据等待异常
+ * @author liukaixiong
+ * @date 2022/9/8 - 13:53
+ */
+public class ConsumerWaitException extends RuntimeException{
+
+    public ConsumerWaitException(String message) {
+        super(message);
+    }
+}

+ 7 - 2
elab-mq/src/main/java/com/elab/mq/listener/AbstractMessageListener.java

@@ -6,6 +6,7 @@ import com.aliyun.openservices.ons.api.Message;
 import com.aliyun.openservices.ons.api.MessageListener;
 import com.dianping.cat.message.Transaction;
 import com.elab.log.utils.CatCrossProcess;
+import com.elab.mq.exception.ConsumerWaitException;
 import com.elab.mq.exception.MsgRollBackException;
 import com.elab.mq.model.MessageModel;
 import org.slf4j.Logger;
@@ -91,14 +92,18 @@ public abstract class AbstractMessageListener implements MessageListener {
             }
         } catch (Exception e) {
             t.setStatus(e);
-            logger.error("消息处理异常 : ", e);
+            if (e instanceof ConsumerWaitException) {
+                logger.warn("消息处理等待下一次重试...");
+            } else {
+                logger.error("消息处理异常 : ", e);
+            }
             messageModel.setInvokeTime(System.currentTimeMillis() - start);
             this.consumerInterceptor.error(messageModel, groupName, e);
             return Action.ReconsumeLater;
         } finally {
             t.complete();
         }
-        logger.debug("消息处理结果 : " + action);
+        logger.debug("消息处理结果 : {}" , action);
         return action;
     }
 

+ 6 - 5
elab-spring/src/main/java/com/elab/spring/intercept/RMQCacheConsumerInterceptor.java

@@ -5,6 +5,7 @@ import com.elab.mq.listener.ConsumerInterceptor;
 import com.elab.mq.model.MessageModel;
 import com.elab.mq.utils.RocketMonitorUtils;
 import com.elab.redis.config.ElabRedisProperties;
+import com.elab.mq.exception.ConsumerWaitException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -80,27 +81,27 @@ public class RMQCacheConsumerInterceptor implements ConsumerInterceptor {
              * - 2.2 : 确实消费慢,那key超时了没关系,只要maxWaitTime小于RMQ超时确认时间
              */
             redisTemplate.opsForValue().set(key, MqConstants.MSG_WAIT, maxWaitTime, TimeUnit.MILLISECONDS);
-            logger.debug("缓存幂等校验通过:未发现值:" + key);
+            logger.debug("缓存幂等校验通过:未发现值: {}", key);
             return true;
         } else {
             Integer status = Integer.valueOf(statusObject.toString());
             if (MqConstants.MSG_OK.equals(status)) {
-                /**
+                /*
                  * 到达这里的话,只有可能是出现超时重试的情况。
                  * 实际消费成功了,但是没有确认给RMQ。
                  *
                  * 一旦出现重试,则说明出现了故障问题,则将有效时间进行延长
                  */
-                logger.info("该数据已经被消费过了 : " + key);
+                logger.info("该数据已经被消费过了 : {}", key);
                 redisTemplate.opsForValue().set(key, MqConstants.MSG_OK, retryWaitTime, TimeUnit.MILLISECONDS);
                 return false;
             } else if (MqConstants.MSG_WAIT.equals(status)) {
                 // 几乎不可能触发.因为同一时刻只可能有一个消费者消费,不会出现并发情况。
-                throw new RuntimeException("等待中未消费完成的数据");
+                throw new ConsumerWaitException("等待中未消费完成的数据");
             } else {
                 Long count = redisTemplate.opsForValue().increment(key, 1);
                 Boolean expire = redisTemplate.expire(key, retryWaitTime, TimeUnit.MILLISECONDS);
-                logger.debug("缓存幂等校验通过 -> 重试 " + expire + ": " + count);
+                logger.debug("缓存幂等校验通过 -> 重试 {}", expire + ": " + count);
                 return true;
             }
         }