Skip to content

Commit 28402a3

Browse files
authored
MINOR: Move CreateTopicsRequestTest to clients-integration-tests and rewrite in Java (#22105)
Rewrite `CreateTopicsRequestTest` in Java and move it from core to `clients-integration-tests`. Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
1 parent 7b5d4ca commit 28402a3

3 files changed

Lines changed: 375 additions & 354 deletions

File tree

Lines changed: 375 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,375 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.clients;
19+
20+
import org.apache.kafka.common.Uuid;
21+
import org.apache.kafka.common.internals.Topic;
22+
import org.apache.kafka.common.message.CreateTopicsRequestData;
23+
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
24+
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignmentCollection;
25+
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
26+
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
27+
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfig;
28+
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfigCollection;
29+
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
30+
import org.apache.kafka.common.protocol.ApiKeys;
31+
import org.apache.kafka.common.protocol.Errors;
32+
import org.apache.kafka.common.requests.ApiError;
33+
import org.apache.kafka.common.requests.CreateTopicsRequest;
34+
import org.apache.kafka.common.requests.CreateTopicsResponse;
35+
import org.apache.kafka.common.requests.MetadataRequest;
36+
import org.apache.kafka.common.requests.MetadataResponse;
37+
import org.apache.kafka.common.test.ClusterInstance;
38+
import org.apache.kafka.common.test.api.ClusterConfigProperty;
39+
import org.apache.kafka.common.test.api.ClusterTest;
40+
import org.apache.kafka.common.test.api.ClusterTestDefaults;
41+
import org.apache.kafka.common.test.api.Type;
42+
import org.apache.kafka.server.IntegrationTestUtils;
43+
import org.apache.kafka.server.config.ReplicationConfigs;
44+
import org.apache.kafka.server.config.ServerLogConfigs;
45+
46+
import java.util.ArrayList;
47+
import java.util.List;
48+
import java.util.Map;
49+
50+
import static org.junit.jupiter.api.Assertions.assertEquals;
51+
import static org.junit.jupiter.api.Assertions.assertFalse;
52+
import static org.junit.jupiter.api.Assertions.assertNotEquals;
53+
import static org.junit.jupiter.api.Assertions.assertNotNull;
54+
import static org.junit.jupiter.api.Assertions.assertTrue;
55+
56+
@ClusterTestDefaults(
57+
types = {Type.KRAFT},
58+
brokers = 3,
59+
serverProperties = {
60+
@ClusterConfigProperty(key = ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"),
61+
}
62+
)
63+
public class CreateTopicsRequestTest {
64+
65+
@ClusterTest
66+
public void testValidCreateTopicsRequests(ClusterInstance cluster) throws Exception {
67+
// Generated assignments
68+
validateValidCreateTopicsRequests(cluster, topicsReq(topicReq("topic1")));
69+
validateValidCreateTopicsRequests(cluster, topicsReq(topicReq("topic2", null, 3)));
70+
validateValidCreateTopicsRequests(cluster, topicsReq(
71+
topicReq("topic3", 5, 2, Map.of("min.insync.replicas", "2"), null)));
72+
73+
// Manual assignments
74+
validateValidCreateTopicsRequests(cluster, topicsReq(
75+
topicReq("topic4", null, null, null, Map.of(0, List.of(0)))));
76+
validateValidCreateTopicsRequests(cluster, topicsReq(
77+
topicReq("topic5", null, null, Map.of("min.insync.replicas", "2"),
78+
Map.of(0, List.of(0, 1), 1, List.of(1, 0), 2, List.of(1, 2)))));
79+
80+
// Mixed
81+
validateValidCreateTopicsRequests(cluster, topicsReq(
82+
topicReq("topic6"),
83+
topicReq("topic7", 5, 2),
84+
topicReq("topic8", null, null, null,
85+
Map.of(0, List.of(0, 1), 1, List.of(1, 0), 2, List.of(1, 2)))));
86+
validateValidCreateTopicsRequests(cluster, topicsReq(true,
87+
topicReq("topic9"),
88+
topicReq("topic10", 5, 2),
89+
topicReq("topic11", null, null, null,
90+
Map.of(0, List.of(0, 1), 1, List.of(1, 0), 2, List.of(1, 2)))));
91+
92+
// Defaults
93+
validateValidCreateTopicsRequests(cluster, topicsReq(topicReq("topic12", -1, -1)));
94+
validateValidCreateTopicsRequests(cluster, topicsReq(topicReq("topic13", -1, 2)));
95+
validateValidCreateTopicsRequests(cluster, topicsReq(topicReq("topic14", 2, -1)));
96+
}
97+
98+
@ClusterTest
99+
public void testErrorCreateTopicsRequests(ClusterInstance cluster) throws Exception {
100+
String existingTopic = "existing-topic";
101+
cluster.createTopic(existingTopic, 1, (short) 1);
102+
103+
int brokerCount = cluster.brokers().size();
104+
105+
// Basic
106+
validateErrorCreateTopicsRequests(cluster,
107+
topicsReq(topicReq(existingTopic)),
108+
Map.of(existingTopic, new ApiError(Errors.TOPIC_ALREADY_EXISTS, "Topic 'existing-topic' already exists.")), true);
109+
validateErrorCreateTopicsRequests(cluster,
110+
topicsReq(topicReq("error-partitions", -2, null)),
111+
Map.of("error-partitions", new ApiError(Errors.INVALID_PARTITIONS)), false);
112+
validateErrorCreateTopicsRequests(cluster,
113+
topicsReq(topicReq("error-replication", null, brokerCount + 1)),
114+
Map.of("error-replication", new ApiError(Errors.INVALID_REPLICATION_FACTOR)), false);
115+
validateErrorCreateTopicsRequests(cluster,
116+
topicsReq(topicReq("error-config", null, null, Map.of("not.a.property", "error"), null)),
117+
Map.of("error-config", new ApiError(Errors.INVALID_CONFIG)), false);
118+
validateErrorCreateTopicsRequests(cluster,
119+
topicsReq(topicReq("error-assignment", null, null, null,
120+
Map.of(0, List.of(0, 1), 1, List.of(0)))),
121+
Map.of("error-assignment", new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT)), false);
122+
123+
// Partial
124+
validateErrorCreateTopicsRequests(cluster,
125+
topicsReq(
126+
topicReq(existingTopic),
127+
topicReq("partial-partitions", -2, null),
128+
topicReq("partial-replication", null, brokerCount + 1),
129+
topicReq("partial-assignment", null, null, null,
130+
Map.of(0, List.of(0, 1), 1, List.of(0))),
131+
topicReq("partial-none")),
132+
Map.of(
133+
existingTopic, new ApiError(Errors.TOPIC_ALREADY_EXISTS),
134+
"partial-partitions", new ApiError(Errors.INVALID_PARTITIONS),
135+
"partial-replication", new ApiError(Errors.INVALID_REPLICATION_FACTOR),
136+
"partial-assignment", new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT),
137+
"partial-none", new ApiError(Errors.NONE)),
138+
false);
139+
validateTopicExists(cluster, "partial-none", 1);
140+
}
141+
142+
@ClusterTest
143+
public void testInvalidCreateTopicsRequests(ClusterInstance cluster) throws Exception {
144+
// Partitions/ReplicationFactor and ReplicaAssignment should not both be specified
145+
validateErrorCreateTopicsRequests(cluster,
146+
topicsReq(topicReq("bad-args-topic", 10, 3, null, Map.of(0, List.of(0)))),
147+
Map.of("bad-args-topic", new ApiError(Errors.INVALID_REQUEST)), false);
148+
149+
validateErrorCreateTopicsRequests(cluster,
150+
topicsReq(true, topicReq("bad-args-topic", 10, 3, null, Map.of(0, List.of(0)))),
151+
Map.of("bad-args-topic", new ApiError(Errors.INVALID_REQUEST)), false);
152+
}
153+
154+
@ClusterTest
155+
public void testCreateTopicsRequestVersions(ClusterInstance cluster) throws Exception {
156+
for (short version = ApiKeys.CREATE_TOPICS.oldestVersion(); version <= ApiKeys.CREATE_TOPICS.latestVersion(); version++) {
157+
String topic = "topic_" + version;
158+
CreateTopicsRequestData data = new CreateTopicsRequestData()
159+
.setTimeoutMs(10000)
160+
.setValidateOnly(false)
161+
.setTopics(new CreatableTopicCollection(List.of(
162+
topicReq(topic, 1, 1, Map.of("min.insync.replicas", "2"), null)
163+
).iterator()));
164+
165+
CreateTopicsRequest request = new CreateTopicsRequest.Builder(data).build(version);
166+
CreateTopicsResponse response = sendCreateTopicRequest(cluster, request);
167+
168+
CreatableTopicResult topicResponse = response.data().topics().find(topic);
169+
assertNotNull(topicResponse);
170+
assertEquals(topic, topicResponse.name());
171+
assertEquals(Errors.NONE.code(), topicResponse.errorCode());
172+
173+
if (version >= 5) {
174+
assertEquals(1, topicResponse.numPartitions());
175+
assertEquals(1, topicResponse.replicationFactor());
176+
var config = topicResponse.configs().stream()
177+
.filter(c -> "min.insync.replicas".equals(c.name())).findFirst();
178+
assertTrue(config.isPresent());
179+
assertEquals("2", config.get().value());
180+
} else {
181+
assertEquals(-1, topicResponse.numPartitions());
182+
assertEquals(-1, topicResponse.replicationFactor());
183+
assertTrue(topicResponse.configs().isEmpty());
184+
}
185+
186+
if (version >= 7) {
187+
assertNotEquals(Uuid.ZERO_UUID, topicResponse.topicId());
188+
} else {
189+
assertEquals(Uuid.ZERO_UUID, topicResponse.topicId());
190+
}
191+
}
192+
}
193+
194+
@ClusterTest
195+
public void testCreateClusterMetadataTopic(ClusterInstance cluster) throws Exception {
196+
validateErrorCreateTopicsRequests(cluster,
197+
topicsReq(topicReq(Topic.CLUSTER_METADATA_TOPIC_NAME)),
198+
Map.of(Topic.CLUSTER_METADATA_TOPIC_NAME,
199+
new ApiError(Errors.INVALID_REQUEST,
200+
"Creation of internal topic " + Topic.CLUSTER_METADATA_TOPIC_NAME + " is prohibited.")),
201+
true);
202+
}
203+
204+
private static CreateTopicsRequest topicsReq(CreatableTopic... topics) {
205+
return topicsReq(false, topics);
206+
}
207+
208+
private static CreateTopicsRequest topicsReq(boolean validateOnly, CreatableTopic... topics) {
209+
var req = new CreateTopicsRequestData()
210+
.setTimeoutMs(10000)
211+
.setTopics(new CreatableTopicCollection(List.of(topics).iterator()))
212+
.setValidateOnly(validateOnly);
213+
return new CreateTopicsRequest.Builder(req).build();
214+
}
215+
216+
private static CreatableTopic topicReq(String name) {
217+
return topicReq(name, null, null, null, null);
218+
}
219+
220+
private static CreatableTopic topicReq(String name, Integer numPartitions, Integer replicationFactor) {
221+
return topicReq(name, numPartitions, replicationFactor, null, null);
222+
}
223+
224+
private static CreatableTopic topicReq(
225+
String name,
226+
Integer numPartitions,
227+
Integer replicationFactor,
228+
Map<String, String> config,
229+
Map<Integer, List<Integer>> assignment
230+
) {
231+
CreatableTopic topic = new CreatableTopic();
232+
topic.setName(name);
233+
if (numPartitions != null) {
234+
topic.setNumPartitions(numPartitions);
235+
} else if (assignment != null) {
236+
topic.setNumPartitions(-1);
237+
} else {
238+
topic.setNumPartitions(1);
239+
}
240+
if (replicationFactor != null) {
241+
topic.setReplicationFactor(replicationFactor.shortValue());
242+
} else if (assignment != null) {
243+
topic.setReplicationFactor((short) -1);
244+
} else {
245+
topic.setReplicationFactor((short) 1);
246+
}
247+
if (config != null) {
248+
var effectiveConfigs = new CreatableTopicConfigCollection();
249+
config.forEach((configName, configValue) ->
250+
effectiveConfigs.add(new CreatableTopicConfig()
251+
.setName(configName)
252+
.setValue(configValue)));
253+
topic.setConfigs(effectiveConfigs);
254+
}
255+
if (assignment != null) {
256+
var effectiveAssignments = new CreatableReplicaAssignmentCollection();
257+
assignment.forEach((partitionIndex, brokerIdList) ->
258+
effectiveAssignments.add(new CreatableReplicaAssignment()
259+
.setPartitionIndex(partitionIndex)
260+
.setBrokerIds(new ArrayList<>(brokerIdList))));
261+
topic.setAssignments(effectiveAssignments);
262+
}
263+
return topic;
264+
}
265+
266+
private static void validateValidCreateTopicsRequests(ClusterInstance cluster, CreateTopicsRequest request) throws Exception {
267+
CreateTopicsResponse response = sendCreateTopicRequest(cluster, request);
268+
269+
assertFalse(response.errorCounts().keySet().stream().anyMatch(e -> e.code() > 0),
270+
"There should be no errors, found " + response.errorCounts().keySet());
271+
272+
for (CreatableTopic topic : request.data().topics()) {
273+
if (!request.data().validateOnly()) {
274+
int partitions = !topic.assignments().isEmpty()
275+
? topic.assignments().size()
276+
: (topic.numPartitions() == -1 ? defaultNumPartitions(cluster) : topic.numPartitions());
277+
cluster.waitTopicCreation(topic.name(), partitions);
278+
}
279+
verifyMetadata(cluster, topic, request.data().validateOnly());
280+
}
281+
}
282+
283+
private static void verifyMetadata(ClusterInstance cluster, CreatableTopic topic,
284+
boolean validateOnly) throws Exception {
285+
MetadataResponse metadataResponse = sendMetadataRequest(cluster,
286+
new MetadataRequest.Builder(List.of(topic.name()), false).build());
287+
MetadataResponse.TopicMetadata metadataForTopic = metadataResponse.topicMetadata().stream()
288+
.filter(t -> topic.name().equals(t.topic())).findFirst().orElse(null);
289+
290+
int partitions = !topic.assignments().isEmpty() ? topic.assignments().size() : topic.numPartitions();
291+
int replication = !topic.assignments().isEmpty() ? topic.assignments().iterator().next().brokerIds().size() : topic.replicationFactor();
292+
293+
if (validateOnly) {
294+
assertNotNull(metadataForTopic);
295+
assertNotEquals(Errors.NONE, metadataForTopic.error(), "Topic " + topic + " should not be created");
296+
assertTrue(metadataForTopic.partitionMetadata().isEmpty(), "The topic should have no partitions");
297+
} else {
298+
assertNotNull(metadataForTopic, "The topic should be created");
299+
assertEquals(Errors.NONE, metadataForTopic.error());
300+
if (partitions == -1) {
301+
assertEquals(defaultNumPartitions(cluster), metadataForTopic.partitionMetadata().size(),
302+
"The topic should have the default number of partitions");
303+
} else {
304+
assertEquals(partitions, metadataForTopic.partitionMetadata().size(),
305+
"The topic should have the correct number of partitions");
306+
}
307+
308+
if (replication == -1) {
309+
assertEquals(defaultReplicationFactor(cluster), metadataForTopic.partitionMetadata().get(0).replicaIds.size(),
310+
"The topic should have the default replication factor");
311+
} else {
312+
assertEquals(replication, metadataForTopic.partitionMetadata().get(0).replicaIds.size(),
313+
"The topic should have the correct replication factor");
314+
}
315+
}
316+
}
317+
318+
private static void validateErrorCreateTopicsRequests(
319+
ClusterInstance cluster,
320+
CreateTopicsRequest request,
321+
Map<String, ApiError> expectedResponse,
322+
boolean checkErrorMessage
323+
) throws Exception {
324+
CreateTopicsResponse response = sendCreateTopicRequest(cluster, request);
325+
assertEquals(expectedResponse.size(), response.data().topics().size(), "The response size should match");
326+
327+
for (var entry : expectedResponse.entrySet()) {
328+
String topicName = entry.getKey();
329+
ApiError expectedError = entry.getValue();
330+
331+
CreatableTopicResult actual = response.data().topics().find(topicName);
332+
if (actual == null) {
333+
throw new RuntimeException("No response data found for topic " + topicName);
334+
}
335+
assertEquals(expectedError.error().code(), actual.errorCode(), "The response error code should match");
336+
if (checkErrorMessage) {
337+
assertEquals(expectedError.message(), actual.errorMessage(), "The response error message should match");
338+
}
339+
// If no error, validate topic exists
340+
if (expectedError.isSuccess() && !request.data().validateOnly()) {
341+
CreatableTopic topic = request.data().topics().find(topicName);
342+
int partitions = !topic.assignments().isEmpty()
343+
? topic.assignments().size()
344+
: (topic.numPartitions() == -1 ? defaultNumPartitions(cluster) : topic.numPartitions());
345+
validateTopicExists(cluster, topicName, partitions);
346+
}
347+
}
348+
}
349+
350+
private static void validateTopicExists(ClusterInstance cluster, String topic, int partitions) throws Exception {
351+
cluster.waitTopicCreation(topic, partitions);
352+
MetadataResponse metadataResponse = sendMetadataRequest(cluster,
353+
new MetadataRequest.Builder(List.of(topic), true).build());
354+
assertTrue(metadataResponse.topicMetadata().stream().anyMatch(p -> topic.equals(p.topic()) && p.error() == Errors.NONE),
355+
"The topic should be created");
356+
}
357+
358+
private static CreateTopicsResponse sendCreateTopicRequest(ClusterInstance cluster, CreateTopicsRequest request) throws Exception {
359+
return IntegrationTestUtils.connectAndReceive(request, cluster.brokerBoundPorts().get(0));
360+
}
361+
362+
private static MetadataResponse sendMetadataRequest(ClusterInstance cluster, MetadataRequest request) throws Exception {
363+
return IntegrationTestUtils.connectAndReceive(request, cluster.brokerBoundPorts().get(0));
364+
}
365+
366+
private static int defaultNumPartitions(ClusterInstance cluster) {
367+
return Integer.parseInt(cluster.config().serverProperties()
368+
.getOrDefault(ServerLogConfigs.NUM_PARTITIONS_CONFIG, "1"));
369+
}
370+
371+
private static int defaultReplicationFactor(ClusterInstance cluster) {
372+
return Integer.parseInt(cluster.config().serverProperties()
373+
.getOrDefault(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "1"));
374+
}
375+
}

0 commit comments

Comments
 (0)