技术博客 技术博客
  • JAVA
  • 仓颉
  • 设计模式
  • 人工智能
  • Spring
  • Mybatis
  • Maven
  • Git
  • Kafka
  • RabbitMQ
  • RocketMQ
  • Redis
  • Zookeeper
  • Nginx
  • 数据库套件
  • MySQL
  • Elasticsearch
  • MongoDB
  • Hadoop
  • ClickHouse
  • Hbase
  • Hive
  • Flink
  • Flume
  • SQLite
  • linux
  • Docker
  • Jenkins
  • Kubernetes
  • 工具
  • 前端
  • AI
GitHub (opens new window)
  • JAVA
  • 仓颉
  • 设计模式
  • 人工智能
  • Spring
  • Mybatis
  • Maven
  • Git
  • Kafka
  • RabbitMQ
  • RocketMQ
  • Redis
  • Zookeeper
  • Nginx
  • 数据库套件
  • MySQL
  • Elasticsearch
  • MongoDB
  • Hadoop
  • ClickHouse
  • Hbase
  • Hive
  • Flink
  • Flume
  • SQLite
  • linux
  • Docker
  • Jenkins
  • Kubernetes
  • 工具
  • 前端
  • AI
GitHub (opens new window)
  • kafka

    • kafka-2.7.0 基本概念
    • Kafka-2.7.0 搭建及参数解析
    • kafka-2.7.0 spring boot 集成 kafka
    • kafka-2.7.0 kafka Connect
      • 简述
      • 单机模式
        • 系统文件读写
        • connect-standalone. properties
        • connect-file-source.properties
        • connect-file-sink.properties
        • 启动
        • 改变发送前的消息
        • 自定义 Connector
    • kafka-2.7.0 Kafka Streams 流处理
  • RabbitMQ

    • rabbitmq 简介
  • RocketMQ

    • RocketMQ 基础概念
    • RocketMQ 搭建
    • RocketMQ 整合spring boot
  • redis

    • Redis 介绍及安装
    • Redis 命令介绍
    • Redis 分布式锁介绍
    • Redis 事务介绍
    • Redis 的key失效通知介绍
    • Redis 配置文件解读
    • Redis 记一次宕机排查
    • Redis 高可用(一) 主从理论
    • Redis 高可用(二) 哨兵理论
    • Redis 高可用(三) 搭建
    • Redis 集群搭建
  • zookeeper

    • Zookeeper 介绍及安装
    • Zookeeper 做为锁使用
  • nginx

    • nginx-1.18.0 安装
    • nginx 常见问题总结
    • nginx 高可用
  • 数据库套件

    • MyCat 1.6.7(一)MySQL高可用及分库分表
    • MyCat 1.6.7(二)高可用及权限
    • shardingsphere 4.x(一)Sharding-JDBC使用
    • shardingsphere 4.x(二)Sharding-Proxy使用
目录

kafka-2.7.0 kafka Connect

# 简述

Kafka Connect 是一个高伸缩性、高可靠性的数据集成工具,用于在 Apache Kafka 与其他系统间进行数据搬运以及执行 ETL 操作,比如 Kafka Connect 能够将文件系统中某些文件的内容全部灌入 Kafka topic 中或者是把 Kafka topic 中的消息导出到外部的数据库系统。

Kafka Connect 主要由 source connector 和 sink connector 组成,乎大部分的 ETL 框架都是由这两大类逻辑组件组成的,source connector 负责把输入数据从外部系统中导入到 Kafka 中,而 sink connector 则负责把输出数据导出到其他外部系统

一个 ETL 框架或 connector 系统是否好用的主要标志之一就是,看 source connector 和 sink connector 的种类是否丰富。默认提供的 connector 越多,我们就能集成越多的外部系统,免去了用户自行开发的成本。更多的 connector 可以在 github 上去搜索,例如 kafka connector mysql,kafka connector mongodb 等,也支持自行开发。

# 单机模式

# 系统文件读写

Kafka Connect standalone 模式下通常有 3 类配置文件: connect 配置文件,若干 source connector 配置文件和若干 sink connector 配置文件。由于本例分别启动一个 source connector 读取 test.txt 剧和一个 sink connector 写入 test.sink.txt ,故 source 和 sink 配置文件都只有一个,所以总共有如下 3 个配置文件

  • connect-standalone.properties: connect standalone 模式下的配置文件
  • connect-file-source.properties: file source connector 配置文件
  • connect-file-sink.properties: file sink connector 配置文件

# connect-standalone. properties

我们首先来编辑 connect-standalone. properties 文件。实际上, Kafka 己经在 config 目录下为
我们提供了一个该文件的模板。我们直接使用该模板井修改对应的宇段即可,如下:

# 我这里一定是本地ip,原因看kafka(一)搭建
bootstrap.servers=192.168.81.62:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true 
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets 
offset.flush.interval.ms=10000
1
2
3
4
5
6
7
8
  • key value.converter:设置 Kafka 消息 key/value 的格式转化类,本例使用 JsonConverter, 即把每条 Kafka 消息转化成一个 JSON 格式
  • key/value.converter.schemas.enable:设置是否需要把数据看成纯 JSON 字符串或者 JSON 格式的对象。本例设置为 true ,即把数据转换成 JSON 对象
  • offset.storage.file.filename:connector 会定期地将状态写入底层存储中 。该参数设定了状态要被写入的底层存储文件的路径。本例使用 /tmp/connect.offsets 保存 connector 的状态。
  • offset.flush.interval.ms:保存 connector 运行中 offset 到 topic 的频率

# connect-file-source.properties

下面编辑 connect-file-source.properties,它在 Kafka 的 config 目录下也有一份模板,本例直接在该模板的基础上进行修改

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
1
2
3
4
5
  • name:设置该 file source connector 的名称
  • connector.class:设置 source connector 类的全限定名 。有时候设置为类名 也是可以的,Kafka Connect 可以在 classpath 中自动搜寻该类并加载
  • tasks.max:每个 connector 下会创建若干个任务( task )执行 connector 逻辑以期望增加并行度,但对于从单个文件读 / 写数据这样的操作,任意时刻只能有一个 task 访问文件,故这里设置最大任务数为 1
  • file:输入文件全路径名。即表示该文件位于 Kafka 目录下 。实际使用时最好使用绝对路径。
  • topic:设置 source connector 把数据导入到 Kafka 的哪个 topic ,若该 topic 之前不存在,则 source connector 会自动创建。最好提前手工创建出该 topic 。

# connect-file-sink.properties

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
1
2
3
4
5
  • name:设置该 sink connector 名称。
  • connector.class:设置 sink connector 类的全限定名。有时候设置为类名也是可以的,Kafka Connect 可以在 classpath 中自动搜寻该类井加载。
  • tasks.max:依然设置为 1,原理与 source connector 中配置设置相同
  • file::输出文件全路径名,即表示该文件位于 Kafka 目录下。 实际使用时最好使用绝对路径
  • topics:设置 sink connector 导出 Kafka 中的哪个 topic 的数据。

# 启动

启动 kafka 和 zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties & bin/kafka-server-start.sh -daemon config/server.properties 
1

启动 connect-standalone

bin/connect-standalone.sh config/connect-standalone.properties \
config/connect-file-source.properties \
config/connect-file-sink.properties
1
2
3

在 kafka 目录下输入测试

echo 'hello' >> ./test.txt
1

接着多出一下文件 test.sink.txt

[root@localhost kafka_2.13-2.7.0]# ls
bin  config  libs  LICENSE  logs  NOTICE  site-docs  test.sink.txt  test.txt
1
2
[root@localhost kafka_2.13-2.7.0]# cat test.sink.txt 
hello
1
2

在 spring boot 项目监听 connect-test 一样可以收到消息,如下
connect-test {"schema":{"type":"string","optional":false},"payload":"hello"}
这里的消息实际上都是 JSON 格式的对象,这就是上面 key value.converter.schemas.enable=true 的缘故

# 改变发送前的消息

上面的例子只涉及 ETL 中的 E 和 L ,即数据抽取( extract )与加载 (load )。作为 ETL 框架, Kafka Connect 也支持相当程度的数据转换操作 下面演示在将文件数据导出到目 标文件之前为每条消息增加一个 IP 字段 如果要插入四静态字段,我们必须修改 source connector 的配置文件,增加以下这些行:

transforms=WrapMap,InsertHost
transforms.WrapMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.WrapMap.field=line
transforms.InsertHost.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertHost.static.field=ip
transforms.InsertHost.static.value=com.connector.machinel 
1
2
3
4
5
6

测试如下

[root@localhost kafka_2.13-2.7.0]# cat test.sink.txt 
hello
[root@localhost kafka_2.13-2.7.0]# echo 'add ip test' >> test.txt 
[root@localhost kafka_2.13-2.7.0]# cat test.sink.txt 
hello
Struct{line=add ip test,ip=com.connector.machinel}
1
2
3
4
5
6

显然,新增的数据被封装成一个结构体 (Struct ),并增加了 ip 字段。这就是上面 WrapMap,InsertHost 的作用。 Kafka Connect 还提供了其他的转换操作,完整用法参见 https://kafka.apache.org/documentation/#connect_transforms (opens new window)

# 自定义 Connector

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>connect-api</artifactId>
</dependency>
1
2
3
4
package com.example.demo.source;

import com.example.demo.task.FileStreamSourceTask;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author big uncle
 * @date 2021/2/26 13:44
 * @module
 **/
public class FileStreamSourceConnector extends SourceConnector {

    private String filename;
    private String topic;

    /**
     *这些都是配置文件里面的 key
    **/
    public static final String FILE_CONFIG = "file";
    public static final String TOPIC_CONFIG = "topic";

    @Override
    public void start(Map<String, String> map) {
        filename = map.get(FILE_CONFIG);
        topic = map.get(TOPIC_CONFIG);
        System.out.println("FileStreamSourceConnector.start:"+map.toString());
    }

    @Override
    public Class<? extends Task> taskClass() {
        return FileStreamSourceTask.class;
    }

    @Override
    public List<Map<String, String>> taskConfigs(int i) {
        ArrayList<Map<String, String>> configs =new ArrayList<>();
        //添加一个输入流
        Map<String, String> config = new HashMap<>() ;
        if(filename != null){
            config.put(FILE_CONFIG, filename);
        }
        config.put(TOPIC_CONFIG, topic);
        configs.add(config);
        System.out.println("FileStreamSourceConnector.taskConfigs:"+config.toString());
        return configs;
    }

    @Override
    public void stop() {
        // 在该方法中关闭 connector 用到的外部资源
    }

    @Override
    public ConfigDef config() {
        System.out.println("FileStreamSourceConnector.config:");
        return new ConfigDef();
    }

    @Override
    public String version() {
        return null;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.example.demo.task;

import com.example.demo.source.FileStreamSourceConnector;
import lombok.SneakyThrows;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

import java.io.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
 * @author big uncle
 * @date 2021/2/26 13:56
 * @module
 **/
public class FileStreamSourceTask extends SourceTask {

    String filename;
    FileInputStream stream;
    String topic;
    BufferedReader reader;

    @Override
    public String version() {
        return null;
    }

    @SneakyThrows
    @Override
    public void start(Map<String, String> map) {
        filename = map.get(FileStreamSourceConnector.FILE_CONFIG);
        stream = new FileInputStream(filename) ;
        topic= map.get(FileStreamSourceConnector.TOPIC_CONFIG);
        reader = new BufferedReader(new InputStreamReader(stream));
        System.out.println("FileStreamSourceTask.start");
    }

    /**
     * poll是一个线程,会一直执行的
     * @author big uncle
     * @date 2021/2/26 17:54
    **/
    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        try{
            ArrayList<SourceRecord> records= new ArrayList<>();
            String str = "";
            while ((str = reader.readLine()) != null && records.isEmpty()) {
                // sourcePartition表示记录来自的单个输入sourcePartition(例如,文件名,表名或主题分区)
                Map<String, Object> sourcePartition = Collections.singletonMap("filename", filename);
                // sourceOffset表示该sourcePartition中的位置,可用于恢复数据使用。
                Map<String, Object> sourceOffset = Collections.singletonMap("position", 1);
                // 做一个 SourceRecord 对象
                SourceRecord sourceRecord = new SourceRecord(sourcePartition, sourceOffset,topic, Schema.STRING_SCHEMA,str);
                // 添加到 records
                records.add(sourceRecord);
                System.out.println("FileStreamSourceTask.poll");
            }
            return records;
        }catch(Exception e){
            e.printStackTrace();
        }
        return null;
    }

    @SneakyThrows
    @Override
    public void stop() {
        if(stream != null) {
            stream.close();
        }
        if(reader!=null) {
            reader.close();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80

我的是一个 maven 项目,整个项目就以上两个文件,打成 jar 之后,上传到 /kafka/libs 目录,然后可以重新写一个配置文件,也可以使用 connect-file-source.properties 直接修改

name=local-file-source
connector.class=com.example.demo.source.FileStreamSourceConnector
tasks.max=1
file=test.txt
topic=ct
1
2
3
4
5

启动方式和以上一样

--------------------------------------- 启动日志开始 ------------------------------------------

[2021-02-26 15:47:12,592] INFO REST resources initialized; server is started and ready to handle requests (org.apache.kafka.connect.runtime.rest.RestServer:319)
[2021-02-26 15:47:12,592] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:57)
FileStreamSourceConnector.config:
FileStreamSourceConnector.config:
1
2
3
4
[2021-02-26 15:47:12,685] INFO Instantiated connector local-file-source with version null of type class com.example.demo.source.FileStreamSourceConnector (org.apache.kafka.connect.runtime.Worker:284)
[2021-02-26 15:47:12,687] INFO Finished creating connector local-file-source (org.apache.kafka.connect.runtime.Worker:310)
FileStreamSourceConnector.start:{connector.class=com.example.demo.source.FileStreamSourceConnector, file=test.txt, tasks.max=1, name=local-file-source, topic=ct}
1
2
3
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:361)
FileStreamSourceConnector.taskConfigs:{file=test.txt, topic=ct}
[2021-02-26 15:47:12,705] INFO Creating task local-file-source-0 (org.apache.kafka.connect.runtime.Worker:509)
1
2
3
FileStreamSourceTask.start
[2021-02-26 15:47:12,876] INFO WorkerSourceTask{id=local-file-source-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
[2021-02-26 15:47:12,877] INFO Created connector local-file-source (org.apache.kafka.connect.cli.ConnectStandalone:112)
FileStreamSourceTask.poll
1
2
3
4

--------------------------------------- 启动日志结束 ------------------------------------------

[root@localhost kafka_2.13-2.7.0]# echo 'dsdsa' >> test.txt 
[root@localhost kafka_2.13-2.7.0]# echo 'dsdsa' >> test.txt
1
2
[2021-02-26 15:50:41,049] INFO WorkerSourceTask{id=local-file-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:478)
[2021-02-26 15:50:41,050] INFO WorkerSourceTask{id=local-file-source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:495)
[2021-02-26 15:50:51,051] INFO WorkerSourceTask{id=local-file-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:478)
[2021-02-26 15:50:51,052] INFO WorkerSourceTask{id=local-file-source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:495)
FileStreamSourceTask.poll
FileStreamSourceTask.poll
[2021-02-26 15:51:01,052] INFO WorkerSourceTask{id=local-file-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:478)
1
2
3
4
5
6
7

spring boot 接收到消息

@KafkaListener(id = "myId2", topics = "ct")
public void listen2(String in) {
  System.out.println("connect-test"+in);
}
1
2
3
4
connect-test{"schema":{"type":"string","optional":false},"payload":"hello"},{"schema":{"type":"string","optional":false},"payload":"hello"},{"schema":{"type":"string","optional":false},"payload":"dsdsa"},{"schema":{"type":"string","optional":false},"payload":"dsdsa"},{"schema":{"type":"string","optional":false},"payload":"dsdsa"}
1

每次重启都会消费一次,遗留问题

上次更新: 6/11/2025, 4:10:30 PM
kafka-2.7.0 spring boot 集成 kafka
kafka-2.7.0 Kafka Streams 流处理

← kafka-2.7.0 spring boot 集成 kafka kafka-2.7.0 Kafka Streams 流处理→

Theme by Vdoing | Copyright © 2023-2025
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式