刘凯雄 096d50dcd3 重新定位版本号,并为mongo重新添加监控埋点 2 years ago
..
src b833d5e6c3 版本分支提交 3 years ago
README.md 537ec24cdd 版本分支提交 4 years ago
pom.xml 096d50dcd3 重新定位版本号,并为mongo重新添加监控埋点 2 years ago

README.md

Elab-kafka 使用介绍

Maven 依赖配置

  1. 正常依赖
 <dependency>
     <groupId>com.elab.core</groupId>
     <artifactId>elab-kafka</artifactId>
     <version>${elab.version}</version> <!-- 必须大于等于2.0.5.1-SNAPSHOT -->
</dependency>
  1. 如果出现jar包依赖冲突则可以尝试

需要注意的是目前该版本测试是基于SpringBoot 2.1.6.RELEASE 通过。

 <dependency>
     <groupId>com.elab.core</groupId>
     <artifactId>elab-kafka</artifactId>
     <version>${elab.version}</version>
     <exclusions>
         <exclusion>
             <artifactId>spring-kafka</artifactId>
             <groupId>org.springframework.kafka</groupId>
         </exclusion>
         <exclusion>
             <artifactId>kafka-clients</artifactId>
             <groupId>org.apache.kafka</groupId>
         </exclusion>
     </exclusions>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.7.RELEASE</version>
    <exclusions>
        <exclusion>
            <artifactId>kafka-clients</artifactId>
            <groupId>org.apache.kafka</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.1</version>
</dependency>

基于大麦服务notify的解决冲突jar关系


<dependency>
            <groupId>com.elab.core</groupId>
            <artifactId>elab-kafka</artifactId>
            <version>${elab.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>spring-kafka</artifactId>
                    <groupId>org.springframework.kafka</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>kafka-clients</artifactId>
                    <groupId>org.apache.kafka</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.2.0.RELEASE</version>
            <exclusions>
                <exclusion>
                    <artifactId>kafka-clients</artifactId>
                    <groupId>org.apache.kafka</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.1</version>
        </dependency>
        <!-- 解决ribbon自带的rxNetty版本过低会导致问题 -->
        <dependency>
            <groupId>io.reactivex</groupId>
            <artifactId>rxnetty</artifactId>
            <version>0.4.20</version>
            <scope>runtime</scope>
        </dependency>

过滤掉低版本SpringBoot的自动配置

@SpringBootApplication(exclude = {KafkaAutoConfiguration.class})

配置文件

spring:
  kafka:
    bootstrap-servers: 47.103.15.48:9093,47.103.17.231:9093,47.103.23.79:9093
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer 
    template:
      default-topic: uat-mysql-dts
    consumer:
      group-id: test-market-db
      max-poll-records: 30
      fetch-min-size: 32000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      enable-auto-commit: false

如果涉及到ssl配置:

java:
  security:
    auth:
      login:
        config: E:/temp/java/kafka_client_jaas.conf                     # 基于阿里云提供的
        trust-store-location: E:/temp/java/kafka.client.truststore.jks  # 基于阿里云提供参考 : https://help.aliyun.com/document_detail/99958.html?spm=a2c4g.11186623.2.14.4c2a30f0KEj6Av#concept-99958-zh

代码使用:

发送

// 引入kafkaTemplate<String,String> 注意,目前还是请用String json的方式进行数据发送
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;


Map<String, Object> map = new HashMap<>();
map.put("est", "123123");
ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send(defaultTopic, JSON.toJSONString(map));
SendResult<String, String> stringStringSendResult = send.get();
RecordMetadata recordMetadata = stringStringSendResult.getRecordMetadata();

// 关于顺序消息,希望某一类消息能够顺序消费掉,请将数据的分区下标指定成同一个
ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send("订阅主题","分区下标","唯一key","Stringjson");

消费

请先集成AbstractKafkaConsumer,然实现subscribeTopic方法,告诉自己到底关注哪类topic.后续会将该类的topic消息回调给onMessage方法

@Component
public class ConsumerListener extends AbstractKafkaConsumer {
    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public String subscribeTopic() {
        return "uat-mysql-dts";
    }

    @Override
    public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
        logger.info("------->" + data); 
    }
}

关于监控

所有kafka 的数据都会基于一个监控数据传输器传送到一个数据收集服务端,如果你有需要根据某些特定的参数查询数据的话可以使用以下方式:

实现TopicMonitorRule接口: 以下是参考案例

@Component
public class TestTopicMonitorRule implements TopicMonitorRule {

    @Override
    public Map<String, BiConsumer<String, MQDataDTO>> getRuleMap() {
        Map<String, BiConsumer<String, MQDataDTO>> map = new HashMap<>();
        // key 是对应的topic
        map.put("uat-mysql-dts", (value, mqData) -> {
            // value就是kafka发送的数据,mqData就是监控数据的结构
            JSONObject jsonObject = JSON.parseObject(value);
            mqData.setGroupName(xxx);
            mqData.setGroupKeyName(xxx);
            mqData.setDataId(xxx);
        });
        return map;
    }
}

通常数据采集器会提供三个参数作为搜索条件,你可以利用这三个参数将你感兴趣或者数据的标识记录到这三个参数当中,后续可以利用这三个标识快速定位到数据来了解当时执行情况.

注意该接口每个项目实现一个即可,一个topic的发送数据对应一个结构对象.