Browse Source

新增功能:
优化RocketMq代码

liukx@elab 6 years ago
parent
commit
6ee5bb9774
27 changed files with 244 additions and 532 deletions
  1. 82 0
      elab-mq/README.md
  2. 5 0
      elab-mq/pom.xml
  3. 14 8
      elab-mq/src/main/java/com/elab/mq/config/RocketMQConfiguration.java
  4. 0 124
      elab-mq/src/main/java/com/elab/mq/config/RocketMqConfigBean.java
  5. 14 3
      elab-mq/src/main/java/com/elab/mq/listener/AbstractMessageListener.java
  6. 0 80
      elab-mq/src/main/java/com/elab/mq/listener/DefaultMessageListener.java
  7. 13 13
      elab-mq/src/main/java/com/elab/mq/listener/MessageListenerWrapper.java
  8. 0 34
      elab-mq/src/main/java/com/elab/mq/model/MessageExtModel.java
  9. 62 2
      elab-mq/src/main/java/com/elab/mq/model/MessageModel.java
  10. 0 35
      elab-mq/src/main/java/com/elab/mq/msg/IConsumeProcessService.java
  11. 0 16
      elab-mq/src/main/java/com/elab/mq/msg/ILocalTransactionExecuter.java
  12. 0 14
      elab-mq/src/main/java/com/elab/mq/msg/IMessageListener.java
  13. 0 13
      elab-mq/src/main/java/com/elab/mq/msg/IMsgConsumerFacade.java
  14. 7 0
      elab-mq/src/main/java/com/elab/mq/msg/IMsgProducerFacade.java
  15. 0 23
      elab-mq/src/main/java/com/elab/mq/msg/IMsgTransactionFacade.java
  16. 0 12
      elab-mq/src/main/java/com/elab/mq/msg/ISendCallbackFacade.java
  17. 0 13
      elab-mq/src/main/java/com/elab/mq/msg/ITransactionCheckListener.java
  18. 1 1
      elab-mq/src/main/java/com/elab/mq/msg/adptor/LocalTransactionExecuterAdaptor.java
  19. 0 25
      elab-mq/src/main/java/com/elab/mq/msg/adptor/MessageListenerAdaptor.java
  20. 0 25
      elab-mq/src/main/java/com/elab/mq/msg/adptor/TransactionCheckListenerAdaptor.java
  21. 0 15
      elab-mq/src/main/java/com/elab/mq/msg/impl/MsgConsumerImpl.java
  22. 6 0
      elab-mq/src/main/java/com/elab/mq/msg/impl/MsgProducerImpl.java
  23. 0 39
      elab-mq/src/main/java/com/elab/mq/msg/impl/MsgTransactionImpl.java
  24. 1 0
      elab-mq/src/main/resources/META-INF/spring.factories
  25. 15 37
      elab-mq/src/test/java/com/elab/mq/config/RocketMQConfigurationTest.java
  26. 4 0
      elab-mq/src/test/resources/application.properties
  27. 20 0
      elab-mq/src/test/resources/applicationContext.xml

+ 82 - 0
elab-mq/README.md

@@ -0,0 +1,82 @@
+# 消息队列使用方式
+
+## 引入项目
+
+加入maven的依赖
+
+```xml
+<dependency>
+    <groupId>com.elab.core</groupId>
+    <artifactId>elab-mq</artifactId>
+    <version>${elab.version}</version>
+</dependency>
+```
+
+## 属性配置
+
+```properties
+# 必填
+elab.mq.AccessKey=                 
+elab.mq.SecretKey= 
+elab.mq.NAMESRV_ADDR=http://MQ_INST_1819241776271348_Bak3xjk0.mq-internet-access.mq-internet.aliyuncs.com:80 
+# 非必填
+elab.mq.GROUP_ID=GID-producer1  # 默认GID-运行环境-项目名称
+elab.mq.SendMsgTimeoutMillis=5000 # 设置发送超时时间,单位毫秒
+elab.mq.SuspendTimeMillis=100 # 顺序消息消费者失败进行重试前的等待时间(单位毫秒)
+elab.mq.MaxReconsumeTimes=20 #  消费失败的重试次数
+
+```
+
+### 生产者
+
+```java
+
+@Autowired
+private IMsgProducerFacade msgProducerFacade;
+
+public void send(){
+    // 发送消息
+    // topic / tag / 全局唯一编号 / 消息内容
+   JSONObject jsonObject = new JSONObject();
+    jsonObject.put("xxx", "哈哈");
+    jsonObject.put("sss", "ggg");
+    for (int i = 0; i < 10; i++) {
+        String key = RandomUtils.randomString(10);
+        MessageModel message = new MessageModel<Map>("T-consumer1", "*", key, jsonObject);
+        SendResult send = msgProducerFacade.send(message);
+        System.out.println("发送消息返回结果 : " + send.toString());
+    }
+}
+```
+
+### 消费者
+
+```java
+import com.aliyun.openservices.ons.api.Action;
+import com.aliyun.openservices.ons.api.ConsumeContext;
+import com.aliyun.openservices.ons.api.Message;
+import com.elab.mq.listener.AbstractMessageListener;
+import org.springframework.stereotype.Component;
+
+/**
+ * 消费者
+ *
+ * @author : liukx
+ * @time : 2019/5/15 - 14:04
+ */
+@Component
+public class ConsumerServiceImpl extends AbstractMessageListener {
+
+    @Override
+    public String topic() {
+        return "T-consumer1";
+    } 
+
+    @Override
+    public Action consume0(Message message, ConsumeContext consumeContext) {
+        System.out.println("---------------------消费成功------>>" + message.toString());
+        return Action.CommitMessage;
+    }
+}
+```
+

+ 5 - 0
elab-mq/pom.xml

@@ -32,6 +32,11 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-test</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <plugins>

+ 14 - 8
elab-mq/src/main/java/com/elab/mq/config/RocketMQConfiguration.java

@@ -3,16 +3,20 @@ package com.elab.mq.config;
 import com.aliyun.openservices.ons.api.Consumer;
 import com.aliyun.openservices.ons.api.ONSFactory;
 import com.aliyun.openservices.ons.api.PropertyKeyConst;
-import com.aliyun.openservices.ons.api.bean.ProducerBean;
 import com.elab.mq.listener.AbstractMessageListener;
 import com.elab.mq.listener.MessageListenerWrapper;
+import com.elab.mq.msg.IMsgProducerFacade;
+import com.elab.mq.msg.impl.MsgProducerImpl;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
 import org.springframework.core.env.Environment;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 
 /**
  * rocketMQ消息队列的配置
@@ -21,17 +25,15 @@ import java.util.Properties;
  * @time : 2019/4/19 - 16:41
  */
 @Configuration
+//@PropertySource(value = {"classpath:application.properties"})
+@Import({MessageListenerWrapper.class})
 public class RocketMQConfiguration {
 
-
     private final String prefix = "elab.mq";
 
-    private final String NAMESRV_ADDR = "http://MQ_INST_1819241776271348_Bak3xjk0.mq-internet-access.mq-internet" +
-            ".aliyuncs.com:80";
-
     @Bean
-    public ProducerBean producerBean(@Autowired Environment env) {
-        ProducerBean producerBean = new ProducerBean();
+    public IMsgProducerFacade producerBean(@Autowired Environment env) {
+        IMsgProducerFacade producerBean = new MsgProducerImpl();
         producerBean.setProperties(mqProperties(env));
         producerBean.start();
         return producerBean;
@@ -42,14 +44,18 @@ public class RocketMQConfiguration {
             messageListenerWrapper, @Autowired List<AbstractMessageListener> messageListeners) {
         Consumer consumer = ONSFactory.createConsumer(mqProperties(env));
 
+        Set<String> existTopic = new HashSet<>();
         if (messageListeners.size() > 0) {
             for (int i = 0; i < messageListeners.size(); i++) {
                 AbstractMessageListener abstractMessageListener = messageListeners.get(i);
                 String topic = abstractMessageListener.topic();
+                boolean exist = existTopic.add(topic);
+                if (!exist) {
+                    throw new RuntimeException(" mq topic : " + topic + " Can not repeat !");
+                }
                 consumer.subscribe(topic, "*", messageListenerWrapper);
             }
             consumer.start();
-
         }
         return consumer;
     }

+ 0 - 124
elab-mq/src/main/java/com/elab/mq/config/RocketMqConfigBean.java

@@ -1,124 +0,0 @@
-package com.elab.mq.config;
-
-import com.aliyun.openservices.ons.api.MessageListener;
-import com.aliyun.openservices.ons.api.PropertyKeyConst;
-import com.aliyun.openservices.ons.api.PropertyValueConst;
-import com.aliyun.openservices.ons.api.bean.Subscription;
-import com.elab.mq.listener.DefaultMessageListener;
-import com.elab.mq.msg.IConsumeProcessService;
-import com.elab.mq.msg.IMsgConsumerFacade;
-import com.elab.mq.msg.IMsgProducerFacade;
-import com.elab.mq.msg.IMsgTransactionFacade;
-import com.elab.mq.msg.adptor.MessageListenerAdaptor;
-import com.elab.mq.msg.adptor.TransactionCheckListenerAdaptor;
-import com.elab.mq.msg.impl.MsgConsumerImpl;
-import com.elab.mq.msg.impl.MsgProducerImpl;
-import com.elab.mq.msg.impl.MsgTransactionImpl;
-import org.springframework.context.annotation.Configuration;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-@Configuration
-public class RocketMqConfigBean {
-
-    private String accessKey;
-    private String secretKey;
-    private String onsAddr;
-    private String producerId;
-    private String consumerId;
-    private String sendMsgTimeoutMillis;
-
-    public RocketMqConfigBean(String accessKey, String secretKey, String onsAddr, String producerId, String consumerId, String sendMsgTimeoutMillis) {
-        this.accessKey = accessKey;
-        this.secretKey = secretKey;
-        this.onsAddr = onsAddr;
-        this.producerId = producerId;
-        this.consumerId = consumerId;
-        this.sendMsgTimeoutMillis = sendMsgTimeoutMillis;
-    }
-
-    public Properties getProperties() {
-        Properties properties = new Properties();
-        // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
-        properties.put(PropertyKeyConst.AccessKey, this.accessKey);
-        // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
-        properties.put(PropertyKeyConst.SecretKey, this.secretKey);
-        //设置发送超时时间,单位毫秒
-        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.sendMsgTimeoutMillis);
-        // 设置 TCP 接入域名(此处以公共云生产环境为例)
-        properties.put(PropertyKeyConst.ONSAddr, this.onsAddr);
-        return properties;
-    }
-
-    /**
-     * 创建一个生产者
-     *
-     * @return
-     */
-    public IMsgProducerFacade createProducer() {
-        //您在控制台创建的 Producer ID
-        Properties properties = getProperties();
-        properties.put(PropertyKeyConst.ProducerId, this.producerId);
-        MsgProducerImpl producer = new MsgProducerImpl();
-        producer.setProperties(properties);
-        return producer;
-    }
-
-    /**
-     * 创建一个消费者
-     *
-     * @param topicName       话题名称
-     * @param tag             标签
-     * @param messageListener 回调监听接口
-     * @param isSubscription  是否为订阅类型
-     * @return
-     */
-    public IMsgConsumerFacade createConsumer(String topicName, String tag, MessageListenerAdaptor messageListener, boolean isSubscription) {
-        Properties properties = getProperties();
-        properties.put(PropertyKeyConst.ConsumerId, this.consumerId);
-        if (isSubscription) {
-            properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
-        }
-        MsgConsumerImpl consumer = new MsgConsumerImpl();
-        consumer.setProperties(properties);
-        Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
-        Subscription subscription = new Subscription();
-        subscription.setTopic(topicName);
-        subscription.setExpression(tag);
-        subscriptionTable.put(subscription, messageListener);
-        consumer.setSubscriptionTable(subscriptionTable);
-        return consumer;
-    }
-
-    /**
-     * 创建一个默认的消费监听适配者
-     *
-     * @param consumeProcessServices 消费执行者列表
-     * @return
-     */
-    public MessageListenerAdaptor getMessageListenerAdaptor(List<IConsumeProcessService> consumeProcessServices) {
-        DefaultMessageListener defaultMessageListener = new DefaultMessageListener();
-        defaultMessageListener.setConsumeProcessServiceList(consumeProcessServices);
-        defaultMessageListener.init();
-        return defaultMessageListener;
-    }
-
-    /**
-     * 创建一个事务消息发送类
-     *
-     * @param transactionCheckListener 事务消息回调
-     * @return
-     */
-    public IMsgTransactionFacade createTransactionProducer(TransactionCheckListenerAdaptor transactionCheckListener) {
-        Properties properties = getProperties();
-        properties.put(PropertyKeyConst.ProducerId, this.producerId);
-        properties.put(PropertyKeyConst.GROUP_ID, this.producerId);
-
-        MsgTransactionImpl transactionProducer = new MsgTransactionImpl(properties, transactionCheckListener);
-        return transactionProducer;
-    }
-
-}

+ 14 - 3
elab-mq/src/main/java/com/elab/mq/listener/AbstractMessageListener.java

@@ -4,6 +4,7 @@ import com.aliyun.openservices.ons.api.Action;
 import com.aliyun.openservices.ons.api.ConsumeContext;
 import com.aliyun.openservices.ons.api.Message;
 import com.aliyun.openservices.ons.api.MessageListener;
+import com.elab.mq.model.MessageModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -29,7 +30,9 @@ public abstract class AbstractMessageListener implements MessageListener {
      *
      * @return
      */
-    public abstract String tag();
+    public String tag() {
+        return "*";
+    }
 
     /**
      * 客户端的业务逻辑实现
@@ -38,12 +41,20 @@ public abstract class AbstractMessageListener implements MessageListener {
      * @param consumeContext 消息上下文
      * @return
      */
-    public abstract Action consume0(Message message, ConsumeContext consumeContext);
+    public abstract Action consume0(MessageModel message, ConsumeContext consumeContext) throws Exception;
 
     @Override
     public Action consume(Message message, ConsumeContext consumeContext) {
+        MessageModel messageModel = new MessageModel(message);
         logger.debug("消息处理被触发 : " + message.toString());
-        Action action = consume0(message, consumeContext);
+        Action action = null;
+        try {
+            action = consume0(messageModel, consumeContext);
+        } catch (Exception e) {
+            logger.debug("消息处理异常 : " + action);
+            e.printStackTrace();
+            return Action.ReconsumeLater;
+        }
         logger.debug("消息处理结果 : " + action);
         return action;
     }

+ 0 - 80
elab-mq/src/main/java/com/elab/mq/listener/DefaultMessageListener.java

@@ -1,80 +0,0 @@
-package com.elab.mq.listener;
-
-import com.elab.mq.api.Action;
-import com.elab.mq.api.ConsumeContext;
-import com.elab.mq.model.MessageModel;
-import com.elab.mq.msg.IConsumeProcessService;
-import com.elab.mq.msg.adptor.MessageListenerAdaptor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * 订阅消息监听
- * <p>
- * 默认的消费者
- */
-public class DefaultMessageListener extends MessageListenerAdaptor {
-
-    private Logger logger = LoggerFactory.getLogger(getClass());
-
-    private ConcurrentMap<String, IConsumeProcessService> tagMap = new ConcurrentHashMap<String, IConsumeProcessService>();
-
-    private List<IConsumeProcessService> consumeProcessServiceList;
-
-    public void setConsumeProcessServiceList(List<IConsumeProcessService> consumeProcessServiceList) {
-        this.consumeProcessServiceList = consumeProcessServiceList;
-    }
-
-    public void init() {
-        if (this.consumeProcessServiceList.size() > 0) {
-            for (int i = 0; i < consumeProcessServiceList.size(); i++) {
-                IConsumeProcessService consumeProcessService = consumeProcessServiceList.get(i);
-                String tag = consumeProcessService.tag();
-                if (tag == null) {
-                    logger.error(" 消息监听tag为Null异常 : " + consumeProcessService.toString() + " tag方法返回为null !!!");
-                    continue;
-                }
-                IConsumeProcessService tagService = tagMap.get(tag);
-                if (tagService != null) {
-                    logger.error(" 消息监听tag重复异常 : " + consumeProcessService.toString() + " tag方法匹配有重复 !!! 重复类 :  " + tagService.toString());
-                    continue;
-                }
-                logger.debug(" 注册消费监听器 : key - " + tag + " value - " + consumeProcessService.toString());
-                tagMap.putIfAbsent(tag, consumeProcessService);
-            }
-        }
-    }
-
-    @Override
-    public Action consume(MessageModel messageModel, ConsumeContext consumeContext) {
-        String tag = messageModel.getTag();
-        try {
-            logger.debug(" 获取到的消息对象 : " + messageModel.toString());
-            Thread.sleep(3000);
-            IConsumeProcessService consumeProcessService = tagMap.get(tag);
-            if (consumeProcessService == null) {
-                logger.error(" 消息消费失败 ... 找不到对应的消费监听处理执行类 关注Tag = " + tag);
-                return Action.ReconsumeLater;
-            }
-
-            boolean process = consumeProcessService.process(messageModel, consumeContext);
-
-            if (!process) {
-                logger.error(" 消息消费失败 : 消息内容 " + messageModel.toString() + " 等待重试..");
-                return Action.ReconsumeLater;
-            }
-            logger.debug(" 消息消费成功 :  " + messageModel.getMsgID());
-            return Action.CommitMessage;
-        } catch (Exception e) {
-            logger.error(" 事务消息执行重试操作 ... " + e.getMessage(), e);
-            //消费失败
-            return Action.ReconsumeLater;
-        }
-    }
-
-
-}

+ 13 - 13
elab-mq/src/main/java/com/elab/mq/listener/MessageListenerWrapper.java

@@ -18,26 +18,26 @@ import java.util.List;
 @Component
 public class MessageListenerWrapper implements MessageListener {
 
-    @Autowired
+    @Autowired(required = false)
     private List<AbstractMessageListener> messageListeners;
 
     @Override
     public Action consume(Message message, ConsumeContext consumeContext) {
-        try {
-            for (int i = 0; i < messageListeners.size(); i++) {
-                AbstractMessageListener abstractMessageListener = messageListeners.get(i);
-                String currentTopic = message.getTopic();
-                String topic = abstractMessageListener.topic();
-                if (topic.equals(currentTopic)) {
-                    abstractMessageListener.consume(message, consumeContext);
+        if (messageListeners != null) {
+            try {
+                for (int i = 0; i < messageListeners.size(); i++) {
+                    AbstractMessageListener abstractMessageListener = messageListeners.get(i);
+                    String currentTopic = message.getTopic();
+                    String topic = abstractMessageListener.topic();
+                    if (topic.equals(currentTopic)) {
+                        abstractMessageListener.consume(message, consumeContext);
+                    }
                 }
+            } catch (Exception e) {
+                e.printStackTrace();
+                return Action.ReconsumeLater;
             }
-        } catch (Exception e) {
-            e.printStackTrace();
-            return Action.ReconsumeLater;
         }
         return Action.CommitMessage;
     }
-
-
 }

+ 0 - 34
elab-mq/src/main/java/com/elab/mq/model/MessageExtModel.java

@@ -1,34 +0,0 @@
-package com.elab.mq.model;
-
-import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageExt;
-
-/**
- * 消息封装实体拓展
- *
- * @author Liukx
- * @create 2018-04-10 20:12
- * @email liukx@elab-plus.com
- **/
-public class MessageExtModel extends MessageExt {
-
-    public MessageExtModel(MessageExt ext) {
-        setTopic(ext.getTopic());
-        setFlag(ext.getFlag());
-        setBody(ext.getBody());
-        setQueueId(ext.getQueueId());
-        setStoreSize(ext.getStoreSize());
-        setQueueOffset(ext.getQueueOffset());
-        setSysFlag(ext.getSysFlag());
-        setBornTimestamp(ext.getBornTimestamp());
-        setBornHost(ext.getBornHost());
-        setStoreTimestamp(ext.getStoreTimestamp());
-        setStoreHost(ext.getStoreHost());
-        setMsgId(ext.getMsgId());
-        setCommitLogOffset(ext.getCommitLogOffset());
-        setBodyCRC(ext.getBodyCRC());
-        setReconsumeTimes(ext.getReconsumeTimes());
-        setPreparedTransactionOffset(ext.getPreparedTransactionOffset());
-    }
-
-
-}

+ 62 - 2
elab-mq/src/main/java/com/elab/mq/model/MessageModel.java

@@ -1,6 +1,8 @@
 package com.elab.mq.model;
 
+import com.alibaba.fastjson.JSON;
 import com.aliyun.openservices.ons.api.Message;
+import com.elab.core.utils.StringUtils;
 
 /**
  * 消息容器承装实体
@@ -9,10 +11,37 @@ import com.aliyun.openservices.ons.api.Message;
  * @create 2018-04-09 18:38
  * @email liukx@elab-plus.com
  **/
-public class MessageModel extends Message {
+public class MessageModel<T> extends Message {
 
+    private T object;
 
-    public MessageModel() {
+    /**
+     * 使用该model传递参数
+     *
+     * @param topic 主题
+     * @param tags  区分
+     * @param key   全局唯一编号
+     * @param body  消息内容
+     */
+    public MessageModel(String topic, String tags, String key, T body) {
+        setObject(body);
+        setKey(key);
+        setTopic(topic);
+        setTag(tags);
+    }
+
+    /**
+     * 使用该model必须传递的参数
+     *
+     * @param topic
+     * @param key
+     * @param body
+     */
+    public MessageModel(String topic, String key, T body) {
+        setObject(body);
+        setKey(key);
+        setTopic(topic);
+        setTag("*");
     }
 
     public MessageModel(Message message) {
@@ -28,4 +57,35 @@ public class MessageModel extends Message {
         setUserProperties(message.getUserProperties());
     }
 
+    /**
+     * 获取对应的转换对象,切记必须为JSON格式的
+     *
+     * @param cls
+     * @return
+     */
+    public T getObject(Class<T> cls) {
+        byte[] body = getBody();
+        if (body != null) {
+            String bodyString = new String(body);
+            if (StringUtils.isNotEmpty(bodyString)) {
+                return JSON.parseObject(bodyString, cls);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public String toString() {
+        return super.toString() + " \t key : " + getKey();
+    }
+
+    /**
+     * 设置传输对象
+     *
+     * @param object 必须为符合JSON转换的请求
+     */
+    public void setObject(Object object) {
+        String s = JSON.toJSONString(object);
+        setBody(s.getBytes());
+    }
 }

+ 0 - 35
elab-mq/src/main/java/com/elab/mq/msg/IConsumeProcessService.java

@@ -1,35 +0,0 @@
-package com.elab.mq.msg;/**
- * Created by liukx on 2018/4/11.
- */
-
-import com.elab.mq.api.ConsumeContext;
-import com.elab.mq.model.MessageModel;
-
-/**
- * 消费者业务监听模式
- *
- * @author Liukx
- * @create 2018-04-11 14:59
- * @email liukx@elab-plus.com
- **/
-public interface IConsumeProcessService {
-
-    /**
-     * 业务方关注的Tag标识
-     *
-     * @return
-     */
-    String tag();
-
-    /**
-     * 根据上面的tag方法匹配后,具体执行的业务操作
-     *
-     * @param messageModel   消息内容
-     * @param consumeContext 消费者上下文
-     * @return
-     * @throws Exception
-     */
-    boolean process(MessageModel messageModel, ConsumeContext consumeContext) throws Exception;
-
-
-}

+ 0 - 16
elab-mq/src/main/java/com/elab/mq/msg/ILocalTransactionExecuter.java

@@ -1,16 +0,0 @@
-//package com.elab.mq.msg;/**
-// * Created by liukx on 2018/4/9.
-// */
-//
-//import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
-//
-///**
-// * 事务消息接口回调
-// *
-// * @author Liukx
-// * @create 2018-04-09 18:55
-// * @email liukx@elab-plus.com
-// **/
-//public interface ILocalTransactionExecuter extends LocalTransactionExecuter {
-////    TransactionStatusEnum executeCallback(MessageModel message, Object o);
-//}

+ 0 - 14
elab-mq/src/main/java/com/elab/mq/msg/IMessageListener.java

@@ -1,14 +0,0 @@
-//package com.elab.mq.msg;
-//
-//import com.aliyun.openservices.ons.api.MessageListener;
-//
-///**
-// * 消息接收监听器
-// *
-// * @author Liukx
-// * @create 2018-04-10 19:43
-// * @email liukx@elab-plus.com
-// **/
-//public interface IMessageListener extends MessageListener {
-//
-//}

+ 0 - 13
elab-mq/src/main/java/com/elab/mq/msg/IMsgConsumerFacade.java

@@ -1,13 +0,0 @@
-package com.elab.mq.msg;
-
-import com.aliyun.openservices.ons.api.Consumer;
-
-/**
- * 消费者门面
- *
- * @author Liukx
- * @create 2018-04-09 18:29
- * @email liukx@elab-plus.com
- **/
-public interface IMsgConsumerFacade extends Consumer {
-}

+ 7 - 0
elab-mq/src/main/java/com/elab/mq/msg/IMsgProducerFacade.java

@@ -5,6 +5,7 @@ import com.elab.mq.model.SendResultModel;
 import com.elab.mq.msg.adptor.SendCallbackAdaptor;
 
 import java.util.Date;
+import java.util.Properties;
 
 /**
  * 消息生产门面类
@@ -59,4 +60,10 @@ public interface IMsgProducerFacade {
      * @return
      */
     SendResultModel sendTimingMsg(MessageModel messageModel, Date datetime);
+
+    /**
+     * 设置属性
+     * @param properties
+     */
+    public void setProperties(Properties properties);
 }

+ 0 - 23
elab-mq/src/main/java/com/elab/mq/msg/IMsgTransactionFacade.java

@@ -1,23 +0,0 @@
-package com.elab.mq.msg;/**
- * Created by liukx on 2018/4/9.
- */
-
-import com.elab.mq.model.MessageModel;
-import com.elab.mq.model.SendResultModel;
-import com.elab.mq.msg.adptor.LocalTransactionExecuterAdaptor;
-
-/**
- * 事务消息门面
- *
- * @author Liukx
- * @create 2018-04-09 18:33
- * @email liukx@elab-plus.com
- **/
-public interface IMsgTransactionFacade  {
-
-    void start();
-
-    void shutdown();
-
-    SendResultModel send(MessageModel messageModel, LocalTransactionExecuterAdaptor localTransactionExecuter, Object obj);
-}

+ 0 - 12
elab-mq/src/main/java/com/elab/mq/msg/ISendCallbackFacade.java

@@ -1,12 +0,0 @@
-//package com.elab.mq.msg;
-//
-//import com.aliyun.openservices.ons.api.SendCallback;
-//
-///**
-// * @author Liukx
-// * @create 2018-04-09 18:48
-// * @email liukx@elab-plus.com
-// **/
-//public interface ISendCallbackFacade extends SendCallback {
-//
-//}

+ 0 - 13
elab-mq/src/main/java/com/elab/mq/msg/ITransactionCheckListener.java

@@ -1,13 +0,0 @@
-//package com.elab.mq.msg;
-//
-//import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.producer.TransactionCheckListener;
-//
-///**
-// * 事务状态回调监听接口
-// *
-// * @author Liukx
-// * @create 2018-04-10 10:48
-// * @email liukx@elab-plus.com
-// **/
-//public interface ITransactionCheckListener extends TransactionCheckListener {
-//}

+ 1 - 1
elab-mq/src/main/java/com/elab/mq/msg/adptor/LocalTransactionExecuterAdaptor.java

@@ -30,7 +30,7 @@ public abstract class LocalTransactionExecuterAdaptor implements LocalTransactio
 
     @Override
     public final TransactionStatus execute(Message message, Object o) {
-        MessageModel messageModel = new MessageModel();
+        MessageModel messageModel = new MessageModel(message.getTopic(), message.getTag(), o);
         logger.debug(" 事务消息适配层 : " + message.toString());
         TransactionStatusEnum transactionStatusEnum = executeCallback(messageModel, o);
         logger.debug(" 事务消息执行结果 : " + transactionStatusEnum.name());

+ 0 - 25
elab-mq/src/main/java/com/elab/mq/msg/adptor/MessageListenerAdaptor.java

@@ -1,25 +0,0 @@
-package com.elab.mq.msg.adptor;
-
-import com.aliyun.openservices.ons.api.Message;
-import com.aliyun.openservices.ons.api.MessageListener;
-import com.elab.mq.api.Action;
-import com.elab.mq.api.ConsumeContext;
-import com.elab.mq.model.MessageModel;
-
-/**
- * 消息监听适配器
- *
- * @author Liukx
- * @create 2018-04-10 19:50
- * @email liukx@elab-plus.com
- **/
-public abstract class MessageListenerAdaptor implements MessageListener {
-
-    public abstract Action consume(MessageModel messageModel, ConsumeContext consumeContext);
-
-    @Override
-    public final com.aliyun.openservices.ons.api.Action consume(Message message, com.aliyun.openservices.ons.api.ConsumeContext consumeContext) {
-        Action consume = consume(new MessageModel(message), new ConsumeContext(consumeContext));
-        return Action.getAction(consume);
-    }
-}

+ 0 - 25
elab-mq/src/main/java/com/elab/mq/msg/adptor/TransactionCheckListenerAdaptor.java

@@ -1,25 +0,0 @@
-package com.elab.mq.msg.adptor;
-
-import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.producer.LocalTransactionState;
-import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.producer.TransactionCheckListener;
-import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageExt;
-import com.elab.mq.api.transaction.LocalTransactionStateEnum;
-import com.elab.mq.model.MessageExtModel;
-
-/**
- * 事务消息监听适配器
- *
- * @author Liukx
- * @create 2018-04-10 20:13
- * @email liukx@elab-plus.com
- **/
-public abstract class TransactionCheckListenerAdaptor implements TransactionCheckListener {
-
-    public abstract LocalTransactionStateEnum checkLocalTransactionState(MessageExtModel var1);
-
-    @Override
-    public final LocalTransactionState checkLocalTransactionState(MessageExt messageExt) {
-        LocalTransactionStateEnum localTransactionStateEnum = checkLocalTransactionState(new MessageExtModel(messageExt));
-        return LocalTransactionStateEnum.getAction(localTransactionStateEnum);
-    }
-}

+ 0 - 15
elab-mq/src/main/java/com/elab/mq/msg/impl/MsgConsumerImpl.java

@@ -1,15 +0,0 @@
-package com.elab.mq.msg.impl;
-
-import com.aliyun.openservices.ons.api.bean.ConsumerBean;
-import com.elab.mq.msg.IMsgConsumerFacade;
-
-/**
- * 消费者默认实现类
- *
- * @author Liukx
- * @create 2018-04-09 18:30
- * @email liukx@elab-plus.com
- **/
-public class MsgConsumerImpl extends ConsumerBean implements IMsgConsumerFacade {
-
-}

+ 6 - 0
elab-mq/src/main/java/com/elab/mq/msg/impl/MsgProducerImpl.java

@@ -10,6 +10,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Date;
+import java.util.Properties;
 
 /**
  * 消息生产包装类
@@ -56,4 +57,9 @@ public class MsgProducerImpl extends ProducerBean implements IMsgProducerFacade
         messageModel.setStartDeliverTime(datetime.getTime());
         return send(messageModel);
     }
+
+    @Override
+    public void setProperties(Properties properties) {
+        super.setProperties(properties);
+    }
 }

+ 0 - 39
elab-mq/src/main/java/com/elab/mq/msg/impl/MsgTransactionImpl.java

@@ -1,39 +0,0 @@
-package com.elab.mq.msg.impl;
-
-import com.aliyun.openservices.ons.api.SendResult;
-import com.aliyun.openservices.ons.api.impl.rocketmq.TransactionProducerImpl;
-import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.producer.TransactionCheckListener;
-import com.elab.mq.model.MessageModel;
-import com.elab.mq.model.SendResultModel;
-import com.elab.mq.msg.IMsgTransactionFacade;
-import com.elab.mq.msg.adptor.LocalTransactionExecuterAdaptor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Properties;
-
-/**
- * 事务消息实现
- *
- * @author Liukx
- * @create 2018-04-09 18:33
- * @email liukx@elab-plus.com
- **/
-//@Service
-public class MsgTransactionImpl extends TransactionProducerImpl implements IMsgTransactionFacade {
-
-    private Logger logger = LoggerFactory.getLogger(MsgTransactionImpl.class);
-
-    public MsgTransactionImpl(Properties properties, TransactionCheckListener transactionCheckListener) {
-        super(properties, transactionCheckListener);
-    }
-
-
-    @Override
-    public SendResultModel send(MessageModel messageModel, LocalTransactionExecuterAdaptor executer, Object obj) {
-        logger.info("执行一条事务消息 [" + messageModel.getMsgID() + "]: " + messageModel.toString() + " 执行器 : " + executer.getClass() + " 对象 ;" + obj.toString());
-        SendResult send = super.send(messageModel, executer, obj);
-        logger.info("事务消息执行结束 [" + messageModel + "]: ");
-        return new SendResultModel(send);
-    }
-}

+ 1 - 0
elab-mq/src/main/resources/META-INF/spring.factories

@@ -0,0 +1 @@
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.elab.mq.config.RocketMQConfiguration

+ 15 - 37
elab-mq/src/test/java/com/elab/mq/config/RocketMQConfigurationTest.java

@@ -1,58 +1,36 @@
 package com.elab.mq.config;
 
 
+import com.alibaba.fastjson.JSONObject;
 import com.aliyun.openservices.ons.api.*;
-import com.aliyun.openservices.ons.api.bean.ProducerBean;
 import com.elab.core.utils.RandomUtils;
+import com.elab.mq.model.MessageModel;
+import com.elab.mq.msg.IMsgProducerFacade;
+import com.elab.mq.msg.impl.MsgProducerImpl;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.Properties;
 
+//@RunWith(SpringJUnit4ClassRunner.class)
+//@ContextConfiguration(classes = RocketMQConfiguration.class)
+//@WebAppConfiguration("classpath:application.properties")
+//@TestPropertySource("classpath:application.properties")
 public class RocketMQConfigurationTest {
     String topic = "T_consumer2";
 
     @Test
     public void mqProperties() {
-        Properties properties = getProperties();
-        ProducerBean producerBean = new ProducerBean();
-        producerBean.setProperties(properties);
-//        producerBean.setCallbackExecutor(new AbstractExecutorService() {
-//            @Override
-//            public void shutdown() {
-//                System.out.println("-->shutdown");
-//            }
-//
-//            @Override
-//            public List<Runnable> shutdownNow() {
-//                return null;
-//            }
-//
-//            @Override
-//            public boolean isShutdown() {
-//                return false;
-//            }
-//
-//            @Override
-//            public boolean isTerminated() {
-//                return false;
-//            }
-//
-//            @Override
-//            public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
-//                return false;
-//            }
-//
-//            @Override
-//            public void execute(Runnable command) {
-//                System.out.println("execute ... ");
-//            }
-//        });
-        String text = "大家好 ... ";
+        IMsgProducerFacade producerBean = new MsgProducerImpl();
+        producerBean.setProperties(getProperties());
         producerBean.start();
+        JSONObject jsonObject = new JSONObject();
+        jsonObject.put("xxx", "哈哈");
+        jsonObject.put("sss", "ggg");
         for (int i = 0; i < 10; i++) {
             String key = RandomUtils.randomString(10);
-            Message message = new Message("T-consumer1", "*", key, text.getBytes());
+            MessageModel message = new MessageModel<Map>("T-consumer1", "*", key, jsonObject);
             SendResult send = producerBean.send(message);
             System.out.println("发送消息返回结果 : " + send.toString());
         }

+ 4 - 0
elab-mq/src/test/resources/application.properties

@@ -0,0 +1,4 @@
+elab.mq.AccessKey=LTAImNZed054h0YV
+elab.mq.SecretKey=8hmhlhiQ2ikmVeLKujwMNWsktFpSzm
+elab.mq.NAMESRV_ADDR=http://MQ_INST_1819241776271348_Bak3xjk0.mq-internet-access.mq-internet.aliyuncs.com:80
+elab.mq.GROUP_ID=GID-producer1

+ 20 - 0
elab-mq/src/test/resources/applicationContext.xml

@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:context="http://www.springframework.org/schema/context"
+       xsi:schemaLocation="http://www.springframework.org/schema/beans
+    http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
+    http://www.springframework.org/schema/context
+    http://www.springframework.org/schema/context/spring-context.xsd
+    ">
+    <!--<context:property-placeholder location="classpath:application.properties" />-->
+
+    <context:component-scan base-package="com.elab.mq">
+        <context:include-filter type="annotation" expression="org.springframework.stereotype.Service"/>
+        <context:include-filter type="annotation" expression="org.springframework.stereotype.Repository"/>
+        <context:include-filter type="annotation" expression="org.springframework.stereotype.Component"/>
+        <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
+    </context:component-scan>
+
+
+</beans>