# Elab-kafka 使用介绍
## Maven 依赖配置
1. 正常依赖
```xml
com.elab.core
elab-kafka
${elab.version}
```
2. 如果出现jar包依赖冲突则可以尝试
需要注意的是目前该版本测试是基于SpringBoot 2.1.6.RELEASE 通过。
```xml
com.elab.core
elab-kafka
${elab.version}
spring-kafka
org.springframework.kafka
kafka-clients
org.apache.kafka
org.springframework.kafka
spring-kafka
2.2.7.RELEASE
kafka-clients
org.apache.kafka
org.apache.kafka
kafka-clients
2.0.1
```
### 基于大麦服务notify的解决冲突jar关系
```xml
com.elab.core
elab-kafka
${elab.version}
spring-kafka
org.springframework.kafka
kafka-clients
org.apache.kafka
org.springframework.kafka
spring-kafka
2.2.0.RELEASE
kafka-clients
org.apache.kafka
org.apache.kafka
kafka-clients
2.0.1
io.reactivex
rxnetty
0.4.20
runtime
```
过滤掉低版本SpringBoot的自动配置
```java
@SpringBootApplication(exclude = {KafkaAutoConfiguration.class})
```
## 配置文件
```yaml
spring:
kafka:
bootstrap-servers: 47.103.15.48:9093,47.103.17.231:9093,47.103.23.79:9093
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
template:
default-topic: uat-mysql-dts
consumer:
group-id: test-market-db
max-poll-records: 30
fetch-min-size: 32000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
enable-auto-commit: false
```
如果涉及到ssl配置:
```yaml
java:
security:
auth:
login:
config: E:/temp/java/kafka_client_jaas.conf # 基于阿里云提供的
trust-store-location: E:/temp/java/kafka.client.truststore.jks # 基于阿里云提供参考 : https://help.aliyun.com/document_detail/99958.html?spm=a2c4g.11186623.2.14.4c2a30f0KEj6Av#concept-99958-zh
```
## 代码使用:
### 发送
```java
// 引入kafkaTemplate 注意,目前还是请用String json的方式进行数据发送
@Autowired
private KafkaTemplate kafkaTemplate;
Map map = new HashMap<>();
map.put("est", "123123");
ListenableFuture> send = kafkaTemplate.send(defaultTopic, JSON.toJSONString(map));
SendResult stringStringSendResult = send.get();
RecordMetadata recordMetadata = stringStringSendResult.getRecordMetadata();
// 关于顺序消息,希望某一类消息能够顺序消费掉,请将数据的分区下标指定成同一个
ListenableFuture> send = kafkaTemplate.send("订阅主题","分区下标","唯一key","Stringjson");
```
### 消费
请先集成`AbstractKafkaConsumer`,然实现**subscribeTopic**方法,告诉自己到底关注哪类topic.后续会将该类的topic消息回调给onMessage方法
```java
@Component
public class ConsumerListener extends AbstractKafkaConsumer {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public String subscribeTopic() {
return "uat-mysql-dts";
}
@Override
public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) {
logger.info("------->" + data);
}
}
```
## 关于监控
所有kafka 的数据都会基于一个监控数据传输器传送到一个数据收集服务端,如果你有需要根据某些特定的参数查询数据的话可以使用以下方式:
实现`TopicMonitorRule`接口: 以下是参考案例
```java
@Component
public class TestTopicMonitorRule implements TopicMonitorRule {
@Override
public Map> getRuleMap() {
Map> map = new HashMap<>();
// key 是对应的topic
map.put("uat-mysql-dts", (value, mqData) -> {
// value就是kafka发送的数据,mqData就是监控数据的结构
JSONObject jsonObject = JSON.parseObject(value);
mqData.setGroupName(xxx);
mqData.setGroupKeyName(xxx);
mqData.setDataId(xxx);
});
return map;
}
}
```
通常数据采集器会提供三个参数作为搜索条件,你可以利用这三个参数将你**感兴趣或者数据的标识**记录到这三个参数当中,后续可以利用这三个标识快速定位到数据来了解当时执行情况.
> 注意该接口每个项目实现一个即可,一个topic的发送数据对应一个结构对象.