我自己在本地装了两个centos7的虚拟机, 部署的kafka版本为2.7.0, 现在想把kafka1作为源集群, kafka2作为目标集群, 将kafka1中指定topic的数据和对应groupid的消费偏移量同步到kafka2中相同的topic和groupid上, 实现数据和消费偏移量的实时同步。
配置文件 connect-mirror-maker.properties 内容如下
# 集群配置
clusters = A, B
# 集群A配置
A.bootstrap.servers = kafka1:9092
A->B.enabled = true # 启用从集群A到集群B的复制
A->B.topics = .* # 复制所有的主题从集群A到集群B
A->B.groups = .* # 复制所有的消费者组从集群A到集群B
# 集群B配置
B.bootstrap.servers = kafka2:9092
B->A.enabled = true # 启用从集群B到集群A的复制
B->A.topics = .* # 复制所有的主题从集群B到集群A
B->A.groups = .* # 复制所有的消费者组从集群B到集群A
# 复制策略配置
# replication.policy.class的取值有两个 DefaultReplicationPolicy 和 CustomReplicationPolicy
# replication.policy.class=org.apache.kafka.connect.mirror.DefaultReplicationPolicy # 这是默认值
replication.factor=1 # 复制因子,指定每个主题的复制因子
# 数据检查点配置
checkpoints.topic.replication.factor=1 # 检查点主题的复制因子
heartbeats.topic.replication.factor=1 # 心跳主题的复制因子
offset-syncs.topic.replication.factor=1 # 偏移同步主题的复制因子
offset.storage.replication.factor=1 # 偏移存储的复制因子
status.storage.replication.factor=1 # 状态存储的复制因子
config.storage.replication.factor=1 # 配置存储的复制因子
# 刷新配置
refresh.topics.enabled = true # 启用主题刷新
refresh.topics.interval.seconds = 3 # 刷新主题的时间间隔(秒)
refresh.groups.enabled = true # 启用消费者组刷新
refresh.groups.interval.seconds = 3 # 刷新消费者组的时间间隔(秒)
# 同步配置
sync.topic.configs.enabled = true # 启用主题配置同步
sync.topic.acls.enabled = false # 启用主题ACL同步
sync.group.offsets.enabled = true # 启用消费者组偏移同步
sync.group.offsets.interval.seconds = 3 # 同步消费者组偏移的时间间隔(秒)
replication.policy.class取值有两个, 分别是 DefaultReplicationPolicy 和 CustomReplicationPolicy。
- DefaultReplicationPolicy: 默认取值, 这个策略会把同步至目标集群的topic都加上一个源集群别名的前缀,比如源集群别名为A,topic为:bi-log,该topic同步到目标集群后会变成:A.bi-log,目的是为了避免双向同步的场景出现死循环, 官方的解释是为了避免在复杂的镜像拓扑中重写数据。 需要在复制流设计和主题管理方面小心自定义此项,以避免数据丢失。
- CustomReplicationPolicy: 使用自定义复制策略类来完成此操作。
上述的配置文件需要在源集群和目标集群中都创建, 并且两个集群都要启动, 启动命令如下:
bin/connect-mirror-maker.sh config/connect-mirror-maker.properties --clusters A
bin/connect-mirror-maker.sh config/connect-mirror-maker.properties --clusters B
我的问题是, kafka官方网站对CustomReplicationPolicy策略写的不多, 而我需要把数据同步到跟源集群topic名一致的topic里, 如何配置配置文件才能实现?
ps: kafka权威指南第一版和第二版对使用MirrorMaker2的描述非常简略, 网上的参考资料需要通过修改kafka源码来实现, 我这边没有部署和开发java的条件(运维平台只有服务器的linux命令行)
从 Kafka 3.0.0 开始,只需设置:
replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy
也可以通过以下设置,删除前缀:
"replication.policy.separator": "" "source.cluster.alias": "", "target.cluster.alias": "",
kafka3.0版本的这个参数需要三节点同步三节点, 我个人在我笔记本上得搭建六个虚拟机, 目前没带动只能再试试, 删除前缀的操作死循环了, 目标kafka集群的topic不停同步数据, 没成功, 谢谢大佬指点, 我试试3.0怎么实现吧
前缀为了防止无限循环:
去掉前缀就只能A->B,如果是A、B双向复制,那A集群和B集群都会读取topic1,来回同步。
单向我也试了, 一开始发送的数据会连着发很多遍, 我猜测可能同步生成的topic不止一个, 如果都替换掉前缀, 那就相当于多条监控的进程都往一个topic里发, 导致数据发送多次
后来删除一些多余配置后, 配置内容如下:
clusters = A, B A.bootstrap.servers = kafka1:9092 B.bootstrap.servers = kafka2:9092 A->B.enabled = true A->B.topics = .* A->B.groups = .* *B->A.enabled = true *B->A.topics = .* *B->A.groups = .* #replication.policy.class=org.apache.kafka.connect.mirror.DefaultReplicationPolicy replication.policy.separator= source.cluster.alias= target.cluster.alias= replication.factor=1 checkpoints.topic.replication.factor=1 heartbeats.topic.replication.factor=1 offset-syncs.topic.replication.factor=1 offset.storage.replication.factor=1 status.storage.replication.factor=1 config.storage.replication.factor=1 refresh.topics.enabled = true refresh.topics.interval.seconds = 1 refresh.groups.enabled = true refresh.groups.interval.seconds = 2 sync.topic.configs.enabled = true sync.topic.acls.enabled = false sync.group.offsets.enabled = true sync.group.offsets.interval.seconds = 1
结果一开始同步发现没问题, 但是关闭mm2再重启就会报错, 报错内容如下:
[2023-12-26 17:25:40,327] WARN Could not create topic test_a. (org.apache.kafka.connect.mirror.MirrorSourceConnector:330) org.apache.kafka.common.errors.TopicExistsException: Topic 'test_a' already exists. [2023-12-26 17:25:40,328] WARN Could not create topic heartbeats. (org.apache.kafka.connect.mirror.MirrorSourceConnector:330) org.apache.kafka.common.errors.TopicExistsException: Topic 'heartbeats' already exists. [2023-12-26 17:25:40,328] WARN Could not create topic test_mm. (org.apache.kafka.connect.mirror.MirrorSourceConnector:330) org.apache.kafka.common.errors.TopicExistsException: Topic 'test_mm' already exists. [2023-12-26 17:25:40,328] WARN Could not create topic -test_mm. (org.apache.kafka.connect.mirror.MirrorSourceConnector:330) org.apache.kafka.common.errors.TopicExistsException: Topic '-test_mm' already exists. [2023-12-26 17:25:40,328] WARN Could not create topic -heartbeats. (org.apache.kafka.connect.mirror.MirrorSourceConnector:330) org.apache.kafka.common.errors.TopicExistsException: Topic '-heartbeats' already exists. [2023-12-26 17:25:41,100] INFO refreshing idle consumers group offsets at target cluster took 113 ms (org.apache.kafka.connect.mirror.Scheduler:95) [2023-12-26 17:25:41,102] INFO sync idle consumer group offset from source to target took 0 ms (org.apache.kafka.connect.mirror.Scheduler:95) [2023-12-26 17:25:41,429] INFO refreshing consumer groups took 99 ms (org.apache.kafka.connect.mirror.Scheduler:95) 已杀死
这就应该是没有开启B->A的问题, 所以单开也不能完全解决这个问题, 我其实是想搞一个灾备的集群, 将A集群的数据同时同步到B集群, 这样就算A集群挂掉也能无缝衔接B集群, 数据和消费偏移量都一致
这是警告,不是报错,验证过数据的一致性了吗?
不是警告, 我发现这个还是因为重启后同步数据结果目标集群消费不到了, 我切回去才看到进程显示已杀死,的确就是没起来. 我猜测其实是重启mm2后程序检测到目标集群对应的topic都有, 无法创建新的topic, 之前没有删除前缀的时候, 每次重启mm2都会在上一个自动创建的topic基础上再创建一个, 比如说第一次创建了A.test, 第二次就会创建A.A.test, 以此类推. 取消前缀后应该是无法创建, 所以进程才会自动挂掉
你的答案