数据传输工具 —— Kafka Connect

1、什么是 kafka connect?

  Kafka Connect 是一种用于在 kafka 和其他系统之间可扩展、可靠的流式传输数据的工具。它使得能够快速定义将大量数据集合移入和移出 kafka 的连接器变得简单。

  Kafka Connect 可以获取整个数据库或从应用程序服务器收集指标到 kafka 主题,使数据可用于低延迟的流处理。

  导出作业可以将数据从 kafka topic 传输到二次存储和查询系统,或者传递到批处理系统以进行离线分析。

2、功能

  • kafka connector 通用框架,提供统一的集成 API
  • 同时支持分布式模式和单机模式
  • 自动化的 offset 管理,开发人员不必担心错误处理的影响
  • rest 接口,用来查看和管理 kafka connectors

3、概念

Connectors:通过管理任务来处理数据流的高级抽象
Tasks:数据写入 kafka 和从 kafka 读出的实现
Workers:运行 connectors 和 tasks 的进程
Converters:kafka connect 和其他存储系统直接发送和接收数据之间转换数据

  Connector 决定了数据要从哪里复制过来以及数据应该写到哪里去,一个 connector 实例是一个需要负责在 kafka 和其他系统之间复制数据的逻辑作业,connector plugin 是 jar 文件,实现了 kafka 定义的一些接口来完成特定的任务。

  Task 是 kafka connect 数据模型的主角,每一个 connector 都会协调一系列的 task 去执行任务,connector 可以把一项工作分割成许多的 task,然后再把 task 分发到各个 worker 中去执行(分布式模式下),task 不自己保存自己的状态信息,而是交给特定的 kafka 主题去保存(config.storage.topicstatus.storage.topic)。在分布式模式下有一个概念叫做任务再平衡(Task Rebalancing),当一个 connector 第一次提交到集群时,所有的 worker 都会做一个 task rebalancing 从而保证每一个 worker 都运行了差不多数量的工作,而不是所有的工作压力都集中在某个 worker 进程中,而当每个进程挂了之后也会执行 task rebalance。

  Connectors 和 Tasks 都是逻辑工作单位,必须安排在进程中执行,而在 kafka connect 中,这些进程就是 workers,分别有两种 worker:standalone、distributed。生产中 distributed worker 表现很棒,因为它提供了可扩展性以及自动容错的功能,可以用一个 group.id 来启动很多 worker 进程,在有效的 worker 进程中它们会自动地去协调执行 connector 和 task,如果新加或者挂了一个 worker,其他的 worker 会检测到然后再重新分配 connector 和 task。

  Converter 会把 bytes 数据转换为 kafka connect 内部的格式,也可以把 kafka connect 内部存储格式的数据变成 bytes,converter 对 connector 来说是解耦的,所以其他的 connector 都可以重用。例如使用了 avro converter,那么 jdbc connector 可以写 avro 格式的数据到 kafka,同时 hfds connector 也可以从 kafka 中读出 avro 格式的数据。

4、实战

  启动 confluent

cd /app/confluent/bin
./confluent local start

  使用 standalone 模式启动

# 启动 kafka connect
$CONFLUENT_HOME/bin/connect-standalone /
$CONFLUENT_HOME/etc/kafka/connect-standalone.properties /
connector1.properties [connector2.properties]

  在 $CONFLUENT_HOME/etc/kafka 下存在很多配置文件

  其中 connect-standalone.properties 是启动 connect 服务组件自身的配置,内容如下:

# kafka 服务
bootstrap.servers=localhost:9092

# 转换器
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# 是否启用转换器
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# 偏移量存储文件名
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

# 插件路径
plugin.path=/usr/share/java,/app/confluent/share/confluent-hub-components

# 默认端口为8083,需要修改端口时启动以下配置
# rest.port=8084

(1)标准 connect

启动一个带 FileSource 的 Connect

  connect-file-source.properties 是一个 source connect 的模板配置,启用该配置就能够从指定文件中复制数据到 kafka 中,其默认的配置如下:

# connect 的名字
name=local-file-source
# 将文件读取到数据流中
connector.class=FileStreamSource
# 工作线程是 1 个
tasks.max=1
# 读取的文件名为 test.txt
file=test.txt
# 复制到的主题为 connect-test
topic=connect-test

  启动 connect

$CONFLUENT_HOME/bin/connect-standalone /
  $CONFLUENT_HOME/etc/kafka/connect-standalone.properties /
  $CONFLUENT_HOME/etc/kafka/connect-file-source.properties

  结果报错 Java 内存不足

  关闭虚拟机,加大内存,重启服务器和 confluent,再次启动 connect,报错 8083 端口已被绑定

  修改 connect-standalone.properties 配置中的端口为 8084 再启动,新的报错:不存在 source 配置文件中的指定的文件,在启动路径下创建文件,日志恢复正常

echo -e "foo/nbar/n" > $CONFLUENT_HOME/test.txt

  可以通过 kafka tools 看到新增了主题 connect-test,写入了3条数据

  往文件中写入数据,会报告又成功提交一次偏移量

# 写数据
/app/confluent# echo -e "foo1/nbar1/n" >> test.txt

# 日志
INFO WorkerSourceTask{id=local-file-source-0} Finished commitOffsets successfully in 1 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:515)
...

  然后可以看到主题中多了3条数据

启动带 FileSource 和 FileSink 的 Connect

  connect-file-sink.properties 是一个 source connect 的模板配置,启用该配置就能够从指定文件中复制数据到 kafka 中,其默认的配置如下:

# connect 的名字
name=local-file-sink
# 从数据流中读取数据到文件中
connector.class=FileStreamSink
# 工作线程是 1 个
tasks.max=1
# 写入的文件是 test.sink.txt
file=test.sink.txt
# 读取数据的主题是 connect-test
topics=connect-test

  启动 connect

$CONFLUENT_HOME/bin/connect-standalone /
  $CONFLUENT_HOME/etc/kafka/connect-standalone.properties /
  $CONFLUENT_HOME/etc/kafka/connect-file-source.properties /
  $CONFLUENT_HOME/etc/kafka/connect-file-sink.properties

  可以看到自动创建了 test.sink.txt 文件

  同时可以看到 consumer 中多了一个 connect-local-file-sink ,偏移量为6(即已将6条数据都 sink 到了文件中)

(2)REST API

  使用 Rest API 必须启动分布式模式,通过 Rest API 可以管理集群中的 connect 服务,默认端口是 8083。

GET /connectors - 返回所有正在运行的connector名。
POST /connectors - 新建一个connector;请求体必须是json格式并且需要包含name字段和config字段,name是connectors的名字,config是json格式,必须包含connector的配置信息。
GET /connectors/{name} - 获取指定connector的信息。
GET /connectors/{name}/config - 获取指定connector的配置信息。

  在分布式模式下,有两种方式来配置 connector,第一种是类似 standalone 模式一样,写好配置文件,然后在启动时指定

$CONFLUENT_HOME/bin/connect-distributed /
  $CONFLUENT_HOME/etc/kafka/connect-distributed.properties /
  $CONFLUENT_HOME/etc/kafka/connect-file-source.properties /
  $CONFLUENT_HOME/etc/kafka/connect-file-sink.properties

  另外一种方式更加灵活,就是直接通过 Rest API 来对 connector 配置进行增删查。

  查看 connectors

  添加 connectors

  查看某个 connector

  这里指定的文件是相对路径,所以要在 $CONFLUENT_HOME/bin 路径下创建一个 test-distributed.txt 文件

cd $CONFLUENT_HOME/bin
echo -e "foo/nbar/n" > test-distributed.txt

  可以看到出现了 connect-distributed 主题

  添加 sink

  从服务器可以看到产生了 sink 文件

  删除 connector

  再次往 test-distributed.txt 文件中追加数据,可以看到 connect-distributed 主题中的数据增加了,source connector 依然在工作,但是 sink connector 已经停止了,所以 test-distributed.sink.txt 文件中数据不再从主题中复制。

【注意】

  如果要在脚本中处理,发起HTTP请求,可以使用 curl 工具,将请求的配置在 json 文件中,如:

curl -d @$CONFLUENT_HOME/connect-file-sink.json /
  -H "Content-Type: application/json" /
  -X POST http://localhost:8083/connectors

创建带有 Convert 的 connector

{
    "name": "local-file-source",
    "config": {
        "connector.class": "FileStreamSource",
        "tasks.max": 1,
        "file": "test-transformation.txt",
        "topic": "connect-transformation",
        "transforms": "MakeMap,InsertSource",
        "transforms.MakeMap.type": "org.apache.kafka.connect.transforms.HoistField$Value",
        "transforms.MakeMap.field": "line",
        "transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.InsertSource.static.field": "data_source",
        "transforms.InsertSource.static.value": "test-file-source"
    }
}

  添加 connector(由于跟上述实验 name 一致,所以需要先删除或者换个 name)

  创建 test-transformation.txt 文件,可以看到自动创建了 connect-transformation 主题

  添加 sink

{
    "name": "local-file-sink",
    "config": {
        "connector.class": "FileStreamSink",
        "tasks.max": 1,
        "file": "test-transformation.sink.txt",
        "topics": "connect-transformation"
    }
}

  可以看到 sink 自动生成了 test-transformation.sink.txt 文件,并且内容不是 source 过来的原始数据,而是经过 convertor 处理后的带格式的数据

(3)MySQL Source、ESSink

  演示将数据从 MySQL 复制到 kafka 中,再通过 kafka 将数据下沉到 ElasticSearch。这里 MySQL 是数据源,所以需要支持 MySQL 的 source connector,ES 是目标数据系统,所以需要支持 ES 的 sink connectors,可以从 https://www.confluent.io/hub/ 下载。

MySQL

  MySQL 下载插件搜索关键字 "JDBC",可以看到提供了在线安装的脚本和离线安装的包下载。

  MySQL 环境准备

# 安装 MySQL
sudo apt-get install mysql-server

# 安装 Confluent 插件
confluent-hub install confluentinc/kafka-connect-jdbc:10.4.1

# 将 MySQL 驱动上传到 confluent 目录
# mv mysql.jar /app/confluent/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib

【注意】下载下来的 jdbc connector 插件,在处理 mysql 时需要相应的驱动,而插件不带驱动,实际采集数据时会报错,这时需要将驱动 jar 包拷贝到插件库目录中。

  数据准备,创建用户并授权,用该用户创建数据库、表和插入数据

grant all on *.* to hyh@'localhost' identified by 'hyh';
create database studentsDB;
use studentsDB;
create table students (rollno int primary key auto_increment, name varchar(30), marks varchar(30));
insert into students (name, marks) values ('James', 35);

  创建 source 配置文件(connect-mysql-source.properties),内容如下:

name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/studentsDB?user=hyh&password=hyh
mode=incrementing
# 表中的自增列字段
incrementing.column.name=rollno
# 表会被采集到的 topic 名前缀,比如表名叫 students,对应的 topic 就为 test-mysql-jdbc-students
topic.prefix=test-mysql-jdbc-

  启动 mysql source connector

$CONFLUENT_HOME/bin/connect-standalone /
  $CONFLUENT_HOME/etc/kafka/connect-standalone.properties /
  $CONFLUENT_HOME/etc/kafka/connect-mysql-source.properties

  可以看到启动之后,开启了 JDBC source task,然后执行了查询的 SQL,最后提交和刷新的偏移量

  与此同时,可以看到 kafka 中新增了一个 topic test-mysql-jdbc-students

  里面有一条数据,如果此时往表中再插入两条数据,可以看到数据变成了3条

ElasticSearch

  ES 下载插件搜索关键字 "ElasticSearch" ,可以看到有 ElasticSearch Sink Connector、ElasticSearch Source Connector,注意有些插件是支持 source、sink,有些是分开两个插件。

  ES 环境准备

tar -zxvf elasticsearch-7.6.0-linux-x86_64.tar.gz -C /app
mv /app/elasticsearch-7.6.0 /app/elasticsearch

# 配置环境变量
export ES_HOME=/app/elasticsearch
export PATH=${ES_HOME}/bin:$PATH

# 安装 Confluent 插件
confluent-hub install confluentinc/kafka-connect-elasticsearch:13.0.0

  启动 ES

cd /app/elasticsearch
.bin/elasticsearch

  报错不能以root用户启动

  创建用户用户组es,并修改 es 安装目录所属用户和组

chown -R es:es elasticsearch/

  再次启动看到以下日志即正常

  配置 sink 配置文件(connect-es-sink.properties),内容如下:

name=test-sink-elastic
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
connection.url=http://localhost:9200
topics=test-mysql-jdbc-students 
key.ignore=true
type.name=kafka-connect

  启动 ES sink connector

$CONFLUENT_HOME/bin/connect-standalone /
  $CONFLUENT_HOME/etc/kafka/connect-standalone.properties /
  $CONFLUENT_HOME/etc/kafka/connect-mysql-source.properties /
  $CONFLUENT_HOME/etc/kafka/connect-es-sink.properties

  访问 es 9092 端口查询数据,可以查到有三条数据

# 查询命令
curl -H "Content-Type: application/json" -X GET http://localhost:9200/test-mysql-jdbc-students/_search

# 查到的结果
{
    "took": 121,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 3,
            "relation": "eq"
        },
        "max_score": 1,
        "hits": [
            {
                "_index": "test-mysql-jdbc-students",
                "_type": "_doc",
                "_id": "test-mysql-jdbc-students+0+0",
                "_score": 1,
                "_source": {
                    "rollno": 1,
                    "name": "James",
                    "marks": "35"
                }
            },
            {
                "_index": "test-mysql-jdbc-students",
                "_type": "_doc",
                "_id": "test-mysql-jdbc-students+0+1",
                "_score": 1,
                "_source": {
                    "rollno": 2,
                    "name": "James2",
                    "marks": "36"
                }
            },
            {
                "_index": "test-mysql-jdbc-students",
                "_type": "_doc",
                "_id": "test-mysql-jdbc-students+0+2",
                "_score": 1,
                "_source": {
                    "rollno": 3,
                    "name": "James3",
                    "marks": "37"
                }
            }
        ]
    }
}

  往数据库插入一条新的数据

insert into students (name, marks) values ('James4', 38);

  可以看到 es 侧接收到了这条数据

版权声明:
作者:admin
链接:https://www.techfm.club/p/42215.html
来源:TechFM
文章版权归作者所有,未经允许请勿转载。

THE END
分享
二维码
< <上一篇
下一篇>>