|
@@ -4,9 +4,6 @@ import com.elab.mq.consts.MqConstants;
|
|
|
import com.elab.mq.listener.ConsumerInterceptor;
|
|
|
import com.elab.mq.model.MessageModel;
|
|
|
import com.elab.redis.config.ElabRedisProperties;
|
|
|
-import com.jay.monitor.data.client.MonitorSendProducer;
|
|
|
-import com.jay.monitor.data.client.utils.MonitorUtils;
|
|
|
-import com.jay.monitor.data.core.model.serializable.MQDataDTO;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
@@ -39,14 +36,16 @@ public class RMQCacheConsumerInterceptor implements ConsumerInterceptor {
|
|
|
/**
|
|
|
* 消费中的时间,避免服务挂掉了,负载均衡到另一台
|
|
|
*/
|
|
|
- private Long maxWaitTime = TimeUnit.SECONDS.toMillis(100);
|
|
|
+ private Long maxWaitTime = TimeUnit.SECONDS.toMillis(20);
|
|
|
+
|
|
|
+ private Long retryWaitTime = TimeUnit.DAYS.toMillis(1);
|
|
|
/**
|
|
|
* 消费完成之后,多久的过期时间
|
|
|
*/
|
|
|
private Long successValidTime = TimeUnit.MINUTES.toMillis(30);
|
|
|
|
|
|
@Override
|
|
|
- public boolean check(MessageModel messageModel) {
|
|
|
+ public boolean check(MessageModel messageModel, String groupName) {
|
|
|
/**
|
|
|
* 0: 消费中
|
|
|
* 1: 消费成功
|
|
@@ -54,8 +53,8 @@ public class RMQCacheConsumerInterceptor implements ConsumerInterceptor {
|
|
|
*
|
|
|
* 考虑到的情况:
|
|
|
* 1. 消费慢
|
|
|
- * - 过了超时时间,但是没有超过RMQ的ConsumeTimeout确认超时时间。没关系,等提交确认状态就好了,因为同一条消息只会被同一个组的一个线程消费.
|
|
|
- * - 过了超时时间,也超过了RMQ的ConsumeTimeout确认超时时间。
|
|
|
+ * - 过了key超时时间,但是没有超过RMQ的ConsumeTimeout确认超时时间。没关系,等提交确认状态就好了,因为同一条消息只会被同一个组的一个线程消费.
|
|
|
+ * - 过了key超时时间,也超过了RMQ的ConsumeTimeout确认超时时间。
|
|
|
* --- RMQ会执行重试,这个时候就依赖业务这种需求做幂等。(比较极端)
|
|
|
* 2. 服务宕机: 这个时候肯定不会提交ACK状态,直接走重试逻辑就好.
|
|
|
* - 2.1 -> 等待key超时就行了
|
|
@@ -65,7 +64,7 @@ public class RMQCacheConsumerInterceptor implements ConsumerInterceptor {
|
|
|
* ---
|
|
|
*
|
|
|
*/
|
|
|
- String key = getCacheKey(messageModel);
|
|
|
+ String key = getCacheKey(messageModel, groupName);
|
|
|
Object statusObject = redisTemplate.opsForValue().get(key);
|
|
|
if (statusObject == null) {
|
|
|
// 如果maxWaitTime还没消费完,只有两种情况:1.服务挂掉了, 2. 消费慢
|
|
@@ -75,7 +74,7 @@ public class RMQCacheConsumerInterceptor implements ConsumerInterceptor {
|
|
|
* - 2.1 : RMQ认为消费超时了,RMQ的消费确认时间肯定大于这个maxWaitTime时间就行.
|
|
|
* - 2.2 : 确实消费慢,那key超时了没关系,只要maxWaitTime小于RMQ超时确认时间
|
|
|
*/
|
|
|
- redisTemplate.opsForValue().set(key, MqConstants.MSG_WAIT, maxWaitTime);
|
|
|
+ redisTemplate.opsForValue().set(key, MqConstants.MSG_WAIT, maxWaitTime, TimeUnit.MILLISECONDS);
|
|
|
logger.debug("缓存幂等校验通过:未发现值:" + key);
|
|
|
return true;
|
|
|
} else {
|
|
@@ -84,58 +83,57 @@ public class RMQCacheConsumerInterceptor implements ConsumerInterceptor {
|
|
|
/**
|
|
|
* 到达这里的话,只有可能是出现超时重试的情况。
|
|
|
* 实际消费成功了,但是没有确认给RMQ。
|
|
|
+ *
|
|
|
+ * 一旦出现重试,则说明出现了故障问题,则将有效时间进行延长
|
|
|
*/
|
|
|
logger.info("该数据已经被消费过了 : " + key);
|
|
|
+ redisTemplate.opsForValue().set(key, MqConstants.MSG_OK, retryWaitTime, TimeUnit.MILLISECONDS);
|
|
|
return false;
|
|
|
} else if (MqConstants.MSG_WAIT.equals(status)) {
|
|
|
- // 几乎不可能触发.
|
|
|
+ // 几乎不可能触发.因为同一时刻只可能有一个消费者消费,不会出现并发情况。
|
|
|
throw new RuntimeException("等待中未消费完成的数据");
|
|
|
} else {
|
|
|
Long count = redisTemplate.opsForValue().increment(key, 1);
|
|
|
- logger.debug("缓存幂等校验通过 -> 重试 : " + count);
|
|
|
+ Boolean expire = redisTemplate.expire(key, retryWaitTime, TimeUnit.MILLISECONDS);
|
|
|
+ logger.debug("缓存幂等校验通过 -> 重试 " + expire + ": " + count);
|
|
|
return true;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private String getCacheKey(MessageModel messageModel) {
|
|
|
+ /**
|
|
|
+ * 目前是根据前缀+组id+redis的自增键去实现
|
|
|
+ * 如果有问题可以使用RMQ里面的MSGID去做唯一键.
|
|
|
+ *
|
|
|
+ * @param messageModel
|
|
|
+ * @param groupName
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private String getCacheKey(MessageModel messageModel, String groupName) {
|
|
|
String producerId = messageModel.getProducerId();
|
|
|
String prefixWith = elabRedisProperties.getPrefixWith();
|
|
|
StringBuilder key = new StringBuilder();
|
|
|
key.append(prefixWith).append(":");
|
|
|
key.append(defaultPrefix).append(":");
|
|
|
key.append(groupId).append(":");
|
|
|
+ key.append(groupName).append(":");
|
|
|
key.append(producerId);
|
|
|
return key.toString();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void success(MessageModel messageModel) {
|
|
|
- String key = getCacheKey(messageModel);
|
|
|
- redisTemplate.opsForValue().set(key, MqConstants.MSG_OK, maxWaitTime, TimeUnit.SECONDS);
|
|
|
- sendMonitorData(messageModel, 1, messageModel.getInvokeTime());
|
|
|
+ public void success(MessageModel messageModel, String groupName) {
|
|
|
+ String key = getCacheKey(messageModel, groupName);
|
|
|
+ redisTemplate.opsForValue().set(key, MqConstants.MSG_OK, maxWaitTime, TimeUnit.MILLISECONDS);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void error(MessageModel messageModel, Throwable e) {
|
|
|
- String key = getCacheKey(messageModel);
|
|
|
+ public void error(MessageModel messageModel, String groupName, Throwable e) {
|
|
|
+ String key = getCacheKey(messageModel, groupName);
|
|
|
Integer currentStatus = (Integer) redisTemplate.opsForValue().get(key);
|
|
|
if (currentStatus <= 1) {
|
|
|
redisTemplate.opsForValue().set(key, 2);
|
|
|
}
|
|
|
- sendMonitorData(messageModel, -1, messageModel.getInvokeTime());
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 发送监控数据
|
|
|
- *
|
|
|
- * @param data 数据内容
|
|
|
- * @param status 状态
|
|
|
- * @param requestTime 请求时长
|
|
|
- */
|
|
|
- private void sendMonitorData(MessageModel data, Integer status, long requestTime) {
|
|
|
- String content = (String) data.getObject(String.class);
|
|
|
- MQDataDTO mqDataDTO = MonitorUtils.builderConsumerRMQDataDTO(data.getTopic(), data.getKey(), data.getTag(), data.getMsgID(), content, status, requestTime);
|
|
|
- MonitorSendProducer.sendMsg(data.getTopic() + "-" + data.getTag(), mqDataDTO);
|
|
|
- }
|
|
|
}
|