Kafka Streams配置

半兽人 发表于: 2017-04-02   最后更新时间: 2017-09-25  
  •   133 订阅,2210 游览

3.5 Kafka Streams配置

Kafka Stream客户端库配置(注意,窗口可拖动)。

NAME DESCRIPTION TYPE DEFAULT VALID VALUES IMPORTANCE
application.id 流处理应用程序标识。必须在Kafka集群中是独一无二的。 1)默认客户端ID前缀,2)成员资格管理的group-id,3)changgelog的topic前缀 string high
bootstrap.servers 用于建立与Kafka集群的初始连接的主机/端口列表。 客户端将会连接所有服务器,跟指定哪些服务器无关 - 通过指定的服务器列表会自动发现全部的服务器。此列表格式host1:port1,host2:port2,...由于这些服务器仅用于初始连接以发现完整的集群成员(可能会动态更改),所以此列表不需要包含完整集 的服务器(您可能需要多个服务器,以防指定的服务器关闭)。 list high
client.id 发出请求时传递给服务器的id字符串。 这样做的目的是通过允许将逻辑应用程序名称包含在服务器端请求日志记录中,来追踪请求源的ip/port。 string "" high
zookeeper.connect Zookeeper连接字符串,用于Kafka主题管理。string "" high
connections.max.idle.ms 关闭闲置的连接时间(以毫秒为单位)。 long 540000 medium
key.serde 用于实现Serde接口的key的Serializer / deserializer类。 class org.apache
.kafka.common
.serialization
.Serdes.
ByteArraySerde
medium
partition.grouper 实现PartitionGrouper接口的Partition grouper类。 class org.apache
.kafka.streams
.processor
.DefaultPartitionGrouper
medium
receive.buffer.bytes 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。 如果值为-1,则将使用OS默认值。 int 32768 [0,...] medium
replication.factor 流处理应用程序创建的变更日志topic和重新分区topic的复制因子。 int 1 medium
request.timeout.ms 控制客户端等待请求响应的最长时间。 如果在超时之前未收到响应,客户端将在需要时重新发送请求,如果重试耗尽,则客户端将重新发送请求。 int 40000 [0,...] medium
security.protocol 用于与broker沟通的协议。 有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。 string PLAINTEXT medium
send.buffer.bytes 发送数据时要使用的TCP发送缓冲区(SO_SNDBUF)的大小。 如果值为-1,则将使用OS默认值。 int 131072 [0,...] medium
state.dir 状态仓库的目录位置。 string /tmp/kafka-streams medium
timestamp.extractor 实现TimestampExtractor接口的Timestamp抽取器类。 class org.apache.kafka
.streams.processor.
FailOnInvalidTimestamp
medium
value.serde 用于实现Serde接口的值的Serializer / deserializer类。 class org.apache
.kafka.common
.serialization
.Serdes.ByteArraySerde
medium
windowstore.changelog.additional.retention.ms 添加到Windows维护管理器以确保数据不会从日志中过早删除。默认为1天 long 86400000 medium
application.server host:port指向用户嵌入定义的末端,可用于发现单个KafkaStreams应用程序中状态存储的位置 string "" low
buffered.records.per.partition 每个分区缓存的最大记录数。 int 1000 low
cache.max.bytes.buffering 用于缓冲所有线程的最大内存字节数 long 10485760 [0,...] low
commit.interval.ms 用于保存处理器位置的频率。 long 30000 low
metadata.max.age.ms 我们强制更新元数据时间段(以毫秒为单位),即使我们没有看到任何分区领导变化,主动发现新的broker或分区。 long 300000 [0,...] low
metric.reporters metric reporter的类列表。实现MetricReporter接口,JmxReporter始终包含在注册JMX统计信息中。 list "" low
metrics.num.samples 保持的样本数以计算度量。 int 2 [1,...] low
metrics.recording.level 日志级别。 string INFO [INFO, DEBUG] low
metrics.sample.window.ms 时间窗口计算度量标准。 long 30000 [0,...] low
num.standby.replicas 每个任务的备用副本数。 int 0 low
num.stream.threads 执行流处理的线程数。 int 1 low
poll.ms 阻塞输入等待的时间(以毫秒为单位)。 long 100 low
reconnect.backoff.ms 尝试重新连接之前等待的时间。避免在高频繁的重复连接服务器。 这种backoff适用于消费者向broker发送的所有请求。 long 50 [0,...] low
retry.backoff.ms 尝试重试topic分区的失败请求之前等待的时间。以避免了在某些故障情况下,在频繁重复发送请求。 long 100 [0,...] low
rocksdb.config.setter 一个Rocks DB配置setter类来实现RocksDBConfigSetter接口类 null low
state.cleanup.delay.ms 在分区迁移删除状态之前等待的时间(毫秒)。 long 60000 low






发表于: 5月前   最后更新时间: 昨天   游览量:2210
上一条: Kafka New Consumer配置
下一条: Kafka Connect配置
评论…

  • 评论…
    • in this conversation
      提问