Kafka Connect配置

原创
半兽人 发表于: 2017-04-04   最后更新时间: 2021-01-14 23:13:58  
{{totalSubscript}} 订阅, 26,327 游览

3.5 Kafka Connect Configs

Kafka Connect框架的相关配置(注意,窗口可向右拉动)。

NAME DESCRIPTION TYPE DEFAULT VALID VALUES IMPORTANCE
config.storage.topic kafka topic仓库配置 string high
group.id 唯一的字符串,用于标识此worker所属的Connect集群组。 string high
key.converter 用于Kafka Connect和写入到Kafka的序列化消息的之间格式转换的转换器类。 这可以控制写入或从kafka读取的消息中的键的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。 class high
offset.storage.topic 连接器的offset存储到哪个topic中 string high
status.storage.topic 追踪连接器和任务状态存储到哪个topic中 string high
value.converter 用于Kafka Connect格式和写入Kafka的序列化格式之间转换的转换器类。 控制了写入或从Kafka读取的消息中的值的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。 class high
internal.key.converter 用于在Kafka Connect格式和写入Kafka的序列化格式之间转换的转换器类。 这可以控制写入或从Kafka读取的消息中的key的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。 此设置用于控制框架内部使用的记账数据的格式,例如配置和偏移量,因此用户可以使用运行各种Converter实现。 class low
internal.value.converter 用于在Kafka Connect格式和写入Kafka的序列化格式之间转换的转换器类。 这控制了写入或从Kafka读取的消息中的值的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。 此设置用于控制框架内部使用的记账数据的格式,例如配置和偏移量,因此用户可以使用运行各种Converter实现。 class low
bootstrap.servers 用于建立与Kafka集群的初始连接的主机/端口列表。此列表用来发现完整服务器集的初始主机。 该列表的格式应为host1:port1,host2:port2,....由于这些服务器仅用于初始连接以发现完整的集群成员资格(可能会动态更改),因此,不需要包含完整的服务器(尽管如此,你需要多配置几个,以防止配置的宕机)。 list localhost:9092 high
heartbeat.interval.ms 心跳间隔时间。心跳用于确保会话保持活动,并在新成员加入或离开组时进行重新平衡。 该值必须设置为低于session.timeout.ms,但通常应设置为不高于该值的1/3。 int 3000 high
rebalance.timeout.ms 限制所有组中消费者的任务处理数据和提交offset所需的时间。如果超时,那么woker将从组中删除,这也将导致offset提交失败。 int 60000 high
session.timeout.ms 用于察觉worker故障的超时时间。worker定时发送心跳以表明自己是活着的。如果broker在会话超时时间到期之前没有接收到心跳,那么broker将从分组中移除该worker,并启动重新平衡。注意,该值必须在group.min.session.timeout.msgroup.max.session.timeout.ms范围内。 int 10000 high
ssl.key.password 密钥存储文件中私钥的密码。 这对于客户端是可选的。 password null high
ssl.keystore.location 密钥存储文件的位置。 这对于客户端是可选的,可以用于客户端的双向身份验证。 string null high
ssl.keystore.password 密钥存储文件的存储密码。 客户端是可选的,只有配置了ssl.keystore.location才需要。 password null high
ssl.truststore.location 信任存储文件的位置。 string null high
ssl.truststore.password 信任存储文件的密码。 password null high
connections.max.idle.ms 多少毫秒之后关闭空闲的连接。 long 540000 medium
receive.buffer.bytes 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。 如果值为-1,则将使用OS默认值。 int 32768 [0,...] medium
request.timeout.ms 配置控制客户端等待请求响应的最长时间。 如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 int 40000 [0,...] medium
sasl.jaas.config 用于JAAS配置文件的SASL连接的JAAS登录上下文参数格式。这里描述了JAAS配置文件的格式。该值的格式为:' (=)*;' password null medium
sasl.kerberos.service.name Kafka运行的Kerberos principal名称。 可以在Kafka的JAAS配置或Kafka的配置中定义。 string null medium
sasl.mechanism 用户客户端连接的SASL机制。可以提供者任何安全机制。 GSSAPI是默认机制。 string GSSAPI medium
security.protocol 用于和broker通讯的策略。有效的值有:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。 string PLAINTEXT medium
send.buffer.bytes 发送数据时使用TCP发送缓冲区(SO_SNDBUF)的大小。如果值为-1,则将使用OS默认。 int 131072 [-1,...] medium
ssl.enabled.protocols 启用SSL连接的协议列表。 list TLSv1.2,TLSv1
.1,TLSv1
medium
ssl.keystore.type 密钥存储文件的文件格式。 对于客户端是可选的。 string JKS medium
ssl.protocol 用于生成SSLContext的SSL协议。 默认设置是TLS,这对大多数情况都是适用的。 最新的JVM中的允许值为TLS,TLSv1.1和TLSv1.2。 旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用SSL。 string TLS medium
ssl.provider 用于SSL连接的安全提供程序的名称。 默认值是JVM的默认安全提供程序。 string null medium
ssl.truststore.type 信任存储文件的文件格式。 string JKS medium
worker.sync.timeout.ms 当worker与其他worker不同步并需要重新同步配置时,需等待一段时间才能离开组,然后才能重新加入。 int 3000 medium
worker.unsync.backoff.ms 当worker与其他worker不同步,并且无法在worker.sync.timeout.ms 期间追赶上,在重新连接之前,退出Connect集群的时间。 int 300000 medium
access.control.allow.methods 通过设置Access-Control-Allow-Methods标头来设置跨源请求支持的方法。 Access-Control-Allow-Methods标头的默认值允许GET,POST和HEAD的跨源请求。 string "" low
access.control.allow.origin 将Access-Control-Allow-Origin标头设置为REST API请求。要启用跨源访问,请将其设置为应该允许访问API的应用程序的域,或者 *" 以允许从任何的。 默认值只允许从REST API的域访问。 string "" low
client.id 在发出请求时传递给服务器的id字符串。这样做的目的是通过允许逻辑应用程序名称包含在请求消息中,来跟踪请求来源。而不仅仅是ip/port string "" low
config.storage.replication.factor 当创建配置仓库topic时的副本数 short 3 [1,...] low
metadata.max.age.ms 在没有任何分区leader改变,主动地发现新的broker或分区的时间。 long 300000 [0,...] low
metric.reporters A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics. list "" low
metrics.num.samples 保留计算metrics的样本数(译者不清楚是做什么的) int 2 [1,...] low
metrics.sample.window.ms The window of time a metrics sample is computed over. long 30000 [0,...] low
offset.flush.interval.ms 尝试提交任务偏移量的间隔。 long 60000 low
offset.flush.timeout.ms 在取消进程并恢复要在之后尝试提交的offset数据之前,等待消息刷新并分配要提交到offset仓库的offset数据的最大毫秒数。 long 5000 low
offset.storage.partitions 创建offset仓库topic的分区数 int 25 [1,...] low
offset.storage.replication.factor 创建offset仓库topic的副本数 short 3 [1,...] low
plugin.path 包含插件(连接器,转换器,转换)逗号(,)分隔的路径列表。该列表应包含顶级目录,其中包括以下任何组合:a)包含jars与插件及其依赖关系的目录 b)具有插件及其依赖项的uber-jars c)包含插件类的包目录结构的目录及其依赖关系,注意:将遵循符号链接来发现依赖关系或插件。 示例:plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors list null low
reconnect.backoff.max.ms 无法连接broker时等待的最大时间(毫秒)。如果设置,则每个host的将会持续的增加,直到达到最大值。计算增加后,再增加20%的随机抖动,以避免高频的反复连接。 long 1000 [0,...] low
reconnect.backoff.ms 尝试重新连接到主机之前等待的时间。 避免了高频率反复的连接主机。 这种机制适用于消费者向broker发送的所有请求。 long 50 [0,...] low
rest.advertised.host.name 如果设置,其他wokers将通过这个hostname进行连接。 string null low
rest.advertised.port 如果设置,其他的worker将通过这个端口进行连接。 int null low
rest.host.name REST API的主机名。 如果设置,它将只绑定到这个接口。 string null low
rest.port 用于监听REST API的端口 int 8083 low
retry.backoff.ms 失败请求重新尝试之前的等待时间,避免了在某些故障的情况下,频繁的重复发送请求。 long 100 [0,...] low
sasl.kerberos.kinit.cmd Kerberos kinit命令路径. string /usr/bin/kinit low
sasl.kerberos.min.time.before.relogin 尝试refresh之间登录线程的休眠时间. long 60000 low
sasl.kerberos.ticket.renew.jitter 添加到更新时间的随机抖动百分比。 double 0.05 low
sasl.kerberos.ticket.renew.window.factor 登录线程将休眠,直到从上次刷新ticket到期,此时将尝试续订ticket。 double 0.8 low
ssl.cipher.suites 密码套件列表。用于TLS或SSL网络协议协商网络连接的安全设置的认证,加密,MAC和密钥交换算法的命名组合。 默认情况下,支持所有可用的密码套件。 list null low
ssl.endpoint.identification.algorithm 末端识别算法使用服务器证书验证服务器主机名。 string null low
ssl.keymanager.algorithm 用于SSL连接的key管理工厂的算法,默认值是Java虚拟机配置的密钥管理工厂算法。 string SunX509 low
ssl.secure.random.implementation 用于SSL加密操作的SecureRandom PRNG实现。 string null low
ssl.trustmanager.algorithm 用于SSL连接的信任管理仓库算法。默认值是Java虚拟机配置的信任管理器工厂算法。 string PKIX low
status.storage.partitions 用于创建状态仓库topic的分区数 int 5 [1,...] low
status.storage.replication.factor 用于创建状态仓库topic的副本数 short 3 [1,...] low
task.shutdown.graceful.timeout.ms 等待任务正常关闭的时间,这是总时间,不是每个任务,所有任务触发关闭,然后依次等待。 long 5000 low

kafka >= 2.0.0

名称 描述 类型 默认 有效值 重要程度
sasl.client.callback.handler.class 实现AuthenticateCallbackHandler接口的SASL客户端回调处理程序类的全称。 class null 中间
sasl.login.callback.handler.class 实现AuthenticateCallbackHandler接口的SASL登录回调处理程序类的全称。对于broker来说,登录回调处理程序配置必须以监听器前缀和小写的SASL机制名称为前缀。例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler class null 中间
sasl.login.class 实现Login接口的类的全称。对于broker来说,login config必须以监听器前缀和SASL机制名称为前缀,并使用小写。例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin class null 中间

kafka >= 2.7

名称 描述 类型 默认 有效值 重要程度
ssl.truststore.certificates 可信证书的格式由'ssl.truststore.type'指定。默认的SSL引擎工厂只支持带X.509证书的PEM格式。 password null
socket.connection.setup.timeout.max.ms 客户端等待建立socket连接的最大时间。连接设置超时时间将随着每一次连续的连接失败而成倍增加,直到这个最大值。为了避免连接风暴,超时时间将被应用一个0.2的随机因子,导致计算值在20%以下和20%以上的随机范围。 long 127000 (127 seconds) 中间
socket.connection.setup.timeout.ms 客户端等待建立socket连接的时间。如果在超时之前没有建立连接,客户端将关闭socket通道。 long 10000 (10 seconds) 中间
更新于 2021-01-14

吼吼嘿哈 2年前

您好,我的消费日志中出现大量commit of offsets timed out.而且消费数据很慢造成了数据积压。这个超时时间是由offset.flush.timeout.ms控制的吗?如果是,我增大它的值有什么约束条件吗?

半兽人 -> 吼吼嘿哈 2年前

是的,该错误表明,有很多信息被缓冲,在超时之前无法刷新。为了解决这个问题,你可以

  • 增加Kafka Connect Worker Configs中的offset.flush.timeout.ms配置参数。
  • 或者你可以通过减少Kafka Connect Worker Configs中的producer.buffer.memory来减少被缓冲的数据量。

当你有相当大的消息时,这将是最好的选择。

Curtain 2年前

您好,如何在docker-compose.yml里面配置让kafka启动后自动执行指定的connector

半兽人 -> Curtain 2年前

只能通过编写cmd实现:docker的cmd和entrypoint的区别

Curtain -> 半兽人 2年前

谢谢,我试下

Curtain -> 半兽人 2年前

您好,我按照你的方式在Dockerfile里面的最后一行加了启动连接器的命令。当我运行容器的时候,查看日志,日志报:

[AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient:766)

这个问题之前也遇到过。之前的场景是这样的:我刚启动kafka容器,然后立马进入容器,执行启动连接器的命令就会出现这个问题。如果是启动kafka容器后,隔一段时间再进入容器执行命令,就正常。

因此我猜测,这个问题可不可能是因为kafka还没完全启动我就执行了启动连接器的命令?如果是的话,请问一下,怎么在dockerfile里面确保kafka完全启动后,在执行启动连接器命令?

半兽人 -> Curtain 2年前

加个sleep,或者做个while。

脾气比较躁 3年前

你好,在开启kerberos的kafka集群,通过kafka客户端模式如何启动kafka connect的standalone模式?kerberos认证信息要写在哪个配置文件中?是connect-standalone.properties这个文件吗?有样例吗?

脾气比较躁 3年前

你好,我现在用的kafka的版本0.10.2.0,要用kafka connect的standalone模式采集采集oracle数据,执行命令:

./bin/connect-standalone.sh ./config/connect-standalone.properties ./config/OracleSourceConnector.properties

然后写入kafka中的topic就会报:

[2021-04-13 16:13:48,274] WARN Bootstrap broker mytest403:9092 disconnected (org.apache.kafka.clients.NetworkClient:694)

猜测是因为kerberos的问题:

[2021-04-13 16:13:18,722] INFO ProducerConfig values:     
sasl.kerberos.service.name = null
sasl.kerberos.service.principal.instance = null
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT

以上参数怎么修改?启动的时候读取的默认配置又是那里?实在是不清楚,求大佬指教

你要到kafka日志中找具体的报错原因。

现在集群是TDH的,我这边是利用的客户端模式,执行的,是要去看kafka集群的吗?还是看客户端的?
以下是客户端执行报错:

[2021-04-13 17:59:11,746] WARN Bootstrap broker mytest403:9092 disconnected (org.apache.kafka.clients.NetworkClient:694)
[2021-04-13 17:59:11,782] ERROR Failed to flush WorkerSourceTask{id=oracle-kafka-connector-0}, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:304)
[2021-04-13 17:59:11,783] ERROR Failed to commit offsets for WorkerSourceTask{id=oracle-kafka-connector-0} (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:112)

去看看kafka集群日志的报错。

lq0317。 6年前
[2018-12-14 19:45:51,840] INFO Got user-level KeeperException when processing sessionid:0x1007d8b3ca30000 type:create cxid:0x1 zxid:0x20 txntype:-1 reqpath:n/a Error Path:/consumers Error:KeeperErrorCode = NodeExists for /consumers (org.apache.zookeeper.server.PrepRequestProcessor)
[2018-12-14 19:45:51,849] INFO Got user-level KeeperException when processing sessionid:0x1007d8b3ca30000 type:create cxid:0x2 zxid:0x21 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor)
[2018-12-14 19:45:51,851] INFO Got user-level KeeperException when processing sessionid:0x1007d8b3ca30000 type:create cxid:0x3 zxid:0x22 txntype:-1 reqpath:n/a Error Path:/brokers/topics Error:KeeperErrorCode = NodeExists for /brokers/topics (org.apache.zookeeper.server.PrepRequestProcessor)
[2018-12-14 19:45:51,852] INFO Got user-level KeeperException when processing sessionid:0x1007d8b3ca30000 type:create cxid:0x4 zxid:0x23 txntype:-1 reqpath:n/a Error Path:/config/changes Error:KeeperErrorCode = NodeExists for /config/changes (org.apache.zookeeper.server.PrepRequestProcessor)
[2018-12-14 19:45:51,854] INFO Got user-level KeeperException when processing sessionid:0x1007d8b3ca30000 type:create cxid:0x5 zxid:0x24 txntype:-1 reqpath:n/a Error Path:/admin/delete_topics Error:KeeperErrorCode = NodeExists for /admin/delete_topics (org.apache.zookeeper.server.PrepRequestProcessor)
[2018-12-14 19:45:51,855] INFO Got user-level KeeperException when processing sessionid:0x1007d8b3ca30000 type:create cxid:0x6 zxid:0x25 txntype:-1 reqpath:n/a Error Path:/brokers/seqid Error:KeeperErrorCode = NodeExists for /brokers/seqid (org.apache.zookeeper.server.PrepRequestProcessor)

启动zk 然后启动kafka报错 这是什么导致的

matteo 6年前

conncet读文件(10万行左右),过一段时间总是会自动停止往topic发送数据,必须重新启动才会再次执行,不知道是什么原因。

__xiao_# -> matteo 5年前

请问 这个问题后来你解决了吗

布布猛猛 7年前

看完了,感谢作者的无私奉献

查看kafka更多相关的文章或提一个关于kafka的问题,也可以与我们一起分享文章