|
@@ -4,21 +4,12 @@ import com.aliyun.openservices.ons.api.Action;
|
|
|
import com.aliyun.openservices.ons.api.ConsumeContext;
|
|
|
import com.aliyun.openservices.ons.api.Message;
|
|
|
import com.aliyun.openservices.ons.api.MessageListener;
|
|
|
-import com.dianping.cat.Cat;
|
|
|
import com.dianping.cat.message.Transaction;
|
|
|
-import com.elab.core.utils.DateUtils;
|
|
|
import com.elab.log.utils.CatCrossProcess;
|
|
|
-import com.elab.mq.dao.IConsumerDao;
|
|
|
-import com.elab.mq.model.ConsumerEntity;
|
|
|
import com.elab.mq.model.MessageModel;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
-import org.springframework.beans.factory.annotation.Value;
|
|
|
-
|
|
|
-import java.util.LinkedHashMap;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.Properties;
|
|
|
|
|
|
|
|
|
* 抽象成客户端能够使用的方法
|
|
@@ -30,15 +21,11 @@ public abstract class AbstractMessageListener implements MessageListener {
|
|
|
|
|
|
private Logger logger = LoggerFactory.getLogger(getClass());
|
|
|
|
|
|
- @Autowired
|
|
|
- private IConsumerDao consumerDao;
|
|
|
-
|
|
|
- @Value("${spring.application.name}")
|
|
|
- private String moduleName;
|
|
|
+ @Autowired(required = false)
|
|
|
+ private ConsumerInterceptor consumerInterceptor = new LoggerConsumerInterceptor();
|
|
|
|
|
|
private final String MQ_CONSUMER = "MQ_CONSUMER";
|
|
|
|
|
|
-
|
|
|
|
|
|
* 客户端关注的topic消息,实现了该方法,消息监听被触发时会与之匹配。
|
|
|
*
|
|
@@ -55,67 +42,6 @@ public abstract class AbstractMessageListener implements MessageListener {
|
|
|
return "*";
|
|
|
}
|
|
|
|
|
|
-
|
|
|
- private ConsumerEntity init(MessageModel message) throws Exception {
|
|
|
- String msgID = message.getMsgID();
|
|
|
- Integer producerId = 0;
|
|
|
-
|
|
|
- if (message.getProducerId() != null) {
|
|
|
- producerId = Integer.valueOf(message.getProducerId());
|
|
|
- }
|
|
|
-
|
|
|
- String topic = message.getTopic();
|
|
|
- String tag = message.getTag();
|
|
|
- ConsumerEntity consumerEntity = new ConsumerEntity();
|
|
|
- consumerEntity.setProducerId(producerId);
|
|
|
- consumerEntity.setTopicId(topic);
|
|
|
- consumerEntity.setModuleName(moduleName);
|
|
|
- consumerEntity.setStatus(1);
|
|
|
- consumerEntity.setMsgId(msgID);
|
|
|
- consumerEntity.setTag(tag);
|
|
|
- consumerEntity.setModuleMethod(getClass().getName());
|
|
|
- ConsumerEntity oldConsumerEntity = consumerDao.selectByObject(consumerEntity);
|
|
|
- if (oldConsumerEntity != null && oldConsumerEntity.getConsumerStatus().equals(1)) {
|
|
|
- logger.warn("消费者验证失败,已经消费成功过一次,不允许重复消费 消费编号 : " + oldConsumerEntity.getId());
|
|
|
-
|
|
|
- return oldConsumerEntity;
|
|
|
- }
|
|
|
- logger.debug("消费者验证通过,消费者消费的数据匹配上生产者数据,");
|
|
|
- if (oldConsumerEntity != null) {
|
|
|
- oldConsumerEntity.setRetryCount(oldConsumerEntity.getRetryCount() + 1);
|
|
|
- oldConsumerEntity.setConsumerStatus(-1);
|
|
|
- oldConsumerEntity.setUpdated(DateUtils.getCurrentDateTime());
|
|
|
- oldConsumerEntity.setCatId(Cat.getCurrentMessageId());
|
|
|
- consumerDao.updateById(oldConsumerEntity);
|
|
|
- logger.debug("触发消息重试 当前重试次数 : " + oldConsumerEntity.getRetryCount());
|
|
|
- } else {
|
|
|
- oldConsumerEntity = new ConsumerEntity();
|
|
|
- oldConsumerEntity.setProducerId(producerId);
|
|
|
- String content = new String(message.getBody());
|
|
|
- if (content.length() < 5000) {
|
|
|
- oldConsumerEntity.setContent(content);
|
|
|
- } else {
|
|
|
- oldConsumerEntity.setContent(content.substring(0, 5000));
|
|
|
- }
|
|
|
- oldConsumerEntity.setHouseId(message.getHouseId());
|
|
|
- oldConsumerEntity.setModuleName(moduleName);
|
|
|
- oldConsumerEntity.setStatus(1);
|
|
|
- oldConsumerEntity.setCreated(DateUtils.getCurrentDateTime());
|
|
|
- oldConsumerEntity.setRetryCount(0);
|
|
|
- oldConsumerEntity.setTopicId(topic);
|
|
|
- oldConsumerEntity.setConsumerStatus(0);
|
|
|
- oldConsumerEntity.setMsgId(msgID);
|
|
|
- oldConsumerEntity.setTag(tag);
|
|
|
- oldConsumerEntity.setCatId(Cat.getCurrentMessageId());
|
|
|
- oldConsumerEntity.setModuleMethod(getClass().getName());
|
|
|
- int id = consumerDao.insert(oldConsumerEntity);
|
|
|
- oldConsumerEntity.setId(id);
|
|
|
- logger.debug(" 一条全新的消息,消息编号:" + id);
|
|
|
- }
|
|
|
- return oldConsumerEntity;
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
|
|
|
* 客户端的业务逻辑实现
|
|
|
*
|
|
@@ -130,39 +56,35 @@ public abstract class AbstractMessageListener implements MessageListener {
|
|
|
String topic = message.getTopic();
|
|
|
String tag = message.getTag();
|
|
|
|
|
|
- Map<String, String> msgMap = conversionPropertyToMap(message.getUserProperties());
|
|
|
-
|
|
|
- Transaction t = CatCrossProcess.getCrossTransactionMsg(MQ_CONSUMER, topic + "_" + tag + "_" + getClass().getSimpleName(), msgMap);
|
|
|
-
|
|
|
+ Transaction t = CatCrossProcess.getCrossTransactionMsg(MQ_CONSUMER, topic + "_" + tag + "_" + getClass().getSimpleName(), null);
|
|
|
MessageModel messageModel = new MessageModel(message);
|
|
|
+
|
|
|
logger.debug("消息处理被触发 : " + message.toString());
|
|
|
+
|
|
|
Action action = null;
|
|
|
+ long start = System.currentTimeMillis();
|
|
|
try {
|
|
|
- ConsumerEntity oldConsumerEntity = init(messageModel);
|
|
|
+
|
|
|
+
|
|
|
+ boolean isConsumer = consumerInterceptor.check(messageModel);
|
|
|
|
|
|
- if (oldConsumerEntity.getConsumerStatus() == 1) {
|
|
|
- logger.warn(" 重复消费 过滤掉. 消费编号 : " + oldConsumerEntity.getId());
|
|
|
+ if (!isConsumer) {
|
|
|
t.setSuccessStatus();
|
|
|
return Action.CommitMessage;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
action = consume0(messageModel, consumeContext);
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
+ long time = System.currentTimeMillis() - start;
|
|
|
+ messageModel.setInvokeTime(time);
|
|
|
t.setSuccessStatus();
|
|
|
+
|
|
|
+ this.consumerInterceptor.success(messageModel);
|
|
|
} catch (Exception e) {
|
|
|
t.setStatus(e);
|
|
|
logger.error("消息处理异常 : ", e);
|
|
|
- e.printStackTrace();
|
|
|
+ messageModel.setInvokeTime(System.currentTimeMillis() - start);
|
|
|
+ this.consumerInterceptor.error(messageModel, e);
|
|
|
+
|
|
|
return Action.ReconsumeLater;
|
|
|
} finally {
|
|
|
t.complete();
|
|
@@ -171,16 +93,5 @@ public abstract class AbstractMessageListener implements MessageListener {
|
|
|
return action;
|
|
|
}
|
|
|
|
|
|
- private Map<String, String> conversionPropertyToMap(Properties userProperties) {
|
|
|
- if (userProperties == null) {
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- Map<String, String> msgMap = new LinkedHashMap<>();
|
|
|
- userProperties.stringPropertyNames().forEach((key) -> {
|
|
|
- msgMap.put(key, userProperties.getProperty(key));
|
|
|
- });
|
|
|
- return msgMap;
|
|
|
- }
|
|
|
|
|
|
}
|