加入maven的依赖
<dependency>
<groupId>com.elab.core</groupId>
<artifactId>elab-mq</artifactId>
<version>${elab.version}</version>
</dependency>
注意: 当你引用这个包的时候,功能已经开启。但是下面的配置也必须填写.
# 必填
elab.mq.AccessKey=
elab.mq.SecretKey=
elab.mq.NAMESRV_ADDR=http://MQ_INST_1819241776271348_Bak3xjk0.mq-internet-access.mq-internet.aliyuncs.com:80
# 非必填
elab.mq.GROUP_ID=GID-producer1 # 默认GID-运行环境-项目名称
elab.mq.SendMsgTimeoutMillis=5000 # 设置发送超时时间,单位毫秒
elab.mq.SuspendTimeMillis=100 # 顺序消息消费者失败进行重试前的等待时间(单位毫秒)
elab.mq.MaxReconsumeTimes=20 # 消费失败的重试次数
@Autowired
private IMsgProducerFacade msgProducerFacade;
public void send(){
// 发送消息
// topic / tag / 全局唯一编号 / 消息内容
JSONObject jsonObject = new JSONObject();
jsonObject.put("xxx", "哈哈");
jsonObject.put("sss", "ggg");
for (int i = 0; i < 10; i++) {
String key = RandomUtils.randomString(10);
MessageModel message = new MessageModel<Map>("T-consumer1", "*", key, jsonObject);
SendResult send = msgProducerFacade.send(message);
System.out.println("发送消息返回结果 : " + send.toString());
}
}
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.elab.mq.listener.AbstractMessageListener;
import org.springframework.stereotype.Component;
/**
* 消费者
*
* @author : liukx
* @time : 2019/5/15 - 14:04
*/
@Component
public class ConsumerServiceImpl extends AbstractMessageListener {
@Override
public String topic() {
return "T-consumer1";
}
// todo 另外强烈建议重写tag方法,一个项目对应一个topic,该项目下的业务用tag区分。
@Override
public Action consume0(Message message, ConsumeContext consumeContext) {
System.out.println("---------------------消费成功------>>" + message.toString());
return Action.CommitMessage;
}
}
生产者生产消息的时候,会去往表【mq_producer】中插入一条数据,字段中包含是否发送成功状态,发送的监控编号【cat_id】等等. 但是如果你要处理这种重复发送同一条消息的时候,没有做区分。正常来讲应该通过key字段去做唯一校验,但是并不是所有业务都会传这个东西。
消费者消费消息的时候会记录表【mq_consumer】,包括重试几次、消费状态、监控编号等等。
目前是强耦合了CAT[elab-log]监控系统、如果你有其他需求可能需要改源码。
如果出现消息发送失败或者消费失败可以去这两张表中找对应的数据,确保数据不会丢失。
-- ----------------------------
-- Table structure for mq_consumer
-- ----------------------------
DROP TABLE IF EXISTS `mq_consumer`;
CREATE TABLE `mq_consumer` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`producer_id` int(11) DEFAULT NULL COMMENT '生产者消息id',
`msg_id` varchar(100) DEFAULT NULL COMMENT 'MQ的消息编号',
`module_name` varchar(300) DEFAULT NULL COMMENT '模块名称',
`module_method` varchar(200) DEFAULT NULL COMMENT '模块的方法名,用来处理一个项目里面同一个topic和tag被多个消费处理',
`house_id` int(11) DEFAULT NULL COMMENT '项目id',
`topic_id` varchar(300) DEFAULT NULL COMMENT 'topic的id',
`tag` varchar(100) DEFAULT NULL COMMENT '标签名',
`content` varchar(4000) DEFAULT NULL COMMENT '消费内容',
`consumer_status` int(11) DEFAULT NULL COMMENT '消费者状态1成功-1失败0消费中',
`retry_count` int(11) DEFAULT NULL COMMENT '重试次数',
`cat_id` varchar(100) DEFAULT NULL COMMENT '消息编号',
`status` int(11) DEFAULT NULL COMMENT '状态1有效-1无效',
`created` datetime DEFAULT NULL COMMENT '创建时间',
`creator` varchar(100) DEFAULT NULL COMMENT '创建人',
`updated` datetime DEFAULT NULL COMMENT '修改时间',
`updator` varchar(100) DEFAULT NULL COMMENT '修改人',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1052 DEFAULT CHARSET=utf8mb4 COMMENT='消费者消息应用状态表';
-- ----------------------------
-- Table structure for mq_producer
-- ----------------------------
DROP TABLE IF EXISTS `mq_producer`;
CREATE TABLE `mq_producer` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`module_name` varchar(300) DEFAULT NULL COMMENT '模块名称',
`house_id` int(11) DEFAULT NULL COMMENT '项目id',
`producer_status` int(11) DEFAULT NULL COMMENT '生产者状态1成功-1失败',
`group_id` varchar(300) DEFAULT NULL COMMENT 'group的id',
`tag` varchar(100) DEFAULT NULL COMMENT '标签名称',
`cat_id` varchar(100) DEFAULT NULL COMMENT '链路编号',
`topic_id` varchar(300) DEFAULT NULL COMMENT 'topic的id',
`content` varchar(4000) DEFAULT NULL COMMENT '发送内容',
`result_content` varchar(1000) DEFAULT NULL COMMENT 'MQ结果参数',
`status` int(11) DEFAULT NULL COMMENT '状态1有效-1无效',
`created` datetime DEFAULT NULL COMMENT '创建时间',
`creator` varchar(100) DEFAULT NULL COMMENT '创建人',
`updated` datetime DEFAULT NULL COMMENT '修改时间',
`updator` varchar(100) DEFAULT NULL COMMENT '修改人',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1050 DEFAULT CHARSET=utf8mb4 COMMENT='生产者消息应用状态表';