Browse Source

版本分支提交

liukaixiong 4 years ago
parent
commit
105c2663d5

+ 5 - 4
elab-mq/src/main/java/com/elab/mq/listener/MessageListenerWrapper.java

@@ -16,10 +16,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
 
 /**
  * 订阅组件适配器
@@ -137,6 +134,10 @@ public class MessageListenerWrapper implements MessageListener {
             mqDataDTO.setDataId(data.getProducerId());
         }
 
+        if (mqDataDTO.getSourceTime() == null && data.getConsumerTime() != null) {
+            mqDataDTO.setSourceTime(new Date(data.getConsumerTime()));
+        }
+
     }
 
     private Map<String, String> conversionPropertyToMap(Properties userProperties) {

+ 8 - 0
elab-mq/src/main/java/com/elab/mq/model/MessageModel.java

@@ -148,6 +148,14 @@ public class MessageModel<T> extends Message {
         return userProperties;
     }
 
+    public Long getConsumerTime(){
+        String consumeStartTime = getUserProperties("CONSUME_START_TIME");
+        if(StringUtils.isNotEmpty(consumeStartTime)){
+            return Long.valueOf(consumeStartTime);
+        }
+        return null;
+    }
+
     public Long getInvokeTime() {
         return invokeTime;
     }

+ 6 - 0
elab-mq/src/main/java/com/elab/mq/msg/impl/MsgProducerImpl.java

@@ -54,6 +54,7 @@ public class MsgProducerImpl extends ProducerBean implements IMsgProducerFacade
 
             // 提前插入消息,为了让消息编号能够带过去,如果发送失败,则根据编号更改为-1状态。
             // int id = insertRecordProducer(message);
+            long start = System.currentTimeMillis();
             try {
 
                 idMap.forEach((K, V) -> {
@@ -67,11 +68,16 @@ public class MsgProducerImpl extends ProducerBean implements IMsgProducerFacade
                 }
 
                 SendResult result = super.send(message);
+                long time = System.currentTimeMillis() - start;
+                message.setInvokeTime(time);
                 logger.debug(" 消息发送结果 : " + result.toString());
+
                 producerInterceptor.success(message, message.getModuleName(), result);
                 return new SendResultModel(result);
             } catch (Exception e) {
                 logger.error("消息发送异常", e);
+                long time = System.currentTimeMillis() - start;
+                message.setInvokeTime(time);
                 producerInterceptor.error(message, message.getModuleName(), e);
                 throw e;
             }

+ 8 - 2
elab-spring/src/main/java/com/elab/spring/intercept/RMQCacheProducerInterceptor.java

@@ -12,6 +12,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 
+import java.util.Date;
+
 /**
  * @Module rmq拦截器
  * @Description 基于缓存实现
@@ -41,13 +43,13 @@ public class RMQCacheProducerInterceptor implements ProducerInterceptor {
 
     @Override
     public void success(MessageModel messageModel, String groupName, SendResult result) {
-         sendMonitorData(messageModel, 1);
+        sendMonitorData(messageModel, 1);
         logger.debug("业务处理成功,携带的消息唯一编号:" + messageModel.getProducerId());
     }
 
     @Override
     public void error(MessageModel messageModel, String groupName, Throwable e) {
-         sendMonitorData(messageModel, -1);
+        sendMonitorData(messageModel, -1);
         logger.debug("业务处理失败,携带的消息唯一编号:" + messageModel.getProducerId());
     }
 
@@ -88,6 +90,10 @@ public class RMQCacheProducerInterceptor implements ProducerInterceptor {
             mqDataDTO.setDataId(data.getProducerId());
         }
 
+        if (mqDataDTO.getSourceTime() == null) {
+            mqDataDTO.setSourceTime(new Date());
+        }
+
     }
 
     private String getCacheKey() {