8.2 用户指南
提供了一个快速入门的例子,运行一个单机版的Kafka Connect。本节更详细的介绍如何配置,运行和管理Kafka Connect。
运行Kafka Connect
Kafka Connect目前支持两种执行模式:独立(单进程)和分布式。
在独立模式下,所有的工作都在一个单进程中进行的。这样易于配置,在一些情况下,只有一个在工作是好的(例如,收集日志文件),但它不会从kafka Connection的功能受益,如容错。通过下面的命令开始一个单进程的例子:
> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
第一个参数是worker(工人)的配置,这包括 Kafka连接的参数设置,序列化格式,以及频繁地提交offset(偏移量)。本节提供的例子用的是默认的配置 conf/server.properties
。其余的参数是connector(连接器)
配置文件。你可以配置你需要的,但是所有的执行都在同一个进程(在不同的线程)。分布式的模式会自动平衡。允许你动态的扩展(或缩减),并在执行任务期间和配置、偏移量提交中提供容错保障,非常类似于独立模式:
bin/connect-distributed.sh config/connect-distributed.properties
在不同的类中,配置参数定义了Kafka Connect如何处理,哪里存储配置,如何分配work,哪里存储offset和任务状态。在分布式模式中,Kafka Connect在topic中存储offset,配置和任务状态。建议手动创建offset的topic,可以自己来定义需要的分区数和副本数。如果启动Kafka Connect时还没有创建topic,那么topic将自动创建(使用默认的分区和副本),这可能不是最合适的(因为kafka可不知道业务需要,只能根据默认参数创建)。特别是以下配置参数尤为关键,启动集群之前设置:
group.id
(默认connect-cluster
) - Connect cluster group使用唯一的名称;注意这不能和consumer group ID(消费者组)冲突。config.storage.topic
(默认connect-configs
) - topic用于存储connector和任务配置;注意,这应该是一个单个的partition,多副本的topic。你需要手动创建这个topic,以确保是单个partition(自动创建的可能会有多个partition)。offset.storage.topic
(默认connect-offsets
) - topic用于存储offsets;这个topic应该配置多个partition和副本。status.storage.topic
(默认connect-status
) - topic 用于存储状态;这个topic 可以有多个partitions和副本
注意,在分布式模式中,connector(连接器)配置不能使用命令行。要使用下面介绍的REST API来创建,修改和销毁connector。
配置连接器(connector)
Connector的配置是简单的key-value映射。对于独立模式,这些都是在属性文件中定义,并通过在命令行上的Connect处理。在分布式模式,JSON负载connector的创建(或修改)请求。大多数配置都是依赖的connector,有几个常见的选项:
name
- 连接器唯一的名称,不能重复。connector.calss
- 连接器的Java类。tasks.max
- 连接器创建任务的最大数。connector.class
配置支持多种格式:全名或连接器类的别名。比如连接器是org.apache.kafka.connect.file.FileStreamSinkConnector
,你可以指定全名,也可以使用FileStreamSink
或FileStreamSinkConnector
。Sink connector也有一个额外的选项来控制它们的输入:- topics - 作为连接器的输入的topic列表。
对于其他的选项,你可以查看连接器的文档。
REST API
由于Kafka Connect的目的是作为一个服务运行,提供了一个用于管理connector的REST API。默认情况下,此服务的端口是8083
。以下是当前支持的终端入口:
GET /connectors
- 返回活跃的connector列表POST /connectors
- 创建一个新的connector;请求的主体是一个包含字符串name字段和对象config字段(connector的配置参数)的JSON对象。GET /connectors/{name}
- 获取指定connector的信息GET /connectors/{name}/config
- 获取指定connector的配置参数PUT /connectors/{name}/config
- 更新指定connector的配置参数GET /connectors/{name}/status
- 获取connector的当前状态,包括它是否正在运行,失败,暂停等。GET /connectors/{name}/tasks
- 获取当前正在运行的connector的任务列表。GET /connectors/{name}/tasks/{taskid}/status
- 获取任务的当前状态,包括是否是运行中的,失败的,暂停的等,PUT /connectors/{name}/pause
- 暂停连接器和它的任务,停止消息处理,直到connector恢复。PUT /connectors/{name}/resume
- 恢复暂停的connector(如果connector没有暂停,则什么都不做)POST /connectors/{name}/restart
- 重启connector(connector已故障)POST /connectors/{name}/tasks/{taskId}/restart
- 重启单个任务 (通常这个任务已失败)DELETE /connectors/{name}
- 删除connector, 停止所有的任务并删除其配置
Kafka Connector还提供了获取有关connector plugins
信息的REST API:
GET /connector-plugins
- 返回已在Kafka Connect集群安装的connector plugin
列表。请注意,API仅验证处理请求的worker的connector。这以为着你可能看不不一致的结果,特别是在滚动升级的时候(添加新的connector jar)PUT /connector-plugins/{connector-type}/config/validate
- 对提供的配置值进行验证,执行对每个配置验证,返回验证的建议值和错误信息。
我这边尝试把 zabbix 数据库 postgresql 数据导出来,数据是能够导出来了,单数原本value是数字的却变成了字符串
这个问题也没有找他那里可以配置的方案,求解
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"itemid"},{"type":"int32","optional":false,"default":0,"field":"clock"},{"type":"bytes","optional":false,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"4","connect.decimal.precision":"16"},"default":"AA==","field":"value"},{"type":"int32","optional":false,"default":0,"field":"ns"}],"optional":true,"name":"postgres_zabbix.public.history.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"itemid"},{"type":"int32","optional":false,"default":0,"field":"clock"},{"type":"bytes","optional":false,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"4","connect.decimal.precision":"16"},"default":"AA==","field":"value"},{"type":"int32","optional":false,"default":0,"field":"ns"}],"optional":true,"name":"postgres_zabbix.public.history.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres_zabbix.public.history.Envelope"},"payload":{"before":null,"after":{"itemid":159373,"clock":1653528253,"value":"FdMc","ns":449032405},"source":{"version":"1.7.1.Final","connector":"postgresql","name":"postgres-zabbix","ts_ms":1655792312919,"snapshot":"true","db":"zabbix","sequence":"[null,\"222247444242032\"]","schema":"public","table":"history","txId":561854628,"lsn":222247444242032,"xmin":null},"op":"r","ts_ms":1655792312919,"transaction":null}}
connect-distributed.properties的配置:
bootstrap.servers=x.x.x.x:9092 group.id=connect-cluster key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter offset.storage.topic=connect-offsets offset.storage.replication.factor=1 config.storage.topic=connect-configs config.storage.replication.factor=1 status.storage.topic=connect-status status.storage.replication.factor=1 offset.flush.interval.ms=10000 plugin.path=/opt/kafka/plugins heartbeat.interval.ms=1000
connectors 信息:
{ "name": "postgres-connect-kafka", "config": { "name": "postgres-connect-kafka", "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "x.x.x.x", "database.port": "5432", "database.user": "postgres", "database.password": "sssssss", "database.dbname" : "zabbix", "table.whitelist": "public.history", "database.server.name": "postgres-zabbix", "plugin.name": "pgoutput" } }