소스 검색

优化消息队列重试异常

刘凯雄 2 년 전
부모
커밋
5125290a2f

+ 2 - 1
elab-mq/src/main/java/com/elab/mq/listener/AbstractMessageListener.java

@@ -91,10 +91,11 @@ public abstract class AbstractMessageListener implements MessageListener {
                 this.consumerInterceptor.success(messageModel, groupName);
             }
         } catch (Exception e) {
-            t.setStatus(e);
             if (e instanceof ConsumerWaitException) {
                 logger.warn("消息处理等待下一次重试...");
+                t.setSuccessStatus();
             } else {
+                t.setStatus(e);
                 logger.error("消息处理异常 : ", e);
             }
             messageModel.setInvokeTime(System.currentTimeMillis() - start);

+ 10 - 4
elab-spring/src/main/java/com/elab/spring/intercept/RMQCacheConsumerInterceptor.java

@@ -1,11 +1,11 @@
 package com.elab.spring.intercept;
 
 import com.elab.mq.consts.MqConstants;
+import com.elab.mq.exception.ConsumerWaitException;
 import com.elab.mq.listener.ConsumerInterceptor;
 import com.elab.mq.model.MessageModel;
 import com.elab.mq.utils.RocketMonitorUtils;
 import com.elab.redis.config.ElabRedisProperties;
-import com.elab.mq.exception.ConsumerWaitException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -149,10 +149,16 @@ public class RMQCacheConsumerInterceptor implements ConsumerInterceptor {
     @Override
     public void error(MessageModel messageModel, String groupName, Throwable e) {
         String key = getCacheKey(messageModel, groupName);
-        Integer currentStatus = (Integer)redisTemplate.opsForValue().get(key);
+//        Integer currentStatus = (Integer)redisTemplate.opsForValue().get(key);
+//
+//        if (currentStatus == null) {
+//            initKey(key);
+//        }
+
+        final Boolean delete = redisTemplate.delete(key);
 
-        if (currentStatus == null) {
-            initKey(key);
+        if (Boolean.FALSE.equals(delete)) {
+            logger.debug("key : {} 删除失败!", key);
         }
 
         RocketMonitorUtils.sendConsumerMonitorData(messageModel, -1, e);