Skip to content

Commit 7840554

Browse files
committed
Add auto-configuration capability to ServerAdapters
1 parent 7e548de commit 7840554

6 files changed

Lines changed: 178 additions & 4 deletions

File tree

docs/features/Examples.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ Sometimes code is worth a thousand words. Here are some pointers to working exam
1414
- [JAX-RS and JDBC applicaiton using a PostgreSQL database](https://github.com/MicroShed/microshed-testing/tree/master/sample-apps/jdbc-app)
1515
- [JAX-RS application secured with MP JWT](https://github.com/MicroShed/microshed-testing/tree/master/sample-apps/jaxrs-mpjwt)
1616
- [JAX-RS and MongoDB application that depends on an external REST service](https://github.com/MicroShed/microshed-testing/tree/master/sample-apps/everything-app)
17+
- [Application using Apache Kafka messaging](https://github.com/MicroShed/microshed-testing/tree/master/sample-apps/kafka-app)
1718
- [Application with no Dockerfile using OpenLiberty adapter](https://github.com/MicroShed/microshed-testing/tree/master/sample-apps/liberty-app)
1819

1920
## Runtime examples:

docs/features/KafkaMessaging.md

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
---
2+
layout: post
3+
title: "Kafka Messaging"
4+
order: 22
5+
---
6+
7+
MicroShed Testing provides integration with applications using [Apache Kafka](https://kafka.apache.org/) for messaging. Apache Kafka is
8+
a messaging engine that is commonly used with Java microservice applications, and also is commonly used with [MicroProfile Reactive Messaging(https://github.com/eclipse/microprofile-reactive-messaging).
9+
10+
## Sending and receiving messages from tests
11+
12+
If an application purely uses Kafka Messaging for communication, a true-to-production way of testing is to also have the test client driving requests
13+
on the application via message passing. To do this, MicroShed Testing offers two annotations: `@KafkaConsumerConfig` and `@KafkaProducerConfig`
14+
15+
### Example setup
16+
17+
To begin using Kafka with MicroShed Testing, define a `KafkaContainer` in the test environment:
18+
19+
```java
20+
import org.testcontainers.containers.KafkaContainer;
21+
import org.testcontainers.containers.Network;
22+
// other imports ...
23+
24+
public class AppContainerConfig implements SharedContainerConfiguration {
25+
26+
private static Network network = Network.newNetwork();
27+
28+
@Container
29+
public static KafkaContainer kafka = new KafkaContainer()
30+
.withNetwork(network);
31+
32+
@Container
33+
public static ApplicationContainer app = new ApplicationContainer()
34+
.withNetwork(network)
35+
.dependsOn(kafka);
36+
}
37+
```
38+
39+
Runtimes such as OpenLiberty and Quarkus will be automatically configured if a `KafkaContainer` is present
40+
in the test environment. For other runtimes, you can link the containers together by using `kafka.withNetworkAlias("kafka")`
41+
and `app.withEnv("<runtime-specific kafka bootstrap servers property>", "kafka:9092")`.
42+
43+
44+
### Example usage
45+
46+
```java
47+
import org.apache.kafka.clients.consumer.KafkaConsumer;
48+
import org.apache.kafka.clients.producer.KafkaProducer;
49+
import org.apache.kafka.common.serialization.StringDeserializer;
50+
import org.apache.kafka.common.serialization.StringSerializer;
51+
import org.microshed.testing.kafka.KafkaConsumerConfig;
52+
import org.microshed.testing.kafka.KafkaProducerConfig;
53+
// other imports ...
54+
55+
@MicroShedTest
56+
@SharedContainerConfig(AppContainerConfig.class)
57+
public class KitchenEndpointIT {
58+
59+
@KafkaProducerConfig(keySerializer = StringSerializer.class, // (1)
60+
valueSerializer = StringSerializer.class)
61+
public static KafkaProducer<String, String> producer;
62+
63+
@KafkaConsumerConfig(keyDeserializer = StringDeserializer.class,
64+
valueDeserializer = StringDeserializer.class,
65+
groupId = "update-status",
66+
topics = "statusTopic") // (2)
67+
public static KafkaConsumer<String, String> consumer;
68+
69+
@Test
70+
public void myTest() {
71+
// Use the producer to send messages
72+
producer.send(...);
73+
74+
// Use the consumer to poll for records
75+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(30));
76+
// ...
77+
}
78+
}
79+
```
80+
81+
1. Each `@KafkaProducerConfig` and `@KafkaConsumerConfig` must define a set of key/value [de]serializers
82+
that correspond to the key/value types defined in the `KafkaProducer` and `KafkaConsumer`.
83+
2. For `@KafkaConsumerConfig` zero or more `topics` may be specified to automatically subscribe the
84+
injected `consumer` to the specified `topics`.
85+
86+
87+
## Additional resources
88+
89+
- [Example application using Apache Kafka messaging](https://github.com/MicroShed/microshed-testing/tree/master/sample-apps/kafka-app)
90+
- [OpenLiberty blog on using MicroProfile Reactive Messaging](https://openliberty.io/blog/2019/09/13/microprofile-reactive-messaging.html)
91+
- [Quarkus guide on using Apache Kafka with Reactive Messaging](https://quarkus.io/guides/kafka)

modules/liberty/src/main/java/org/testcontainers/containers/liberty/LibertyAdapter.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,24 @@
2626
import java.util.ArrayList;
2727
import java.util.List;
2828
import java.util.Map;
29+
import java.util.Set;
30+
import java.util.stream.Collectors;
2931

32+
import org.junit.jupiter.api.extension.ExtensionConfigurationException;
33+
import org.microshed.testing.ApplicationEnvironment;
34+
import org.microshed.testing.testcontainers.ApplicationContainer;
35+
import org.microshed.testing.testcontainers.config.HollowTestcontainersConfiguration;
36+
import org.microshed.testing.testcontainers.config.TestcontainersConfiguration;
3037
import org.microshed.testing.testcontainers.spi.ServerAdapter;
38+
import org.slf4j.Logger;
39+
import org.slf4j.LoggerFactory;
40+
import org.testcontainers.containers.GenericContainer;
3141
import org.testcontainers.images.builder.ImageFromDockerfile;
3242

3343
public class LibertyAdapter implements ServerAdapter {
3444

45+
static final Logger LOG = LoggerFactory.getLogger(LibertyAdapter.class);
46+
3547
private static String BASE_DOCKER_IMAGE = "openliberty/open-liberty:full-java8-openj9-ubi";
3648
private static final String CONFIG_FILE_PROP = "MICROSHED_TEST_LIBERTY_CONFIG_FILE";
3749

@@ -126,4 +138,52 @@ public ImageFromDockerfile getDefaultImage(File appFile) {
126138
return image;
127139
}
128140

141+
@Override
142+
public void configure(Set<GenericContainer<?>> containers) {
143+
configureKafka(containers);
144+
}
145+
146+
private void configureKafka(Set<GenericContainer<?>> containers) {
147+
Class<?> KafkaContainer = tryLoad("org.testcontainers.containers.KafkaContainer");
148+
if (KafkaContainer == null)
149+
return;
150+
Set<GenericContainer<?>> kafkaContainers = containers.stream()
151+
.filter(c -> KafkaContainer.isAssignableFrom(c.getClass()))
152+
.collect(Collectors.toSet());
153+
if (kafkaContainers.size() != 1)
154+
return;
155+
156+
// At this point we have found exactly 1 kafka container
157+
GenericContainer<?> kafka = kafkaContainers.iterator().next();
158+
159+
// Configure app container with bootstrap server
160+
String bootstrapProperty = "MP_MESSAGING_CONNECTOR_LIBERTY_KAFKA_BOOTSTRAP_SERVERS";
161+
String bootstrapServer = null;
162+
if (ApplicationEnvironment.Resolver.isSelected(TestcontainersConfiguration.class)) {
163+
if (kafka.getNetworkAliases().size() == 0)
164+
throw new ExtensionConfigurationException("Unable to configure kafka bootstrap server because no network alias is defined");
165+
bootstrapServer = kafka.getNetworkAliases().get(kafka.getNetworkAliases().size() - 1) + ":9092";
166+
} else if (ApplicationEnvironment.Resolver.isSelected(HollowTestcontainersConfiguration.class)) {
167+
bootstrapServer = "localhost:9093";
168+
} else {
169+
return;
170+
}
171+
String finalBootstrapServers = bootstrapServer;
172+
LOG.info("Auto-configuring ApplicationContainer instances with " + bootstrapProperty + "=" + finalBootstrapServers);
173+
containers.stream()
174+
.filter(c -> ApplicationContainer.class.isAssignableFrom(c.getClass()))
175+
.filter(c -> c.getNetwork().equals(kafka.getNetwork()))
176+
.filter(c -> !c.getEnvMap().containsKey(bootstrapProperty))
177+
.filter(c -> !c.getEnvMap().containsKey(bootstrapProperty.toLowerCase()))
178+
.forEach(c -> c.withEnv(bootstrapProperty, finalBootstrapServers));
179+
}
180+
181+
private static Class<?> tryLoad(String clazz) {
182+
try {
183+
return Class.forName(clazz, false, LibertyAdapter.class.getClassLoader());
184+
} catch (ClassNotFoundException | LinkageError e) {
185+
return null;
186+
}
187+
}
188+
129189
}

modules/testcontainers/src/main/java/org/microshed/testing/testcontainers/config/TestcontainersConfiguration.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.lang.reflect.Field;
2222
import java.lang.reflect.Modifier;
2323
import java.util.ArrayList;
24+
import java.util.Collections;
2425
import java.util.HashSet;
2526
import java.util.List;
2627
import java.util.Set;
@@ -91,6 +92,14 @@ public void applyConfiguration(Class<?> testClass) {
9192
unsharedContainers.forEach(c -> c.setNetwork(Network.SHARED));
9293
}
9394

95+
// Give ServerAdapters a chance to do some auto-wiring between containers
96+
allContainers().stream()
97+
.filter(c -> ApplicationContainer.class.isAssignableFrom(c.getClass()))
98+
.findFirst()
99+
.ifPresent(c -> {
100+
((ApplicationContainer) c).getServerAdapter().configure(allContainers());
101+
});
102+
94103
if (isJwtNeeded()) {
95104
allContainers().stream()
96105
.filter(c -> ApplicationContainer.class.isAssignableFrom(c.getClass()))
@@ -238,7 +247,7 @@ protected Set<GenericContainer<?>> discoverContainers(Class<?> clazz) {
238247
protected Set<GenericContainer<?>> allContainers() {
239248
Set<GenericContainer<?>> all = new HashSet<>(unsharedContainers);
240249
all.addAll(sharedContainers);
241-
return all;
250+
return Collections.unmodifiableSet(all);
242251
}
243252

244253
private static Class<?> tryLoad(String clazz) {

modules/testcontainers/src/main/java/org/microshed/testing/testcontainers/spi/ServerAdapter.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121
import java.io.File;
2222
import java.util.Map;
2323
import java.util.Optional;
24+
import java.util.Set;
2425

26+
import org.microshed.testing.testcontainers.ApplicationContainer;
27+
import org.testcontainers.containers.GenericContainer;
2528
import org.testcontainers.images.builder.ImageFromDockerfile;
2629

2730
/**
@@ -103,4 +106,17 @@ default ImageFromDockerfile getDefaultImage(File appFile) {
103106
default Optional<String> getReadinessPath() {
104107
return Optional.empty();
105108
}
109+
110+
/**
111+
* An optional hook that may be implemented for the purposes of auto-wiring multiple
112+
* containers the the test environment together.
113+
* <p>
114+
* For example, the <code>LibertyAdapter</code> detects the presence of a KafkaContainer
115+
* and will automatically configure the ApplicationContainer to communicate with it by
116+
* calling {@link ApplicationContainer#withEnv(String, String)}
117+
*
118+
* @param allContainers An unmodifiable set of the containers discovered in the test environment
119+
*/
120+
default void configure(Set<GenericContainer<?>> allContainers) {
121+
}
106122
}

sample-apps/kafka-app/src/test/java/org/example/app/KitchenEndpointIT.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,6 @@
2222

2323
import java.time.Duration;
2424

25-
import javax.json.bind.Jsonb;
26-
import javax.json.bind.JsonbBuilder;
27-
2825
import org.apache.kafka.clients.consumer.ConsumerConfig;
2926
import org.apache.kafka.clients.consumer.ConsumerRecord;
3027
import org.apache.kafka.clients.consumer.ConsumerRecords;

0 commit comments

Comments
 (0)