Parcourir la source

新增消息队列功能

liukx@elab il y a 5 ans
Parent
commit
ff2fcf2bdc

+ 1 - 1
elab-rocketMQ/REDERME.md

@@ -27,7 +27,7 @@
 rocketmq:
     name-server: 192.168.0.24:9876;192.168.0.25:9876
     producer:
-        group: ${profiles.active}-${spring.application.name}
+        group: ${spring.application.name}-${profiles.active}
         sendMessageTimeout: 300000
 ```
 

+ 2 - 1
elab-rocketMQ/src/main/java/com/elab/mq/rocket/listener/DefaultRocketMQListener.java

@@ -23,7 +23,7 @@ import java.util.Date;
  * @author : liukx
  * @time : 2019/7/18 - 13:54
  */
-@RocketMQMessageListener(topic = "topic-${rocketmq.producer.group}", consumerGroup = "group-${rocketmq.producer.group}")
+@RocketMQMessageListener(topic = "${rocketmq.producer.group}-topic", consumerGroup = "${rocketmq.producer.group}-group")
 public class DefaultRocketMQListener implements RocketMQListener<MessageExt> {
 
     private Logger logger = LoggerFactory.getLogger(getClass());
@@ -102,6 +102,7 @@ public class DefaultRocketMQListener implements RocketMQListener<MessageExt> {
             }
         }
     }
+
     private void saveData(RmqConsumerEntity entity, boolean isInsert) throws Exception {
         // 如果没有异常发生
         entity.setConsumerStatus(MsgConstants.MSG_OK);

+ 0 - 17
elab-rocketMQ/src/main/java/com/elab/mq/rocket/producer/DefaultMQProducerExt.java

@@ -1,17 +0,0 @@
-package com.elab.mq.rocket.producer;
-
-/**
- * 拓展消息发送者
- *
- * @author : liukx
- * @time : 2019/7/18 - 16:21
- */
-public class DefaultMQProducerExt {
-
-
-
-
-
-
-
-}

+ 22 - 6
elab-rocketMQ/src/main/java/com/elab/mq/rocket/template/DefaultMQProducerExt.java

@@ -1,11 +1,15 @@
 package com.elab.mq.rocket.template;
 
+import com.elab.mq.rocket.utils.MsgConstants;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
 
 /**
  * 消息发送模版拓展
@@ -15,8 +19,14 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
  */
 public class DefaultMQProducerExt {
 
+    private Logger logger = LoggerFactory.getLogger(DefaultMQProducerExt.class);
+
     private DefaultMQProducer defaultMQProducer;
 
+    @Value("${spring.profiles.active:dev}")
+    private String profiles;
+
+
     public DefaultMQProducerExt(DefaultMQProducer defaultMQProducer) {
         this.defaultMQProducer = defaultMQProducer;
     }
@@ -25,23 +35,29 @@ public class DefaultMQProducerExt {
         return defaultMQProducer;
     }
 
-    public SendResult send(String topic, String tag, String key, byte[] body, int delayTimeLevel) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+    public SendResult send(String project, String tag, String key, byte[] body, int delayTimeLevel) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
         Message message = new Message();
         message.setBody(body);
-        message.setTopic(topic);
+        message.setTopic(getTopic(project));
         message.setTags(tag);
         message.setKeys(key);
         message.setDelayTimeLevel(delayTimeLevel);
         return defaultMQProducer.send(message);
     }
 
-    public SendResult send(String topic, String tag, String key, byte[] body) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
-        return send(topic, tag, key, body, 0);
+    public SendResult send(String project, String tag, String key, byte[] body) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        return send(project, tag, key, body, 0);
     }
 
-    public SendResult send(String topic, String key, byte[] body) throws InterruptedException, RemotingException,
+    public SendResult send(String project, String key, byte[] body) throws InterruptedException, RemotingException,
             MQClientException, MQBrokerException {
-        return send(topic, "*", key, body, 0);
+        return send(project, "*", key, body, 0);
+    }
+
+    public String getTopic(String project) {
+        String topic = project + "-" + profiles + "-" + MsgConstants.TOPIC_SUFFIX;
+        logger.info(" topic : " + topic);
+        return topic;
     }
 
 }

+ 4 - 0
elab-rocketMQ/src/main/java/com/elab/mq/rocket/utils/MsgConstants.java

@@ -21,5 +21,9 @@ public class MsgConstants {
      */
     public static int MSG_WAIT = 0;
 
+    /**
+     * topic 后缀
+     */
+    public static String TOPIC_SUFFIX = "TOPIC";
 
 }