Skip to content

Commit dc5f816

Browse files
tvainikaivanyuviktorsomogyi
authored
KAFKA-9914: Fix replication cycle detection (#22079)
Fix replication cycle detection to detect recursive topic cycles if kafka cluster naming differs between multiple MM2 instances with special handling for IdentityReplicationPolicy. Reviewers: Viktor Somogyi-Vass <viktorsomogyi@gmail.com> --------- Co-authored-by: Ivan Yurchenko <ivanyu@aiven.io> Co-authored-by: Viktor Somogyi-Vass <viktorsomogyi@gmail.com>
1 parent b6d2503 commit dc5f816

2 files changed

Lines changed: 29 additions & 2 deletions

File tree

connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -720,7 +720,16 @@ boolean isCycle(String topic) {
720720
String source = replicationPolicy.topicSource(topic);
721721
if (source == null) {
722722
return false;
723-
} else if (source.equals(sourceAndTarget.target())) {
723+
}
724+
725+
final boolean condition;
726+
if (replicationPolicy instanceof IdentityReplicationPolicy) {
727+
condition = source.equals(sourceAndTarget.target());
728+
} else {
729+
condition = source.equals(sourceAndTarget.source()) || source.equals(sourceAndTarget.target());
730+
}
731+
732+
if (condition) {
724733
return true;
725734
} else {
726735
String upstreamTopic = replicationPolicy.upstreamTopic(topic);

connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,13 +130,31 @@ public void testReplicatesHeartbeatsWhenDisabledButFilterAllows() {
130130
public void testNoCycles() {
131131
MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
132132
new DefaultReplicationPolicy(), x -> true, getConfigPropertyFilter());
133+
assertFalse(connector.shouldReplicateTopic("source.topic1"), "should not allow cycles");
133134
assertFalse(connector.shouldReplicateTopic("target.topic1"), "should not allow cycles");
134135
assertFalse(connector.shouldReplicateTopic("target.source.topic1"), "should not allow cycles");
135136
assertFalse(connector.shouldReplicateTopic("source.target.topic1"), "should not allow cycles");
136137
assertFalse(connector.shouldReplicateTopic("target.source.target.topic1"), "should not allow cycles");
137138
assertFalse(connector.shouldReplicateTopic("source.target.source.topic1"), "should not allow cycles");
138139
assertTrue(connector.shouldReplicateTopic("topic1"), "should allow anything else");
139-
assertTrue(connector.shouldReplicateTopic("source.topic1"), "should allow anything else");
140+
assertTrue(connector.shouldReplicateTopic("othersource.topic1"), "should allow anything else");
141+
assertTrue(connector.shouldReplicateTopic("othertarget.topic1"), "should allow anything else");
142+
assertTrue(connector.shouldReplicateTopic("other.another.topic1"), "should allow anything else");
143+
144+
final IdentityReplicationPolicy identityReplicationPolicy = new IdentityReplicationPolicy();
145+
final HashMap<String, String> props = new HashMap<>();
146+
props.put("source.cluster.alias", "source");
147+
identityReplicationPolicy.configure(props);
148+
connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
149+
identityReplicationPolicy, x -> true, x -> true);
150+
assertTrue(connector.shouldReplicateTopic("source.topic1"), "should not consider this a cycle");
151+
assertTrue(connector.shouldReplicateTopic("target.topic1"), "should not consider this a cycle");
152+
assertTrue(connector.shouldReplicateTopic("target.source.topic1"), "should not consider this a cycle");
153+
assertTrue(connector.shouldReplicateTopic("source.target.topic1"), "should not consider this a cycle");
154+
assertTrue(connector.shouldReplicateTopic("topic1"), "should not consider this a cycle");
155+
assertTrue(connector.shouldReplicateTopic("othersource.topic1"), "should not consider this a cycle");
156+
assertTrue(connector.shouldReplicateTopic("othertarget.topic1"), "should not consider this a cycle");
157+
assertTrue(connector.shouldReplicateTopic("other.another.topic1"), "should not consider this a cycle");
140158
}
141159

142160
@Test

0 commit comments

Comments
 (0)