|
@@ -6,6 +6,7 @@ import com.aliyun.openservices.ons.api.Message;
|
|
|
import com.aliyun.openservices.ons.api.MessageListener;
|
|
|
import com.dianping.cat.message.Transaction;
|
|
|
import com.elab.log.utils.CatCrossProcess;
|
|
|
+import com.elab.mq.exception.MsgRollBackException;
|
|
|
import com.elab.mq.model.MessageModel;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -57,7 +58,8 @@ public abstract class AbstractMessageListener implements MessageListener {
|
|
|
String tag = message.getTag();
|
|
|
String groupName = this.getClass().getSimpleName();
|
|
|
|
|
|
- Transaction t = CatCrossProcess.getCrossTransactionMsg(MQ_CONSUMER, topic + "_" + tag + "_" + getClass().getSimpleName(), null);
|
|
|
+ Transaction t = CatCrossProcess
|
|
|
+ .getCrossTransactionMsg(MQ_CONSUMER, topic + "_" + tag + "_" + getClass().getSimpleName(), null);
|
|
|
MessageModel messageModel = new MessageModel(message);
|
|
|
|
|
|
Action action = null;
|
|
@@ -76,12 +78,17 @@ public abstract class AbstractMessageListener implements MessageListener {
|
|
|
action = consume0(messageModel, consumeContext);
|
|
|
|
|
|
long time = System.currentTimeMillis() - start;
|
|
|
-
|
|
|
+ // Manual
|
|
|
messageModel.setInvokeTime(time);
|
|
|
|
|
|
t.setSuccessStatus();
|
|
|
-
|
|
|
- this.consumerInterceptor.success(messageModel, groupName);
|
|
|
+ // 可能出现手动提交回滚状态的情况。
|
|
|
+ if (action == null || action == Action.ReconsumeLater) {
|
|
|
+ // 手动提交或者极端情况返回为null的情况
|
|
|
+ this.consumerInterceptor.error(messageModel, groupName, new MsgRollBackException("手动提交回滚"));
|
|
|
+ } else {
|
|
|
+ this.consumerInterceptor.success(messageModel, groupName);
|
|
|
+ }
|
|
|
} catch (Exception e) {
|
|
|
t.setStatus(e);
|
|
|
logger.error("消息处理异常 : ", e);
|