Kafka Connect用户指南

原创
半兽人 发表于: 2016-08-22   最后更新时间: 2021-03-30 10:37:30  
{{totalSubscript}} 订阅, 31,158 游览

8.2 用户指南

提供了一个快速入门的例子,运行一个单机版的Kafka Connect。本节更详细的介绍如何配置,运行和管理Kafka Connect。

运行Kafka Connect

Kafka Connect目前支持两种执行模式:独立(单进程)和分布式。

在独立模式下,所有的工作都在一个单进程中进行的。这样易于配置,在一些情况下,只有一个在工作是好的(例如,收集日志文件),但它不会从kafka Connection的功能受益,如容错。通过下面的命令开始一个单进程的例子:

> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]

第一个参数是worker(工人)的配置,这包括 Kafka连接的参数设置,序列化格式,以及频繁地提交offset(偏移量)。本节提供的例子用的是默认的配置 conf/server.properties。其余的参数是connector(连接器)配置文件。你可以配置你需要的,但是所有的执行都在同一个进程(在不同的线程)。分布式的模式会自动平衡。允许你动态的扩展(或缩减),并在执行任务期间和配置、偏移量提交中提供容错保障,非常类似于独立模式:

bin/connect-distributed.sh config/connect-distributed.properties

在不同的类中,配置参数定义了Kafka Connect如何处理,哪里存储配置,如何分配work,哪里存储offset和任务状态。在分布式模式中,Kafka Connect在topic中存储offset,配置和任务状态。建议手动创建offset的topic,可以自己来定义需要的分区数和副本数。如果启动Kafka Connect时还没有创建topic,那么topic将自动创建(使用默认的分区和副本),这可能不是最合适的(因为kafka可不知道业务需要,只能根据默认参数创建)。特别是以下配置参数尤为关键,启动集群之前设置:

  • group.id (默认connect-cluster) - Connect cluster group使用唯一的名称;注意这不能和consumer group ID(消费者组)冲突。

  • config.storage.topic (默认connect-configs) - topic用于存储connector和任务配置;注意,这应该是一个单个的partition,多副本的topic。你需要手动创建这个topic,以确保是单个partition(自动创建的可能会有多个partition)。

  • offset.storage.topic (默认 connect-offsets) - topic用于存储offsets;这个topic应该配置多个partition和副本。

  • status.storage.topic (默认 connect-status) - topic 用于存储状态;这个topic 可以有多个partitions和副本

注意,在分布式模式中,connector(连接器)配置不能使用命令行。要使用下面介绍的REST API来创建,修改和销毁connector。

配置连接器(connector)

Connector的配置是简单的key-value映射。对于独立模式,这些都是在属性文件中定义,并通过在命令行上的Connect处理。在分布式模式,JSON负载connector的创建(或修改)请求。大多数配置都是依赖的connector,有几个常见的选项:

  • name - 连接器唯一的名称,不能重复。
  • connector.calss - 连接器的Java类。
  • tasks.max - 连接器创建任务的最大数。
  • connector.class配置支持多种格式:全名或连接器类的别名。比如连接器是org.apache.kafka.connect.file.FileStreamSinkConnector,你可以指定全名,也可以使用FileStreamSinkFileStreamSinkConnector。Sink connector也有一个额外的选项来控制它们的输入:
  • topics - 作为连接器的输入的topic列表。

对于其他的选项,你可以查看连接器的文档。

REST API

由于Kafka Connect的目的是作为一个服务运行,提供了一个用于管理connector的REST API。默认情况下,此服务的端口是8083。以下是当前支持的终端入口:

  • GET /connectors - 返回活跃的connector列表
  • POST /connectors - 创建一个新的connector;请求的主体是一个包含字符串name字段和对象config字段(connector的配置参数)的JSON对象。
  • GET /connectors/{name} - 获取指定connector的信息
  • GET /connectors/{name}/config - 获取指定connector的配置参数
  • PUT /connectors/{name}/config - 更新指定connector的配置参数
  • GET /connectors/{name}/status - 获取connector的当前状态,包括它是否正在运行,失败,暂停等。
  • GET /connectors/{name}/tasks - 获取当前正在运行的connector的任务列表。
  • GET /connectors/{name}/tasks/{taskid}/status - 获取任务的当前状态,包括是否是运行中的,失败的,暂停的等,
  • PUT /connectors/{name}/pause - 暂停连接器和它的任务,停止消息处理,直到connector恢复。
  • PUT /connectors/{name}/resume - 恢复暂停的connector(如果connector没有暂停,则什么都不做)
  • POST /connectors/{name}/restart - 重启connector(connector已故障)
  • POST /connectors/{name}/tasks/{taskId}/restart - 重启单个任务 (通常这个任务已失败)
  • DELETE /connectors/{name} - 删除connector, 停止所有的任务并删除其配置

Kafka Connector还提供了获取有关connector plugins信息的REST API:

  • GET /connector-plugins- 返回已在Kafka Connect集群安装的connector plugin列表。请注意,API仅验证处理请求的worker的connector。这以为着你可能看不不一致的结果,特别是在滚动升级的时候(添加新的connector jar)

  • PUT /connector-plugins/{connector-type}/config/validate - 对提供的配置值进行验证,执行对每个配置验证,返回验证的建议值和错误信息。

更新于 2021-03-30

木木&很呆 1年前

我这边尝试把 zabbix 数据库 postgresql 数据导出来,数据是能够导出来了,单数原本value是数字的却变成了字符串
这个问题也没有找他那里可以配置的方案,求解

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"itemid"},{"type":"int32","optional":false,"default":0,"field":"clock"},{"type":"bytes","optional":false,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"4","connect.decimal.precision":"16"},"default":"AA==","field":"value"},{"type":"int32","optional":false,"default":0,"field":"ns"}],"optional":true,"name":"postgres_zabbix.public.history.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"itemid"},{"type":"int32","optional":false,"default":0,"field":"clock"},{"type":"bytes","optional":false,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"4","connect.decimal.precision":"16"},"default":"AA==","field":"value"},{"type":"int32","optional":false,"default":0,"field":"ns"}],"optional":true,"name":"postgres_zabbix.public.history.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres_zabbix.public.history.Envelope"},"payload":{"before":null,"after":{"itemid":159373,"clock":1653528253,"value":"FdMc","ns":449032405},"source":{"version":"1.7.1.Final","connector":"postgresql","name":"postgres-zabbix","ts_ms":1655792312919,"snapshot":"true","db":"zabbix","sequence":"[null,\"222247444242032\"]","schema":"public","table":"history","txId":561854628,"lsn":222247444242032,"xmin":null},"op":"r","ts_ms":1655792312919,"transaction":null}}

connect-distributed.properties的配置:

bootstrap.servers=x.x.x.x:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/opt/kafka/plugins
heartbeat.interval.ms=1000

connectors 信息:

{
  "name": "postgres-connect-kafka",
  "config": {
    "name": "postgres-connect-kafka",
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "x.x.x.x",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "sssssss",
    "database.dbname" : "zabbix",
    "table.whitelist": "public.history",
    "database.server.name": "postgres-zabbix",
    "plugin.name": "pgoutput"
  }
}
查看kafka更多相关的文章或提一个关于kafka的问题,也可以与我们一起分享文章