Kafka Streams配置

原创
半兽人 发表于: 2017-04-02   最后更新时间: 2017-10-31 23:13:37  
{{totalSubscript}} 订阅, 20,475 游览

3.6 Kafka Streams配置

Kafka Stream客户端库配置(注意,窗口可拖动)。

NAME DESCRIPTION TYPE DEFAULT VALID VALUES IMPORTANCE
application.id 流处理应用程序标识。必须在Kafka集群中是独一无二的。 1)默认客户端ID前缀,2)成员资格管理的group-id,3)changgelog的topic前缀 string high
bootstrap.servers 用于建立与Kafka集群的初始连接的主机/端口列表。 客户端将会连接所有服务器,跟指定哪些服务器无关 - 通过指定的服务器列表会自动发现全部的服务器。此列表格式host1:port1,host2:port2,...由于这些服务器仅用于初始连接以发现完整的集群成员(可能会动态更改),所以此列表不需要包含完整集 的服务器(您可能需要多个服务器,以防指定的服务器关闭)。 list high
replication.factor 流处理程序创建更改日志topic和重新分配topic的副本数 int 1 high
state.dir 状态存储的目录地址。 string /tmp/kafka-streams high
cache.max.bytes.buffering 用于缓冲所有线程的最大内存字节数 long 10485760 [0,...] low
client.id 发出请求时传递给服务器的id字符串。 这样做的目的是通过允许将逻辑应用程序名称包含在服务器端请求日志记录中,来追踪请求源的ip/port。 string "" high
default.key.serde 用于实现Serde接口的key的默认序列化器/解串器类。 class org.apache.kafka.common.serialization.Serdes$ByteArraySerde medium
default.timestamp.extractor 实现TimestampExtractor接口的默认时间戳提取器类。 class org.apache.kafka.streams.processor.FailOnInvalidTimestamp medium
default.value.serde 用于实现Serde接口的值的默认serializer / deserializer类。 class org.apache.kafka.common.serialization.Serdes$ByteArraySerde medium
num.standby.replicas 每个任务的备用副本数。 int 0 low
num.stream.threads 执行流处理的线程数。 int 1 low
processing.guarantee 应使用的加工保证。可能的值为at_least_once(默认)和exact_once。 string at_least_once [at_least_once, exactly_once] medium
security.protocol 用于与broker沟通的协议。 有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。 string PLAINTEXT medium
application.server host:port指向用户嵌入定义的末端,可用于发现单个KafkaStreams应用程序中状态存储的位置 string "" low
buffered.records.per.partition 每个分区缓存的最大记录数。 int 1000 low
commit.interval.ms 用于保存process位置的频率。 注意,如果'processing.guarantee'设置为'exact_once',默认值为100,否则默认值为30000。 long 30000 low
connections.max.idle.ms 关闭闲置的连接时间(以毫秒为单位)。 long 540000 medium
key.serde 用于实现Serde接口的key的Serializer/deserializer类.此配置已被弃用,请改用default.key.serde class null low
metadata.max.age.ms 即使我们没有看到任何分区leader发生变化,主动发现新的broker或分区,强制更新元数据时间(以毫秒为单位)。 long 300000 [0,...] low
metric.reporters metric reporter的类列表。实现MetricReporter接口,JmxReporter始终包含在注册JMX统计信息中。 list "" low
metrics.num.samples 保持的样本数以计算度量。 int 2 [1,...] low
metrics.recording.level 日志级别。 string INFO [INFO, DEBUG] low
metrics.sample.window.ms 时间窗口计算度量标准。 long 30000 [0,...] low
partition.grouper 实现PartitionGrouper接口的Partition grouper类。 class org.apache
.kafka.streams
.processor
.DefaultPartitionGrouper
medium
poll.ms 阻塞输入等待的时间(以毫秒为单位)。 long 100 low
receive.buffer.bytes 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。 如果值为-1,则将使用OS默认值。 int 32768 [0,...] medium
reconnect.backoff.max.ms 因故障无法重新连接broker,重新连接的等待的最大时间(毫秒)。如果提供,每个主机会连续增加,直到达到最大值。随机递增20%的随机抖动以避免连接风暴。 long 1000 [0,...] low
reconnect.backoff.ms 尝试重新连接之前等待的时间。避免在高频繁的重复连接服务器。 这种backoff适用于消费者向broker发送的所有请求。 long 50 [0,...] low
request.timeout.ms 控制客户端等待请求响应的最长时间。如果在配置时间内未收到响应,客户端将在需要时重新发送请求,如果重试耗尽,则请求失败。 int 40000 [0,...] low
retry.backoff.ms 尝试重试失败请求之前等待的时间。以避免了在某些故障情况下,在频繁重复发送请求。 long 100 [0,...] low
rocksdb.config.setter 一个Rocks DB配置setter类,或实现RocksDBConfigSetter接口的类名 null low
send.buffer.bytes 发送数据时要使用的TCP发送缓冲区(SO_SNDBUF)的大小。 如果值为-1,则将使用OS默认值。 int 131072 [0,...] low
state.cleanup.delay.ms 在分区迁移删除状态之前等待的时间(毫秒)。 long 60000 low
timestamp.extractor 实现TimestampExtractor接口的Timestamp抽取器类。此配置已弃用,请改用default.timestamp.extractor class null low
windowstore.changelog.additional.retention.ms 添加到Windows维护管理器以确保数据不会从日志中过早删除。默认为1天 long 86400000 low
zookeeper.connect Zookeeper连接字符串,用于Kafka主题管理。此配置已被弃用,将被忽略,因为Streams API不再使用Zookeeper。 string "" low
更新于 2017-10-31
在线,1小时前登录

光年 2年前

Kafka Stream 要配置 auto.offset.reset=latest 可以使用:

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");

吗?

半兽人 -> 光年 2年前

不行,StreamsConfig.xxxx的才可以

光年 -> 半兽人 2年前

请问有相关的配置方法吗?

半兽人 -> 光年 2年前

默认的呀,不需要设置吧

光年 -> 半兽人 2年前

我使用的版本的2.5.1 ,默认配置应该是没有记录偏移量就从头开始消费

半兽人 -> 光年 2年前

确认一下 告诉我答案,我去看看。

光年 -> 半兽人 2年前

这是在 org.apache.kafka.streams.StreamsConfig 下的部分代码:

private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES;
static {
    final Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>();
    tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
    tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    tempConsumerDefaultOverrides.put("internal.leave.group.on.close", false);
    CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
}

private static final Map<String, Object> CONSUMER_EOS_OVERRIDES;
static {
    final Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>(CONSUMER_DEFAULT_OVERRIDES);
    tempConsumerDefaultOverrides.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_COMMITTED.name().toLowerCase(Locale.ROOT));
    CONSUMER_EOS_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
}
查看kafka更多相关的文章或提一个关于kafka的问题,也可以与我们一起分享文章