|
@@ -4,17 +4,17 @@ import com.aliyun.openservices.ons.api.Action;
|
|
|
import com.aliyun.openservices.ons.api.ConsumeContext;
|
|
|
import com.aliyun.openservices.ons.api.Message;
|
|
|
import com.aliyun.openservices.ons.api.MessageListener;
|
|
|
-import com.elab.core.exception.BusinessException;
|
|
|
import com.elab.core.utils.DateUtils;
|
|
|
import com.elab.core.utils.ObjectUtils;
|
|
|
import com.elab.mq.dao.IConsumerDao;
|
|
|
-import com.elab.mq.dao.IProducerDao;
|
|
|
import com.elab.mq.model.ConsumerEntity;
|
|
|
import com.elab.mq.model.MessageModel;
|
|
|
-import com.elab.mq.model.ProducerEntity;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+
|
|
|
+import java.util.Date;
|
|
|
|
|
|
/**
|
|
|
* 抽象成客户端能够使用的方法
|
|
@@ -26,12 +26,12 @@ public abstract class AbstractMessageListener implements MessageListener {
|
|
|
|
|
|
private Logger logger = LoggerFactory.getLogger(getClass());
|
|
|
|
|
|
- @Autowired
|
|
|
- private IProducerDao producerDao;
|
|
|
-
|
|
|
@Autowired
|
|
|
private IConsumerDao consumerDao;
|
|
|
|
|
|
+ @Value("${spring.application.name}")
|
|
|
+ private String moduleName;
|
|
|
+
|
|
|
|
|
|
/**
|
|
|
* 客户端关注的topic消息,实现了该方法,消息监听被触发时会与之匹配。
|
|
@@ -50,47 +50,57 @@ public abstract class AbstractMessageListener implements MessageListener {
|
|
|
}
|
|
|
|
|
|
|
|
|
- private ConsumerEntity init(String producerId) throws Exception {
|
|
|
+ private ConsumerEntity init(MessageModel message) throws Exception {
|
|
|
+ String msgID = message.getMsgID();
|
|
|
+ Integer producerId = 0;
|
|
|
+
|
|
|
+ if (message.getProducerId() != null) {
|
|
|
+ producerId = Integer.valueOf(message.getProducerId());
|
|
|
+ }
|
|
|
+
|
|
|
+ String topic = message.getTopic();
|
|
|
+ String tag = message.getTag();
|
|
|
ConsumerEntity oldConsumerEntity = new ConsumerEntity();
|
|
|
- ProducerEntity producerEntity = producerDao.selectById(producerId);
|
|
|
- logger.info("获取生产者的信息:" + ObjectUtils.objectParseJsonStr(producerEntity));
|
|
|
- if (producerEntity != null) {
|
|
|
- logger.info("进入消费者验证...");
|
|
|
- ConsumerEntity consumerEntity = new ConsumerEntity();
|
|
|
- consumerEntity.setProducerId(producerEntity.getId());
|
|
|
- consumerEntity.setTopicId(producerEntity.getTopicId());
|
|
|
- consumerEntity.setStatus(1);
|
|
|
- oldConsumerEntity = consumerDao.selectByObject(consumerEntity);
|
|
|
- if (oldConsumerEntity != null && oldConsumerEntity.getConsumerStatus().equals(1)) {
|
|
|
- logger.info("消费者验证失败,已经消费成功过一次,不允许重复消费");
|
|
|
- throw new BusinessException("请不要重复消费,key:" + producerId);
|
|
|
- }
|
|
|
- logger.info("消费者验证通过,消费者消费的数据匹配上生产者数据,");
|
|
|
- if (oldConsumerEntity != null) {
|
|
|
- oldConsumerEntity.setRetryCount(oldConsumerEntity.getRetryCount() + 1);
|
|
|
- oldConsumerEntity.setConsumerStatus(-1);
|
|
|
- oldConsumerEntity.setUpdated(DateUtils.getCurrentDateTime());
|
|
|
- consumerDao.updateById(oldConsumerEntity);
|
|
|
- } else {
|
|
|
- oldConsumerEntity = new ConsumerEntity();
|
|
|
- oldConsumerEntity.setProducerId(producerEntity.getId());
|
|
|
- oldConsumerEntity.setContent(producerEntity.getContent());
|
|
|
- oldConsumerEntity.setHouseId(producerEntity.getHouseId());
|
|
|
- oldConsumerEntity.setModuleName(producerEntity.getModuleName());
|
|
|
- oldConsumerEntity.setStatus(1);
|
|
|
- oldConsumerEntity.setCreated(DateUtils.getCurrentDateTime());
|
|
|
- oldConsumerEntity.setRetryCount(0);
|
|
|
- oldConsumerEntity.setTopicId(producerEntity.getTopicId());
|
|
|
- oldConsumerEntity.setConsumerStatus(0);
|
|
|
- int id = consumerDao.insert(oldConsumerEntity);
|
|
|
- oldConsumerEntity.setId(id);
|
|
|
- }
|
|
|
+ logger.info("进入消费者验证...");
|
|
|
+ ConsumerEntity consumerEntity = new ConsumerEntity();
|
|
|
+
|
|
|
+ consumerEntity.setProducerId(producerId);
|
|
|
+ consumerEntity.setTopicId(topic);
|
|
|
+ consumerEntity.setModuleName(moduleName);
|
|
|
+ consumerEntity.setStatus(1);
|
|
|
+ consumerEntity.setMsgId(msgID);
|
|
|
+ consumerEntity.setTag(tag);
|
|
|
+ consumerEntity.setModuleMethod(getClass().getName());
|
|
|
+ oldConsumerEntity = consumerDao.selectByObject(consumerEntity);
|
|
|
+ if (oldConsumerEntity != null && oldConsumerEntity.getConsumerStatus().equals(1)) {
|
|
|
+ logger.warn("消费者验证失败,已经消费成功过一次,不允许重复消费 消费编号 : " + oldConsumerEntity.getId());
|
|
|
+ // throw new BusinessException("请不要重复消费,key:" + message);
|
|
|
+ return oldConsumerEntity;
|
|
|
+ }
|
|
|
+ logger.info("消费者验证通过,消费者消费的数据匹配上生产者数据,");
|
|
|
+ if (oldConsumerEntity != null) {
|
|
|
+ oldConsumerEntity.setRetryCount(oldConsumerEntity.getRetryCount() + 1);
|
|
|
+ oldConsumerEntity.setConsumerStatus(-1);
|
|
|
+ oldConsumerEntity.setUpdated(DateUtils.getCurrentDateTime());
|
|
|
+ consumerDao.updateById(oldConsumerEntity);
|
|
|
} else {
|
|
|
- logger.warn("生产者为空,无法匹配消费者");
|
|
|
+ oldConsumerEntity = new ConsumerEntity();
|
|
|
+ oldConsumerEntity.setProducerId(producerId);
|
|
|
+ oldConsumerEntity.setContent(new String(message.getBody()));
|
|
|
+ oldConsumerEntity.setHouseId(message.getHouseId());
|
|
|
+ oldConsumerEntity.setModuleName(moduleName);
|
|
|
+ oldConsumerEntity.setStatus(1);
|
|
|
+ oldConsumerEntity.setCreated(DateUtils.getCurrentDateTime());
|
|
|
+ oldConsumerEntity.setRetryCount(0);
|
|
|
+ oldConsumerEntity.setTopicId(topic);
|
|
|
+ oldConsumerEntity.setConsumerStatus(0);
|
|
|
+ oldConsumerEntity.setMsgId(msgID);
|
|
|
+ oldConsumerEntity.setTag(tag);
|
|
|
+ oldConsumerEntity.setModuleMethod(getClass().getName());
|
|
|
+ int id = consumerDao.insert(oldConsumerEntity);
|
|
|
+ oldConsumerEntity.setId(id);
|
|
|
}
|
|
|
return oldConsumerEntity;
|
|
|
-// ConsumerEntity consumerEntity = new ConsumerEntity();
|
|
|
-
|
|
|
}
|
|
|
|
|
|
|
|
@@ -109,15 +119,23 @@ public abstract class AbstractMessageListener implements MessageListener {
|
|
|
logger.debug("消息处理被触发 : " + message.toString());
|
|
|
Action action = null;
|
|
|
try {
|
|
|
- ConsumerEntity oldConsumerEntity = init(message.getKey());
|
|
|
+ ConsumerEntity oldConsumerEntity = init(messageModel);
|
|
|
+ //如果已经成功过一次
|
|
|
+ if (oldConsumerEntity.getConsumerStatus() == 1) {
|
|
|
+ logger.warn(" 重复消费 过滤掉. 消费编号 : " + oldConsumerEntity.getId());
|
|
|
+ return Action.CommitMessage;
|
|
|
+ }
|
|
|
+
|
|
|
logger.info("更新消费者数据: " + ObjectUtils.objectParseJsonStr(oldConsumerEntity));
|
|
|
action = consume0(messageModel, consumeContext);
|
|
|
|
|
|
- if (Action.ReconsumeLater.equals(action)) {
|
|
|
+ if (action == null || Action.ReconsumeLater.equals(action)) {
|
|
|
oldConsumerEntity.setConsumerStatus(-1);
|
|
|
+ oldConsumerEntity.setUpdated(new Date());
|
|
|
consumerDao.updateById(oldConsumerEntity);
|
|
|
} else {
|
|
|
oldConsumerEntity.setConsumerStatus(1);
|
|
|
+ oldConsumerEntity.setUpdated(new Date());
|
|
|
consumerDao.updateById(oldConsumerEntity);
|
|
|
}
|
|
|
} catch (Exception e) {
|