REDERME.md 3.2 KB

rocketMQ使用介绍

引入依赖

<dependency>
    <groupId>com.elab.core</groupId>
    <artifactId>elab-rocketMQ</artifactId>
    <version>${elab.version}</version>
</dependency>

触发配置

// 开启rocketmq配置
@EnableRocketMQ

配置文件

rocketmq:
    name-server: 192.168.0.24:9876;192.168.0.25:9876
    producer:
        group: ${spring.application.name}-${profiles.active}
        sendMessageTimeout: 300000

SQL

消息去重

需要将消息进行存储,重试的时候可以方便查看对比

CREATE TABLE `rmq_consumer` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `msg_id` varchar(50) DEFAULT NULL COMMENT '消息编号',
  `msg_key` varchar(50) DEFAULT NULL COMMENT '消息唯一编号',
  `consuemr_group` varchar(300) DEFAULT NULL COMMENT '模块名称',
  `consumer_ip` varchar(20) DEFAULT NULL COMMENT '生产者ip',
  `topic` varchar(300) DEFAULT NULL COMMENT 'topic名称',
  `consumer_status` int(11) DEFAULT NULL COMMENT '消费者状态1成功-1失败',
  `retry_count` int(11) DEFAULT NULL COMMENT '重试次数',
  `created` datetime DEFAULT NULL COMMENT '创建时间',
  `updated` datetime DEFAULT NULL COMMENT '修改时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COMMENT='消费者消息应用状态表';

CREATE TABLE `rmq_producer` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `producer_status` int(11) DEFAULT NULL COMMENT '生产者状态1成功-1失败',
  `producer_group` varchar(50) DEFAULT NULL COMMENT '生产者组',
  `producer_ip` varchar(20) DEFAULT NULL COMMENT '生产者ip',
  `topic` varchar(100) DEFAULT NULL COMMENT 'topic',
  `tags` varchar(100) DEFAULT NULL COMMENT '标签',
  `content` varchar(1000) DEFAULT NULL COMMENT '发送内容',
  `created` datetime DEFAULT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='生产者消息应用状态表';

消费者

@Service
public class StringConsumer implements TagRocketMQListener {
    private Logger logger = LoggerFactory.getLogger(StringConsumer.class);

    @Override
    public String tag() {
        return "*"; // 这里订阅你关注的tag消息,控制器会根据消息发送过来的tag转发给你定义的这个处理类中。
    }

    @Override
    public void processMessage(String message) throws Exception {
        logger.info("--------------------消费完成-->>>>" + message);
    }
}

这里需要注意两点:

  1. 如果没有抛出异常,那么MQ会认为消费成功了。
  2. 如果失败会执行重试机制,业务如果特别重要可以自行在验证一次。尽管抽象类中已经做了一次。

生产者

@Autowired
private DefaultMQProducerExt rocketMQTemplate;

SONObject jsonObject = new JSONObject();
jsonObject.put("success", "true");
jsonObject.put("msg", "my name is lkx");
Message message = new Message();
message.setBody(jsonObject.toJSONString().getBytes());
message.setTopic(topic);
message.setTags("");
SendResult sendResult = rocketMQTemplate.send(topic, "tag", RandomUtils.randomString(5),
                                              jsonObject.toJSONString().getBytes());