Skip to content

Commit 30969d2

Browse files
authored
Merge pull request #161 from aguibert/kafka-serializers-optional
Make Kafka key/value [de]serializers optional with auto-detection
2 parents b2338ed + da141a0 commit 30969d2

6 files changed

Lines changed: 166 additions & 50 deletions

File tree

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Copyright (c) 2020 IBM Corporation and others
3+
*
4+
* See the NOTICE file(s) distributed with this work for additional
5+
* information regarding copyright ownership.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* You may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.microshed.testing.jupiter;
20+
21+
import java.lang.reflect.Field;
22+
import java.lang.reflect.ParameterizedType;
23+
import java.lang.reflect.Type;
24+
import java.nio.ByteBuffer;
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
import java.util.Properties;
28+
import java.util.UUID;
29+
30+
import org.junit.jupiter.api.extension.ExtensionConfigurationException;
31+
import org.microshed.testing.kafka.KafkaConsumerConfig;
32+
import org.microshed.testing.kafka.KafkaProducerConfig;
33+
34+
class KafkaConfigAnnotationProcessor {
35+
36+
private final Map<Class<?>, String> defaultSerailizers = new HashMap<>();
37+
private final Map<Class<?>, String> defaultDeserailizers = new HashMap<>();
38+
private final String globalBootstrapServers = System.getProperty("org.microshed.kafka.bootstrap.servers");
39+
40+
KafkaConfigAnnotationProcessor() {
41+
defaultSerailizers.put(byte[].class, "org.apache.kafka.common.serialization.ByteArraySerializer");
42+
defaultSerailizers.put(ByteBuffer.class, "org.apache.kafka.common.serialization.ByteBufferSerializer");
43+
defaultSerailizers.put(Double.class, "org.apache.kafka.common.serialization.DoubleSerializer");
44+
defaultSerailizers.put(Float.class, "org.apache.kafka.common.serialization.FloatSerializer");
45+
defaultSerailizers.put(Integer.class, "org.apache.kafka.common.serialization.IntegerSerializer");
46+
defaultSerailizers.put(Long.class, "org.apache.kafka.common.serialization.LongSerializer");
47+
defaultSerailizers.put(Short.class, "org.apache.kafka.common.serialization.ShortSerializer");
48+
defaultSerailizers.put(String.class, "org.apache.kafka.common.serialization.StringSerializer");
49+
defaultSerailizers.put(UUID.class, "org.apache.kafka.common.serialization.UUIDSerializer");
50+
51+
defaultDeserailizers.put(byte[].class, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
52+
defaultDeserailizers.put(ByteBuffer.class, "org.apache.kafka.common.serialization.ByteBufferDeserializer");
53+
defaultDeserailizers.put(Double.class, "org.apache.kafka.common.serialization.DoubleDeserializer");
54+
defaultDeserailizers.put(Float.class, "org.apache.kafka.common.serialization.FloatDeserializer");
55+
defaultDeserailizers.put(Integer.class, "org.apache.kafka.common.serialization.IntegerDeserializer");
56+
defaultDeserailizers.put(Long.class, "org.apache.kafka.common.serialization.LongDeserializer");
57+
defaultDeserailizers.put(Short.class, "org.apache.kafka.common.serialization.ShortDeserializer");
58+
defaultDeserailizers.put(String.class, "org.apache.kafka.common.serialization.StringDeserializer");
59+
defaultDeserailizers.put(UUID.class, "org.apache.kafka.common.serialization.UUIDDeserializer");
60+
}
61+
62+
Properties getProducerProperties(Field producerField) {
63+
KafkaProducerConfig producerConfig = producerField.getAnnotation(KafkaProducerConfig.class);
64+
Properties properties = new Properties();
65+
String bootstrapServers = producerConfig.bootstrapServers().isEmpty() ? globalBootstrapServers : producerConfig.bootstrapServers();
66+
if (bootstrapServers.isEmpty())
67+
throw new ExtensionConfigurationException("To use @KafkaProducerConfig on a KafkaProducer a bootstrap server must be " +
68+
"defined in the @KafkaProducerConfig annotation or using the " +
69+
"'org.microshed.kafka.bootstrap.servers' system property");
70+
properties.put("bootstrap.servers", bootstrapServers);
71+
if (isClassPropertySet(producerConfig.keySerializer().getName()))
72+
properties.put("key.serializer", producerConfig.keySerializer().getName());
73+
if (isClassPropertySet(producerConfig.valueSerializer().getName()))
74+
properties.put("value.serializer", producerConfig.valueSerializer().getName());
75+
for (String prop : producerConfig.properties()) {
76+
int split = prop.indexOf("=");
77+
if (split < 2)
78+
throw new ExtensionConfigurationException("The property '" + prop + "' for field " + producerField + " must be in the format 'key=value'");
79+
properties.put(prop.substring(0, split), prop.substring(split + 1));
80+
}
81+
82+
// Auto-detect key/value serializers if needed
83+
if (producerField.getGenericType() instanceof ParameterizedType) {
84+
if (!properties.containsKey("key.serializer")) {
85+
Type keyType = ((ParameterizedType) producerField.getGenericType()).getActualTypeArguments()[0];
86+
if (defaultSerailizers.containsKey(keyType))
87+
properties.put("key.serializer", defaultSerailizers.get(keyType));
88+
}
89+
if (!properties.containsKey("value.serializer")) {
90+
Type valueType = ((ParameterizedType) producerField.getGenericType()).getActualTypeArguments()[1];
91+
if (defaultSerailizers.containsKey(valueType))
92+
properties.put("value.serializer", defaultSerailizers.get(valueType));
93+
}
94+
}
95+
96+
return properties;
97+
}
98+
99+
Properties getConsumerProperties(Field consumerField) {
100+
KafkaConsumerConfig consumerConfig = consumerField.getAnnotation(KafkaConsumerConfig.class);
101+
Properties properties = new Properties();
102+
String bootstrapServers = consumerConfig.bootstrapServers().isEmpty() ? globalBootstrapServers : consumerConfig.bootstrapServers();
103+
if (bootstrapServers.isEmpty())
104+
throw new ExtensionConfigurationException("To use @KafkaConsumerConfig on a KafkaConsumer a bootstrap server must be " +
105+
"defined in the @KafkaConsumerConfig annotation or using the " +
106+
"'org.microshed.kafka.bootstrap.servers' system property");
107+
properties.put("bootstrap.servers", bootstrapServers);
108+
properties.put("group.id", consumerConfig.groupId());
109+
if (isClassPropertySet(consumerConfig.keyDeserializer().getName()))
110+
properties.put("key.deserializer", consumerConfig.keyDeserializer().getName());
111+
if (isClassPropertySet(consumerConfig.valueDeserializer().getName()))
112+
properties.put("value.deserializer", consumerConfig.valueDeserializer().getName());
113+
for (String prop : consumerConfig.properties()) {
114+
int split = prop.indexOf("=");
115+
if (split < 2)
116+
throw new ExtensionConfigurationException("The property '" + prop + "' for field " + consumerField + " must be in the format 'key=value'");
117+
properties.put(prop.substring(0, split), prop.substring(split + 1));
118+
}
119+
120+
// Auto-detect key/value deserializer if needed
121+
if (consumerField.getGenericType() instanceof ParameterizedType) {
122+
if (!properties.containsKey("key.deserializer")) {
123+
Type keyType = ((ParameterizedType) consumerField.getGenericType()).getActualTypeArguments()[0];
124+
if (defaultDeserailizers.containsKey(keyType))
125+
properties.put("key.deserializer", defaultDeserailizers.get(keyType));
126+
}
127+
if (!properties.containsKey("value.deserializer")) {
128+
Type valueType = ((ParameterizedType) consumerField.getGenericType()).getActualTypeArguments()[1];
129+
if (defaultDeserailizers.containsKey(valueType))
130+
properties.put("value.deserializer", defaultDeserailizers.get(valueType));
131+
}
132+
}
133+
134+
return properties;
135+
}
136+
137+
private static boolean isClassPropertySet(String prop) {
138+
return !"java.lang.Object".equals(prop);
139+
}
140+
141+
}

core/src/main/java/org/microshed/testing/jupiter/MicroShedTestExtension.java

Lines changed: 5 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,8 @@ private void injectKafkaClients(Class<?> clazz) {
122122
Class<?> KafkaConsumer = tryLoad("org.apache.kafka.clients.consumer.KafkaConsumer");
123123
if (KafkaProducer == null || KafkaConsumer == null)
124124
return;
125-
String globalBootstrapServers = System.getProperty("org.microshed.kafka.bootstrap.servers");
125+
126+
KafkaConfigAnnotationProcessor kafkaProcessor = new KafkaConfigAnnotationProcessor();
126127

127128
List<Field> kafkaProducerFields = AnnotationSupport.findAnnotatedFields(clazz, KafkaProducerConfig.class);
128129
for (Field producerField : kafkaProducerFields) {
@@ -136,26 +137,11 @@ private void injectKafkaClients(Class<?> clazz) {
136137
"must be public, static, and non-final: " + producerField);
137138
}
138139

139-
KafkaProducerConfig producerConfig = producerField.getAnnotation(KafkaProducerConfig.class);
140-
Properties properties = new Properties();
141-
String bootstrapServers = producerConfig.bootstrapServers().isEmpty() ? globalBootstrapServers : producerConfig.bootstrapServers();
142-
if (bootstrapServers.isEmpty())
143-
throw new ExtensionConfigurationException("To use @KafkaProducerConfig on a KafkaProducer a bootstrap server must be " +
144-
"defined in the @KafkaProducerConfig annotation or using the " +
145-
"'org.microshed.kafka.bootstrap.servers' system property");
146-
properties.put("bootstrap.servers", bootstrapServers);
147-
properties.put("key.serializer", producerConfig.keySerializer().getName());
148-
properties.put("value.serializer", producerConfig.valueSerializer().getName());
149-
for (String prop : producerConfig.properties()) {
150-
int split = prop.indexOf("=");
151-
if (split < 2)
152-
throw new ExtensionConfigurationException("The property '" + prop + "' for field " + producerField + " must be in the format 'key=value'");
153-
properties.put(prop.substring(0, split), prop.substring(split + 1));
154-
}
140+
Properties properties = kafkaProcessor.getProducerProperties(producerField);
155141
try {
156142
Object producer = KafkaProducer.getConstructor(Properties.class).newInstance(properties);
157143
producerField.set(null, producer);
158-
LOG.debug("Injected kafka producer for " + producerField + " with config " + producerConfig);
144+
LOG.debug("Injected kafka producer for " + producerField + " with config " + producerField.getAnnotation(KafkaProducerConfig.class));
159145
} catch (Exception e) {
160146
throw new ExtensionConfigurationException("Unable to inject field " + producerField, e);
161147
}
@@ -173,23 +159,8 @@ private void injectKafkaClients(Class<?> clazz) {
173159
"must be public, static, and non-final: " + consumerField);
174160
}
175161

162+
Properties properties = kafkaProcessor.getConsumerProperties(consumerField);
176163
KafkaConsumerConfig consumerConfig = consumerField.getAnnotation(KafkaConsumerConfig.class);
177-
Properties properties = new Properties();
178-
String bootstrapServers = consumerConfig.bootstrapServers().isEmpty() ? globalBootstrapServers : consumerConfig.bootstrapServers();
179-
if (bootstrapServers.isEmpty())
180-
throw new ExtensionConfigurationException("To use @KafkaConsumerConfig on a KafkaConsumer a bootstrap server must be " +
181-
"defined in the @KafkaConsumerConfig annotation or using the " +
182-
"'org.microshed.kafka.bootstrap.servers' system property");
183-
properties.put("bootstrap.servers", bootstrapServers);
184-
properties.put("group.id", consumerConfig.groupId());
185-
properties.put("key.deserializer", consumerConfig.keyDeserializer().getName());
186-
properties.put("value.deserializer", consumerConfig.valueDeserializer().getName());
187-
for (String prop : consumerConfig.properties()) {
188-
int split = prop.indexOf("=");
189-
if (split < 2)
190-
throw new ExtensionConfigurationException("The property '" + prop + "' for field " + consumerField + " must be in the format 'key=value'");
191-
properties.put(prop.substring(0, split), prop.substring(split + 1));
192-
}
193164
try {
194165
Object consumer = KafkaConsumer.getConstructor(Properties.class).newInstance(properties);
195166
consumerField.set(null, consumer);

core/src/main/java/org/microshed/testing/kafka/KafkaConsumerConfig.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,17 @@
4545

4646
/**
4747
* @return Sets the <code>key.deserializer</code> property for the injected <code>KafkaConsumer</code>.
48+
* If unset, an an attempt will be made to select an appropriate class from the built-in deserializers
49+
* in the <code>org.apache.kafka.common.serialization</code> package.
4850
*/
49-
Class<?> keyDeserializer();
51+
Class<?> keyDeserializer() default Object.class;
5052

5153
/**
5254
* @return Sets the <code>value.deserializer</code> property for the injected <code>KafkaConsumer</code>.
55+
* If unset, an an attempt will be made to select an appropriate class from the built-in deserializers
56+
* in the <code>org.apache.kafka.common.serialization</code> package.
5357
*/
54-
Class<?> valueDeserializer();
58+
Class<?> valueDeserializer() default Object.class;
5559

5660
/**
5761
* @return Sets the <code>group.id</code> property for the injected <code>KafkaConsumer</code>.

core/src/main/java/org/microshed/testing/kafka/KafkaProducerConfig.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,17 @@
4545

4646
/**
4747
* @return Sets the <code>key.serializer</code> property for the injected <code>KafkaProducer</code>.
48+
* If unset, an an attempt will be made to select an appropriate class from the built-in serializers
49+
* in the <code>org.apache.kafka.common.serialization</code> package.
4850
*/
49-
Class<?> keySerializer();
51+
Class<?> keySerializer() default Object.class;
5052

5153
/**
5254
* @return Sets the <code>value.serializer</code> property for the injected <code>KafkaProducer</code>.
55+
* If unset, an an attempt will be made to select an appropriate class from the built-in serializers
56+
* in the <code>org.apache.kafka.common.serialization</code> package.
5357
*/
54-
Class<?> valueSerializer();
58+
Class<?> valueSerializer() default Object.class;
5559

5660
/**
5761
* @return An optional array of <code>key=value</code> strings, which will be used as configuration options

docs/features/KafkaMessaging.md

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,11 @@ import org.microshed.testing.kafka.KafkaProducerConfig;
5757
@SharedContainerConfig(AppContainerConfig.class)
5858
public class KitchenEndpointIT {
5959

60-
@KafkaProducerConfig(keySerializer = StringSerializer.class, // (1)
61-
valueSerializer = StringSerializer.class)
60+
@KafkaProducerConfig // (1)
6261
public static KafkaProducer<String, String> producer;
6362

64-
@KafkaConsumerConfig(keyDeserializer = StringDeserializer.class,
65-
valueDeserializer = StringDeserializer.class,
66-
groupId = "update-status",
67-
topics = "statusTopic") // (2)
63+
@KafkaConsumerConfig(groupId = "update-status",
64+
topics = "statusTopic") // (2)
6865
public static KafkaConsumer<String, String> consumer;
6966

7067
@Test
@@ -79,8 +76,9 @@ public class KitchenEndpointIT {
7976
}
8077
```
8178

82-
1. Each `@KafkaProducerConfig` and `@KafkaConsumerConfig` must define a set of key/value [de]serializers
83-
that correspond to the key/value types defined in the `KafkaProducer` and `KafkaConsumer`.
79+
1. Each `@KafkaProducerConfig` and `@KafkaConsumerConfig` may optionally define a set of key/value [de]serializers
80+
that correspond to the key/value types defined in the `KafkaProducer` and `KafkaConsumer`. If none are specified,
81+
then an attempt will be made to auto-detect a fitting built-in [de]serializer.
8482
2. For `@KafkaConsumerConfig` zero or more `topics` may be specified to automatically subscribe the
8583
injected `consumer` to the specified `topics`.
8684

sample-apps/kafka-app/src/test/java/org/example/app/KitchenEndpointIT.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
import org.apache.kafka.clients.consumer.KafkaConsumer;
2929
import org.apache.kafka.clients.producer.KafkaProducer;
3030
import org.apache.kafka.clients.producer.ProducerRecord;
31-
import org.apache.kafka.common.serialization.StringDeserializer;
32-
import org.apache.kafka.common.serialization.StringSerializer;
3331
import org.example.app.KitchenOrder.JsonbSerializer;
3432
import org.example.app.KitchenOrder.KitchenOrderDeserializer;
3533
import org.junit.jupiter.api.Test;
@@ -42,10 +40,10 @@
4240
@SharedContainerConfig(AppContainerConfig.class)
4341
public class KitchenEndpointIT {
4442

45-
@KafkaProducerConfig(keySerializer = StringSerializer.class, valueSerializer = JsonbSerializer.class)
43+
@KafkaProducerConfig(valueSerializer = JsonbSerializer.class)
4644
public static KafkaProducer<String, KitchenOrder> producer;
4745

48-
@KafkaConsumerConfig(keyDeserializer = StringDeserializer.class, valueDeserializer = KitchenOrderDeserializer.class,
46+
@KafkaConsumerConfig(valueDeserializer = KitchenOrderDeserializer.class,
4947
groupId = "update-status", topics = "statusTopic",
5048
properties = ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + "=earliest")
5149
public static KafkaConsumer<String, KitchenOrder> consumer;

0 commit comments

Comments
 (0)