3.2 Topic配置
与topic相关的配置,服务器的默认值,也可可选择的覆盖指定的topic。如果没有给出指定topic的配置,则将使用服务器默认值。 可以通过-config选项在topic创建时设置。此示例使用自定义最大消息的大小和刷新率,创建一个名为my-topic
的topic:
> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1
--replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
也可以使用alter configs
命令修改或设置。 此示例修改更新my-topic
的最大的消息大小:
> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic
--alter --add-config max.message.bytes=128000
你可以执行以下命令验证结果
> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --describe
移除:
> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes
以下是topic级配置。是服务器的默认配置。服务器默认配置值仅适用于topic。
名称 | 描述 | 类型 | 默认 | 有效的值 | 服务器默认属性 | 重要程度 |
---|---|---|---|---|---|---|
cleanup.policy | “delete”或“compact”。指定在旧的日志段的保留策略。默认策略(“delete”),将达到保留时间或大小限制的日志废弃。 “compact”则压缩日志。 | list | delete | [compact, delete] | log.cleanup.policy | medium |
compression.type | 针对指定的topic设置最终的压缩方式。标准的压缩格式有'gzip', 'snappy', lz4。还可以设置'uncompressed',就是不压缩;设置为'producer'这意味着保留生产者设置的原始压缩编解码。 | string | producer | [uncompressed, snappy, lz4, gzip, producer] | compression.type | medium |
delete.retention.ms | 保留删除消息压缩topic的删除标记的时间。此设置还给出消费者如果从offset 0开始读取并确保获得最终阶段的有效快照的时间范围(否则,在完成扫描之前可能已经回收了)。 | long | 86400000 | [0,...] | log.cleaner.delete.retention.ms | medium |
file.delete.delay.ms | 从文件系统中删除文件之前等待的时间 | long | 60000 | [0,...] | log.segment.delete.delay.ms | medium |
flush.messages | 此设置允许指定我们强制fsync写入日志的数据的间隔。例如,如果这被设置为1,我们将在每个消息之后fsync; 如果是5,我们将在每五个消息之后fsync。一般,我们建议不要设置它,使用复制特性来保持持久性,并允许操作系统的后台刷新功能更高效。可以在每个topic的基础上覆盖此设置(请参阅每个主题的配置部分)。 | medium | long | 9223372036854775807 | [0,...] | log.flush.interval.messages |
flush.ms | 此设置允许我们强制fsync写入日志的数据的时间间隔。例如,如果这设置为1000,那么在1000ms过去之后,我们将fsync。 一般,我们建议不要设置它,并使用复制来保持持久性,并允许操作系统的后台刷新功能,因为它更有效率 | long | 9223372036854775807 | [0,...] | log.flush.interval.ms | medium |
follower.replication.throttled.replicas | follower复制限流列表。该列表应以[PartitionId]的形式描述一组副本:[BrokerId],[PartitionId]:[BrokerId]:...或者通配符'*'可用于限制此topic的所有副本。 | list | "" | [partitionId],[brokerId]:[partitionId],[brokerId]:... | follower.replication.throttled.replicas | medium |
index.interval.bytes | 此设置控制Kafka向其offset索引添加索引条目的频率。默认设置确保我们大致每4096个字节索引消息。 更多的索引允许读取更接近日志中的确切位置,但使索引更大。你不需要改变这个值。 | int | 4096 | [0,...] | log.index.interval.bytes | medium |
leader.replication.throttled.replicas | 在leader方面进行限制的副本列表。该列表设置以[PartitionId]的形式描述限制副本:[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:...或使用通配符‘*’限制该topic的所有副本。 | list | "" | [partitionId],[brokerId]:[partitionId],[brokerId]:... | leader.replication.throttled.replicas | medium |
max.message.bytes | kafka允许的最大的消息批次大小。如果增加此值,并且消费者的版本比0.10.2老,那么消费者的提取的大小也必须增加,以便他们可以获取大的消息批次。 在最新的消息格式版本中,消息总是分组批量来提高效率。在之前的消息格式版本中,未压缩的记录不会分组批量,并且此限制仅适用于该情况下的单个消息。 |
int | 1000012 | [0,...] | message.max.bytes | medium |
message.format.version | 指定消息附加到日志的消息格式版本。该值应该是一个有效的ApiVersion。例如:0.8.2, 0.9.0.0, 0.10.0,更多细节检查ApiVersion。通过设置特定的消息格式版本,并且磁盘上的所有现有消息都小于或等于指定版本。不正确地设置此值将导致消费者使用旧版本,因为他们将接收到“不认识”的格式的消息。 | string | 0.11.0-IV2 | log.message.format.version | medium | |
min.cleanable.dirty.ratio | 此配置控制日志压缩程序将尝试清除日志的频率(假设启用了日志压缩)。默认情况下,我们将避免清理超过50%日志被压缩的日志。 该比率限制日志中浪费的最大空间重复(在最多50%的日志中可以是重复的50%)。更高的比率意味着更少,更有效的清洁,但意味着日志中的浪费更多。 | double | 0.5 | [0,...,1] | log.cleaner.min.cleanable.ratio | medium |
min.compaction.lag.ms | 消息在日志中保持不压缩的最短时间。仅适用于正在压缩的日志。 | long | 0 | [0,...] | log.cleaner.min.compaction.lag.ms | medium |
min.insync.replicas | 当生产者设置应答为"all"(或“-1”)时,此配置指定了成功写入的副本应答的最小数。如果没满足此最小数,则生产者将引发异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend) 当min.insync.replicas和acks强制更大的耐用性时。典型的情况是创建一个副本为3的topic,将min.insync.replicas设置为2,并设置acks为“all”。如果多数副本没有收到写入,这将确保生产者引发异常。 |
int | 1 | [1,...] | min.insync.replicas | medium |
preallocate | 如果我们在创建新的日志段时在磁盘上预分配该文件,那么设为True。 | boolean | false | log.preallocate | medium | |
retention.bytes | 如果我们使用“删除”保留策略,则此配置将控制日志可以增长的最大大小,之后我们将丢弃旧的日志段以释放空间。默认情况下,没有设置大小限制则仅限于时间限制。 | long | -1 | log.retention.bytes | medium | |
retention.ms | 如果我们使用“删除”保留策略,则此配置控制我们将保留日志的最长时间,然后我们将丢弃旧的日志段以释放空间。这代表SLA消费者必须读取数据的时间长度。 | long | 604800000 | log.retention.ms | medium | |
segment.bytes | 此配置控制日志的段文件大小。一次保留和清理一个文件,因此较大的段大小意味着较少的文件,但对保留率的粒度控制较少。 | int | 1073741824 | [14,...] | log.segment.bytes | medium |
segment.index.bytes | 此配置控制offset映射到文件位置的索引的大小。我们预先分配此索引文件,并在日志滚动后收缩它。通常不需要更改此设置。 | int | 10485760 | [0,...] | log.index.size.max.bytes | medium |
segment.jitter.ms | 从计划的段滚动时间减去最大随机抖动,以避免异常的段滚动 | long | 0 | [0,...] | log.roll.jitter.ms | medium |
segment.ms | 此配置控制Kafka强制日志滚动的时间段,以确保保留可以删除或压缩旧数据,即使段文件未满。 | long | 604800000 | [0,...] | log.roll.ms | medium |
unclean.leader.election.enable | 是否将不在ISR中的副本作为最后的手段选举为leader,即使这样做可能会导致数据丢失。 | boolean | false | unclean.leader.election.enable | medium |
kafka新版本新增的配置
kafka > 2.0
名称 | 描述 | 类型 | 默认 | 有效的值 | 服务器默认属性 | 重要程度 |
---|---|---|---|---|---|---|
message.downconversion.enable | 此配置控制是否启用消息格式的向下转换以满足消费请求。当设置为false时,broker不会对期待旧消息格式的消费者执行向下转换。broker 会对来自此类旧客户端的消费请求作出 UNSUPPORTED_VERSION 错误响应。这个配置不适用于复制到followers时可能需要的任何消息格式转换。 |
boolean | false | log.message.downconversion.enable | 低 |
请问一下我的分区默认只能储存100条数据,被消费完还是这样,请问有什么配置可以修改吗
max.message.bytes kafka允许的最大的消息批次大小。如果增加此值,则消费者的版本比0.10.2老,
-----
max.message.bytes kafka允许的最大的消息批次大小。如果增加此值,【并且】消费者的版本比0.10.2老,
赞,已更新。
你好,我的Kafka环境只有一个Broker,但是配了2块硬盘,我创建了一个Topic,请问这个Topic会创建在哪个硬盘上?或者说我怎么才能指定Topic创建在某个硬盘上,谢谢。
你指定的日志存储目录。
跟硬盘无关吧,要看你指定的数据存储目录在文件系统上的哪个目录里了
问下topic在创建成功之后再设置replication-factor是不是不可以,topic更新的内容是不是只能在列表里面的项。
可以,https://www.orchome.com/31
ok,跪谢大神~
消费者,怎么设置解压数据的