浏览代码

新增功能:
优化代码

liukx@elab 6 年之前
父节点
当前提交
e670ab5b7f
共有 63 个文件被更改,包括 2209 次插入556 次删除
  1. 1 1
      elab-alert/pom.xml
  2. 1 1
      elab-annotation/pom.xml
  3. 1 1
      elab-cache/pom.xml
  4. 1 1
      elab-core/pom.xml
  5. 47 0
      elab-core/src/main/java/com/elab/core/exception/CheckException.java
  6. 31 0
      elab-core/src/main/java/com/elab/core/utils/GzipUtils.java
  7. 19 0
      elab-core/src/main/java/com/elab/core/utils/StringUtils.java
  8. 312 0
      elab-db/REDEME.md
  9. 1 1
      elab-db/pom.xml
  10. 20 0
      elab-db/src/main/java/com/elab/core/aop/annotations/EnableElabDB.java
  11. 1 1
      elab-db/src/main/java/com/elab/core/spring/method/DefaultSQLBuilderSupport.java
  12. 12 0
      elab-db/src/test/java/com.db.service/ITrainItemService.java
  13. 14 0
      elab-db/src/test/java/com.db.service/ITrainUserService.java
  14. 14 0
      elab-db/src/test/java/com.db.service/dao/ITrainItemDao.java
  15. 14 0
      elab-db/src/test/java/com.db.service/dao/ITrainUserDao.java
  16. 26 0
      elab-db/src/test/java/com.db.service/impl/TrainItemServiceImpl.java
  17. 25 0
      elab-db/src/test/java/com.db.service/impl/TrainUserServiceImpl.java
  18. 107 0
      elab-db/src/test/java/com.db.service/main/HxCase.java
  19. 143 0
      elab-db/src/test/java/com.db.service/main/RecommandCase.java
  20. 56 0
      elab-db/src/test/java/com.db.service/model/TrainItem.java
  21. 91 0
      elab-db/src/test/java/com.db.service/model/TrainUser.java
  22. 4 4
      elab-log/pom.xml
  23. 2 1
      elab-log/src/main/java/com/elab/log/asepct/CatAspect.java
  24. 4 9
      elab-log/src/main/java/com/elab/log/asepct/CatDaoAscept.java
  25. 12 10
      elab-log/src/main/java/com/elab/log/asepct/LogResponseBodyAdvice.java
  26. 5 5
      elab-log/src/main/java/com/elab/log/filter/HttpCatCrossFliter.java
  27. 1 1
      elab-mongodb/pom.xml
  28. 16 3
      elab-mongodb/src/main/java/com/elab/mongodb/BaseMongodb.java
  29. 82 0
      elab-mq/README.md
  30. 38 17
      elab-mq/pom.xml
  31. 114 0
      elab-mq/src/main/java/com/elab/mq/config/RocketMQConfiguration.java
  32. 0 120
      elab-mq/src/main/java/com/elab/mq/config/RocketMqConfigBean.java
  33. 14 0
      elab-mq/src/main/java/com/elab/mq/dao/IConsumerDao.java
  34. 15 0
      elab-mq/src/main/java/com/elab/mq/dao/IProducerDao.java
  35. 132 0
      elab-mq/src/main/java/com/elab/mq/listener/AbstractMessageListener.java
  36. 0 80
      elab-mq/src/main/java/com/elab/mq/listener/DefaultMessageListener.java
  37. 43 0
      elab-mq/src/main/java/com/elab/mq/listener/MessageListenerWrapper.java
  38. 195 0
      elab-mq/src/main/java/com/elab/mq/model/ConsumerEntity.java
  39. 0 34
      elab-mq/src/main/java/com/elab/mq/model/MessageExtModel.java
  40. 90 2
      elab-mq/src/main/java/com/elab/mq/model/MessageModel.java
  41. 184 0
      elab-mq/src/main/java/com/elab/mq/model/ProducerEntity.java
  42. 0 35
      elab-mq/src/main/java/com/elab/mq/msg/IConsumeProcessService.java
  43. 0 16
      elab-mq/src/main/java/com/elab/mq/msg/ILocalTransactionExecuter.java
  44. 0 14
      elab-mq/src/main/java/com/elab/mq/msg/IMessageListener.java
  45. 0 13
      elab-mq/src/main/java/com/elab/mq/msg/IMsgConsumerFacade.java
  46. 7 0
      elab-mq/src/main/java/com/elab/mq/msg/IMsgProducerFacade.java
  47. 0 23
      elab-mq/src/main/java/com/elab/mq/msg/IMsgTransactionFacade.java
  48. 0 12
      elab-mq/src/main/java/com/elab/mq/msg/ISendCallbackFacade.java
  49. 0 13
      elab-mq/src/main/java/com/elab/mq/msg/ITransactionCheckListener.java
  50. 1 1
      elab-mq/src/main/java/com/elab/mq/msg/adptor/LocalTransactionExecuterAdaptor.java
  51. 0 25
      elab-mq/src/main/java/com/elab/mq/msg/adptor/MessageListenerAdaptor.java
  52. 0 25
      elab-mq/src/main/java/com/elab/mq/msg/adptor/TransactionCheckListenerAdaptor.java
  53. 0 15
      elab-mq/src/main/java/com/elab/mq/msg/impl/MsgConsumerImpl.java
  54. 66 0
      elab-mq/src/main/java/com/elab/mq/msg/impl/MsgProducerImpl.java
  55. 0 39
      elab-mq/src/main/java/com/elab/mq/msg/impl/MsgTransactionImpl.java
  56. 1 0
      elab-mq/src/main/resources/META-INF/spring.factories
  57. 4 0
      elab-mq/src/test/resources/application.properties
  58. 14 1
      elab-spring/pom.xml
  59. 28 22
      elab-spring/src/main/java/com/elab/spring/exception/CommonException.java
  60. 104 0
      elab-spring/src/main/java/com/elab/spring/intercept/CheckAnnotationProcess.java
  61. 41 8
      elab-spring/src/main/java/com/elab/spring/utils/RestTemplateUtils.java
  62. 54 1
      elab-spring/src/test/java/com/elab/spring/utils/RestTemplateUtilsTest.java
  63. 1 1
      pom.xml

+ 1 - 1
elab-alert/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>elab-parent</artifactId>
         <groupId>com.elab.core</groupId>
-        <version>2.0.4.10-SNAPSHOT</version>
+        <version>2.0.4.11-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>elab-alert</artifactId>

+ 1 - 1
elab-annotation/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>elab-parent</artifactId>
         <groupId>com.elab.core</groupId>
-        <version>2.0.4.10-SNAPSHOT</version>
+        <version>2.0.4.11-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

+ 1 - 1
elab-cache/pom.xml

@@ -6,7 +6,7 @@
     <parent>
         <groupId>com.elab.core</groupId>
         <artifactId>elab-parent</artifactId>
-        <version>2.0.4.10-SNAPSHOT</version>
+        <version>2.0.4.11-SNAPSHOT</version>
     </parent>
     <groupId>com.elab.cache</groupId>
     <artifactId>elab-cache</artifactId>

+ 1 - 1
elab-core/pom.xml

@@ -7,7 +7,7 @@
     <parent>
         <groupId>com.elab.core</groupId>
         <artifactId>elab-parent</artifactId>
-        <version>2.0.4.10-SNAPSHOT</version>
+        <version>2.0.4.11-SNAPSHOT</version>
     </parent>
 
     <groupId>com.elab.core</groupId>

+ 47 - 0
elab-core/src/main/java/com/elab/core/exception/CheckException.java

@@ -0,0 +1,47 @@
+package com.elab.core.exception;
+
+/**
+ * 检测异常标识
+ *
+ * @author liuhx on 2016/12/8 15:57
+ * @version V1.0
+ * @email liuhx@elab-plus.com
+ */
+public class CheckException extends RuntimeException {
+
+    private String errorCode;
+
+    public CheckException() {
+        super();
+    }
+
+    public CheckException(String message) {
+        super(message);
+    }
+
+    public CheckException(String errorCode, String message) {
+        super(message);
+        this.errorCode = errorCode;
+    }
+
+    public CheckException(Throwable cause) {
+        super(cause);
+    }
+
+    public CheckException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    @Override
+    public String getMessage() {
+        return super.getMessage();
+    }
+
+    public String getErrorCode() {
+        return errorCode;
+    }
+
+    public void setErrorCode(String errorCode) {
+        this.errorCode = errorCode;
+    }
+}

+ 31 - 0
elab-core/src/main/java/com/elab/core/utils/GzipUtils.java

@@ -0,0 +1,31 @@
+package com.elab.core.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.zip.GZIPInputStream;
+
+/**
+ * @describe :压缩解压工具类
+ * @Author : liukx
+ * @time : 2019/2/26 - 16:33
+ */
+public class GzipUtils {
+
+    public static String unZip(byte[] b) throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        ByteArrayInputStream in;
+        GZIPInputStream gunzip = null;
+
+        in = new ByteArrayInputStream(b);
+        gunzip = new GZIPInputStream(in);
+        byte[] buffer = new byte[256];
+        int n;
+        while ((n = gunzip.read(buffer)) >= 0) {
+            out.write(buffer, 0, n);
+        }
+        return out.toString();
+    }
+
+
+}

+ 19 - 0
elab-core/src/main/java/com/elab/core/utils/StringUtils.java

@@ -20,6 +20,7 @@ public class StringUtils {
     private static String localIp;
     private static DecimalFormat df = new DecimalFormat("##,###.00");
     private static NumberFormat nf = NumberFormat.getInstance();
+    private final static Integer maxStringLength = 500;
 
     public StringUtils() {
     }
@@ -620,4 +621,22 @@ public class StringUtils {
         return null;
     }
 
+    /**
+     * 字符串输出规则
+     *
+     * @param str
+     * @return
+     */
+    public static String logOut(String str) {
+        if (isEmpty(str)) {
+            return "null";
+        }
+
+        if (str.length() > maxStringLength) {
+            return str.substring(0, maxStringLength) + " 省略...";
+        }
+
+        return str;
+    }
+
 }

+ 312 - 0
elab-db/REDEME.md

@@ -0,0 +1,312 @@
+# Elab-db 使用介绍
+
+## 更新介绍:
+
+版本号: 2.0.4.10
+
+- 新增批量新增功能
+- 批量修改,参考`IBaseDaoSupport`
+- 新增SQL检测功能
+  - join 表超过3张会提示警告。
+  - union 超过3张会提示警告。
+
+#### 版本号 : 2.0.4.4
+- 增加通用的service业务处理
+	- 接口继承 : ICommonService<?>
+	- 实现继承 : CommonServiceAdaptor<?>
+**业务特殊可以通过重写来覆盖**
+
+- 查询分页结果的时候,可以指定order by
+```
+pageModel.setOrderby("order by id desc");
+```
+
+- 如果涉及多个库的增删改查操作如何指定库?
+**通过catalog来指定,效果是在生成的sql语句中表名会变成 mvp.file 查询**
+```
+@Table(name = "file",catalog = "mvp")
+```
+
+
+修复若干BUG.
+
+
+## 功能描述
+- 单表的增删改差不需要写单独的SQL
+- 动态参数通过$指定,但前提是必须要有值
+- DAO只需要接口,就能够找到对应的sql文件中的sql
+- 实现基于一对多的关系映射
+- 参数可以通过对象或者Map的方式指定,SQL文件中的参数可以自由指定,没有限制
+
+### 配置
+基于注解配置:
+```java
+@Configuration
+public class JdbcBeanConfig {
+
+    @Bean
+  public JdbcTemplate jdbcTemplate(@Autowired DataSource dataSource) {
+	 JdbcTemplate jdbcTemplate = new JdbcTemplate();
+	 jdbcTemplate.setDataSource(dataSource);
+	 return jdbcTemplate;
+  }
+
+    @Bean
+  public BasicBaseDao basicBaseDao(@Autowired JdbcTemplate jdbcTemplate) {
+	 BasicBaseDao basicBaseDao = new BasicBaseDao();
+	 basicBaseDao.setJdbcTemplate(jdbcTemplate);
+	 return basicBaseDao;
+  }
+
+    @Bean
+  public ConfigurableFactory getConfigurableFactory() {
+	ConfigurableFactory configurableFactory = new ConfigurableFactory();
+	// 配置文件的文件位置
+	configurableFactory.setSqlConfigurableLocations("sql");
+    return configurableFactory;
+  }
+
+    @Bean
+  public DaoScannerConfigurer daoScannerConfigurer(@Autowired ConfigurableFactory configurableFactory) {
+	  DaoScannerConfigurer daoScannerConfigurer = new DaoScannerConfigurer();
+	  // db层要扫描的包
+	  daoScannerConfigurer.setBasePackage("com.elab.service.business.daos.*");
+	  // 持久层操作对象
+	  daoScannerConfigurer.setBasicBaseDaoName("basicBaseDao");
+	  daoScannerConfigurer.setConfigurableFactory(configurableFactory);
+	 return daoScannerConfigurer;
+  }
+
+}
+
+```
+
+基于配置文件配置:
+```xml
+
+<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
+        <property name="dataSource" ref="mysqlDataSource"/>
+    </bean>
+
+    <bean id="basicBaseDao" class="com.elab.core.dao.BasicBaseDao">
+        <property name="jdbcTemplate" ref="jdbcTemplate"/>
+    </bean>
+
+    <!-- 构建一个配置工厂 -->
+    <bean id="configurableFactory" class="com.elab.core.sql.ConfigurableFactory">
+        <property name="sqlConfigurableLocations" value="sql"/>
+    </bean>
+
+    <!-- 扫描包 -->
+    <bean id="daoScannerConfigurer" class="com.elab.core.spring.DaoScannerConfigurer">
+        <property name="basePackage" value="com.db.service.dao"/>
+        <property name="basicBaseDaoName" value="basicBaseDao"/>
+        <property name="configurableFactory" ref="configurableFactory"/>
+    </bean>
+```
+
+
+### 强制操作
+#### 接口层
+1. 必须实现IBaseDaoSupport接口,这样才会受框架的扫描
+```java
+// 指定配置文件的前缀路径 , 注意这里的配置文件为test-sql.xml , <TTest>构造参数对应的就是数据库的实体
+@XmlGroupName("test")
+public interface ITestDao extends IBaseDaoSupport<TTest> {
+
+   public TTest getTestObject(String id);
+
+   public List getTestList(TTest id);
+
+}
+
+```
+2. SQL文件通常放在resources/sql下面
+
+SQL文件的规范参考
+
+```xml
+<!-- 这里面的name对应dao的接口层指定的XmlGroupName的值 -->
+<sqlGroup name="test">
+    <!-- 对应dao的接口层的方法 -->
+    <sql id="getTestObject">
+        select
+        id
+        ,username,name,sex,status,created,time,test_id,love_name
+        from t_test
+        where
+        id = :id
+    </sql>
+</sqlGroup>
+```
+
+- 规范类似Mybatis.
+- 增删改不需要指定,框架会通过语句识别执行sql类型
+- 简单的增删改查不需要写SQL,复杂的增删改查需要手动写。不用写的sql在IBaseDaoSupport已经写明了。(Mybatis类似)
+
+#### 实体层
+
+- @Table 指定表名
+- @Id : 指定主键, 默认以id字段为主键
+- @Column 当数据库列名和实体属性名不一致时,可以使用该注解指定
+- @Ignore 忽略当前实体的字段,非数据库字段可以通过这个注解指定
+- @JoinTable 表关联操作
+	- schema 对应的sql.xml配置文件中的,sqlGroup的name加上sql的id , 参考test.selectByExample
+	- joinColumns 关联对象的属性名和数据库列名对应
+		- name 属性名称
+		- referencedColumnName  -> sql语句中的条件名称
+
+
+参考实体 : **这里的一对多关系会根据你的属性对象来确定是返回一个或者多个**
+```java
+
+@Table(name = "t_test")
+public class TTest {
+ // 表字段 : t_test.id//    @javax.persistence.Column(name="ids")
+  @Id
+  private Integer id;
+
+ // 表字段 : t_test.test_id  @Column(name = "test_id")
+  @Column(name = "test_id")
+  private String testId;
+
+  @Ignore
+  private String girlName;
+
+// 对应的sql文件中的编号,参数就是JoinColumn中需要将实体和sql中的参数对应
+  @JoinTable(schema = "test.selectByExample", joinColumns = {
+            @JoinColumn(name = "id", referencedColumnName = "id"),
+  @JoinColumn(name = "status", referencedColumnName = "status")
+    })
+    private TTest test;
+
+  @JoinTable(schema = "test.selectByExample", joinColumns = {
+            @JoinColumn(name = "testId", referencedColumnName = "test_id")
+    })
+    private List testList;
+```
+
+## 常用操作
+
+下面是集成自IBaseDaoSupport的操作
+
+1. 添加操作
+```java
+@Test
+public void testInserCase() throws Exception {
+    TTest test = new TTest();
+	test.setStatus("1");
+	test.setUsername("某某某xx111");
+	// 这里是非手写的语句,集成自IBaseDaoSupport操作
+	int insert = testDao.insert(test);
+	System.out.println(insert);
+}
+```
+2. 修改
+```java
+
+@Test
+public void testUpdateCase() throws Exception {
+	TTest test = new TTest();
+	test.setId(1);
+	test.setStatus("1");
+	test.setUsername("某某某xx33333333");
+	int insert = testDao.updateById(test);
+	System.out.println(insert);
+}
+```
+
+3. 查询
+
+```java
+@Test
+  public void testQueryCase() throws Exception {
+        TTest test = new TTest();
+	test.setId(1123);
+	test.setStatus("1");
+  //        test.setUsername("某某某xx2121212");
+	List tTests = testDao.selectByList(test);
+	test.setId(1);
+	TTest test1 = testDao.selectByObject(test);
+	System.out.println(test1.toString());
+	System.out.println(tTests.size());
+  }
+
+```
+
+
+
+## 特殊操作
+
+### 动态参数
+1. 例如动态表名,或者动态字段
+```xml
+ 1. select * from $table
+ 2. selct * from table order  by $column
+```
+1. 传值的时候,就传table这个字段
+2. 传值的时候就传column -> id asc 等等
+
+## bean对象手动设置
+1. 实现RowMapper接口
+```
+// 该类已经是最底层的赋值方法了,可以根据具体业务去实现不同的处理
+public class TestRowMapper implements RowMapper {
+
+    @Override
+  public TTest mapRow(ResultSet resultSet, int i) throws SQLException {
+	  TTest test = new TTest();
+	  int id = resultSet.getInt("id");
+	  String username = resultSet.getString("username");
+	  String name = resultSet.getString("name");
+	  test.setId(id);
+	  test.setUsername(username);
+	  test.setName(name);
+	  return test;
+  }
+}
+
+// 然后接口层
+可以通过公用的selectByMapper(参数,new TestRowMapper());
+
+TTest test = new TTest();
+test.setStatus("1");
+TestRowMapper mapper = new TestRowMapper();
+List tTests1 = testDao.selectByList(test);
+List tTests = testDao.selectByMapper(test, mapper);
+
+```
+
+### in like 操作
+如果是in like 等函数:
+sql.xml中
+```java
+// sql
+select * from table where name like :name
+// java
+model.setName("%mmm%");
+
+// sql
+select * from table where id in :idList
+// java
+List<String> list = new ArrayList();
+list.add("1");
+list.add("2");
+list.add("3");
+// 上面最好用对象包这这个list
+
+```
+
+# 使用建议
+1. 如果出现(columnA = :A or columnB = :B) 这种括号条件的话....希望传值不能为空!
+2. 传值的时候参数必须带有一个以上!
+
+## 加速开发
+
+### 自动生成代码
+
+[内部生成代码网站](http://192.168.0.25:8889/share;JSESSIONID=aa82cbce-e6d8-4816-af7f-fe0e01eb77f5)
+
+> 帐号: admin 密码:111111
+
+选择表结构生成特定的内部代码结构。

+ 1 - 1
elab-db/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <groupId>com.elab.core</groupId>
         <artifactId>elab-parent</artifactId>
-        <version>2.0.4.10-SNAPSHOT</version>
+        <version>2.0.4.11-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>elab-db</artifactId>

+ 20 - 0
elab-db/src/main/java/com/elab/core/aop/annotations/EnableElabDB.java

@@ -0,0 +1,20 @@
+package com.elab.core.aop.annotations;
+
+import com.elab.core.spring.config.DataSourceConfigBean;
+import com.elab.core.spring.config.JdbcBeanConfig;
+import com.elab.core.spring.config.TransactionConfigBean;
+import org.springframework.context.annotation.Import;
+import org.springframework.core.annotation.AliasFor;
+
+import java.lang.annotation.*;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+@Documented
+@Import({DataSourceConfigBean.class, JdbcBeanConfig.class, TransactionConfigBean.class})
+public @interface EnableElabDB {
+
+    @AliasFor("basePackages")
+    String[] scanBasePackages() default {};
+
+}

+ 1 - 1
elab-db/src/main/java/com/elab/core/spring/method/DefaultSQLBuilderSupport.java

@@ -75,7 +75,7 @@ public class DefaultSQLBuilderSupport implements ISQLBuliderSupport {
         String id = entityOperation.pkField.getName();
         StringBuffer sb = new StringBuffer();
         // 这里默认会将主键放在第一位
-        sb.append(" select " + id + "," + allColumn + " from " + tableName + " where " + id + " = ? ");
+        sb.append(" select " + allColumn + " from " + tableName + " where " + id + " = ? ");
         return sb.toString();
     }
 

+ 12 - 0
elab-db/src/test/java/com.db.service/ITrainItemService.java

@@ -0,0 +1,12 @@
+package com.db.service;
+
+import com.db.service.model.TrainItem;
+import com.elab.core.services.ICommonService;
+
+/**
+ * @author liuhx
+ * @create 2019/04/02 10:15
+ * @email liuhx@elab-plus.com
+ **/
+public interface ITrainItemService extends ICommonService<TrainItem> {
+}

+ 14 - 0
elab-db/src/test/java/com.db.service/ITrainUserService.java

@@ -0,0 +1,14 @@
+package com.db.service;
+
+import com.db.service.model.TrainUser;
+import com.elab.core.services.ICommonService;
+
+/**
+ * @author liuhx
+ * @create 2019/04/02 10:14
+ * @email liuhx@elab-plus.com
+ **/
+public interface ITrainUserService extends ICommonService<TrainUser> {
+
+
+}

+ 14 - 0
elab-db/src/test/java/com.db.service/dao/ITrainItemDao.java

@@ -0,0 +1,14 @@
+package com.db.service.dao;
+
+import com.db.service.model.TrainItem;
+import com.elab.core.aop.annotations.XmlGroupName;
+import com.elab.core.dao.IBaseDaoSupport;
+
+/**
+ * @author liuhx
+ * @create 2019/04/02 10:36
+ * @email liuhx@elab-plus.com
+ **/
+@XmlGroupName("trainItem")
+public interface ITrainItemDao extends IBaseDaoSupport<TrainItem> {
+}

+ 14 - 0
elab-db/src/test/java/com.db.service/dao/ITrainUserDao.java

@@ -0,0 +1,14 @@
+package com.db.service.dao;
+
+import com.db.service.model.TrainUser;
+import com.elab.core.aop.annotations.XmlGroupName;
+import com.elab.core.dao.IBaseDaoSupport;
+
+/**
+ * @author liuhx
+ * @create 2019/04/02 10:24
+ * @email liuhx@elab-plus.com
+ **/
+@XmlGroupName("TrainUser")
+public interface ITrainUserDao extends IBaseDaoSupport<TrainUser> {
+}

+ 26 - 0
elab-db/src/test/java/com.db.service/impl/TrainItemServiceImpl.java

@@ -0,0 +1,26 @@
+package com.db.service.impl;
+
+import com.db.service.ITrainItemService;
+import com.db.service.dao.ITrainItemDao;
+import com.db.service.model.TrainItem;
+import com.elab.core.dao.IBaseDaoSupport;
+import com.elab.core.services.impl.CommonServiceAdaptor;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * @author liuhx
+ * @create 2019/04/02 10:35
+ * @email liuhx@elab-plus.com
+ **/
+@Service
+public class TrainItemServiceImpl extends CommonServiceAdaptor<TrainItem> implements ITrainItemService {
+
+    @Autowired
+    private ITrainItemDao trainItemDao;
+
+    @Override
+    protected IBaseDaoSupport<TrainItem> getBaseDaoSupport() {
+        return trainItemDao;
+    }
+}

+ 25 - 0
elab-db/src/test/java/com.db.service/impl/TrainUserServiceImpl.java

@@ -0,0 +1,25 @@
+package com.db.service.impl;
+
+import com.db.service.ITrainUserService;
+import com.db.service.dao.ITrainUserDao;
+import com.db.service.model.TrainUser;
+import com.elab.core.dao.IBaseDaoSupport;
+import com.elab.core.services.impl.CommonServiceAdaptor;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * @author liuhx
+ * @create 2019/04/02 10:23
+ * @email liuhx@elab-plus.com
+ **/
+@Service
+public class TrainUserServiceImpl extends CommonServiceAdaptor<TrainUser> implements ITrainUserService {
+
+    @Autowired
+    private ITrainUserDao trainUserDao;
+    @Override
+    protected IBaseDaoSupport<TrainUser> getBaseDaoSupport() {
+        return trainUserDao;
+    }
+}

+ 107 - 0
elab-db/src/test/java/com.db.service/main/HxCase.java

@@ -0,0 +1,107 @@
+package com.db.service.main;
+
+import com.db.service.ITrainItemService;
+import com.db.service.ITrainUserService;
+import com.db.service.model.TrainUser;
+import com.elab.core.utils.ObjectUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * @author liuhx
+ * @create 2019/04/02 16:22
+ * @email liuhx@elab-plus.com
+ **/
+@RunWith(SpringJUnit4ClassRunner.class)  //使用junit4进行测试
+@ContextConfiguration
+        ({"classpath:applicationContext-*.xml",})
+public class HxCase {
+    @Autowired
+    private ITrainUserService trainUserService;
+    @Autowired
+    private ITrainItemService trainItemService;
+
+    public List<TrainUser> findByUserList() throws Exception {
+        TrainUser user = new TrainUser();
+        List<TrainUser> list = trainUserService.selectByList(user);
+        return list;
+    }
+
+    @Test
+    public void getUserItemSize() throws Exception {
+        List<TrainUser> resultList = findByUserList();
+        //建立物品到用户的倒排表 eg: a A B
+        Map<Integer, List<TrainUser>> itemList = resultList.stream()
+                .filter(trainUser -> ObjectUtils.isNotEmpty(trainUser.getItemId()))
+                .collect(Collectors.groupingBy(TrainUser::getItemId));
+
+        Map<Integer, List<TrainUser>> userList = resultList.stream()
+                .filter(trainUser -> ObjectUtils.isNotEmpty(trainUser.getUserId()))
+                .collect(Collectors.groupingBy(TrainUser::getUserId));
+
+
+        System.out.println(itemList.size());
+        int userSize = userList.size();
+        /*建立用户稀疏矩阵,用于用户相似度计算【相似度矩阵】*/
+        int[][] sparseMatrix = new int[userSize][userSize];
+
+        Map<Integer, Integer> userItemLength = new HashMap<>();//存储每一个用户ID对应的不同物品总数
+        Map<Integer, Integer> userID = new HashMap<>();//辅助存储每一个用户的用户ID映射
+        Map<Integer, Integer> idUser = new HashMap<>();//辅助存储每一个ID对应的用户映射
+        Set<Integer> items = new HashSet<>();//辅助存储物品集合,这里对应存的是物品的id
+
+        AtomicInteger count = new AtomicInteger();
+        userList.entrySet().iterator().forEachRemaining(user->{
+            userItemLength.put(user.getKey(), user.getValue().size());
+            userID.put(user.getKey(), count.intValue());//用户ID与稀疏矩阵建立对应关系
+            idUser.put(count.intValue(), user.getKey());
+            count.addAndGet(1);
+        });
+        itemList.entrySet().iterator().forEachRemaining(item->{
+            items.add(item.getKey());
+        });
+
+        //计算相似度矩阵【稀疏】
+        Set<Map.Entry<Integer, List<TrainUser>>> entrySet = itemList.entrySet();
+        Iterator<Map.Entry<Integer, List<TrainUser>>> iterator = entrySet.iterator();
+        while(iterator.hasNext()) {
+            List<TrainUser> commonUsers = iterator.next().getValue();
+            for (TrainUser user_u : commonUsers) {
+                for (TrainUser user_v : commonUsers) {
+                    if(user_u.getUserId().equals(user_v.getUserId())){
+                        continue;
+                    }
+                    sparseMatrix[userID.get(user_u.getUserId())][userID.get(user_v.getUserId())] += 1;//计算用户u与用户v都有正反馈的物品总数
+                }
+            }
+        }
+
+        int recommendUser = 1;
+        //计算用户之间的相似度【余弦相似性】
+        int recommendUserId = userID.get(recommendUser);
+        for (int j = 0;j < sparseMatrix.length; j++) {
+            if(j != recommendUserId){
+                System.out.println(idUser.get(recommendUserId)+"--"+idUser.get(j)+"相似度:"+sparseMatrix[recommendUserId][j]/Math.sqrt(userItemLength.get(idUser.get(recommendUserId))*userItemLength.get(idUser.get(j))));
+            }
+        }
+        //计算指定用户recommendUser的物品推荐度
+        for(Integer item: items){//遍历每一件物品
+            List<TrainUser> users = itemList.get(item);//得到购买当前物品的所有用户集合
+            List<Integer> integers = users.stream().filter(trainUser -> ObjectUtils.isNotEmpty(trainUser.getUserId())).map(TrainUser::getUserId).collect(Collectors.toList());
+            if(!integers.contains(recommendUser)){//如果被推荐用户没有购买当前物品,则进行推荐度计算
+                double itemRecommendDegree = 0.0;
+                for(TrainUser user: users){
+                    itemRecommendDegree += sparseMatrix[userID.get(recommendUser)][userID.get(user.getUserId())]/Math.sqrt(userItemLength.get(recommendUser)*userItemLength.get(user.getUserId()));//推荐度计算
+                }
+                System.out.println("The item "+item+" for "+recommendUser +"'s recommended degree:"+itemRecommendDegree);
+            }
+        }
+    }
+}

+ 143 - 0
elab-db/src/test/java/com.db.service/main/RecommandCase.java

@@ -0,0 +1,143 @@
+/*
+package com.db.service.main;
+
+import com.db.service.ITrainItemService;
+import com.db.service.ITrainUserService;
+import com.db.service.model.TrainItem;
+import com.db.service.model.TrainUser;
+import com.elab.core.utils.ObjectUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+*/
+/**
+ * @author liuhx
+ * @create 2019/04/02 10:09
+ * @email liuhx@elab-plus.com
+ **//*
+
+@RunWith(SpringJUnit4ClassRunner.class)  //使用junit4进行测试
+@ContextConfiguration
+        ({"classpath:applicationContext-*.xml",})
+public class RecommandCase {
+
+    @Autowired
+    private ITrainUserService trainUserService;
+    @Autowired
+    private ITrainItemService trainItemService;
+
+    public List<TrainUser> findByUserList() throws Exception {
+        TrainUser user = new TrainUser();
+        List<TrainUser> list = trainUserService.selectByList(user);
+        return list;
+    }
+
+    public List<TrainItem> findByItemList() throws Exception {
+        TrainItem item = new TrainItem();
+        List<TrainItem> list = trainItemService.selectByList(item);
+        return list;
+    }
+
+    @Test
+    public void getUserItemSize() throws Exception {
+        List<TrainUser> userList = findByUserList();
+        Map<Integer, List<TrainUser>> resultList = userList.stream()
+                .filter(trainUser -> ObjectUtils.isNotEmpty(trainUser.getItemId()))
+                .collect(Collectors.groupingBy(TrainUser::getItemId));
+        System.out.println(resultList.size());
+        int userSize = resultList.size();
+        */
+/*建立用户稀疏矩阵,用于用户相似度计算【相似度矩阵】*//*
+
+        int[][] sparseMatrix = new int[userSize][userSize];
+        Map<Integer, Integer> userItemLength = new HashMap<>();//存储每一个用户对应的不同物品总数  eg: A 3
+        Map<Integer, Set<Integer>> itemUserCollection = new HashMap<>();//建立物品到用户的倒排表 eg: a A B
+        Set<Integer> items = new HashSet<>();//辅助存储物品集合
+        Map<Integer, Integer> userID = new HashMap<>();//辅助存储每一个用户的用户ID映射
+        Map<Integer, Integer> idUser = new HashMap<>();//辅助存储每一个ID对应的用户映射
+
+        for(int i = 0; i < userSize ; i++) { */
+/*依次处理userSize个用户 *//*
+
+            int userId = resultList.entrySet().stream().collect(Collectors.toList()).get(i).getKey();
+            Object[] objects = resultList.entrySet().stream().collect(Collectors.toList()).get(i).getValue().toArray();
+            int length = objects.length;
+            userItemLength.put(userId, length-1);//eg: A 3
+//            String[] user_item = scanner.nextLine().split(" ");
+//            int length = user_item.length;
+//            userItemLength.put(user_item[0], length-1);//eg: A 3
+            userID.put(userId, i);//用户ID与稀疏矩阵建立对应关系
+            idUser.put(i, userId);
+//            //建立物品--用户倒排表
+            for (int j = 1; j < length; j ++) {
+                int itemId = ((TrainUser)objects[j]).getItemId();
+                if(items.contains(itemId)){//如果已经包含对应的物品--用户映射,直接添加对应的用户
+                    itemUserCollection.get(itemId).add(userId);
+                }else{//否则创建对应物品--用户集合映射
+                    items.add(itemId);
+                    itemUserCollection.put(itemId, new HashSet<Integer>());//创建物品--用户倒排关系
+                    itemUserCollection.get(itemId).add(userId);
+                }
+            }
+
+
+
+
+//            //计算指定用户recommendUser的物品推荐度
+//            for(String item: items){//遍历每一件物品
+//                Set<String> users = itemUserCollection.get(item);//得到购买当前物品的所有用户集合
+//                if(!users.contains(recommendUser)){//如果被推荐用户没有购买当前物品,则进行推荐度计算
+//                    double itemRecommendDegree = 0.0;
+//                    for(String user: users){
+//                        itemRecommendDegree += sparseMatrix[userID.get(recommendUser)][userID.get(user)]/Math.sqrt(userItemLength.get(recommendUser)*userItemLength.get(user));//推荐度计算
+//                    }
+//                    System.out.println("The item "+item+" for "+recommendUser +"'s recommended degree:"+itemRecommendDegree);
+//                }
+//            }
+
+        }
+
+        //计算相似度矩阵【稀疏】
+        Set<Map.Entry<Integer, Set<Integer>>> entrySet = itemUserCollection.entrySet();
+        Iterator<Map.Entry<Integer, Set<Integer>>> iterator = entrySet.iterator();
+        while(iterator.hasNext()) {
+            Set<Integer> commonUsers = iterator.next().getValue();
+            for (Integer user_u : commonUsers) {
+                for (Integer user_v : commonUsers) {
+                    if(user_u.equals(user_v)){
+                        continue;
+                    }
+                    sparseMatrix[userID.get(user_u)][userID.get(user_v)] += 1;//计算用户u与用户v都有正反馈的物品总数
+                }
+            }
+        }
+
+        int recommendUser = 1;
+        //计算用户之间的相似度【余弦相似性】
+        int recommendUserId = userID.get(recommendUser);
+        for (int j = 0;j < sparseMatrix.length; j++) {
+            if(j != recommendUserId){
+                System.out.println(idUser.get(recommendUserId)+"--"+idUser.get(j)+"相似度:"+sparseMatrix[recommendUserId][j]/Math.sqrt(userItemLength.get(idUser.get(recommendUserId))*userItemLength.get(idUser.get(j))));
+            }
+        }
+
+        //计算指定用户recommendUser的物品推荐度
+        for(Integer item: items){//遍历每一件物品
+            Set<Integer> users = itemUserCollection.get(item);//得到购买当前物品的所有用户集合
+            if(!users.contains(recommendUser)){//如果被推荐用户没有购买当前物品,则进行推荐度计算
+                double itemRecommendDegree = 0.0;
+                for(Integer user: users){
+                    itemRecommendDegree += sparseMatrix[userID.get(recommendUser)][userID.get(user)]/Math.sqrt(userItemLength.get(recommendUser)*userItemLength.get(user));//推荐度计算
+                }
+                System.out.println("The item "+item+" for "+recommendUser +"'s recommended degree:"+itemRecommendDegree);
+            }
+        }
+    }
+}
+*/

+ 56 - 0
elab-db/src/test/java/com.db.service/model/TrainItem.java

@@ -0,0 +1,56 @@
+package com.db.service.model;
+
+import javax.persistence.Id;
+import javax.persistence.Table;
+
+/**
+ * @author liuhx
+ * @create 2019/04/01 18:48
+ * @email liuhx@elab-plus.com
+ **/
+@Table(name = "tianchi_fresh_comp_train_item_2w")
+public class TrainItem {
+    @Id
+    private Integer id;
+
+    @javax.persistence.Column(name="item_id")
+    private Integer itemId;
+
+    @javax.persistence.Column(name="item_geohash")
+    private String itemGeohash;
+
+    @javax.persistence.Column(name="item_category")
+    private String itemCategory;
+
+    public Integer getId() {
+        return id;
+    }
+
+    public void setId(Integer id) {
+        this.id = id;
+    }
+
+    public Integer getItemId() {
+        return itemId;
+    }
+
+    public void setItemId(Integer itemId) {
+        this.itemId = itemId;
+    }
+
+    public String getItemGeohash() {
+        return itemGeohash;
+    }
+
+    public void setItemGeohash(String itemGeohash) {
+        this.itemGeohash = itemGeohash;
+    }
+
+    public String getItemCategory() {
+        return itemCategory;
+    }
+
+    public void setItemCategory(String itemCategory) {
+        this.itemCategory = itemCategory;
+    }
+}

+ 91 - 0
elab-db/src/test/java/com.db.service/model/TrainUser.java

@@ -0,0 +1,91 @@
+package com.db.service.model;
+
+import javax.persistence.Id;
+import javax.persistence.Table;
+
+/**
+ * @author liuhx
+ * @create 2019/04/01 18:52
+ * @email liuhx@elab-plus.com
+ **/
+@Table(name = "tianchi_fresh_comp_train_user_2w")
+public class TrainUser {
+
+    @Id
+    private Integer id;
+
+    @javax.persistence.Column(name="user_id")
+    private Integer userId;
+
+    @javax.persistence.Column(name="item_id")
+    private Integer itemId;
+
+    @javax.persistence.Column(name="behavior_type")
+    private String behaviorType;
+
+    @javax.persistence.Column(name="user_geohash")
+    private String userGeohash;
+
+    @javax.persistence.Column(name="item_category")
+    private String itemCategory;
+
+    @javax.persistence.Column(name="time")
+    private String time;
+
+
+    public Integer getId() {
+        return id;
+    }
+
+    public void setId(Integer id) {
+        this.id = id;
+    }
+
+    public Integer getUserId() {
+        return userId;
+    }
+
+    public void setUserId(Integer userId) {
+        this.userId = userId;
+    }
+
+    public Integer getItemId() {
+        return itemId;
+    }
+
+    public void setItemId(Integer itemId) {
+        this.itemId = itemId;
+    }
+
+    public String getBehaviorType() {
+        return behaviorType;
+    }
+
+    public void setBehaviorType(String behaviorType) {
+        this.behaviorType = behaviorType;
+    }
+
+    public String getUserGeohash() {
+        return userGeohash;
+    }
+
+    public void setUserGeohash(String userGeohash) {
+        this.userGeohash = userGeohash;
+    }
+
+    public String getItemCategory() {
+        return itemCategory;
+    }
+
+    public void setItemCategory(String itemCategory) {
+        this.itemCategory = itemCategory;
+    }
+
+    public String getTime() {
+        return time;
+    }
+
+    public void setTime(String time) {
+        this.time = time;
+    }
+}

+ 4 - 4
elab-log/pom.xml

@@ -7,7 +7,7 @@
     <parent>
         <groupId>com.elab.core</groupId>
         <artifactId>elab-parent</artifactId>
-        <version>2.0.4.10-SNAPSHOT</version>
+        <version>2.0.4.11-SNAPSHOT</version>
     </parent>
 
     <groupId>com.elab.log</groupId>
@@ -41,9 +41,9 @@
         </dependency>
 
         <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-        </dependency>
+        <groupId>org.slf4j</groupId>
+        <artifactId>slf4j-api</artifactId>
+    </dependency>
         <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
         <dependency>
             <groupId>org.slf4j</groupId>

+ 2 - 1
elab-log/src/main/java/com/elab/log/asepct/CatAspect.java

@@ -8,6 +8,7 @@ import com.elab.core.bean.Info;
 import com.elab.core.exception.BusinessException;
 import com.elab.core.exception.CoreException;
 import com.elab.core.utils.ObjectUtils;
+import com.elab.core.utils.StringUtils;
 import org.aspectj.lang.ProceedingJoinPoint;
 import org.aspectj.lang.reflect.MethodSignature;
 import org.slf4j.Logger;
@@ -59,7 +60,7 @@ public class CatAspect {
             }
         }
         Transaction t = Cat.newTransaction(value, fullMethodName);
-        logger.info(" 开始执行方法 " + method.getName() + " 参数:" + JSON.toJSONString(pjp.getArgs()));
+        logger.info(" 开始执行方法 " + method.getName() + " 参数:" + StringUtils.logOut(JSON.toJSONString(pjp.getArgs())));
         try {
             proceed = pjp.proceed();
             t.setStatus(Transaction.SUCCESS);

+ 4 - 9
elab-log/src/main/java/com/elab/log/asepct/CatDaoAscept.java

@@ -3,13 +3,13 @@ package com.elab.log.asepct;
 import com.alibaba.fastjson.JSON;
 import com.dianping.cat.Cat;
 import com.dianping.cat.message.Transaction;
+import com.elab.core.utils.StringUtils;
 import org.aspectj.lang.ProceedingJoinPoint;
 import org.aspectj.lang.reflect.MethodSignature;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Method;
-import java.util.List;
 
 /**
  * DAO层执行拦截器
@@ -35,17 +35,12 @@ public class CatDaoAscept {
             Method objMethod = classTarget.getMethod(methodName, par);
 
             transaction = Cat.getProducer().newTransaction("SQL.Method", classTarget.getName().concat(".").concat(objMethod.getName()));
-            logger.debug(" 开始执行方法 " + method.getName() + " 参数:" + JSON.toJSONString(pjp.getArgs()));
+            logger.debug(" 开始执行方法 " + method.getName() + " 参数:" + StringUtils.logOut(JSON.toJSONString(pjp.getArgs())));
             proceed = pjp.proceed();
             if (proceed != null) {
-                if (proceed instanceof List) {
-                    List retValueList = (List) proceed;
-                    logger.debug("结束执行方法 " + method.getName() + " list 大小 : " + retValueList.size());
-                } else {
-                    logger.debug("结束执行方法 " + method.getName() + " 参数值 : " + JSON.toJSONString(proceed));
-                }
+                logger.debug("结束执行方法 " + method.getName() + " 参数值 : " + StringUtils.logOut(JSON.toJSONString(proceed)));
             } else {
-                logger.debug(" 结束执行方法 ... ");
+                logger.debug(" 结束执行方法 ... " + method.getName());
             }
             transaction.setStatus(Transaction.SUCCESS);
         } finally {

+ 12 - 10
elab-log/src/main/java/com/elab/log/asepct/LogResponseBodyAdvice.java

@@ -61,17 +61,19 @@ public class LogResponseBodyAdvice implements ResponseBodyAdvice {
     @Override
     public Object beforeBodyWrite(Object body, MethodParameter returnType, MediaType selectedContentType, Class selectedConverterType, ServerHttpRequest request, ServerHttpResponse response) {
         try {
-            if (body instanceof Info) {
-                Info info = (Info) body;
-                String currentMessageId = Cat.getCurrentMessageId();
-                Map<String, Object> extension = info.getExtension();
-                if (extension == null) {
-                    extension = new HashMap<String, Object>(8);
-                    info.setExtension(extension);
+            if (body != null) {
+                if (body instanceof Info) {
+                    Info info = (Info) body;
+                    String currentMessageId = Cat.getCurrentMessageId();
+                    Map<String, Object> extension = info.getExtension();
+                    if (extension == null) {
+                        extension = new HashMap<String, Object>(8);
+                        info.setExtension(extension);
+                    }
+                    info.getExtension().put(this.logId, currentMessageId);
+                } else {
+                    BeanUtils.setProperty(body, this.logId, Cat.getCurrentMessageId());
                 }
-                info.getExtension().put(this.logId, currentMessageId);
-            } else {
-                BeanUtils.setProperty(body, this.logId, Cat.getCurrentMessageId());
             }
         } catch (Exception e) {
             logger.error("===========赋值参数出现故障==============", e);

+ 5 - 5
elab-log/src/main/java/com/elab/log/filter/HttpCatCrossFliter.java

@@ -45,12 +45,12 @@ public class HttpCatCrossFliter implements Filter {
         HttpServletRequest request = (HttpServletRequest) req;
         String requestURI = request.getRequestURI();
         String contentType = request.getContentType();
-        String method = request.getMethod();
+//        String method = request.getMethod();
         // 过滤非匹配的拦截请求
-        if (!"GET".equals(method) && !includeContentType(contentType)) {
-            filterChain.doFilter(req, resp);
-            return;
-        }
+//        if (!"GET".equals(method) && !includeContentType(contentType)) {
+////            filterChain.doFilter(req, resp);
+////            return;
+////        }
 
 
         String rootId = request.getHeader(Cat.Context.ROOT);

+ 1 - 1
elab-mongodb/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>elab-parent</artifactId>
         <groupId>com.elab.core</groupId>
-        <version>2.0.4.10-SNAPSHOT</version>
+        <version>2.0.4.11-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

+ 16 - 3
elab-mongodb/src/main/java/com/elab/mongodb/BaseMongodb.java

@@ -95,12 +95,25 @@ public abstract class BaseMongodb<T> {
      * @throws Exception
      */
     public Long count(T entity) throws Exception {
+
+        return count(entity, getCollectionName());
+    }
+
+    /**
+     * 根据条件查询总数
+     *
+     * @param entity         查询对象
+     * @param collectionName 查询collection名称
+     * @return
+     * @throws Exception
+     */
+    public Long count(T entity, String collectionName) throws Exception {
         Query query = beanForQuery(entity);
         long count = 0;
-        if (getCollectionName() == null) {
-            count = getMongoTemplate().count(query, getEntity());
+        if (collectionName == null) {
+            count = getMongoTemplate().count(query, getEntity(), collectionName);
         } else {
-            count = getMongoTemplate().count(query, getEntity(), getCollectionName());
+            count = getMongoTemplate().count(query, getEntity(), collectionName);
         }
         return count;
     }

+ 82 - 0
elab-mq/README.md

@@ -0,0 +1,82 @@
+# 消息队列使用方式
+
+## 引入项目
+
+加入maven的依赖
+
+```xml
+<dependency>
+    <groupId>com.elab.core</groupId>
+    <artifactId>elab-mq</artifactId>
+    <version>${elab.version}</version>
+</dependency>
+```
+
+## 属性配置
+
+```properties
+# 必填
+elab.mq.AccessKey=                 
+elab.mq.SecretKey= 
+elab.mq.NAMESRV_ADDR=http://MQ_INST_1819241776271348_Bak3xjk0.mq-internet-access.mq-internet.aliyuncs.com:80 
+# 非必填
+elab.mq.GROUP_ID=GID-producer1  # 默认GID-运行环境-项目名称
+elab.mq.SendMsgTimeoutMillis=5000 # 设置发送超时时间,单位毫秒
+elab.mq.SuspendTimeMillis=100 # 顺序消息消费者失败进行重试前的等待时间(单位毫秒)
+elab.mq.MaxReconsumeTimes=20 #  消费失败的重试次数
+
+```
+
+### 生产者
+
+```java
+
+@Autowired
+private IMsgProducerFacade msgProducerFacade;
+
+public void send(){
+    // 发送消息
+    // topic / tag / 全局唯一编号 / 消息内容
+   JSONObject jsonObject = new JSONObject();
+    jsonObject.put("xxx", "哈哈");
+    jsonObject.put("sss", "ggg");
+    for (int i = 0; i < 10; i++) {
+        String key = RandomUtils.randomString(10);
+        MessageModel message = new MessageModel<Map>("T-consumer1", "*", key, jsonObject);
+        SendResult send = msgProducerFacade.send(message);
+        System.out.println("发送消息返回结果 : " + send.toString());
+    }
+}
+```
+
+### 消费者
+
+```java
+import com.aliyun.openservices.ons.api.Action;
+import com.aliyun.openservices.ons.api.ConsumeContext;
+import com.aliyun.openservices.ons.api.Message;
+import com.elab.mq.listener.AbstractMessageListener;
+import org.springframework.stereotype.Component;
+
+/**
+ * 消费者
+ *
+ * @author : liukx
+ * @time : 2019/5/15 - 14:04
+ */
+@Component
+public class ConsumerServiceImpl extends AbstractMessageListener {
+
+    @Override
+    public String topic() {
+        return "T-consumer1";
+    } 
+
+    @Override
+    public Action consume0(Message message, ConsumeContext consumeContext) {
+        System.out.println("---------------------消费成功------>>" + message.toString());
+        return Action.CommitMessage;
+    }
+}
+```
+

+ 38 - 17
elab-mq/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>elab-parent</artifactId>
         <groupId>com.elab.core</groupId>
-        <version>2.0.4.10-SNAPSHOT</version>
+        <version>2.0.4.11-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
@@ -17,10 +17,30 @@
             <artifactId>elab-core</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.elab.core</groupId>
+            <artifactId>elab-spring</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.elab.core</groupId>
+            <artifactId>elab-db</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>com.aliyun.openservices</groupId>
             <artifactId>ons-client</artifactId>
-            <version>1.7.0.Final</version>
+            <version>1.8.0.Final</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-test</artifactId>
+            <scope>test</scope>
         </dependency>
     </dependencies>
     <build>
@@ -40,25 +60,26 @@
             </plugin>
 
             <!-- 打包javadoc插件 -->
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-javadoc-plugin</artifactId>
+            <!--<plugin>-->
+                <!--<groupId>org.apache.maven.plugins</groupId>-->
+                <!--<artifactId>maven-javadoc-plugin</artifactId>-->
                 <!--<version>2.9</version>-->
-                <executions>
-                    <execution>
-                        <id>attach-javadocs</id>
-                        <goals>
-                            <goal>jar</goal>
-                        </goals>
-                        <configuration>
-                            <additionalOptions>-Xdoclint:none</additionalOptions>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
+                <!--<executions>-->
+                    <!--<execution>-->
+                        <!--<id>attach-javadocs</id>-->
+                        <!--<goals>-->
+                            <!--<goal>jar</goal>-->
+                        <!--</goals>-->
+                        <!--<configuration>-->
+                            <!--<additionalOptions>-Xdoclint:none</additionalOptions>-->
+                        <!--</configuration>-->
+                    <!--</execution>-->
+                <!--</executions>-->
+            <!--</plugin>-->
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-compiler-plugin</artifactId>
+                <version>2.1</version>
                 <configuration>
                     <source>1.8</source>
                     <target>1.8</target>

+ 114 - 0
elab-mq/src/main/java/com/elab/mq/config/RocketMQConfiguration.java

@@ -0,0 +1,114 @@
+package com.elab.mq.config;
+
+import com.aliyun.openservices.ons.api.Consumer;
+import com.aliyun.openservices.ons.api.ONSFactory;
+import com.aliyun.openservices.ons.api.PropertyKeyConst;
+import com.elab.mq.listener.AbstractMessageListener;
+import com.elab.mq.listener.MessageListenerWrapper;
+import com.elab.mq.msg.IMsgProducerFacade;
+import com.elab.mq.msg.impl.MsgProducerImpl;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+import org.springframework.core.env.Environment;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * rocketMQ消息队列的配置
+ *
+ * @author : liukx
+ * @time : 2019/4/19 - 16:41
+ */
+@Configuration
+//@PropertySource(value = {"classpath:application.properties"})
+@Import({MessageListenerWrapper.class})
+public class RocketMQConfiguration {
+
+    private final String prefix = "elab.mq";
+
+    @Bean
+    public IMsgProducerFacade producerBean(@Autowired Environment env) {
+        IMsgProducerFacade producerBean = new MsgProducerImpl();
+        producerBean.setProperties(mqProperties(env));
+        producerBean.start();
+        return producerBean;
+    }
+
+    @Bean
+    public Consumer consumerBean(@Autowired Environment env, @Autowired MessageListenerWrapper
+            messageListenerWrapper, @Autowired List<AbstractMessageListener> messageListeners) {
+        Consumer consumer = ONSFactory.createConsumer(mqProperties(env));
+
+        Set<String> existTopic = new HashSet<>();
+        if (messageListeners.size() > 0) {
+            for (int i = 0; i < messageListeners.size(); i++) {
+                AbstractMessageListener abstractMessageListener = messageListeners.get(i);
+                String topic = abstractMessageListener.topic();
+                boolean exist = existTopic.add(topic);
+                if (!exist) {
+                    throw new RuntimeException(" mq topic : " + topic + " Can not repeat !");
+                }
+                consumer.subscribe(topic, "*", messageListenerWrapper);
+            }
+            consumer.start();
+        }
+        return consumer;
+    }
+
+    /**
+     * 通用的参数属性
+     *
+     * @param env
+     * @return
+     */
+    public Properties mqProperties(Environment env) {
+        Properties properties = new Properties();
+        // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
+        properties.put(PropertyKeyConst.AccessKey, env.getProperty(getPrefix(PropertyKeyConst.AccessKey)));
+        // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
+        properties.put(PropertyKeyConst.SecretKey, env.getProperty(getPrefix(PropertyKeyConst.SecretKey)));
+        // 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看
+        properties.put(PropertyKeyConst.NAMESRV_ADDR, env.getProperty(getPrefix(PropertyKeyConst.NAMESRV_ADDR)));
+        properties.put(PropertyKeyConst.GROUP_ID, groupId(env));
+        //设置发送超时时间,单位毫秒
+        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, env.getProperty(getPrefix(PropertyKeyConst.SendMsgTimeoutMillis),
+                "5000"));
+        properties.put(PropertyKeyConst.OnsChannel, "ALIYUN");
+        // 顺序消息消费者失败进行重试前的等待时间(单位毫秒)
+        properties.put(PropertyKeyConst.SuspendTimeMillis, env.getProperty(getPrefix(PropertyKeyConst
+                .SuspendTimeMillis), "100"));
+        // 消费失败的重试次数
+        properties.put(PropertyKeyConst.MaxReconsumeTimes, env.getProperty(getPrefix(PropertyKeyConst
+                .MaxReconsumeTimes), "20"));
+        return properties;
+    }
+
+    /**
+     * 默认的配置文件前缀
+     *
+     * @param val
+     * @return
+     */
+    private String getPrefix(String val) {
+        return prefix + "." + val;
+    }
+
+    /**
+     * 默认的groupId是运行环境+项目名称为一组
+     *
+     * @param env
+     * @return
+     */
+    private String groupId(Environment env) {
+        String defaultGroupId = "GID-" + env.getProperty("spring.profiles.active") + "-" + env.getProperty("spring" +
+                ".application.name");
+        return env.getProperty(getPrefix(PropertyKeyConst.GROUP_ID), defaultGroupId);
+    }
+
+
+}

+ 0 - 120
elab-mq/src/main/java/com/elab/mq/config/RocketMqConfigBean.java

@@ -1,120 +0,0 @@
-package com.elab.mq.config;
-
-import com.aliyun.openservices.ons.api.MessageListener;
-import com.aliyun.openservices.ons.api.PropertyKeyConst;
-import com.aliyun.openservices.ons.api.PropertyValueConst;
-import com.aliyun.openservices.ons.api.bean.Subscription;
-import com.elab.mq.listener.DefaultMessageListener;
-import com.elab.mq.msg.IConsumeProcessService;
-import com.elab.mq.msg.IMsgConsumerFacade;
-import com.elab.mq.msg.IMsgProducerFacade;
-import com.elab.mq.msg.IMsgTransactionFacade;
-import com.elab.mq.msg.adptor.MessageListenerAdaptor;
-import com.elab.mq.msg.adptor.TransactionCheckListenerAdaptor;
-import com.elab.mq.msg.impl.MsgConsumerImpl;
-import com.elab.mq.msg.impl.MsgProducerImpl;
-import com.elab.mq.msg.impl.MsgTransactionImpl;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-public class RocketMqConfigBean {
-
-    private String accessKey;
-    private String secretKey;
-    private String onsAddr;
-    private String producerId;
-    private String consumerId;
-    private String sendMsgTimeoutMillis;
-
-    public RocketMqConfigBean(String accessKey, String secretKey, String onsAddr, String producerId, String consumerId, String sendMsgTimeoutMillis) {
-        this.accessKey = accessKey;
-        this.secretKey = secretKey;
-        this.onsAddr = onsAddr;
-        this.producerId = producerId;
-        this.consumerId = consumerId;
-        this.sendMsgTimeoutMillis = sendMsgTimeoutMillis;
-    }
-
-    public Properties getProperties() {
-        Properties properties = new Properties();
-        // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
-        properties.put(PropertyKeyConst.AccessKey, this.accessKey);
-        // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
-        properties.put(PropertyKeyConst.SecretKey, this.secretKey);
-        //设置发送超时时间,单位毫秒
-        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.sendMsgTimeoutMillis);
-        // 设置 TCP 接入域名(此处以公共云生产环境为例)
-        properties.put(PropertyKeyConst.ONSAddr, this.onsAddr);
-        return properties;
-    }
-
-    /**
-     * 创建一个生产者
-     *
-     * @return
-     */
-    public IMsgProducerFacade createProducer() {
-        //您在控制台创建的 Producer ID
-        Properties properties = getProperties();
-        properties.put(PropertyKeyConst.ProducerId, this.producerId);
-        MsgProducerImpl producer = new MsgProducerImpl();
-        producer.setProperties(properties);
-        return producer;
-    }
-
-    /**
-     * 创建一个消费者
-     *
-     * @param topicName       话题名称
-     * @param tag             标签
-     * @param messageListener 回调监听接口
-     * @param isSubscription  是否为订阅类型
-     * @return
-     */
-    public IMsgConsumerFacade createConsumer(String topicName, String tag, MessageListenerAdaptor messageListener, boolean isSubscription) {
-        Properties properties = getProperties();
-        properties.put(PropertyKeyConst.ConsumerId, this.consumerId);
-        if (isSubscription) {
-            properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
-        }
-        MsgConsumerImpl consumer = new MsgConsumerImpl();
-        consumer.setProperties(properties);
-        Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
-        Subscription subscription = new Subscription();
-        subscription.setTopic(topicName);
-        subscription.setExpression(tag);
-        subscriptionTable.put(subscription, messageListener);
-        consumer.setSubscriptionTable(subscriptionTable);
-        return consumer;
-    }
-
-    /**
-     * 创建一个默认的消费监听适配者
-     *
-     * @param consumeProcessServices 消费执行者列表
-     * @return
-     */
-    public MessageListenerAdaptor getMessageListenerAdaptor(List<IConsumeProcessService> consumeProcessServices) {
-        DefaultMessageListener defaultMessageListener = new DefaultMessageListener();
-        defaultMessageListener.setConsumeProcessServiceList(consumeProcessServices);
-        defaultMessageListener.init();
-        return defaultMessageListener;
-    }
-
-    /**
-     * 创建一个事务消息发送类
-     *
-     * @param transactionCheckListener 事务消息回调
-     * @return
-     */
-    public IMsgTransactionFacade createTransactionProducer(TransactionCheckListenerAdaptor transactionCheckListener) {
-        Properties properties = getProperties();
-        properties.put(PropertyKeyConst.ProducerId, this.producerId);
-        MsgTransactionImpl transactionProducer = new MsgTransactionImpl(properties, transactionCheckListener);
-        return transactionProducer;
-    }
-
-}

+ 14 - 0
elab-mq/src/main/java/com/elab/mq/dao/IConsumerDao.java

@@ -0,0 +1,14 @@
+package com.elab.mq.dao;
+
+import com.elab.core.aop.annotations.XmlGroupName;
+import com.elab.core.dao.IBaseDaoSupport;
+import com.elab.mq.model.ConsumerEntity;
+
+/**
+ * @author liuhx
+ * @create 2019/05/17 18:36
+ * @email liuhx@elab-plus.com
+ **/
+@XmlGroupName("consumer")
+public interface IConsumerDao extends IBaseDaoSupport<ConsumerEntity> {
+}

+ 15 - 0
elab-mq/src/main/java/com/elab/mq/dao/IProducerDao.java

@@ -0,0 +1,15 @@
+package com.elab.mq.dao;
+
+import com.elab.core.aop.annotations.XmlGroupName;
+import com.elab.core.dao.IBaseDaoSupport;
+import com.elab.mq.model.ProducerEntity;
+
+/**
+ * @author liuhx
+ * @create 2019/05/16 19:42
+ * @email liuhx@elab-plus.com
+ **/
+@XmlGroupName("producer")
+public interface IProducerDao extends IBaseDaoSupport<ProducerEntity> {
+
+}

+ 132 - 0
elab-mq/src/main/java/com/elab/mq/listener/AbstractMessageListener.java

@@ -0,0 +1,132 @@
+package com.elab.mq.listener;
+
+import com.aliyun.openservices.ons.api.Action;
+import com.aliyun.openservices.ons.api.ConsumeContext;
+import com.aliyun.openservices.ons.api.Message;
+import com.aliyun.openservices.ons.api.MessageListener;
+import com.elab.core.exception.BusinessException;
+import com.elab.core.utils.DateUtils;
+import com.elab.core.utils.ObjectUtils;
+import com.elab.mq.dao.IConsumerDao;
+import com.elab.mq.dao.IProducerDao;
+import com.elab.mq.model.ConsumerEntity;
+import com.elab.mq.model.MessageModel;
+import com.elab.mq.model.ProducerEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * 抽象成客户端能够使用的方法
+ *
+ * @author : liukx
+ * @time : 2019/4/22 - 18:02
+ */
+public abstract class AbstractMessageListener implements MessageListener {
+
+    private Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Autowired
+    private IProducerDao producerDao;
+
+    @Autowired
+    private IConsumerDao consumerDao;
+
+
+    /**
+     * 客户端关注的topic消息,实现了该方法,消息监听被触发时会与之匹配。
+     *
+     * @return
+     */
+    public abstract String topic();
+
+    /**
+     * 客户端关注的topic下的tag子类标签
+     *
+     * @return
+     */
+    public String tag() {
+        return "*";
+    }
+
+
+    private ConsumerEntity init(String producerId) throws Exception {
+        ConsumerEntity oldConsumerEntity = new ConsumerEntity();
+        ProducerEntity producerEntity = producerDao.selectById(producerId);
+        logger.info("获取生产者的信息:" + ObjectUtils.objectParseJsonStr(producerEntity));
+        if (producerEntity != null) {
+            logger.info("进入消费者验证...");
+            ConsumerEntity consumerEntity = new ConsumerEntity();
+            consumerEntity.setProducerId(producerEntity.getId());
+            consumerEntity.setTopicId(producerEntity.getTopicId());
+            consumerEntity.setStatus(1);
+            oldConsumerEntity = consumerDao.selectByObject(consumerEntity);
+            if (oldConsumerEntity != null && oldConsumerEntity.getConsumerStatus().equals(1)) {
+                logger.info("消费者验证失败,已经消费成功过一次,不允许重复消费");
+                throw new BusinessException("请不要重复消费,key:" + producerId);
+            }
+            logger.info("消费者验证通过,消费者消费的数据匹配上生产者数据,");
+            if (oldConsumerEntity != null) {
+                oldConsumerEntity.setRetryCount(oldConsumerEntity.getRetryCount() + 1);
+                oldConsumerEntity.setConsumerStatus(-1);
+                oldConsumerEntity.setUpdated(DateUtils.getCurrentDateTime());
+                consumerDao.updateById(oldConsumerEntity);
+            } else {
+                oldConsumerEntity = new ConsumerEntity();
+                oldConsumerEntity.setProducerId(producerEntity.getId());
+                oldConsumerEntity.setContent(producerEntity.getContent());
+                oldConsumerEntity.setHouseId(producerEntity.getHouseId());
+                oldConsumerEntity.setModuleName(producerEntity.getModuleName());
+                oldConsumerEntity.setStatus(1);
+                oldConsumerEntity.setCreated(DateUtils.getCurrentDateTime());
+                oldConsumerEntity.setRetryCount(0);
+                oldConsumerEntity.setTopicId(producerEntity.getTopicId());
+                oldConsumerEntity.setConsumerStatus(0);
+                int id = consumerDao.insert(oldConsumerEntity);
+                oldConsumerEntity.setId(id);
+            }
+        } else {
+            logger.warn("生产者为空,无法匹配消费者");
+        }
+        return oldConsumerEntity;
+//        ConsumerEntity consumerEntity = new ConsumerEntity();
+
+    }
+
+
+    /**
+     * 客户端的业务逻辑实现
+     *
+     * @param message        消息对象
+     * @param consumeContext 消息上下文
+     * @return
+     */
+    public abstract Action consume0(MessageModel message, ConsumeContext consumeContext) throws Exception;
+
+    @Override
+    public Action consume(Message message, ConsumeContext consumeContext) {
+        MessageModel messageModel = new MessageModel(message);
+        logger.debug("消息处理被触发 : " + message.toString());
+        Action action = null;
+        try {
+            ConsumerEntity oldConsumerEntity = init(message.getKey());
+            logger.info("更新消费者数据: " + ObjectUtils.objectParseJsonStr(oldConsumerEntity));
+            action = consume0(messageModel, consumeContext);
+
+            if (Action.ReconsumeLater.equals(action)) {
+                oldConsumerEntity.setConsumerStatus(-1);
+                consumerDao.updateById(oldConsumerEntity);
+            } else {
+                oldConsumerEntity.setConsumerStatus(1);
+                consumerDao.updateById(oldConsumerEntity);
+            }
+        } catch (Exception e) {
+            logger.debug("消息处理异常 : " + action);
+            e.printStackTrace();
+            return Action.ReconsumeLater;
+        }
+        logger.debug("消息处理结果 : " + action);
+        return action;
+    }
+
+}

+ 0 - 80
elab-mq/src/main/java/com/elab/mq/listener/DefaultMessageListener.java

@@ -1,80 +0,0 @@
-package com.elab.mq.listener;
-
-import com.elab.mq.api.Action;
-import com.elab.mq.api.ConsumeContext;
-import com.elab.mq.model.MessageModel;
-import com.elab.mq.msg.IConsumeProcessService;
-import com.elab.mq.msg.adptor.MessageListenerAdaptor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * 订阅消息监听
- * <p>
- * 默认的消费者
- */
-public class DefaultMessageListener extends MessageListenerAdaptor {
-
-    private Logger logger = LoggerFactory.getLogger(getClass());
-
-    private ConcurrentMap<String, IConsumeProcessService> tagMap = new ConcurrentHashMap<String, IConsumeProcessService>();
-
-    private List<IConsumeProcessService> consumeProcessServiceList;
-
-    public void setConsumeProcessServiceList(List<IConsumeProcessService> consumeProcessServiceList) {
-        this.consumeProcessServiceList = consumeProcessServiceList;
-    }
-
-    public void init() {
-        if (this.consumeProcessServiceList.size() > 0) {
-            for (int i = 0; i < consumeProcessServiceList.size(); i++) {
-                IConsumeProcessService consumeProcessService = consumeProcessServiceList.get(i);
-                String tag = consumeProcessService.tag();
-                if (tag == null) {
-                    logger.error(" 消息监听tag为Null异常 : " + consumeProcessService.toString() + " tag方法返回为null !!!");
-                    continue;
-                }
-                IConsumeProcessService tagService = tagMap.get(tag);
-                if (tagService != null) {
-                    logger.error(" 消息监听tag重复异常 : " + consumeProcessService.toString() + " tag方法匹配有重复 !!! 重复类 :  " + tagService.toString());
-                    continue;
-                }
-                logger.debug(" 注册消费监听器 : key - " + tag + " value - " + consumeProcessService.toString());
-                tagMap.putIfAbsent(tag, consumeProcessService);
-            }
-        }
-    }
-
-    @Override
-    public Action consume(MessageModel messageModel, ConsumeContext consumeContext) {
-        String tag = messageModel.getTag();
-        try {
-            logger.debug(" 获取到的消息对象 : " + messageModel.toString());
-            Thread.sleep(3000);
-            IConsumeProcessService consumeProcessService = tagMap.get(tag);
-            if (consumeProcessService == null) {
-                logger.error(" 消息消费失败 ... 找不到对应的消费监听处理执行类 关注Tag = " + tag);
-                return Action.ReconsumeLater;
-            }
-
-            boolean process = consumeProcessService.process(messageModel, consumeContext);
-
-            if (!process) {
-                logger.error(" 消息消费失败 : 消息内容 " + messageModel.toString() + " 等待重试..");
-                return Action.ReconsumeLater;
-            }
-            logger.debug(" 消息消费成功 :  " + messageModel.getMsgID());
-            return Action.CommitMessage;
-        } catch (Exception e) {
-            logger.error(" 事务消息执行重试操作 ... " + e.getMessage(), e);
-            //消费失败
-            return Action.ReconsumeLater;
-        }
-    }
-
-
-}

+ 43 - 0
elab-mq/src/main/java/com/elab/mq/listener/MessageListenerWrapper.java

@@ -0,0 +1,43 @@
+package com.elab.mq.listener;
+
+import com.aliyun.openservices.ons.api.Action;
+import com.aliyun.openservices.ons.api.ConsumeContext;
+import com.aliyun.openservices.ons.api.Message;
+import com.aliyun.openservices.ons.api.MessageListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+/**
+ * 订阅组件适配器
+ *
+ * @author : liukx
+ * @time : 2019/4/22 - 17:59
+ */
+@Component
+public class MessageListenerWrapper implements MessageListener {
+
+    @Autowired(required = false)
+    private List<AbstractMessageListener> messageListeners;
+
+    @Override
+    public Action consume(Message message, ConsumeContext consumeContext) {
+        if (messageListeners != null) {
+            try {
+                for (int i = 0; i < messageListeners.size(); i++) {
+                    AbstractMessageListener abstractMessageListener = messageListeners.get(i);
+                    String currentTopic = message.getTopic();
+                    String topic = abstractMessageListener.topic();
+                    if (topic.equals(currentTopic)) {
+                        abstractMessageListener.consume(message, consumeContext);
+                    }
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+                return Action.ReconsumeLater;
+            }
+        }
+        return Action.CommitMessage;
+    }
+}

+ 195 - 0
elab-mq/src/main/java/com/elab/mq/model/ConsumerEntity.java

@@ -0,0 +1,195 @@
+package com.elab.mq.model;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
+import javax.persistence.Column;
+import javax.persistence.Id;
+import javax.persistence.Table;
+import java.util.Date;
+
+/**
+ * @author liuhx
+ * @create 2019/05/16 19:32
+ * @email liuhx@elab-plus.com
+ **/
+@Table(name = "mq_consumer", catalog = "elab_db")
+@ApiModel(description = "消息队列生产者 实体")
+public class ConsumerEntity {
+
+    @Id
+    @ApiModelProperty(name = "id", value = "主键")
+    private Integer id;
+
+    @Column(name = "producer_id")
+    @ApiModelProperty(name = "producer_id", value = "生产者id")
+    private Integer producerId;
+
+    @Column(name = "module_name")
+    @ApiModelProperty(name = "moduleName", value = "微服务模块名")
+    private String moduleName;
+
+    @Column(name = "house_id")
+    @ApiModelProperty(name = "houseId", value = "项目id")
+    private Integer houseId;
+
+    @Column(name = "topic_id")
+    @ApiModelProperty(name = "topicId", value = "队列通道id")
+    private String topicId;
+
+    @Column(name = "content")
+    @ApiModelProperty(name = "content", value = "内容")
+    private String content;
+
+    @Column(name = "consumer_status")
+    @ApiModelProperty(name = "consumerStatus", value = "消费者状态 1成功-1失败")
+    private Integer consumerStatus;
+
+    @Column(name = "retry_count")
+    @ApiModelProperty(name = "retryCount", value = "失败重试次数")
+    private Integer retryCount;
+
+    /**
+     * 状态:1  有效  -1  无效
+     *
+     */
+    @Column(name = "status")
+    @ApiModelProperty(name = "status", value = "状态:1  有效  -1  无效")
+    private Integer status;
+
+    /**
+     * 创建时间
+     *
+     */
+    @Column(name = "created")
+    @ApiModelProperty(name = "created", value = "创建时间")
+    private Date created;
+
+    /**
+     * 修改时间
+     *
+     */
+    @Column(name = "updated")
+    @ApiModelProperty(name = "updated", value = "修改时间")
+    private Date updated;
+
+    /**
+     * 创建者
+     *
+     */
+    @Column(name = "creator")
+    @ApiModelProperty(name = "creator", value = "创建者")
+    private String creator;
+
+    /**
+     * 修改者
+     *
+     */
+    @Column(name = "updator")
+    @ApiModelProperty(name = "updator", value = "修改者")
+    private String updator;
+
+    public Integer getId() {
+        return id;
+    }
+
+    public void setId(Integer id) {
+        this.id = id;
+    }
+
+    public Integer getProducerId() {
+        return producerId;
+    }
+
+    public void setProducerId(Integer producerId) {
+        this.producerId = producerId;
+    }
+
+    public String getModuleName() {
+        return moduleName;
+    }
+
+    public void setModuleName(String moduleName) {
+        this.moduleName = moduleName;
+    }
+
+    public Integer getHouseId() {
+        return houseId;
+    }
+
+    public void setHouseId(Integer houseId) {
+        this.houseId = houseId;
+    }
+
+    public String getTopicId() {
+        return topicId;
+    }
+
+    public void setTopicId(String topicId) {
+        this.topicId = topicId;
+    }
+
+    public String getContent() {
+        return content;
+    }
+
+    public void setContent(String content) {
+        this.content = content;
+    }
+
+    public Integer getConsumerStatus() {
+        return consumerStatus;
+    }
+
+    public void setConsumerStatus(Integer consumerStatus) {
+        this.consumerStatus = consumerStatus;
+    }
+
+    public Integer getRetryCount() {
+        return retryCount;
+    }
+
+    public void setRetryCount(Integer retryCount) {
+        this.retryCount = retryCount;
+    }
+
+    public Integer getStatus() {
+        return status;
+    }
+
+    public void setStatus(Integer status) {
+        this.status = status;
+    }
+
+    public Date getCreated() {
+        return created;
+    }
+
+    public void setCreated(Date created) {
+        this.created = created;
+    }
+
+    public Date getUpdated() {
+        return updated;
+    }
+
+    public void setUpdated(Date updated) {
+        this.updated = updated;
+    }
+
+    public String getCreator() {
+        return creator;
+    }
+
+    public void setCreator(String creator) {
+        this.creator = creator;
+    }
+
+    public String getUpdator() {
+        return updator;
+    }
+
+    public void setUpdator(String updator) {
+        this.updator = updator;
+    }
+}

+ 0 - 34
elab-mq/src/main/java/com/elab/mq/model/MessageExtModel.java

@@ -1,34 +0,0 @@
-package com.elab.mq.model;
-
-import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageExt;
-
-/**
- * 消息封装实体拓展
- *
- * @author Liukx
- * @create 2018-04-10 20:12
- * @email liukx@elab-plus.com
- **/
-public class MessageExtModel extends MessageExt {
-
-    public MessageExtModel(MessageExt ext) {
-        setTopic(ext.getTopic());
-        setFlag(ext.getFlag());
-        setBody(ext.getBody());
-        setQueueId(ext.getQueueId());
-        setStoreSize(ext.getStoreSize());
-        setQueueOffset(ext.getQueueOffset());
-        setSysFlag(ext.getSysFlag());
-        setBornTimestamp(ext.getBornTimestamp());
-        setBornHost(ext.getBornHost());
-        setStoreTimestamp(ext.getStoreTimestamp());
-        setStoreHost(ext.getStoreHost());
-        setMsgId(ext.getMsgId());
-        setCommitLogOffset(ext.getCommitLogOffset());
-        setBodyCRC(ext.getBodyCRC());
-        setReconsumeTimes(ext.getReconsumeTimes());
-        setPreparedTransactionOffset(ext.getPreparedTransactionOffset());
-    }
-
-
-}

+ 90 - 2
elab-mq/src/main/java/com/elab/mq/model/MessageModel.java

@@ -1,6 +1,8 @@
 package com.elab.mq.model;
 
+import com.alibaba.fastjson.JSON;
 import com.aliyun.openservices.ons.api.Message;
+import com.elab.core.utils.StringUtils;
 
 /**
  * 消息容器承装实体
@@ -9,10 +11,41 @@ import com.aliyun.openservices.ons.api.Message;
  * @create 2018-04-09 18:38
  * @email liukx@elab-plus.com
  **/
-public class MessageModel extends Message {
+public class MessageModel<T> extends Message {
 
+    private T object;
+    private String groupId;
+    private Integer houseId;
+    private String moduleName;
 
-    public MessageModel() {
+
+    /**
+     * 使用该model传递参数
+     *
+     * @param topic 主题
+     * @param tags  区分
+     * @param key   全局唯一编号
+     * @param body  消息内容
+     */
+    public MessageModel(String topic, String tags, String key, T body) {
+        setObject(body);
+        setKey(key);
+        setTopic(topic);
+        setTag(tags);
+    }
+
+    /**
+     * 使用该model必须传递的参数
+     *
+     * @param topic
+     * @param key
+     * @param body
+     */
+    public MessageModel(String topic, String key, T body) {
+        setObject(body);
+        setKey(key);
+        setTopic(topic);
+        setTag("*");
     }
 
     public MessageModel(Message message) {
@@ -28,4 +61,59 @@ public class MessageModel extends Message {
         setUserProperties(message.getUserProperties());
     }
 
+    /**
+     * 获取对应的转换对象,切记必须为JSON格式的
+     *
+     * @param cls
+     * @return
+     */
+    public T getObject(Class<T> cls) {
+        byte[] body = getBody();
+        if (body != null) {
+            String bodyString = new String(body);
+            if (StringUtils.isNotEmpty(bodyString)) {
+                return JSON.parseObject(bodyString, cls);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public String toString() {
+        return super.toString() + " \t key : " + getKey();
+    }
+
+    /**
+     * 设置传输对象
+     *
+     * @param object 必须为符合JSON转换的请求
+     */
+    public void setObject(Object object) {
+        String s = JSON.toJSONString(object);
+        setBody(s.getBytes());
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
+    public Integer getHouseId() {
+        return houseId;
+    }
+
+    public void setHouseId(Integer houseId) {
+        this.houseId = houseId;
+    }
+
+    public String getModuleName() {
+        return moduleName;
+    }
+
+    public void setModuleName(String moduleName) {
+        this.moduleName = moduleName;
+    }
 }

+ 184 - 0
elab-mq/src/main/java/com/elab/mq/model/ProducerEntity.java

@@ -0,0 +1,184 @@
+package com.elab.mq.model;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
+import javax.persistence.Column;
+import javax.persistence.Id;
+import javax.persistence.Table;
+import java.util.Date;
+
+/**
+ * @author liuhx
+ * @create 2019/05/16 19:32
+ * @email liuhx@elab-plus.com
+ **/
+@Table(name = "mq_producer", catalog = "elab_db")
+@ApiModel(description = "消息队列生产者 实体")
+public class ProducerEntity {
+
+    @Id
+    @ApiModelProperty(name = "id", value = "主键")
+    private Integer id;
+
+    @Column(name = "module_name")
+    @ApiModelProperty(name = "moduleName", value = "微服务模块名")
+    private String moduleName;
+
+    @Column(name = "house_id")
+    @ApiModelProperty(name = "houseId", value = "项目id")
+    private Integer houseId;
+
+    @Column(name = "producer_status")
+    @ApiModelProperty(name = "producerStatus", value = "生产者状态 1成功-1失败")
+    private Integer producerStatus;
+
+    @Column(name = "group_id")
+    @ApiModelProperty(name = "groupId", value = "队列的groupId")
+    private String groupId;
+
+    @Column(name = "topic_id")
+    @ApiModelProperty(name = "topicId", value = "队列通道id")
+    private String topicId;
+
+    @Column(name = "content")
+    @ApiModelProperty(name = "content", value = "内容")
+    private String content;
+
+
+    /**
+     * 状态:1  有效  -1  无效
+     *
+     */
+    @Column(name = "status")
+    @ApiModelProperty(name = "status", value = "状态:1  有效  -1  无效")
+    private Integer status;
+
+    /**
+     * 创建时间
+     *
+     */
+    @Column(name = "created")
+    @ApiModelProperty(name = "created", value = "创建时间")
+    private Date created;
+
+    /**
+     * 修改时间
+     *
+     */
+    @Column(name = "updated")
+    @ApiModelProperty(name = "updated", value = "修改时间")
+    private Date updated;
+
+    /**
+     * 创建者
+     *
+     */
+    @Column(name = "creator")
+    @ApiModelProperty(name = "creator", value = "创建者")
+    private String creator;
+
+    /**
+     * 修改者
+     *
+     */
+    @Column(name = "updator")
+    @ApiModelProperty(name = "updator", value = "修改者")
+    private String updator;
+
+    public Integer getId() {
+        return id;
+    }
+
+    public void setId(Integer id) {
+        this.id = id;
+    }
+
+    public String getModuleName() {
+        return moduleName;
+    }
+
+    public void setModuleName(String moduleName) {
+        this.moduleName = moduleName;
+    }
+
+    public Integer getHouseId() {
+        return houseId;
+    }
+
+    public void setHouseId(Integer houseId) {
+        this.houseId = houseId;
+    }
+
+    public Integer getProducerStatus() {
+        return producerStatus;
+    }
+
+    public void setProducerStatus(Integer producerStatus) {
+        this.producerStatus = producerStatus;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
+    public String getTopicId() {
+        return topicId;
+    }
+
+    public void setTopicId(String topicId) {
+        this.topicId = topicId;
+    }
+
+    public String getContent() {
+        return content;
+    }
+
+    public void setContent(String content) {
+        this.content = content;
+    }
+
+    public Integer getStatus() {
+        return status;
+    }
+
+    public void setStatus(Integer status) {
+        this.status = status;
+    }
+
+    public Date getCreated() {
+        return created;
+    }
+
+    public void setCreated(Date created) {
+        this.created = created;
+    }
+
+    public Date getUpdated() {
+        return updated;
+    }
+
+    public void setUpdated(Date updated) {
+        this.updated = updated;
+    }
+
+    public String getCreator() {
+        return creator;
+    }
+
+    public void setCreator(String creator) {
+        this.creator = creator;
+    }
+
+    public String getUpdator() {
+        return updator;
+    }
+
+    public void setUpdator(String updator) {
+        this.updator = updator;
+    }
+}

+ 0 - 35
elab-mq/src/main/java/com/elab/mq/msg/IConsumeProcessService.java

@@ -1,35 +0,0 @@
-package com.elab.mq.msg;/**
- * Created by liukx on 2018/4/11.
- */
-
-import com.elab.mq.api.ConsumeContext;
-import com.elab.mq.model.MessageModel;
-
-/**
- * 消费者业务监听模式
- *
- * @author Liukx
- * @create 2018-04-11 14:59
- * @email liukx@elab-plus.com
- **/
-public interface IConsumeProcessService {
-
-    /**
-     * 业务方关注的Tag标识
-     *
-     * @return
-     */
-    String tag();
-
-    /**
-     * 根据上面的tag方法匹配后,具体执行的业务操作
-     *
-     * @param messageModel   消息内容
-     * @param consumeContext 消费者上下文
-     * @return
-     * @throws Exception
-     */
-    boolean process(MessageModel messageModel, ConsumeContext consumeContext) throws Exception;
-
-
-}

+ 0 - 16
elab-mq/src/main/java/com/elab/mq/msg/ILocalTransactionExecuter.java

@@ -1,16 +0,0 @@
-//package com.elab.mq.msg;/**
-// * Created by liukx on 2018/4/9.
-// */
-//
-//import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
-//
-///**
-// * 事务消息接口回调
-// *
-// * @author Liukx
-// * @create 2018-04-09 18:55
-// * @email liukx@elab-plus.com
-// **/
-//public interface ILocalTransactionExecuter extends LocalTransactionExecuter {
-////    TransactionStatusEnum executeCallback(MessageModel message, Object o);
-//}

+ 0 - 14
elab-mq/src/main/java/com/elab/mq/msg/IMessageListener.java

@@ -1,14 +0,0 @@
-//package com.elab.mq.msg;
-//
-//import com.aliyun.openservices.ons.api.MessageListener;
-//
-///**
-// * 消息接收监听器
-// *
-// * @author Liukx
-// * @create 2018-04-10 19:43
-// * @email liukx@elab-plus.com
-// **/
-//public interface IMessageListener extends MessageListener {
-//
-//}

+ 0 - 13
elab-mq/src/main/java/com/elab/mq/msg/IMsgConsumerFacade.java

@@ -1,13 +0,0 @@
-package com.elab.mq.msg;
-
-import com.aliyun.openservices.ons.api.Consumer;
-
-/**
- * 消费者门面
- *
- * @author Liukx
- * @create 2018-04-09 18:29
- * @email liukx@elab-plus.com
- **/
-public interface IMsgConsumerFacade extends Consumer {
-}

+ 7 - 0
elab-mq/src/main/java/com/elab/mq/msg/IMsgProducerFacade.java

@@ -5,6 +5,7 @@ import com.elab.mq.model.SendResultModel;
 import com.elab.mq.msg.adptor.SendCallbackAdaptor;
 
 import java.util.Date;
+import java.util.Properties;
 
 /**
  * 消息生产门面类
@@ -59,4 +60,10 @@ public interface IMsgProducerFacade {
      * @return
      */
     SendResultModel sendTimingMsg(MessageModel messageModel, Date datetime);
+
+    /**
+     * 设置属性
+     * @param properties
+     */
+    public void setProperties(Properties properties);
 }

+ 0 - 23
elab-mq/src/main/java/com/elab/mq/msg/IMsgTransactionFacade.java

@@ -1,23 +0,0 @@
-package com.elab.mq.msg;/**
- * Created by liukx on 2018/4/9.
- */
-
-import com.elab.mq.model.MessageModel;
-import com.elab.mq.model.SendResultModel;
-import com.elab.mq.msg.adptor.LocalTransactionExecuterAdaptor;
-
-/**
- * 事务消息门面
- *
- * @author Liukx
- * @create 2018-04-09 18:33
- * @email liukx@elab-plus.com
- **/
-public interface IMsgTransactionFacade  {
-
-    void start();
-
-    void shutdown();
-
-    SendResultModel send(MessageModel messageModel, LocalTransactionExecuterAdaptor localTransactionExecuter, Object obj);
-}

+ 0 - 12
elab-mq/src/main/java/com/elab/mq/msg/ISendCallbackFacade.java

@@ -1,12 +0,0 @@
-//package com.elab.mq.msg;
-//
-//import com.aliyun.openservices.ons.api.SendCallback;
-//
-///**
-// * @author Liukx
-// * @create 2018-04-09 18:48
-// * @email liukx@elab-plus.com
-// **/
-//public interface ISendCallbackFacade extends SendCallback {
-//
-//}

+ 0 - 13
elab-mq/src/main/java/com/elab/mq/msg/ITransactionCheckListener.java

@@ -1,13 +0,0 @@
-//package com.elab.mq.msg;
-//
-//import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.producer.TransactionCheckListener;
-//
-///**
-// * 事务状态回调监听接口
-// *
-// * @author Liukx
-// * @create 2018-04-10 10:48
-// * @email liukx@elab-plus.com
-// **/
-//public interface ITransactionCheckListener extends TransactionCheckListener {
-//}

+ 1 - 1
elab-mq/src/main/java/com/elab/mq/msg/adptor/LocalTransactionExecuterAdaptor.java

@@ -30,7 +30,7 @@ public abstract class LocalTransactionExecuterAdaptor implements LocalTransactio
 
     @Override
     public final TransactionStatus execute(Message message, Object o) {
-        MessageModel messageModel = new MessageModel();
+        MessageModel messageModel = new MessageModel(message.getTopic(), message.getTag(), o);
         logger.debug(" 事务消息适配层 : " + message.toString());
         TransactionStatusEnum transactionStatusEnum = executeCallback(messageModel, o);
         logger.debug(" 事务消息执行结果 : " + transactionStatusEnum.name());

+ 0 - 25
elab-mq/src/main/java/com/elab/mq/msg/adptor/MessageListenerAdaptor.java

@@ -1,25 +0,0 @@
-package com.elab.mq.msg.adptor;
-
-import com.aliyun.openservices.ons.api.Message;
-import com.aliyun.openservices.ons.api.MessageListener;
-import com.elab.mq.api.Action;
-import com.elab.mq.api.ConsumeContext;
-import com.elab.mq.model.MessageModel;
-
-/**
- * 消息监听适配器
- *
- * @author Liukx
- * @create 2018-04-10 19:50
- * @email liukx@elab-plus.com
- **/
-public abstract class MessageListenerAdaptor implements MessageListener {
-
-    public abstract Action consume(MessageModel messageModel, ConsumeContext consumeContext);
-
-    @Override
-    public final com.aliyun.openservices.ons.api.Action consume(Message message, com.aliyun.openservices.ons.api.ConsumeContext consumeContext) {
-        Action consume = consume(new MessageModel(message), new ConsumeContext(consumeContext));
-        return Action.getAction(consume);
-    }
-}

+ 0 - 25
elab-mq/src/main/java/com/elab/mq/msg/adptor/TransactionCheckListenerAdaptor.java

@@ -1,25 +0,0 @@
-package com.elab.mq.msg.adptor;
-
-import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.producer.LocalTransactionState;
-import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.producer.TransactionCheckListener;
-import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageExt;
-import com.elab.mq.api.transaction.LocalTransactionStateEnum;
-import com.elab.mq.model.MessageExtModel;
-
-/**
- * 事务消息监听适配器
- *
- * @author Liukx
- * @create 2018-04-10 20:13
- * @email liukx@elab-plus.com
- **/
-public abstract class TransactionCheckListenerAdaptor implements TransactionCheckListener {
-
-    public abstract LocalTransactionStateEnum checkLocalTransactionState(MessageExtModel var1);
-
-    @Override
-    public final LocalTransactionState checkLocalTransactionState(MessageExt messageExt) {
-        LocalTransactionStateEnum localTransactionStateEnum = checkLocalTransactionState(new MessageExtModel(messageExt));
-        return LocalTransactionStateEnum.getAction(localTransactionStateEnum);
-    }
-}

+ 0 - 15
elab-mq/src/main/java/com/elab/mq/msg/impl/MsgConsumerImpl.java

@@ -1,15 +0,0 @@
-package com.elab.mq.msg.impl;
-
-import com.aliyun.openservices.ons.api.bean.ConsumerBean;
-import com.elab.mq.msg.IMsgConsumerFacade;
-
-/**
- * 消费者默认实现类
- *
- * @author Liukx
- * @create 2018-04-09 18:30
- * @email liukx@elab-plus.com
- **/
-public class MsgConsumerImpl extends ConsumerBean implements IMsgConsumerFacade {
-
-}

+ 66 - 0
elab-mq/src/main/java/com/elab/mq/msg/impl/MsgProducerImpl.java

@@ -2,14 +2,20 @@ package com.elab.mq.msg.impl;
 
 import com.aliyun.openservices.ons.api.SendResult;
 import com.aliyun.openservices.ons.api.bean.ProducerBean;
+import com.elab.core.utils.DateUtils;
+import com.elab.core.utils.ObjectUtils;
+import com.elab.mq.dao.IProducerDao;
 import com.elab.mq.model.MessageModel;
+import com.elab.mq.model.ProducerEntity;
 import com.elab.mq.model.SendResultModel;
 import com.elab.mq.msg.IMsgProducerFacade;
 import com.elab.mq.msg.adptor.SendCallbackAdaptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 
 import java.util.Date;
+import java.util.Properties;
 
 /**
  * 消息生产包装类
@@ -20,18 +26,73 @@ import java.util.Date;
  **/
 public class MsgProducerImpl extends ProducerBean implements IMsgProducerFacade {
 
+
     private Logger logger = LoggerFactory.getLogger(MsgProducerImpl.class);
 
+    @Autowired
+    private IProducerDao producerDao;
+
+
+    /**
+     * 往生产者表中写入数据
+     * @param message
+     * @return
+     */
+    private int insertRecordProducer(MessageModel message) {
+        String content = ObjectUtils.objectParseJsonStr(message.getObject(Object.class));
+        ProducerEntity producerEntity = new ProducerEntity();
+        producerEntity.setModuleName(message.getModuleName());
+        producerEntity.setHouseId(message.getHouseId());
+        producerEntity.setProducerStatus(0);
+        producerEntity.setGroupId(message.getGroupId());
+        producerEntity.setTopicId(message.getTopic());
+        producerEntity.setContent(content);
+        producerEntity.setStatus(1);
+        producerEntity.setCreated(DateUtils.getCurrentDateTime());
+        int id = 0;
+        try {
+            id = producerDao.insert(producerEntity);
+            message.setKey(id + "");
+        } catch (Exception e) {
+            e.printStackTrace();
+            logger.error("往生产者表中插入记录失败:,入参:" + ObjectUtils.objectParseJsonStr(message), e);
+        }
+        return id;
+    }
+
+    /**
+     * 修改生产者数据
+     * @param id
+     * @return
+     */
+    private void updateRecordProducer(int id) {
+
+        try {
+            ProducerEntity producerEntity = producerDao.selectById(id + "");
+            if (producerEntity != null) {
+                producerEntity.setProducerStatus(1);
+                producerDao.updateById(producerEntity);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            logger.error("修改生产者表中记录失败:,入参:" + id, e);
+        }
+    }
+
     @Override
     public void sendAsync(MessageModel message, SendCallbackAdaptor sendCallback) {
         logger.debug(" 发送一条异步消息 " + message + " -> " + message.toString() + " 回调类 : " + sendCallback.getClass().getName());
         super.sendAsync(message, sendCallback);
+
+
     }
 
     @Override
     public SendResultModel send(MessageModel message) {
         logger.debug(" 发送一条消息 " + message.getMsgID() + " -> " + message.toString());
+        int id = insertRecordProducer(message);
         SendResult result = super.send(message);
+        updateRecordProducer(id);
         logger.debug(" 消息发送结果 : " + result.toString());
         return new SendResultModel(result);
     }
@@ -56,4 +117,9 @@ public class MsgProducerImpl extends ProducerBean implements IMsgProducerFacade
         messageModel.setStartDeliverTime(datetime.getTime());
         return send(messageModel);
     }
+
+    @Override
+    public void setProperties(Properties properties) {
+        super.setProperties(properties);
+    }
 }

+ 0 - 39
elab-mq/src/main/java/com/elab/mq/msg/impl/MsgTransactionImpl.java

@@ -1,39 +0,0 @@
-package com.elab.mq.msg.impl;
-
-import com.aliyun.openservices.ons.api.SendResult;
-import com.aliyun.openservices.ons.api.impl.rocketmq.TransactionProducerImpl;
-import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.producer.TransactionCheckListener;
-import com.elab.mq.model.MessageModel;
-import com.elab.mq.model.SendResultModel;
-import com.elab.mq.msg.IMsgTransactionFacade;
-import com.elab.mq.msg.adptor.LocalTransactionExecuterAdaptor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Properties;
-
-/**
- * 事务消息实现
- *
- * @author Liukx
- * @create 2018-04-09 18:33
- * @email liukx@elab-plus.com
- **/
-//@Service
-public class MsgTransactionImpl extends TransactionProducerImpl implements IMsgTransactionFacade {
-
-    private Logger logger = LoggerFactory.getLogger(MsgTransactionImpl.class);
-
-    public MsgTransactionImpl(Properties properties, TransactionCheckListener transactionCheckListener) {
-        super(properties, transactionCheckListener);
-    }
-
-
-    @Override
-    public SendResultModel send(MessageModel messageModel, LocalTransactionExecuterAdaptor executer, Object obj) {
-        logger.info("执行一条事务消息 [" + messageModel.getMsgID() + "]: " + messageModel.toString() + " 执行器 : " + executer.getClass() + " 对象 ;" + obj.toString());
-        SendResult send = super.send(messageModel, executer, obj);
-        logger.info("事务消息执行结束 [" + messageModel + "]: ");
-        return new SendResultModel(send);
-    }
-}

+ 1 - 0
elab-mq/src/main/resources/META-INF/spring.factories

@@ -0,0 +1 @@
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.elab.mq.config.RocketMQConfiguration

+ 4 - 0
elab-mq/src/test/resources/application.properties

@@ -0,0 +1,4 @@
+elab.mq.AccessKey=LTAImNZed054h0YV
+elab.mq.SecretKey=8hmhlhiQ2ikmVeLKujwMNWsktFpSzm
+elab.mq.NAMESRV_ADDR=http://MQ_INST_1819241776271348_Bak3xjk0.mq-internet-access.mq-internet.aliyuncs.com:80
+elab.mq.GROUP_ID=GID-producer1

+ 14 - 1
elab-spring/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>elab-parent</artifactId>
         <groupId>com.elab.core</groupId>
-        <version>2.0.4.10-SNAPSHOT</version>
+        <version>2.0.4.11-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
@@ -145,4 +145,17 @@
         <!--</dependency>-->
     </dependencies>
 
+    <distributionManagement>
+        <!--<repository>-->
+        <!--<id>releases</id>-->
+        <!--<name>Nexus Release Repository</name>-->
+        <!--<url>http://192.168.0.11:8081/nexus/content/repositories/elab/</url>-->
+        <!--</repository>-->
+        <snapshotRepository>
+            <id>snapshots</id>
+            <name>User Project SNAPSHOTS</name>
+            <url>http://192.168.0.11:8081/nexus/content/repositories/snapshots/</url>
+        </snapshotRepository>
+    </distributionManagement>
+
 </project>

+ 28 - 22
elab-spring/src/main/java/com/elab/spring/exception/CommonException.java

@@ -8,8 +8,10 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.core.env.Environment;
 import org.springframework.http.HttpStatus;
+import org.springframework.http.converter.HttpMessageNotReadableException;
 import org.springframework.validation.FieldError;
 import org.springframework.validation.ObjectError;
+import org.springframework.web.HttpMediaTypeNotSupportedException;
 import org.springframework.web.HttpRequestMethodNotSupportedException;
 import org.springframework.web.bind.MethodArgumentNotValidException;
 import org.springframework.web.bind.annotation.ControllerAdvice;
@@ -33,6 +35,7 @@ public class CommonException {
     private Logger logger = LoggerFactory.getLogger(CommonException.class);
     private String defaultSystemError = "CORE.ERROR.001";
     private String defaultParamsError = "CORE.PARAMS.001";
+    private String defaultNotFoundError = "CORE.NOT_FOUND";
     private String defaultMessage = "系统异常";
     @Autowired
     private Environment environment;
@@ -42,10 +45,7 @@ public class CommonException {
     @ResponseStatus(code = HttpStatus.INTERNAL_SERVER_ERROR)
     public Object exception(Exception ex, WebRequest request) {
         logger.error(ex.getMessage(), ex);
-        Info info = new Info();
-        info.setSuccess(false);
-        info.setErrorCode(environment.getProperty("mvc.common.exception.code", defaultSystemError));
-        info.setMessage(environment.getProperty("mvc.common.exception.message", defaultMessage));
+        Info info = getInfo(environment.getProperty("mvc.common.exception.code", defaultSystemError), environment.getProperty("mvc.common.exception.message", defaultMessage));
         return info;
     }
 
@@ -62,10 +62,7 @@ public class CommonException {
                 sb.append(objectError.getField() + " 字段值 " + objectError.getRejectedValue() + " - " + objectError.getDefaultMessage());
             }
         }
-        Info info = new Info();
-        info.setSuccess(false);
-        info.setErrorCode(defaultParamsError);
-        info.setMessage(sb.toString());
+        Info info = getInfo(defaultParamsError, sb.toString());
         return info;
     }
 
@@ -73,10 +70,7 @@ public class CommonException {
     @ResponseBody
     public Object coreException(CoreException ex) {
         logger.debug(ex.getMessage(), ex);
-        Info info = new Info();
-        info.setSuccess(false);
-        info.setErrorCode(ex.getErrorCode());
-        info.setMessage(ex.getMessage());
+        Info info = getInfo(ex.getErrorCode(), ex.getMessage());
         return info;
     }
 
@@ -84,21 +78,33 @@ public class CommonException {
     @ResponseBody
     public Object businessException(BusinessException bx) {
         logger.debug(bx.getMessage(), bx);
-        Info info = new Info();
-        info.setSuccess(false);
-        info.setErrorCode(bx.getErrorCode());
-        info.setMessage(bx.getMessage());
+        Info info = getInfo(bx.getErrorCode(), bx.getMessage());
         return info;
     }
 
-    @ExceptionHandler(HttpRequestMethodNotSupportedException.class)
+    private Info getInfo(String errorCode, String message) {
+        return Info.NO(errorCode, message);
+    }
+
+    @ExceptionHandler(value = {HttpRequestMethodNotSupportedException.class, HttpMediaTypeNotSupportedException.class})
     @ResponseBody
-    @ResponseStatus(value = HttpStatus.NOT_FOUND)
+    @ResponseStatus(value = HttpStatus.METHOD_NOT_ALLOWED)
     public Object notFondMethod(Exception ex) {
-        Info info = new Info();
-        info.setSuccess(false);
-        info.setErrorCode(defaultSystemError);
-        info.setMessage("not fond");
+        logger.info(" 全局notFondMethod异常捕获 .." + ex.getMessage());
+        // 将匹配不到方法的异常状态定位404。
+        Info info = getInfo(defaultNotFoundError, "not fond");
+        return info;
+    }
+
+    @ExceptionHandler(HttpMessageNotReadableException.class)
+    @ResponseBody
+    @ResponseStatus(value = HttpStatus.NOT_FOUND)
+    public Object notReadable(Exception ex) {
+        logger.info(" 全局notFondMethod异常捕获 .." + ex.getMessage());
+        // 将参数有异常的数据拦截返回
+        Info info = getInfo(defaultNotFoundError, "请检查请求数据..");
         return info;
     }
+
+
 }

+ 104 - 0
elab-spring/src/main/java/com/elab/spring/intercept/CheckAnnotationProcess.java

@@ -0,0 +1,104 @@
+package com.elab.spring.intercept;
+
+import com.elab.core.aop.annotations.ExceptionHandle;
+import com.elab.core.exception.CheckException;
+import com.elab.core.utils.ObjectUtils;
+import com.elab.core.utils.StringUtils;
+import io.swagger.annotations.ApiOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.aop.support.AopUtils;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
+import org.springframework.core.env.Environment;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.lang.reflect.Method;
+
+/**
+ * @describe :代码检测
+ * @Author : liukx
+ * @time : 2019/3/6 - 11:44
+ */
+public class CheckAnnotationProcess implements BeanPostProcessor, BeanFactoryPostProcessor {
+
+    private Logger logger = LoggerFactory.getLogger(com.elab.spring.intercept.CheckAnnotationProcess.class);
+    private ConfigurableListableBeanFactory beanFactory;
+
+    private Environment environment;
+    private boolean isException = false;
+    /*排除不包含的包名*/
+    private String excludePackage;
+
+    public CheckAnnotationProcess(Environment environment) {
+        this.environment = environment;
+        this.isException = ObjectUtils.isNotEmpty(environment.getProperty("elab.isException")) ? Boolean.parseBoolean(environment.getProperty("elab.isException")): false;
+        this.excludePackage = environment.getProperty("elab.excludePackage");
+    }
+
+    public void setException(boolean exception) {
+        isException = exception;
+    }
+
+    @Override
+    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
+        this.beanFactory = beanFactory;
+    }
+
+    @Override
+    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
+        return bean;
+    }
+
+    @Override
+    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
+        String[] checkPackageList = excludePackage.split(",");
+        if (checkPackageList.length > 0) {
+            String exclude = "com.elab.marketing.auth.dao";
+            String proxyName = "com.sun.proxy";
+            if (AopUtils.isAopProxy(bean)) {
+                Class<?> targetClass = AopUtils.getTargetClass(bean);
+                String name = targetClass.getName();
+
+
+                for(String singlePackage : checkPackageList) {
+                    if (!name.startsWith(singlePackage) && !name.startsWith(proxyName)) {
+                        Method[] declaredMethods = targetClass.getDeclaredMethods();
+                        for (int i = 0; i < declaredMethods.length; i++) {
+                            Method declaredMethod = declaredMethods[i];
+                            String author = "";
+                            Transactional annotation = declaredMethod.getAnnotation(Transactional.class);
+                            ApiOperation apiOperation = declaredMethod.getAnnotation(ApiOperation.class);
+                            if (apiOperation != null && StringUtils.isNotEmpty(apiOperation.nickname())) {
+                                author = apiOperation.nickname();
+                            }
+                            if (annotation != null || apiOperation != null) {
+                                ExceptionHandle exceptionHandle = declaredMethod.getAnnotation(ExceptionHandle.class);
+                                if (exceptionHandle != null) {
+                                    if (StringUtils.isEmpty(exceptionHandle.username())) {
+                                        String message = "请在" + name + " \t " + declaredMethod.getName() + " --> " + author + " 方法头的@ExceptionHandle 加上username属性";
+                                        notifyWarning(message);
+                                    }
+                                } else {
+                                    String message = "请在" + name + " \t " + declaredMethod.getName() + " --> " + author + " 方法头加上 @ExceptionHandle(username = '')";
+                                    notifyWarning(message);
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        return bean;
+    }
+
+    private void notifyWarning(String message) {
+        if (isException) {
+            throw new CheckException(message);
+        } else {
+            logger.error(message);
+        }
+    }
+}

+ 41 - 8
elab-spring/src/main/java/com/elab/spring/utils/RestTemplateUtils.java

@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONObject;
 import com.dianping.cat.Cat;
 import com.dianping.cat.message.Transaction;
 import com.elab.core.utils.ObjectUtils;
+import com.elab.core.utils.StringUtils;
 import com.elab.log.utils.CatMsgConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -12,6 +13,8 @@ import org.springframework.http.*;
 import org.springframework.http.client.SimpleClientHttpRequestFactory;
 import org.springframework.web.client.RestTemplate;
 
+import java.io.InputStream;
+
 /**
  * http调用工具
  *
@@ -70,22 +73,34 @@ public class RestTemplateUtils {
      * @param <T>
      * @return
      */
-    public <T> T post(String url, Object reqParam, Class<T> clazz)  {
+    public <T> T post(String url, Object reqParam, Class<T> clazz) {
+        return post(url, headers, reqParam, clazz);
+    }
+
+    /**
+     * 执行post请求
+     *
+     * @param url         url
+     * @param httpHeaders head参数
+     * @param reqParam    请求参数
+     * @param clazz       类
+     * @param <T>
+     * @return
+     */
+    public <T> T post(String url, HttpHeaders httpHeaders, Object reqParam, Class<T> clazz) {
         String newUrl = getUrl(url);
         Transaction t = Cat.getProducer().newTransaction(CatMsgConstants.THIRD_PARTY, newUrl);
         logger.info(" URL : " + url);
         if (reqParam != null) {
-            logger.info(" RequestData : " + logData(reqParam));
+            logger.info(" RequestData : " + StringUtils.logOut(logData(reqParam)));
         }
         T responseData = null;
         try {
             //利用容器实现数据封装,发送
-            HttpEntity<Object> entity = new HttpEntity<Object>(reqParam, headers);
+            HttpEntity<Object> entity = new HttpEntity<Object>(reqParam, httpHeaders);
             responseData = restTemplate.postForObject(url, entity, clazz);
             t.setStatus(Transaction.SUCCESS);
-            if (responseData != null) {
-                logger.info(" ResponseData : " + logData(responseData));
-            }
+            logResponse(responseData);
         } catch (Exception e) {
             logger.error("------ 第三方接口调用失败 : ", e);
             t.setStatus(e.getClass().getSimpleName());
@@ -95,6 +110,20 @@ public class RestTemplateUtils {
         return responseData;
     }
 
+    /**
+     * 通用的返回日志
+     *
+     * @param responseData
+     * @param <T>
+     */
+    private <T> void logResponse(T responseData) {
+        if (dontCareType(responseData)) {
+            logger.info(" 不关心的类型 ... ");
+        } else {
+            logger.info(" ResponseData : " + StringUtils.logOut(logData(responseData)));
+        }
+    }
+
     /**
      * get请求发送
      *
@@ -113,7 +142,7 @@ public class RestTemplateUtils {
             responseData = restTemplate.getForObject(url, clazz);
             t.setStatus(Transaction.SUCCESS);
             if (responseData != null) {
-                logger.info(" ResponseData : " + logData(responseData));
+                logResponse((T) responseData);
             }
         } catch (Exception e) {
             logger.error("------ 第三方接口调用失败 : ", e);
@@ -124,6 +153,10 @@ public class RestTemplateUtils {
         return responseData;
     }
 
+    private <T> boolean dontCareType(T responseData) {
+        return responseData instanceof byte[] || responseData instanceof InputStream;
+    }
+
     /**
      * 文件上传接口
      *
@@ -146,7 +179,7 @@ public class RestTemplateUtils {
             ResponseEntity<T> exchange = restTemplate.exchange(url, HttpMethod.POST, entity, clazz);
             responseData = exchange.getBody();
             if (exchange != null && responseData != null) {
-                logger.info(" ResponseData : " + logData(responseData));
+                logger.info(" ResponseData : " + StringUtils.logOut(logData(responseData)));
             }
             t.setStatus(Transaction.SUCCESS);
         } catch (Exception e) {

+ 54 - 1
elab-spring/src/test/java/com/elab/spring/utils/RestTemplateUtilsTest.java

@@ -1,12 +1,21 @@
 package com.elab.spring.utils;
 
+import com.elab.core.utils.GzipUtils;
 import org.junit.Assert;
 import org.junit.Test;
+import org.springframework.web.client.RestTemplate;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.zip.GZIPInputStream;
 
 public class RestTemplateUtilsTest {
 
     private RestTemplateUtils restTemplateUtils = new RestTemplateUtils();
 
+    private RestTemplate restTemplate = new RestTemplate();
+
     @Test
     public void post() {
         String url = "http://106.14.133.2:5555/elab-marketing-authentication//ipAddr/getIpAddr";
@@ -23,7 +32,51 @@ public class RestTemplateUtilsTest {
         Assert.assertNotNull(get);
     }
 
-    public void postUpload() {
+    public static String conventFromGzip(String str) throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        ByteArrayInputStream in;
+        GZIPInputStream gunzip = null;
+
+        in = new ByteArrayInputStream(str.getBytes("ISO-8859-1"));
+        gunzip = new GZIPInputStream(in);
+        byte[] buffer = new byte[256];
+        int n;
+        while ((n = gunzip.read(buffer)) >= 0) {
+            out.write(buffer, 0, n);
+        }
+        return out.toString();
+    }
+
+    @Test
+    public void getInputstream() throws IOException {
+        String url = "http://wthrcdn.etouch.cn/weather_mini?city=上海";
+        byte[] bytes = restTemplateUtils.get(url, byte[].class);
+        String s1 = GzipUtils.unZip(bytes);
+        System.out.println(s1);
+
+    }
+
+    @Test
+    public void test1() {
+        String array = "2610\n" +
+                "5477\n" +
+                "6421\n" +
+                "14457\n" +
+                "11405\n" +
+                "15684\n" +
+                "5793\n";
 
+        String[] split = array.split("\n");
+        Integer result = 0;
+        for (int i = 0; i < split.length; i++) {
+            Integer count = Integer.valueOf(split[i]);
+            if (i == 0) {
+                result = count;
+                System.out.println(count);
+            } else {
+                result = result + count ;
+                System.out.println(result);
+            }
+        }
     }
 }

+ 1 - 1
pom.xml

@@ -7,7 +7,7 @@
     <groupId>com.elab.core</groupId>
     <artifactId>elab-parent</artifactId>
     <packaging>pom</packaging>
-    <version>2.0.4.10-SNAPSHOT</version>
+    <version>2.0.4.11-SNAPSHOT</version>
     <modules>
         <module>elab-core</module>
         <module>elab-cache</module>