刘凯雄 096d50dcd3 重新定位版本号,并为mongo重新添加监控埋点 | 2 years ago | |
---|---|---|
.. | ||
src | 3 years ago | |
README.md | 4 years ago | |
pom.xml | 2 years ago |
<dependency>
<groupId>com.elab.core</groupId>
<artifactId>elab-kafka</artifactId>
<version>${elab.version}</version> <!-- 必须大于等于2.0.5.1-SNAPSHOT -->
</dependency>
需要注意的是目前该版本测试是基于SpringBoot 2.1.6.RELEASE 通过。
<dependency>
<groupId>com.elab.core</groupId>
<artifactId>elab-kafka</artifactId>
<version>${elab.version}</version>
<exclusions>
<exclusion>
<artifactId>spring-kafka</artifactId>
<groupId>org.springframework.kafka</groupId>
</exclusion>
<exclusion>
<artifactId>kafka-clients</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.7.RELEASE</version>
<exclusions>
<exclusion>
<artifactId>kafka-clients</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>com.elab.core</groupId>
<artifactId>elab-kafka</artifactId>
<version>${elab.version}</version>
<exclusions>
<exclusion>
<artifactId>spring-kafka</artifactId>
<groupId>org.springframework.kafka</groupId>
</exclusion>
<exclusion>
<artifactId>kafka-clients</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.0.RELEASE</version>
<exclusions>
<exclusion>
<artifactId>kafka-clients</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.1</version>
</dependency>
<!-- 解决ribbon自带的rxNetty版本过低会导致问题 -->
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxnetty</artifactId>
<version>0.4.20</version>
<scope>runtime</scope>
</dependency>
过滤掉低版本SpringBoot的自动配置
@SpringBootApplication(exclude = {KafkaAutoConfiguration.class})
spring:
kafka:
bootstrap-servers: 47.103.15.48:9093,47.103.17.231:9093,47.103.23.79:9093
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
template:
default-topic: uat-mysql-dts
consumer:
group-id: test-market-db
max-poll-records: 30
fetch-min-size: 32000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
enable-auto-commit: false
如果涉及到ssl配置:
java:
security:
auth:
login:
config: E:/temp/java/kafka_client_jaas.conf # 基于阿里云提供的
trust-store-location: E:/temp/java/kafka.client.truststore.jks # 基于阿里云提供参考 : https://help.aliyun.com/document_detail/99958.html?spm=a2c4g.11186623.2.14.4c2a30f0KEj6Av#concept-99958-zh
// 引入kafkaTemplate<String,String> 注意,目前还是请用String json的方式进行数据发送
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
Map<String, Object> map = new HashMap<>();
map.put("est", "123123");
ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send(defaultTopic, JSON.toJSONString(map));
SendResult<String, String> stringStringSendResult = send.get();
RecordMetadata recordMetadata = stringStringSendResult.getRecordMetadata();
// 关于顺序消息,希望某一类消息能够顺序消费掉,请将数据的分区下标指定成同一个
ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send("订阅主题","分区下标","唯一key","Stringjson");
请先集成AbstractKafkaConsumer
,然实现subscribeTopic方法,告诉自己到底关注哪类topic.后续会将该类的topic消息回调给onMessage方法
@Component
public class ConsumerListener extends AbstractKafkaConsumer {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public String subscribeTopic() {
return "uat-mysql-dts";
}
@Override
public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
logger.info("------->" + data);
}
}
所有kafka 的数据都会基于一个监控数据传输器传送到一个数据收集服务端,如果你有需要根据某些特定的参数查询数据的话可以使用以下方式:
实现TopicMonitorRule
接口: 以下是参考案例
@Component
public class TestTopicMonitorRule implements TopicMonitorRule {
@Override
public Map<String, BiConsumer<String, MQDataDTO>> getRuleMap() {
Map<String, BiConsumer<String, MQDataDTO>> map = new HashMap<>();
// key 是对应的topic
map.put("uat-mysql-dts", (value, mqData) -> {
// value就是kafka发送的数据,mqData就是监控数据的结构
JSONObject jsonObject = JSON.parseObject(value);
mqData.setGroupName(xxx);
mqData.setGroupKeyName(xxx);
mqData.setDataId(xxx);
});
return map;
}
}
通常数据采集器会提供三个参数作为搜索条件,你可以利用这三个参数将你感兴趣或者数据的标识记录到这三个参数当中,后续可以利用这三个标识快速定位到数据来了解当时执行情况.
注意该接口每个项目实现一个即可,一个topic的发送数据对应一个结构对象.