<dependency>
<groupId>com.elab.core</groupId>
<artifactId>elab-rocketMQ</artifactId>
<version>${elab.version}</version>
</dependency>
// 开启rocketmq配置
@EnableRocketMQ
rocketmq:
name-server: 192.168.0.24:9876;192.168.0.25:9876
producer:
group: ${spring.application.name}-${profiles.active}
sendMessageTimeout: 300000
需要将消息进行存储,重试的时候可以方便查看对比
CREATE TABLE `rmq_consumer` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`msg_id` varchar(50) DEFAULT NULL COMMENT '消息编号',
`msg_key` varchar(50) DEFAULT NULL COMMENT '消息唯一编号',
`consuemr_group` varchar(300) DEFAULT NULL COMMENT '模块名称',
`consumer_ip` varchar(20) DEFAULT NULL COMMENT '生产者ip',
`topic` varchar(300) DEFAULT NULL COMMENT 'topic名称',
`consumer_status` int(11) DEFAULT NULL COMMENT '消费者状态1成功-1失败',
`retry_count` int(11) DEFAULT NULL COMMENT '重试次数',
`created` datetime DEFAULT NULL COMMENT '创建时间',
`updated` datetime DEFAULT NULL COMMENT '修改时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COMMENT='消费者消息应用状态表';
CREATE TABLE `rmq_producer` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`producer_status` int(11) DEFAULT NULL COMMENT '生产者状态1成功-1失败',
`producer_group` varchar(50) DEFAULT NULL COMMENT '生产者组',
`producer_ip` varchar(20) DEFAULT NULL COMMENT '生产者ip',
`topic` varchar(100) DEFAULT NULL COMMENT 'topic',
`tags` varchar(100) DEFAULT NULL COMMENT '标签',
`content` varchar(1000) DEFAULT NULL COMMENT '发送内容',
`created` datetime DEFAULT NULL COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='生产者消息应用状态表';
@Service
public class StringConsumer implements TagRocketMQListener {
private Logger logger = LoggerFactory.getLogger(StringConsumer.class);
@Override
public String tag() {
return "*"; // 这里订阅你关注的tag消息,控制器会根据消息发送过来的tag转发给你定义的这个处理类中。
}
@Override
public void processMessage(String message) throws Exception {
logger.info("--------------------消费完成-->>>>" + message);
}
}
这里需要注意两点:
@Autowired
private DefaultMQProducerExt rocketMQTemplate;
SONObject jsonObject = new JSONObject();
jsonObject.put("success", "true");
jsonObject.put("msg", "my name is lkx");
Message message = new Message();
message.setBody(jsonObject.toJSONString().getBytes());
message.setTopic(topic);
message.setTags("");
SendResult sendResult = rocketMQTemplate.send(topic, "tag", RandomUtils.randomString(5),
jsonObject.toJSONString().getBytes());