数据传输工具 —— Kafka Connect
1、什么是 kafka connect?
Kafka Connect 是一种用于在 kafka 和其他系统之间可扩展、可靠的流式传输数据的工具。它使得能够快速定义将大量数据集合移入和移出 kafka 的连接器变得简单。
Kafka Connect 可以获取整个数据库或从应用程序服务器收集指标到 kafka 主题,使数据可用于低延迟的流处理。
导出作业可以将数据从 kafka topic 传输到二次存储和查询系统,或者传递到批处理系统以进行离线分析。
2、功能
- kafka connector 通用框架,提供统一的集成 API
- 同时支持分布式模式和单机模式
- 自动化的 offset 管理,开发人员不必担心错误处理的影响
- rest 接口,用来查看和管理 kafka connectors
3、概念
Connectors:通过管理任务来处理数据流的高级抽象
Tasks:数据写入 kafka 和从 kafka 读出的实现
Workers:运行 connectors 和 tasks 的进程
Converters:kafka connect 和其他存储系统直接发送和接收数据之间转换数据
Connector 决定了数据要从哪里复制过来以及数据应该写到哪里去,一个 connector 实例是一个需要负责在 kafka 和其他系统之间复制数据的逻辑作业,connector plugin 是 jar 文件,实现了 kafka 定义的一些接口来完成特定的任务。
Task 是 kafka connect 数据模型的主角,每一个 connector 都会协调一系列的 task 去执行任务,connector 可以把一项工作分割成许多的 task,然后再把 task 分发到各个 worker 中去执行(分布式模式下),task 不自己保存自己的状态信息,而是交给特定的 kafka 主题去保存(config.storage.topic 和 status.storage.topic)。在分布式模式下有一个概念叫做任务再平衡(Task Rebalancing),当一个 connector 第一次提交到集群时,所有的 worker 都会做一个 task rebalancing 从而保证每一个 worker 都运行了差不多数量的工作,而不是所有的工作压力都集中在某个 worker 进程中,而当每个进程挂了之后也会执行 task rebalance。
Connectors 和 Tasks 都是逻辑工作单位,必须安排在进程中执行,而在 kafka connect 中,这些进程就是 workers,分别有两种 worker:standalone、distributed。生产中 distributed worker 表现很棒,因为它提供了可扩展性以及自动容错的功能,可以用一个 group.id 来启动很多 worker 进程,在有效的 worker 进程中它们会自动地去协调执行 connector 和 task,如果新加或者挂了一个 worker,其他的 worker 会检测到然后再重新分配 connector 和 task。
Converter 会把 bytes 数据转换为 kafka connect 内部的格式,也可以把 kafka connect 内部存储格式的数据变成 bytes,converter 对 connector 来说是解耦的,所以其他的 connector 都可以重用。例如使用了 avro converter,那么 jdbc connector 可以写 avro 格式的数据到 kafka,同时 hfds connector 也可以从 kafka 中读出 avro 格式的数据。
4、实战
启动 confluent
cd /app/confluent/bin
./confluent local start
使用 standalone 模式启动
# 启动 kafka connect
$CONFLUENT_HOME/bin/connect-standalone /
$CONFLUENT_HOME/etc/kafka/connect-standalone.properties /
connector1.properties [connector2.properties]
在 $CONFLUENT_HOME/etc/kafka 下存在很多配置文件
其中 connect-standalone.properties 是启动 connect 服务组件自身的配置,内容如下:
# kafka 服务
bootstrap.servers=localhost:9092
# 转换器
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 是否启用转换器
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# 偏移量存储文件名
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
# 插件路径
plugin.path=/usr/share/java,/app/confluent/share/confluent-hub-components
# 默认端口为8083,需要修改端口时启动以下配置
# rest.port=8084
(1)标准 connect
启动一个带 FileSource 的 Connect
connect-file-source.properties 是一个 source connect 的模板配置,启用该配置就能够从指定文件中复制数据到 kafka 中,其默认的配置如下:
# connect 的名字
name=local-file-source
# 将文件读取到数据流中
connector.class=FileStreamSource
# 工作线程是 1 个
tasks.max=1
# 读取的文件名为 test.txt
file=test.txt
# 复制到的主题为 connect-test
topic=connect-test
启动 connect
$CONFLUENT_HOME/bin/connect-standalone /
$CONFLUENT_HOME/etc/kafka/connect-standalone.properties /
$CONFLUENT_HOME/etc/kafka/connect-file-source.properties
结果报错 Java 内存不足
关闭虚拟机,加大内存,重启服务器和 confluent,再次启动 connect,报错 8083 端口已被绑定
修改 connect-standalone.properties 配置中的端口为 8084 再启动,新的报错:不存在 source 配置文件中的指定的文件,在启动路径下创建文件,日志恢复正常
echo -e "foo/nbar/n" > $CONFLUENT_HOME/test.txt
可以通过 kafka tools 看到新增了主题 connect-test,写入了3条数据
往文件中写入数据,会报告又成功提交一次偏移量
# 写数据
/app/confluent# echo -e "foo1/nbar1/n" >> test.txt
# 日志
INFO WorkerSourceTask{id=local-file-source-0} Finished commitOffsets successfully in 1 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:515)
...
然后可以看到主题中多了3条数据
启动带 FileSource 和 FileSink 的 Connect
connect-file-sink.properties 是一个 source connect 的模板配置,启用该配置就能够从指定文件中复制数据到 kafka 中,其默认的配置如下:
# connect 的名字
name=local-file-sink
# 从数据流中读取数据到文件中
connector.class=FileStreamSink
# 工作线程是 1 个
tasks.max=1
# 写入的文件是 test.sink.txt
file=test.sink.txt
# 读取数据的主题是 connect-test
topics=connect-test
启动 connect
$CONFLUENT_HOME/bin/connect-standalone /
$CONFLUENT_HOME/etc/kafka/connect-standalone.properties /
$CONFLUENT_HOME/etc/kafka/connect-file-source.properties /
$CONFLUENT_HOME/etc/kafka/connect-file-sink.properties
可以看到自动创建了 test.sink.txt 文件
同时可以看到 consumer 中多了一个 connect-local-file-sink ,偏移量为6(即已将6条数据都 sink 到了文件中)
(2)REST API
使用 Rest API 必须启动分布式模式,通过 Rest API 可以管理集群中的 connect 服务,默认端口是 8083。
GET /connectors - 返回所有正在运行的connector名。
POST /connectors - 新建一个connector;请求体必须是json格式并且需要包含name字段和config字段,name是connectors的名字,config是json格式,必须包含connector的配置信息。
GET /connectors/{name} - 获取指定connector的信息。
GET /connectors/{name}/config - 获取指定connector的配置信息。
在分布式模式下,有两种方式来配置 connector,第一种是类似 standalone 模式一样,写好配置文件,然后在启动时指定
$CONFLUENT_HOME/bin/connect-distributed /
$CONFLUENT_HOME/etc/kafka/connect-distributed.properties /
$CONFLUENT_HOME/etc/kafka/connect-file-source.properties /
$CONFLUENT_HOME/etc/kafka/connect-file-sink.properties
另外一种方式更加灵活,就是直接通过 Rest API 来对 connector 配置进行增删查。
查看 connectors
添加 connectors
查看某个 connector
这里指定的文件是相对路径,所以要在 $CONFLUENT_HOME/bin 路径下创建一个 test-distributed.txt 文件
cd $CONFLUENT_HOME/bin
echo -e "foo/nbar/n" > test-distributed.txt
可以看到出现了 connect-distributed 主题
添加 sink
从服务器可以看到产生了 sink 文件
删除 connector
再次往 test-distributed.txt 文件中追加数据,可以看到 connect-distributed 主题中的数据增加了,source connector 依然在工作,但是 sink connector 已经停止了,所以 test-distributed.sink.txt 文件中数据不再从主题中复制。
【注意】
如果要在脚本中处理,发起HTTP请求,可以使用 curl 工具,将请求的配置在 json 文件中,如:
curl -d @$CONFLUENT_HOME/connect-file-sink.json /
-H "Content-Type: application/json" /
-X POST http://localhost:8083/connectors
创建带有 Convert 的 connector
{
"name": "local-file-source",
"config": {
"connector.class": "FileStreamSource",
"tasks.max": 1,
"file": "test-transformation.txt",
"topic": "connect-transformation",
"transforms": "MakeMap,InsertSource",
"transforms.MakeMap.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.MakeMap.field": "line",
"transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertSource.static.field": "data_source",
"transforms.InsertSource.static.value": "test-file-source"
}
}
添加 connector(由于跟上述实验 name 一致,所以需要先删除或者换个 name)
创建 test-transformation.txt 文件,可以看到自动创建了 connect-transformation 主题
添加 sink
{
"name": "local-file-sink",
"config": {
"connector.class": "FileStreamSink",
"tasks.max": 1,
"file": "test-transformation.sink.txt",
"topics": "connect-transformation"
}
}
可以看到 sink 自动生成了 test-transformation.sink.txt 文件,并且内容不是 source 过来的原始数据,而是经过 convertor 处理后的带格式的数据
(3)MySQL Source、ESSink
演示将数据从 MySQL 复制到 kafka 中,再通过 kafka 将数据下沉到 ElasticSearch。这里 MySQL 是数据源,所以需要支持 MySQL 的 source connector,ES 是目标数据系统,所以需要支持 ES 的 sink connectors,可以从 https://www.confluent.io/hub/ 下载。
MySQL
MySQL 下载插件搜索关键字 "JDBC",可以看到提供了在线安装的脚本和离线安装的包下载。
MySQL 环境准备
# 安装 MySQL
sudo apt-get install mysql-server
# 安装 Confluent 插件
confluent-hub install confluentinc/kafka-connect-jdbc:10.4.1
# 将 MySQL 驱动上传到 confluent 目录
# mv mysql.jar /app/confluent/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib
【注意】下载下来的 jdbc connector 插件,在处理 mysql 时需要相应的驱动,而插件不带驱动,实际采集数据时会报错,这时需要将驱动 jar 包拷贝到插件库目录中。
数据准备,创建用户并授权,用该用户创建数据库、表和插入数据
grant all on *.* to hyh@'localhost' identified by 'hyh';
create database studentsDB;
use studentsDB;
create table students (rollno int primary key auto_increment, name varchar(30), marks varchar(30));
insert into students (name, marks) values ('James', 35);
创建 source 配置文件(connect-mysql-source.properties),内容如下:
name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/studentsDB?user=hyh&password=hyh
mode=incrementing
# 表中的自增列字段
incrementing.column.name=rollno
# 表会被采集到的 topic 名前缀,比如表名叫 students,对应的 topic 就为 test-mysql-jdbc-students
topic.prefix=test-mysql-jdbc-
启动 mysql source connector
$CONFLUENT_HOME/bin/connect-standalone /
$CONFLUENT_HOME/etc/kafka/connect-standalone.properties /
$CONFLUENT_HOME/etc/kafka/connect-mysql-source.properties
可以看到启动之后,开启了 JDBC source task,然后执行了查询的 SQL,最后提交和刷新的偏移量
与此同时,可以看到 kafka 中新增了一个 topic test-mysql-jdbc-students
里面有一条数据,如果此时往表中再插入两条数据,可以看到数据变成了3条
ElasticSearch
ES 下载插件搜索关键字 "ElasticSearch" ,可以看到有 ElasticSearch Sink Connector、ElasticSearch Source Connector,注意有些插件是支持 source、sink,有些是分开两个插件。
ES 环境准备
tar -zxvf elasticsearch-7.6.0-linux-x86_64.tar.gz -C /app
mv /app/elasticsearch-7.6.0 /app/elasticsearch
# 配置环境变量
export ES_HOME=/app/elasticsearch
export PATH=${ES_HOME}/bin:$PATH
# 安装 Confluent 插件
confluent-hub install confluentinc/kafka-connect-elasticsearch:13.0.0
启动 ES
cd /app/elasticsearch
.bin/elasticsearch
报错不能以root用户启动
创建用户用户组es,并修改 es 安装目录所属用户和组
chown -R es:es elasticsearch/
再次启动看到以下日志即正常
配置 sink 配置文件(connect-es-sink.properties),内容如下:
name=test-sink-elastic
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
connection.url=http://localhost:9200
topics=test-mysql-jdbc-students
key.ignore=true
type.name=kafka-connect
启动 ES sink connector
$CONFLUENT_HOME/bin/connect-standalone /
$CONFLUENT_HOME/etc/kafka/connect-standalone.properties /
$CONFLUENT_HOME/etc/kafka/connect-mysql-source.properties /
$CONFLUENT_HOME/etc/kafka/connect-es-sink.properties
访问 es 9092 端口查询数据,可以查到有三条数据
# 查询命令
curl -H "Content-Type: application/json" -X GET http://localhost:9200/test-mysql-jdbc-students/_search
# 查到的结果
{
"took": 121,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 3,
"relation": "eq"
},
"max_score": 1,
"hits": [
{
"_index": "test-mysql-jdbc-students",
"_type": "_doc",
"_id": "test-mysql-jdbc-students+0+0",
"_score": 1,
"_source": {
"rollno": 1,
"name": "James",
"marks": "35"
}
},
{
"_index": "test-mysql-jdbc-students",
"_type": "_doc",
"_id": "test-mysql-jdbc-students+0+1",
"_score": 1,
"_source": {
"rollno": 2,
"name": "James2",
"marks": "36"
}
},
{
"_index": "test-mysql-jdbc-students",
"_type": "_doc",
"_id": "test-mysql-jdbc-students+0+2",
"_score": 1,
"_source": {
"rollno": 3,
"name": "James3",
"marks": "37"
}
}
]
}
}
往数据库插入一条新的数据
insert into students (name, marks) values ('James4', 38);
可以看到 es 侧接收到了这条数据
共有 0 条评论