Skip to content

Commit 7e548de

Browse files
committed
Injectable kafka consumer/producers with @KafkaConsumerConfig and @KafkaProducerConfig
1 parent f4c6fa6 commit 7e548de

16 files changed

Lines changed: 436 additions & 374 deletions

File tree

core/src/main/java/org/microshed/testing/jupiter/MicroShedTestExtension.java

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@
2323
import java.lang.reflect.Modifier;
2424
import java.net.URL;
2525
import java.util.ArrayList;
26+
import java.util.Arrays;
27+
import java.util.Collection;
2628
import java.util.List;
2729
import java.util.Optional;
30+
import java.util.Properties;
2831

2932
import org.junit.jupiter.api.extension.BeforeAllCallback;
3033
import org.junit.jupiter.api.extension.ExtensionConfigurationException;
@@ -36,6 +39,8 @@
3639
import org.microshed.testing.jaxrs.RestClientBuilder;
3740
import org.microshed.testing.jwt.JwtBuilder;
3841
import org.microshed.testing.jwt.JwtConfig;
42+
import org.microshed.testing.kafka.KafkaConsumerConfig;
43+
import org.microshed.testing.kafka.KafkaProducerConfig;
3944
import org.slf4j.Logger;
4045
import org.slf4j.LoggerFactory;
4146

@@ -63,6 +68,7 @@ public void beforeAll(ExtensionContext context) throws Exception {
6368
config.start();
6469
configureRestAssured(config);
6570
injectRestClients(testClass);
71+
injectKafkaClients(testClass);
6672
}
6773

6874
private static void injectRestClients(Class<?> clazz) {
@@ -90,7 +96,96 @@ private static void injectRestClients(Class<?> clazz) {
9096
restClientField.set(null, restClient);
9197
LOG.debug("Injected rest client for " + restClientField);
9298
} catch (Exception e) {
93-
throw new ExtensionConfigurationException("Unable to set field " + restClientField, e);
99+
throw new ExtensionConfigurationException("Unable to inject field " + restClientField, e);
100+
}
101+
}
102+
}
103+
104+
private void injectKafkaClients(Class<?> clazz) {
105+
// Verify kafka-client and testcontainers-kafka is on classpath
106+
Class<?> KafkaProducer = tryLoad("org.apache.kafka.clients.producer.KafkaProducer");
107+
Class<?> KafkaConsumer = tryLoad("org.apache.kafka.clients.consumer.KafkaConsumer");
108+
if (KafkaProducer == null || KafkaConsumer == null)
109+
return;
110+
String globalBootstrapServers = System.getProperty("org.microshed.kafka.bootstrap.servers");
111+
112+
List<Field> kafkaProducerFields = AnnotationSupport.findAnnotatedFields(clazz, KafkaProducerConfig.class);
113+
for (Field producerField : kafkaProducerFields) {
114+
if (!KafkaProducer.isAssignableFrom(producerField.getType())) {
115+
throw new ExtensionConfigurationException("Fields annotated with @KafkaProducerConfig must be of the type " + KafkaProducer.getName());
116+
}
117+
if (!Modifier.isPublic(producerField.getModifiers()) ||
118+
!Modifier.isStatic(producerField.getModifiers()) ||
119+
Modifier.isFinal(producerField.getModifiers())) {
120+
throw new ExtensionConfigurationException("The KafkaProducer field annotated with @KafkaProducerConfig " +
121+
"must be public, static, and non-final: " + producerField);
122+
}
123+
124+
KafkaProducerConfig producerConfig = producerField.getAnnotation(KafkaProducerConfig.class);
125+
Properties properties = new Properties();
126+
String bootstrapServers = producerConfig.bootstrapServers().isEmpty() ? globalBootstrapServers : producerConfig.bootstrapServers();
127+
if (bootstrapServers.isEmpty())
128+
throw new ExtensionConfigurationException("To use @KafkaProducerConfig on a KafkaProducer a bootstrap server must be " +
129+
"defined in the @KafkaProducerConfig annotation or using the " +
130+
"'org.microshed.kafka.bootstrap.servers' system property");
131+
properties.put("bootstrap.servers", bootstrapServers);
132+
properties.put("key.serializer", producerConfig.keySerializer().getName());
133+
properties.put("value.serializer", producerConfig.valueSerializer().getName());
134+
for (String prop : producerConfig.properties()) {
135+
int split = prop.indexOf("=");
136+
if (split < 2)
137+
throw new ExtensionConfigurationException("The property '" + prop + "' for field " + producerField + " must be in the format 'key=value'");
138+
properties.put(prop.substring(0, split), prop.substring(split + 1));
139+
}
140+
try {
141+
Object producer = KafkaProducer.getConstructor(Properties.class).newInstance(properties);
142+
producerField.set(null, producer);
143+
LOG.debug("Injected kafka producer for " + producerField + " with config " + producerConfig);
144+
} catch (Exception e) {
145+
throw new ExtensionConfigurationException("Unable to inject field " + producerField, e);
146+
}
147+
}
148+
149+
List<Field> kafkaConsumerFields = AnnotationSupport.findAnnotatedFields(clazz, KafkaConsumerConfig.class);
150+
for (Field consumerField : kafkaConsumerFields) {
151+
if (!KafkaConsumer.isAssignableFrom(consumerField.getType())) {
152+
throw new ExtensionConfigurationException("Fields annotated with @KafkaConsumerConfig must be of the type " + KafkaConsumer.getName());
153+
}
154+
if (!Modifier.isPublic(consumerField.getModifiers()) ||
155+
!Modifier.isStatic(consumerField.getModifiers()) ||
156+
Modifier.isFinal(consumerField.getModifiers())) {
157+
throw new ExtensionConfigurationException("The KafkaProducer field annotated with @KafkaConsumerConfig " +
158+
"must be public, static, and non-final: " + consumerField);
159+
}
160+
161+
KafkaConsumerConfig consumerConfig = consumerField.getAnnotation(KafkaConsumerConfig.class);
162+
Properties properties = new Properties();
163+
String bootstrapServers = consumerConfig.bootstrapServers().isEmpty() ? globalBootstrapServers : consumerConfig.bootstrapServers();
164+
if (bootstrapServers.isEmpty())
165+
throw new ExtensionConfigurationException("To use @KafkaConsumerConfig on a KafkaConsumer a bootstrap server must be " +
166+
"defined in the @KafkaConsumerConfig annotation or using the " +
167+
"'org.microshed.kafka.bootstrap.servers' system property");
168+
properties.put("bootstrap.servers", bootstrapServers);
169+
properties.put("group.id", consumerConfig.groupId());
170+
properties.put("key.deserializer", consumerConfig.keyDeserializer().getName());
171+
properties.put("value.deserializer", consumerConfig.valueDeserializer().getName());
172+
for (String prop : consumerConfig.properties()) {
173+
int split = prop.indexOf("=");
174+
if (split < 2)
175+
throw new ExtensionConfigurationException("The property '" + prop + "' for field " + consumerField + " must be in the format 'key=value'");
176+
properties.put(prop.substring(0, split), prop.substring(split + 1));
177+
}
178+
try {
179+
Object consumer = KafkaConsumer.getConstructor(Properties.class).newInstance(properties);
180+
consumerField.set(null, consumer);
181+
LOG.debug("Injected kafka consumer for " + consumerField + " with config " + consumerConfig);
182+
if (consumerConfig.topics().length > 0) {
183+
Collection<String> topics = Arrays.asList(consumerConfig.topics());
184+
KafkaConsumer.getMethod("subscribe", Collection.class).invoke(consumer, topics);
185+
LOG.debug("Subscribed kafka consumer for " + consumerField + " to topics " + topics);
186+
}
187+
} catch (Exception e) {
188+
throw new ExtensionConfigurationException("Unable to inject field " + consumerField, e);
94189
}
95190
}
96191
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright (c) 2020 IBM Corporation and others
3+
*
4+
* See the NOTICE file(s) distributed with this work for additional
5+
* information regarding copyright ownership.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* You may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.microshed.testing.kafka;
20+
21+
import java.lang.annotation.ElementType;
22+
import java.lang.annotation.Retention;
23+
import java.lang.annotation.RetentionPolicy;
24+
import java.lang.annotation.Target;
25+
26+
/**
27+
* Identifies an injection point for a <code>org.apache.kafka.clients.consumer.KafkaConsumer</code>
28+
* The annotated field MUST be <code>public static</code> and non-final.
29+
*
30+
* The injected <code>KafkaConsumer</code> will be auto-configured according the values
31+
* in this annotation.
32+
*/
33+
@Target({ ElementType.FIELD })
34+
@Retention(RetentionPolicy.RUNTIME)
35+
public @interface KafkaConsumerConfig {
36+
37+
/**
38+
* @return Sets the <code>bootstrap.servers</code> property for the injected <code>KafkaConsumer</code>.
39+
* Otherwise, the <code>org.microshed.kafka.bootstrap.servers</code> system property is used if set.
40+
* Otherwise, any <code>org.testcontainers.containers.KafkaContainer</code> discovered in the test
41+
* will be used.
42+
* If none of the previous options are discovered, an error is raised.
43+
*/
44+
String bootstrapServers() default "";
45+
46+
/**
47+
* @return Sets the <code>key.deserializer</code> property for the injected <code>KafkaConsumer</code>.
48+
*/
49+
Class<?> keyDeserializer();
50+
51+
/**
52+
* @return Sets the <code>value.deserializer</code> property for the injected <code>KafkaConsumer</code>.
53+
*/
54+
Class<?> valueDeserializer();
55+
56+
/**
57+
* @return Sets the <code>group.id</code> property for the injected <code>KafkaConsumer</code>.
58+
*/
59+
String groupId();
60+
61+
/**
62+
* @return The topics that the injected <code>KafkaConsumer</code> will be automatically subscribed to.
63+
*/
64+
String[] topics() default {};
65+
66+
/**
67+
* @return An optional array of <code>key=value</code> strings, which will be used as configuration options
68+
* in the injected <code>KafkaConsumer</code>.
69+
*/
70+
String[] properties() default {};
71+
72+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright (c) 2020 IBM Corporation and others
3+
*
4+
* See the NOTICE file(s) distributed with this work for additional
5+
* information regarding copyright ownership.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* You may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.microshed.testing.kafka;
20+
21+
import java.lang.annotation.ElementType;
22+
import java.lang.annotation.Retention;
23+
import java.lang.annotation.RetentionPolicy;
24+
import java.lang.annotation.Target;
25+
26+
/**
27+
* Identifies an injection point for a <code>org.apache.kafka.clients.producer.KafkaProducer</code>
28+
* The annotated field MUST be <code>public static</code> and non-final.
29+
*
30+
* The injected <code>KafkaProducer</code> will be auto-configured according the values
31+
* in this annotation.
32+
*/
33+
@Target({ ElementType.FIELD })
34+
@Retention(RetentionPolicy.RUNTIME)
35+
public @interface KafkaProducerConfig {
36+
37+
/**
38+
* @return Sets the <code>bootstrap.servers</code> property for the injected <code>KafkaProducer</code>.
39+
* Otherwise, the <code>org.microshed.kafka.bootstrap.servers</code> system property is used if set.
40+
* Otherwise, any <code>org.testcontainers.containers.KafkaContainer</code> discovered in the test
41+
* will be used.
42+
* If none of the previous options are discovered, an error is raised.
43+
*/
44+
String bootstrapServers() default "";
45+
46+
/**
47+
* @return Sets the <code>key.serializer</code> property for the injected <code>KafkaProducer</code>.
48+
*/
49+
Class<?> keySerializer();
50+
51+
/**
52+
* @return Sets the <code>value.serializer</code> property for the injected <code>KafkaProducer</code>.
53+
*/
54+
Class<?> valueSerializer();
55+
56+
/**
57+
* @return An optional array of <code>key=value</code> strings, which will be used as configuration options
58+
* in the injected <code>KafkaProducer</code>.
59+
*/
60+
String[] properties() default {};
61+
62+
}

modules/testcontainers/src/main/java/org/microshed/testing/testcontainers/config/TestcontainersConfiguration.java

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import java.util.HashSet;
2525
import java.util.List;
2626
import java.util.Set;
27-
import java.util.stream.Stream;
27+
import java.util.stream.Collectors;
2828

2929
import org.junit.jupiter.api.extension.ExtensionConfigurationException;
3030
import org.junit.platform.commons.support.AnnotationSupport;
@@ -40,6 +40,7 @@
4040
import org.testcontainers.containers.GenericContainer;
4141
import org.testcontainers.containers.Network;
4242
import org.testcontainers.junit.jupiter.Container;
43+
import org.testcontainers.lifecycle.Startables;
4344

4445
public class TestcontainersConfiguration implements ApplicationEnvironment {
4546

@@ -91,7 +92,7 @@ public void applyConfiguration(Class<?> testClass) {
9192
}
9293

9394
if (isJwtNeeded()) {
94-
Stream.concat(unsharedContainers.stream(), sharedContainers.stream())
95+
allContainers().stream()
9596
.filter(c -> ApplicationContainer.class.isAssignableFrom(c.getClass()))
9697
.filter(c -> !c.isRunning())
9798
.filter(c -> !c.getEnvMap().containsKey(JwtBuilder.MP_JWT_PUBLIC_KEY))
@@ -108,6 +109,7 @@ public void applyConfiguration(Class<?> testClass) {
108109
public void start() {
109110
List<GenericContainer<?>> containersToStart = new ArrayList<>();
110111

112+
long start = System.currentTimeMillis();
111113
// Start shared containers first
112114
if (sharedConfigClass != null) {
113115
try {
@@ -125,15 +127,43 @@ public void start() {
125127
containersToStart.addAll(unsharedContainers);
126128
containersToStart.removeIf(c -> c.isRunning());
127129

128-
if (containersToStart.size() == 0)
130+
if (containersToStart.size() > 0) {
131+
LOG.info("Starting containers " + containersToStart + " in parallel for " + currentTestClass);
132+
for (GenericContainer<?> c : containersToStart)
133+
LOG.info(" " + c.getImage());
134+
Startables.deepStart(containersToStart).join();
135+
}
136+
LOG.info("All containers started in " + (System.currentTimeMillis() - start) + "ms");
137+
138+
configureKafka();
139+
}
140+
141+
void configureKafka() {
142+
// If a KafkaContainer is defined, store the bootstrap location
143+
Class<?> KafkaContainer = tryLoad("org.testcontainers.containers.KafkaContainer");
144+
if (KafkaContainer == null)
129145
return;
130146

131-
LOG.info("Starting containers in parallel for " + currentTestClass);
132-
for (GenericContainer<?> c : containersToStart)
133-
LOG.info(" " + c.getImage());
134-
long start = System.currentTimeMillis();
135-
containersToStart.parallelStream().forEach(GenericContainer::start);
136-
LOG.info("All containers started in " + (System.currentTimeMillis() - start) + "ms");
147+
Set<GenericContainer<?>> kafkaContainers = allContainers().stream()
148+
.filter(c -> KafkaContainer.isAssignableFrom(c.getClass()))
149+
.collect(Collectors.toSet());
150+
151+
if (kafkaContainers.size() == 1) {
152+
try {
153+
GenericContainer<?> kafka = kafkaContainers.iterator().next();
154+
String bootstrapServers = (String) KafkaContainer.getMethod("getBootstrapServers").invoke(kafka);
155+
System.setProperty("org.microshed.kafka.bootstrap.servers", bootstrapServers);
156+
LOG.debug("Discovered KafkaContainer with bootstrap.servers=" + bootstrapServers);
157+
} catch (Exception e) {
158+
LOG.warn("Unable to set kafka boostrap server", e);
159+
}
160+
} else if (kafkaContainers.size() > 1) {
161+
if (LOG.isInfoEnabled())
162+
LOG.info("Located multiple KafkaContainer instances. Unable to auto configure kafka clients");
163+
} else {
164+
if (LOG.isDebugEnabled())
165+
LOG.debug("No KafkaContainer instances found in configuration");
166+
}
137167
}
138168

139169
@Override
@@ -211,4 +241,12 @@ protected Set<GenericContainer<?>> allContainers() {
211241
return all;
212242
}
213243

244+
private static Class<?> tryLoad(String clazz) {
245+
try {
246+
return Class.forName(clazz, false, TestcontainersConfiguration.class.getClassLoader());
247+
} catch (ClassNotFoundException | LinkageError e) {
248+
return null;
249+
}
250+
}
251+
214252
}

sample-apps/kafka-app/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ dependencies {
88
providedCompile 'org.eclipse.microprofile.reactive.messaging:microprofile-reactive-messaging-api:1.0'
99
compile 'org.apache.kafka:kafka-clients:2.4.0'
1010
testCompile project(':microshed-testing-testcontainers')
11+
testCompile 'org.awaitility:awaitility:4.0.2'
1112
testCompile 'org.testcontainers:kafka:1.12.5'
1213
testCompile 'org.slf4j:slf4j-log4j12:1.7.29'
1314
testImplementation 'org.junit.jupiter:junit-jupiter:5.6.0'

sample-apps/kafka-app/src/main/java/org/example/app/KitchenApplication.java

Lines changed: 0 additions & 26 deletions
This file was deleted.

0 commit comments

Comments
 (0)