不是警告, 我发现这个还是因为重启后同步数据结果目标集群消费不到了, 我切回去才看到进程显示已杀死,的确就是没起来. 我猜测其实是重启mm2后程序检测到目标集群对应的topic都有, 无法创建新的topic, 之前没有删除前缀的时候, 每次重启mm2都会在上一个自动创建的topic基础上再创建一个, 比如说第一次创建了A.test, 第二次就会创建A.A.test, 以此类推. 取消前缀后应该是无法创建, 所以进程才会自动挂掉
这是警告,不是报错,验证过数据的一致性了吗?
单向我也试了, 一开始发送的数据会连着发很多遍, 我猜测可能同步生成的topic不止一个, 如果都替换掉前缀, 那就相当于多条监控的进程都往一个topic里发, 导致数据发送多次
后来删除一些多余配置后, 配置内容如下:
clusters = A, B
A.bootstrap.servers = kafka1:9092
B.bootstrap.servers = kafka2:9092
A->B.enabled = true
A->B.topics = .*
A->B.groups = .*
*B->A.enabled = true
*B->A.topics = .*
*B->A.groups = .*
#replication.policy.class=org.apache.kafka.connect.mirror.DefaultReplicationPolicy
replication.policy.separator=
source.cluster.alias=
target.cluster.alias=
replication.factor=1
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1
refresh.topics.enabled = true
refresh.topics.interval.seconds = 1
refresh.groups.enabled = true
refresh.groups.interval.seconds = 2
sync.topic.configs.enabled = true
sync.topic.acls.enabled = false
sync.group.offsets.enabled = true
sync.group.offsets.interval.seconds = 1
结果一开始同步发现没问题, 但是关闭mm2再重启就会报错, 报错内容如下:
[2023-12-26 17:25:40,327] WARN Could not create topic test_a. (org.apache.kafka.connect.mirror.MirrorSourceConnector:330)
org.apache.kafka.common.errors.TopicExistsException: Topic 'test_a' already exists.
[2023-12-26 17:25:40,328] WARN Could not create topic heartbeats. (org.apache.kafka.connect.mirror.MirrorSourceConnector:330)
org.apache.kafka.common.errors.TopicExistsException: Topic 'heartbeats' already exists.
[2023-12-26 17:25:40,328] WARN Could not create topic test_mm. (org.apache.kafka.connect.mirror.MirrorSourceConnector:330)
org.apache.kafka.common.errors.TopicExistsException: Topic 'test_mm' already exists.
[2023-12-26 17:25:40,328] WARN Could not create topic -test_mm. (org.apache.kafka.connect.mirror.MirrorSourceConnector:330)
org.apache.kafka.common.errors.TopicExistsException: Topic '-test_mm' already exists.
[2023-12-26 17:25:40,328] WARN Could not create topic -heartbeats. (org.apache.kafka.connect.mirror.MirrorSourceConnector:330)
org.apache.kafka.common.errors.TopicExistsException: Topic '-heartbeats' already exists.
[2023-12-26 17:25:41,100] INFO refreshing idle consumers group offsets at target cluster took 113 ms (org.apache.kafka.connect.mirror.Scheduler:95)
[2023-12-26 17:25:41,102] INFO sync idle consumer group offset from source to target took 0 ms (org.apache.kafka.connect.mirror.Scheduler:95)
[2023-12-26 17:25:41,429] INFO refreshing consumer groups took 99 ms (org.apache.kafka.connect.mirror.Scheduler:95)
已杀死
这就应该是没有开启B->A的问题, 所以单开也不能完全解决这个问题, 我其实是想搞一个灾备的集群, 将A集群的数据同时同步到B集群, 这样就算A集群挂掉也能无缝衔接B集群, 数据和消费偏移量都一致
前缀为了防止无限循环:
去掉前缀就只能A->B,如果是A、B双向复制,那A集群和B集群都会读取topic1,来回同步。
没用过CDH额,这个配置是默认生成的,不过文件内容很简单,只是为了方便你命令形态的东西,写到里面变成固定的,不用每次带那么多参数了,你可以下载一个官方的版本的kafka来获取。
producer.properties的默认内容如下:
cat config/producer.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.producer.ProducerConfig for more details
############################# Producer Basics #############################
# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092
# specify the compression codec for all data generated: none, gzip, snappy, lz4
compression.type=none
# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=
# the maximum amount of time the client will wait for the response of a request
#request.timeout.ms=
# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
#max.block.ms=
# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
#linger.ms=
# the maximum size of a request in bytes
#max.request.size=
# the default batch size in bytes when batching multiple records sent to a partition
#batch.size=
# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
#buffer.memory=