# rocketMQ使用介绍 ## 引入依赖 ```xml com.elab.core elab-rocketMQ ${elab.version} ``` ## 触发配置 ```java // 开启rocketmq配置 @EnableRocketMQ ``` ## 配置文件 ```yaml rocketmq: name-server: 192.168.0.24:9876;192.168.0.25:9876 producer: group: ${profiles.active}-${spring.application.name} sendMessageTimeout: 300000 ``` ## SQL ### 消息去重 需要将消息进行存储,重试的时候可以方便查看对比 ```sql CREATE TABLE `rmq_consumer` ( `id` int(11) NOT NULL AUTO_INCREMENT, `msg_id` varchar(50) DEFAULT NULL COMMENT '消息编号', `msg_key` varchar(50) DEFAULT NULL COMMENT '消息唯一编号', `consuemr_group` varchar(300) DEFAULT NULL COMMENT '模块名称', `consumer_ip` varchar(20) DEFAULT NULL COMMENT '生产者ip', `topic` varchar(300) DEFAULT NULL COMMENT 'topic名称', `consumer_status` int(11) DEFAULT NULL COMMENT '消费者状态1成功-1失败', `retry_count` int(11) DEFAULT NULL COMMENT '重试次数', `created` datetime DEFAULT NULL COMMENT '创建时间', `updated` datetime DEFAULT NULL COMMENT '修改时间', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COMMENT='消费者消息应用状态表'; CREATE TABLE `rmq_producer` ( `id` int(11) NOT NULL AUTO_INCREMENT, `producer_status` int(11) DEFAULT NULL COMMENT '生产者状态1成功-1失败', `producer_group` varchar(50) DEFAULT NULL COMMENT '生产者组', `producer_ip` varchar(20) DEFAULT NULL COMMENT '生产者ip', `topic` varchar(100) DEFAULT NULL COMMENT 'topic', `tags` varchar(100) DEFAULT NULL COMMENT '标签', `content` varchar(1000) DEFAULT NULL COMMENT '发送内容', `created` datetime DEFAULT NULL COMMENT '创建时间', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='生产者消息应用状态表'; ``` ## 消费者 ```java @Service public class StringConsumer implements TagRocketMQListener { private Logger logger = LoggerFactory.getLogger(StringConsumer.class); @Override public String tag() { return "*"; // 这里订阅你关注的tag消息,控制器会根据消息发送过来的tag转发给你定义的这个处理类中。 } @Override public void processMessage(String message) throws Exception { logger.info("--------------------消费完成-->>>>" + message); } } ``` 这里需要注意两点: 1. 如果没有抛出异常,那么MQ会认为消费成功了。 2. 如果失败会执行重试机制,业务如果特别重要可以自行在验证一次。尽管抽象类中已经做了一次。 ## 生产者 ```java @Autowired private DefaultMQProducerExt rocketMQTemplate; SONObject jsonObject = new JSONObject(); jsonObject.put("success", "true"); jsonObject.put("msg", "my name is lkx"); Message message = new Message(); message.setBody(jsonObject.toJSONString().getBytes()); message.setTopic(topic); message.setTags(""); SendResult sendResult = rocketMQTemplate.send(topic, "tag", RandomUtils.randomString(5), jsonObject.toJSONString().getBytes()); ```