# Elab-kafka 使用介绍 ## Maven 依赖配置 1. 正常依赖 ```xml com.elab.core elab-kafka ${elab.version} ``` 2. 如果出现jar包依赖冲突则可以尝试 需要注意的是目前该版本测试是基于SpringBoot 2.1.6.RELEASE 通过。 ```xml com.elab.core elab-kafka ${elab.version} spring-kafka org.springframework.kafka kafka-clients org.apache.kafka org.springframework.kafka spring-kafka 2.2.7.RELEASE kafka-clients org.apache.kafka org.apache.kafka kafka-clients 2.0.1 ``` ### 基于大麦服务notify的解决冲突jar关系 ```xml com.elab.core elab-kafka ${elab.version} spring-kafka org.springframework.kafka kafka-clients org.apache.kafka org.springframework.kafka spring-kafka 2.2.0.RELEASE kafka-clients org.apache.kafka org.apache.kafka kafka-clients 2.0.1 io.reactivex rxnetty 0.4.20 runtime ``` 过滤掉低版本SpringBoot的自动配置 ```java @SpringBootApplication(exclude = {KafkaAutoConfiguration.class}) ``` ## 配置文件 ```yaml spring: kafka: bootstrap-servers: 47.103.15.48:9093,47.103.17.231:9093,47.103.23.79:9093 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer template: default-topic: uat-mysql-dts consumer: group-id: test-market-db max-poll-records: 30 fetch-min-size: 32000 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer enable-auto-commit: false ``` 如果涉及到ssl配置: ```yaml java: security: auth: login: config: E:/temp/java/kafka_client_jaas.conf # 基于阿里云提供的 trust-store-location: E:/temp/java/kafka.client.truststore.jks # 基于阿里云提供参考 : https://help.aliyun.com/document_detail/99958.html?spm=a2c4g.11186623.2.14.4c2a30f0KEj6Av#concept-99958-zh ``` ## 代码使用: ### 发送 ```java // 引入kafkaTemplate 注意,目前还是请用String json的方式进行数据发送 @Autowired private KafkaTemplate kafkaTemplate; Map map = new HashMap<>(); map.put("est", "123123"); ListenableFuture> send = kafkaTemplate.send(defaultTopic, JSON.toJSONString(map)); SendResult stringStringSendResult = send.get(); RecordMetadata recordMetadata = stringStringSendResult.getRecordMetadata(); // 关于顺序消息,希望某一类消息能够顺序消费掉,请将数据的分区下标指定成同一个 ListenableFuture> send = kafkaTemplate.send("订阅主题","分区下标","唯一key","Stringjson"); ``` ### 消费 请先集成`AbstractKafkaConsumer`,然实现**subscribeTopic**方法,告诉自己到底关注哪类topic.后续会将该类的topic消息回调给onMessage方法 ```java @Component public class ConsumerListener extends AbstractKafkaConsumer { private Logger logger = LoggerFactory.getLogger(getClass()); @Override public String subscribeTopic() { return "uat-mysql-dts"; } @Override public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) { logger.info("------->" + data); } } ``` ## 关于监控 所有kafka 的数据都会基于一个监控数据传输器传送到一个数据收集服务端,如果你有需要根据某些特定的参数查询数据的话可以使用以下方式: 实现`TopicMonitorRule`接口: 以下是参考案例 ```java @Component public class TestTopicMonitorRule implements TopicMonitorRule { @Override public Map> getRuleMap() { Map> map = new HashMap<>(); // key 是对应的topic map.put("uat-mysql-dts", (value, mqData) -> { // value就是kafka发送的数据,mqData就是监控数据的结构 JSONObject jsonObject = JSON.parseObject(value); mqData.setGroupName(xxx); mqData.setGroupKeyName(xxx); mqData.setDataId(xxx); }); return map; } } ``` 通常数据采集器会提供三个参数作为搜索条件,你可以利用这三个参数将你**感兴趣或者数据的标识**记录到这三个参数当中,后续可以利用这三个标识快速定位到数据来了解当时执行情况. > 注意该接口每个项目实现一个即可,一个topic的发送数据对应一个结构对象.