3.6 Kafka Streams配置
Kafka Stream客户端库配置(注意,窗口可拖动)。
NAME | DESCRIPTION | TYPE | DEFAULT | VALID VALUES | IMPORTANCE |
---|---|---|---|---|---|
application.id | 流处理应用程序标识。必须在Kafka集群中是独一无二的。 1)默认客户端ID前缀,2)成员资格管理的group-id,3)changgelog的topic前缀 | string | high | ||
bootstrap.servers | 用于建立与Kafka集群的初始连接的主机/端口列表。 客户端将会连接所有服务器,跟指定哪些服务器无关 - 通过指定的服务器列表会自动发现全部的服务器。此列表格式host1:port1,host2:port2,...由于这些服务器仅用于初始连接以发现完整的集群成员(可能会动态更改),所以此列表不需要包含完整集 的服务器(您可能需要多个服务器,以防指定的服务器关闭)。 | list | high | ||
replication.factor | 流处理程序创建更改日志topic和重新分配topic的副本数 | int | 1 | high | |
state.dir | 状态存储的目录地址。 | string | /tmp/kafka-streams | high | |
cache.max.bytes.buffering | 用于缓冲所有线程的最大内存字节数 | long | 10485760 | [0,...] | low |
client.id | 发出请求时传递给服务器的id字符串。 这样做的目的是通过允许将逻辑应用程序名称包含在服务器端请求日志记录中,来追踪请求源的ip/port。 | string | "" | high | |
default.key.serde | 用于实现Serde接口的key的默认序列化器/解串器类。 | class | org.apache.kafka.common.serialization.Serdes$ByteArraySerde | medium | |
default.timestamp.extractor | 实现TimestampExtractor接口的默认时间戳提取器类。 | class | org.apache.kafka.streams.processor.FailOnInvalidTimestamp | medium | |
default.value.serde | 用于实现Serde接口的值的默认serializer / deserializer类。 | class | org.apache.kafka.common.serialization.Serdes$ByteArraySerde | medium | |
num.standby.replicas | 每个任务的备用副本数。 | int | 0 | low | |
num.stream.threads | 执行流处理的线程数。 | int | 1 | low | |
processing.guarantee | 应使用的加工保证。可能的值为at_least_once(默认)和exact_once。 | string | at_least_once | [at_least_once, exactly_once] | medium |
security.protocol | 用于与broker沟通的协议。 有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。 | string | PLAINTEXT | medium | |
application.server | host:port指向用户嵌入定义的末端,可用于发现单个KafkaStreams应用程序中状态存储的位置 | string | "" | low | |
buffered.records.per.partition | 每个分区缓存的最大记录数。 | int | 1000 | low | |
commit.interval.ms | 用于保存process位置的频率。 注意,如果'processing.guarantee'设置为'exact_once',默认值为100,否则默认值为30000。 | long | 30000 | low | |
connections.max.idle.ms | 关闭闲置的连接时间(以毫秒为单位)。 | long | 540000 | medium | |
key.serde | 用于实现Serde接口的key的Serializer/deserializer类.此配置已被弃用,请改用default.key.serde | class | null | low | |
metadata.max.age.ms | 即使我们没有看到任何分区leader发生变化,主动发现新的broker或分区,强制更新元数据时间(以毫秒为单位)。 | long | 300000 | [0,...] | low |
metric.reporters | metric reporter的类列表。实现MetricReporter接口,JmxReporter始终包含在注册JMX统计信息中。 | list | "" | low | |
metrics.num.samples | 保持的样本数以计算度量。 | int | 2 | [1,...] | low |
metrics.recording.level | 日志级别。 | string | INFO | [INFO, DEBUG] | low |
metrics.sample.window.ms | 时间窗口计算度量标准。 | long | 30000 | [0,...] | low |
partition.grouper | 实现PartitionGrouper接口的Partition grouper类。 | class | org.apache .kafka.streams .processor .DefaultPartitionGrouper |
medium | |
poll.ms | 阻塞输入等待的时间(以毫秒为单位)。 | long | 100 | low | |
receive.buffer.bytes | 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。 如果值为-1,则将使用OS默认值。 | int | 32768 | [0,...] | medium |
reconnect.backoff.max.ms | 因故障无法重新连接broker,重新连接的等待的最大时间(毫秒)。如果提供,每个主机会连续增加,直到达到最大值。随机递增20%的随机抖动以避免连接风暴。 | long | 1000 | [0,...] | low |
reconnect.backoff.ms | 尝试重新连接之前等待的时间。避免在高频繁的重复连接服务器。 这种backoff适用于消费者向broker发送的所有请求。 | long | 50 | [0,...] | low |
request.timeout.ms | 控制客户端等待请求响应的最长时间。如果在配置时间内未收到响应,客户端将在需要时重新发送请求,如果重试耗尽,则请求失败。 | int | 40000 | [0,...] | low |
retry.backoff.ms | 尝试重试失败请求之前等待的时间。以避免了在某些故障情况下,在频繁重复发送请求。 | long | 100 | [0,...] | low |
rocksdb.config.setter | 一个Rocks DB配置setter类,或实现RocksDBConfigSetter接口的类名 | null | low | ||
send.buffer.bytes | 发送数据时要使用的TCP发送缓冲区(SO_SNDBUF)的大小。 如果值为-1,则将使用OS默认值。 | int | 131072 | [0,...] | low |
state.cleanup.delay.ms | 在分区迁移删除状态之前等待的时间(毫秒)。 | long | 60000 | low | |
timestamp.extractor | 实现TimestampExtractor接口的Timestamp抽取器类。此配置已弃用,请改用default.timestamp.extractor | class | null | low | |
windowstore.changelog.additional.retention.ms | 添加到Windows维护管理器以确保数据不会从日志中过早删除。默认为1天 | long | 86400000 | low | |
zookeeper.connect | Zookeeper连接字符串,用于Kafka主题管理。此配置已被弃用,将被忽略,因为Streams API不再使用Zookeeper。 | string | "" | low |
Kafka Stream 要配置 auto.offset.reset=latest 可以使用:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
吗?
不行,
StreamsConfig.xxxx
的才可以请问有相关的配置方法吗?
默认的呀,不需要设置吧
我使用的版本的2.5.1 ,默认配置应该是没有记录偏移量就从头开始消费
确认一下 告诉我答案,我去看看。
这是在
org.apache.kafka.streams.StreamsConfig
下的部分代码:private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES; static { final Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>(); tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000"); tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); tempConsumerDefaultOverrides.put("internal.leave.group.on.close", false); CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides); } private static final Map<String, Object> CONSUMER_EOS_OVERRIDES; static { final Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>(CONSUMER_DEFAULT_OVERRIDES); tempConsumerDefaultOverrides.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_COMMITTED.name().toLowerCase(Locale.ROOT)); CONSUMER_EOS_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides); }