Browse Source

消息队列的链路监控

liukx@elab 5 years ago
parent
commit
4bbf6ea5d2

+ 70 - 0
elab-core/src/test/java/com/elab/test/utils/GuavaUtilsCase.java

@@ -0,0 +1,70 @@
+package com.elab.test.utils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.*;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 工具类研究
+ *
+ * @author : liukx
+ * @time : 2020/4/9 - 19:02
+ */
+public class GuavaUtilsCase {
+
+
+    @Test
+    public void ListTest() {
+        // 普通集合
+        ArrayList<Object> objects = Lists.newArrayList("a", "b", "c", "d", "e", "f");
+        // 数据分片
+        List<List<Object>> partition = Lists.partition(objects, 2);
+        System.out.println("");
+
+        String str = "1-2-3-4-5-6";
+        List<String> list = Splitter.on("-").splitToList(str);
+    }
+
+    @Test
+    public void MapTest() {
+        /**
+         * 允许一个key对应多个值,相当于Map<String,List<String>>
+         */
+        ArrayListMultimap<Object, Object> objectObjectArrayListMultimap = ArrayListMultimap.create();
+        objectObjectArrayListMultimap.put("aaa", "a");
+        objectObjectArrayListMultimap.put("aaa", "b");
+        objectObjectArrayListMultimap.put("aaa", "c");
+        System.out.println("===");
+
+        HashBiMap<String, String> objectObjectHashBiMap = HashBiMap.create();
+        objectObjectHashBiMap.put("a", "b");
+        objectObjectHashBiMap.put("c", "d");
+        objectObjectHashBiMap.put("e", "e");
+        System.out.println("====");
+
+        Table<String, String, Integer> tables = HashBasedTable.create();
+        tables.put("l", "k", 1);
+
+        Map<String, Integer> map = Maps.newHashMap();
+        map.put("xiaoming", 12);
+        map.put("xiaohong", 13);
+        String result = Joiner.on(",").withKeyValueSeparator("=").join(map);
+
+        String str = "xiaoming=11,xiaohong=23";
+        Map<String, String> strMap = Splitter.on(",").withKeyValueSeparator("=").split(str);
+
+        Preconditions.checkArgument(true, "asdfasdfd");
+    }
+
+    @Test
+    public void strTest() {
+        String text = "a,b,c,d";
+        Joiner joiner1 = Joiner.on(",").skipNulls();
+    }
+}

+ 18 - 4
elab-mq/src/main/java/com/elab/mq/listener/AbstractMessageListener.java

@@ -4,6 +4,8 @@ import com.aliyun.openservices.ons.api.Action;
 import com.aliyun.openservices.ons.api.ConsumeContext;
 import com.aliyun.openservices.ons.api.Message;
 import com.aliyun.openservices.ons.api.MessageListener;
+import com.dianping.cat.Cat;
+import com.dianping.cat.message.Transaction;
 import com.elab.core.utils.DateUtils;
 import com.elab.core.utils.ObjectUtils;
 import com.elab.mq.dao.IConsumerDao;
@@ -32,6 +34,8 @@ public abstract class AbstractMessageListener implements MessageListener {
     @Value("${spring.application.name}")
     private String moduleName;
 
+    private final String MQ_CONSUMER = "MQ_CONSUMER";
+
 
     /**
      * 客户端关注的topic消息,实现了该方法,消息监听被触发时会与之匹配。
@@ -61,9 +65,7 @@ public abstract class AbstractMessageListener implements MessageListener {
         String topic = message.getTopic();
         String tag = message.getTag();
         ConsumerEntity oldConsumerEntity = new ConsumerEntity();
-        logger.info("进入消费者验证...");
         ConsumerEntity consumerEntity = new ConsumerEntity();
-
         consumerEntity.setProducerId(producerId);
         consumerEntity.setTopicId(topic);
         consumerEntity.setModuleName(moduleName);
@@ -77,12 +79,14 @@ public abstract class AbstractMessageListener implements MessageListener {
             // throw new BusinessException("请不要重复消费,key:" + message);
             return oldConsumerEntity;
         }
-        logger.info("消费者验证通过,消费者消费的数据匹配上生产者数据,");
+        logger.debug("消费者验证通过,消费者消费的数据匹配上生产者数据,");
         if (oldConsumerEntity != null) {
             oldConsumerEntity.setRetryCount(oldConsumerEntity.getRetryCount() + 1);
             oldConsumerEntity.setConsumerStatus(-1);
             oldConsumerEntity.setUpdated(DateUtils.getCurrentDateTime());
+            oldConsumerEntity.setCatId(Cat.getCurrentMessageId());
             consumerDao.updateById(oldConsumerEntity);
+            logger.debug("触发消息重试 当前重试次数 : " + oldConsumerEntity.getRetryCount());
         } else {
             oldConsumerEntity = new ConsumerEntity();
             oldConsumerEntity.setProducerId(producerId);
@@ -96,9 +100,11 @@ public abstract class AbstractMessageListener implements MessageListener {
             oldConsumerEntity.setConsumerStatus(0);
             oldConsumerEntity.setMsgId(msgID);
             oldConsumerEntity.setTag(tag);
+            oldConsumerEntity.setCatId(Cat.getCurrentMessageId());
             oldConsumerEntity.setModuleMethod(getClass().getName());
             int id = consumerDao.insert(oldConsumerEntity);
             oldConsumerEntity.setId(id);
+            logger.debug(" 一条全新的消息,消息编号:" + id);
         }
         return oldConsumerEntity;
     }
@@ -115,6 +121,9 @@ public abstract class AbstractMessageListener implements MessageListener {
 
     @Override
     public Action consume(Message message, ConsumeContext consumeContext) {
+        String topic = message.getTopic();
+        String tag = message.getTag();
+        Transaction t = Cat.newTransaction(MQ_CONSUMER, topic + "_" + tag + "_" + getClass().getSimpleName());
         MessageModel messageModel = new MessageModel(message);
         logger.debug("消息处理被触发 : " + message.toString());
         Action action = null;
@@ -123,10 +132,11 @@ public abstract class AbstractMessageListener implements MessageListener {
             //如果已经成功过一次
             if (oldConsumerEntity.getConsumerStatus() == 1) {
                 logger.warn(" 重复消费 过滤掉. 消费编号 : " + oldConsumerEntity.getId());
+                t.setSuccessStatus();
                 return Action.CommitMessage;
             }
 
-            logger.info("更新消费者数据: " + ObjectUtils.objectParseJsonStr(oldConsumerEntity));
+            logger.debug("更新消费者数据: " + ObjectUtils.objectParseJsonStr(oldConsumerEntity));
             action = consume0(messageModel, consumeContext);
 
             if (action == null || Action.ReconsumeLater.equals(action)) {
@@ -138,10 +148,14 @@ public abstract class AbstractMessageListener implements MessageListener {
                 oldConsumerEntity.setUpdated(new Date());
                 consumerDao.updateById(oldConsumerEntity);
             }
+            t.setSuccessStatus();
         } catch (Exception e) {
+            t.setStatus(e);
             logger.debug("消息处理异常 : " + action);
             e.printStackTrace();
             return Action.ReconsumeLater;
+        } finally {
+            t.complete();
         }
         logger.debug("消息处理结果 : " + action);
         return action;

+ 11 - 0
elab-mq/src/main/java/com/elab/mq/model/ConsumerEntity.java

@@ -38,6 +38,9 @@ public class ConsumerEntity {
     @Column(name = "tag")
     @ApiModelProperty(name = "tag", value = "标签 业务区分标识")
     private String tag;
+    @Column(name = "cat_id")
+    @ApiModelProperty(name = "catId", value = "监控链路编号")
+    private String catId;
 
     @Column(name = "house_id")
     @ApiModelProperty(name = "houseId", value = "项目id")
@@ -98,6 +101,14 @@ public class ConsumerEntity {
         return tag;
     }
 
+    public String getCatId() {
+        return catId;
+    }
+
+    public void setCatId(String catId) {
+        this.catId = catId;
+    }
+
     public void setTag(String tag) {
         this.tag = tag;
     }

+ 23 - 0
elab-mq/src/main/java/com/elab/mq/model/ProducerEntity.java

@@ -49,6 +49,13 @@ public class ProducerEntity {
     @ApiModelProperty(name = "resultContent", value = "返回结果内容")
     private String resultContent;
 
+    @Column(name = "cat_id")
+    @ApiModelProperty(name = "catId", value = "链路编号")
+    private String catId;
+    @Column(name = "tag")
+    @ApiModelProperty(name = "tag", value = "标签名称")
+    private String tag;
+
     /**
      * 状态:1  有效  -1  无效
      */
@@ -84,6 +91,22 @@ public class ProducerEntity {
     @ApiModelProperty(name = "updator", value = "修改者")
     private String updator;
 
+    public String getCatId() {
+        return catId;
+    }
+
+    public void setCatId(String catId) {
+        this.catId = catId;
+    }
+
+    public String getTag() {
+        return tag;
+    }
+
+    public void setTag(String tag) {
+        this.tag = tag;
+    }
+
     public String getResultContent() {
         return resultContent;
     }

+ 18 - 3
elab-mq/src/main/java/com/elab/mq/msg/impl/MsgProducerImpl.java

@@ -2,6 +2,8 @@ package com.elab.mq.msg.impl;
 
 import com.aliyun.openservices.ons.api.SendResult;
 import com.aliyun.openservices.ons.api.bean.ProducerBean;
+import com.dianping.cat.Cat;
+import com.dianping.cat.message.Transaction;
 import com.elab.core.utils.DateUtils;
 import com.elab.core.utils.ObjectUtils;
 import com.elab.core.utils.StringUtils;
@@ -37,6 +39,11 @@ public class MsgProducerImpl extends ProducerBean implements IMsgProducerFacade
     @Value("${spring.application.name}")
     private String moduleName;
 
+    @Value(value = "${elab.mq.GROUP_ID:}")
+    private String groupId;
+
+    private final String MQ_PRODUCER = "MQ_PRODUCER";
+
 
     /**
      * 往生产者表中写入数据
@@ -47,13 +54,15 @@ public class MsgProducerImpl extends ProducerBean implements IMsgProducerFacade
     private int insertRecordProducer(MessageModel message) {
         String content = ObjectUtils.objectParseJsonStr(message.getObject(Object.class));
         ProducerEntity producerEntity = new ProducerEntity();
-
+        String tag = message.getTag();
         if (StringUtils.isEmpty(message.getModuleName())) {
             producerEntity.setModuleName(moduleName);
         } else {
             producerEntity.setModuleName(message.getModuleName());
         }
-
+        producerEntity.setTag(tag);
+        producerEntity.setCatId(Cat.getCurrentMessageId());
+        producerEntity.setGroupId(groupId);
         producerEntity.setHouseId(message.getHouseId());
         producerEntity.setProducerStatus(0);
         producerEntity.setGroupId(message.getGroupId());
@@ -79,7 +88,6 @@ public class MsgProducerImpl extends ProducerBean implements IMsgProducerFacade
      * @return
      */
     private void updateRecordProducer(int id, Integer status, String result) {
-
         try {
             ProducerEntity producerEntity = producerDao.selectById(id + "");
             if (producerEntity != null) {
@@ -101,6 +109,9 @@ public class MsgProducerImpl extends ProducerBean implements IMsgProducerFacade
 
     @Override
     public SendResultModel send(MessageModel message) {
+        String topic = message.getTopic();
+        String tag = message.getTag();
+        Transaction t = Cat.newTransaction(MQ_PRODUCER, topic + "_" + tag + "_" + getClass().getSimpleName());
         logger.debug(" 发送一条消息 " + message.getMsgID() + " -> " + message.toString());
         int id = insertRecordProducer(message);
         try {
@@ -108,10 +119,14 @@ public class MsgProducerImpl extends ProducerBean implements IMsgProducerFacade
             SendResult result = super.send(message);
             updateRecordProducer(id, 1, result.toString());
             logger.debug(" 消息发送结果 : " + result.toString());
+            t.setSuccessStatus();
             return new SendResultModel(result);
         } catch (Exception e) {
             logger.error("消息发送异常", e);
+            t.setStatus(e);
             updateRecordProducer(id, -1, e.getMessage());
+        } finally {
+            t.complete();
         }
         return null;
     }

+ 1 - 1
pom.xml

@@ -343,7 +343,7 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-javadoc-plugin</artifactId>
-                <!--<version>2.9</version>-->
+                <version>3.0.1</version>
                 <executions>
                     <execution>
                         <id>attach-javadocs</id>