Skip to content

Commit b6d2503

Browse files
authored
KAFKA-20410: Add DLQ configuration parameters for Share Groups (KIP-1191) (#21979)
## Summary This PR adds the configuration foundation for Share Groups Dead-Letter Queue (DLQ) functionality as specified in [KIP-1191: Dead-letter queues for share groups](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1191%3A+Dead-letter+queues+for+share+groups). ## Changes ### New Configurations **Cluster-level configs (GroupCoordinatorConfig.java):** 1. `errors.deadletterqueue.auto.create.topics.enable` (default: `false`) - Enables automatic creation of DLQ topics 2. `errors.deadletterqueue.topic.name.prefix` (default: `"dlq."`) - Required prefix for DLQ topic names when auto-create is enabled **Group-level configs (GroupConfig.java):** 1. `errors.deadletterqueue.topic.name` (default: `""`) - Specifies the DLQ topic name for a share group - Empty string means DLQ disabled for that group 2. `errors.deadletterqueue.copy.record.enable` (default: `false`) - When `true`: Copy full original record to DLQ - When `false`: Copy only context metadata (offset, delivery count, reason) **Topic-level config (TopicConfig.java):** 1. `errors.deadletterqueue.group.enable` (default: not set) - Marks a topic as eligible for use as a DLQ ### Validation Logic - DLQ topic names **must not** start with `__` (reserved for internal topics) Reviewers: David Jacot <david.jacot@gmail.com>, Andrew Schofield <aschofield@confluent.io>
1 parent 75052bb commit b6d2503

7 files changed

Lines changed: 196 additions & 6 deletions

File tree

clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,4 +235,10 @@ public class TopicConfig {
235235
@Deprecated
236236
public static final String MESSAGE_DOWNCONVERSION_ENABLE_DOC = "Down-conversion is not possible in Apache Kafka 4.0 and newer, " +
237237
"hence this configuration is no-op and it is deprecated for removal in Apache Kafka 5.0.";
238+
239+
// Dead Letter Queue Configuration (KIP-1191)
240+
public static final String ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG = "errors.deadletterqueue.group.enable";
241+
public static final String ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_DOC = "Enable this topic to be used as a dead-letter queue for share groups. " +
242+
"When set to <code>true</code>, share groups can write undeliverable records to this topic. When set to <code>false</code> (the default), " +
243+
"attempts to use this topic as a DLQ will be rejected.";
238244
}

core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerd
7777
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
7878
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils}
7979
import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection
80-
import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_DELIVERY_COUNT_LIMIT_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG, STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, STREAMS_NUM_STANDBY_REPLICAS_CONFIG, STREAMS_SESSION_TIMEOUT_MS_CONFIG, STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG}
80+
import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_DELIVERY_COUNT_LIMIT_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG, STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, STREAMS_NUM_STANDBY_REPLICAS_CONFIG, STREAMS_SESSION_TIMEOUT_MS_CONFIG, STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG}
8181
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
8282
import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager, GroupCoordinator, GroupCoordinatorConfig}
8383
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
@@ -379,6 +379,8 @@ class KafkaApisTest extends Logging {
379379
cgConfigs.put(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, "1000")
380380
cgConfigs.put(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "false");
381381
cgConfigs.put(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT.toString)
382+
cgConfigs.put(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, "")
383+
cgConfigs.put(ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG, "false")
382384

383385
when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
384386

core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,8 @@ import org.apache.kafka.common.record.internal.{CompressionType, Records}
3030
import org.apache.kafka.common.security.auth.SecurityProtocol
3131
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
3232
import org.apache.kafka.common.utils.LogCaptureAppender
33-
import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy
33+
import org.apache.kafka.coordinator.group.{ConsumerGroupMigrationPolicy, GroupConfig, GroupCoordinatorConfig}
3434
import org.apache.kafka.coordinator.group.Group.GroupType
35-
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
3635
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
3736
import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
3837
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig}
@@ -1071,6 +1070,10 @@ class KafkaConfigTest {
10711070
case GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
10721071
case GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
10731072
case GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
1073+
case GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
1074+
case GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG => // ignore string
1075+
case GroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG => // ignore string
1076+
case GroupConfig.ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
10741077

10751078
/** Streams groups configs */
10761079
case GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,14 @@ public final class GroupConfig extends AbstractConfig {
105105

106106
public static final String STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG = "streams.task.offset.interval.ms";
107107

108+
public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG = "errors.deadletterqueue.topic.name";
109+
public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DEFAULT = "";
110+
public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DOC = "The name of the topic to be used as the dead-letter queue (DLQ) topic for this share group. If blank (the default), the group does not have a DLQ topic.";
111+
112+
public static final String ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG = "errors.deadletterqueue.copy.record.enable";
113+
public static final boolean ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_DEFAULT = false;
114+
public static final String ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_DOC = "When writing onto the dead-letter queue topic, whether to copy the original record onto the DLQ topic, or just write a record containing the context information headers.";
115+
108116
private final Optional<Integer> consumerSessionTimeoutMs;
109117

110118
private final Optional<Integer> consumerHeartbeatIntervalMs;
@@ -147,6 +155,10 @@ public final class GroupConfig extends AbstractConfig {
147155

148156
private final Optional<Boolean> shareRenewAcknowledgeEnable;
149157

158+
public final String errorsDLQTopicName;
159+
160+
public final boolean errorsDLQCopyRecordEnable;
161+
150162
public static final ConfigDef CONFIG_DEF = new ConfigDef()
151163
.define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
152164
INT,
@@ -269,7 +281,19 @@ public final class GroupConfig extends AbstractConfig {
269281
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT,
270282
atLeast(1),
271283
MEDIUM,
272-
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC);
284+
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC)
285+
286+
// DLQ configurations (KIP-1191)
287+
.define(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG,
288+
STRING,
289+
ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DEFAULT,
290+
MEDIUM,
291+
ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DOC)
292+
.define(ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG,
293+
BOOLEAN,
294+
ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_DEFAULT,
295+
MEDIUM,
296+
ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_DOC);
273297

274298
/**
275299
* Mapping from GroupConfig name to its broker-level synonym config name.
@@ -301,7 +325,11 @@ public final class GroupConfig extends AbstractConfig {
301325
Map.entry(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG)),
302326
Map.entry(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)),
303327
Map.entry(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)),
304-
Map.entry(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG))
328+
Map.entry(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG)),
329+
330+
// DLQ configs
331+
Map.entry(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, Optional.empty()),
332+
Map.entry(ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG, Optional.empty())
305333
);
306334

307335
/**
@@ -337,6 +365,8 @@ public GroupConfig(Map<?, ?> props) {
337365
this.shareIsolationLevel = optionalString(SHARE_ISOLATION_LEVEL_CONFIG)
338366
.map(s -> IsolationLevel.valueOf(s.toUpperCase(Locale.ROOT)));
339367
this.shareRenewAcknowledgeEnable = optionalBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG);
368+
this.errorsDLQTopicName = getString(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG);
369+
this.errorsDLQCopyRecordEnable = getBoolean(ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG);
340370
}
341371

342372
private Optional<Integer> optionalInt(String key) {
@@ -535,6 +565,14 @@ private static void validateValues(
535565
STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
536566
groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs()
537567
);
568+
569+
// DLQ validation (KIP-1191)
570+
// DLQ topic name must not start with "__" (reserved for internal topics)
571+
String dlqTopicName = (String) parsed.get(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG);
572+
if (dlqTopicName != null && !dlqTopicName.isEmpty() && dlqTopicName.startsWith("__")) {
573+
throw new InvalidConfigurationException(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG +
574+
": DLQ topic name must not start with '__'");
575+
}
538576
}
539577

540578
/**
@@ -1060,6 +1098,20 @@ public Optional<Boolean> shareRenewAcknowledgeEnable() {
10601098
return shareRenewAcknowledgeEnable;
10611099
}
10621100

1101+
/**
1102+
* The DLQ topic name for this group.
1103+
*/
1104+
public String errorsDLQTopicName() {
1105+
return errorsDLQTopicName;
1106+
}
1107+
1108+
/**
1109+
* Whether to copy the original record to the DLQ topic.
1110+
*/
1111+
public boolean errorsDLQCopyRecordEnable() {
1112+
return errorsDLQCopyRecordEnable;
1113+
}
1114+
10631115
public static void main(String[] args) {
10641116
System.out.println(CONFIG_DEF.toHtml(4, config -> "groupconfigs_" + config));
10651117
}

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,17 @@ public class GroupCoordinatorConfig {
314314
public static final String SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DOC = "Time elapsed before retrying initialize share group state request. " +
315315
"If below offsets.commit.timeout.ms, then value of offsets.commit.timeout.ms is used.";
316316

317+
///
318+
/// DLQ configs (KIP-1191)
319+
///
320+
public static final String ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG = "errors.deadletterqueue.auto.create.topics.enable";
321+
public static final boolean ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_DEFAULT = false;
322+
public static final String ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_DOC = "Whether automatic creation of DLQ topics is enabled (KIP-1191). When a share group has a DLQ topic configured, this setting controls whether the broker will automatically create the topic if it does not exist.";
323+
324+
public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG = "errors.deadletterqueue.topic.name.prefix";
325+
public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_DEFAULT = "dlq.";
326+
public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_DOC = "The required prefix of topic names used by dead-letter queue topics for share groups. When set to \"\", there is no restriction on the names used for dead-letter queue topics.";
327+
317328
///
318329
/// Streams group configs
319330
///
@@ -451,6 +462,10 @@ public class GroupCoordinatorConfig {
451462
.define(SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, BOOLEAN, SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT, MEDIUM, SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
452463
.defineInternal(SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DEFAULT, atLeast(1), LOW, SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DOC)
453464

465+
// DLQ configs (KIP-1191)
466+
.define(ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG, BOOLEAN, ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_DEFAULT, MEDIUM, ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_DOC)
467+
.define(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG, STRING, ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_DEFAULT, MEDIUM, ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_DOC)
468+
454469
// Streams group configs
455470
.define(STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC)
456471
.define(STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
@@ -515,6 +530,9 @@ public class GroupCoordinatorConfig {
515530
private final int shareGroupMinAssignmentIntervalMs;
516531
private final int shareGroupMaxAssignmentIntervalMs;
517532
private final int shareGroupInitializeRetryIntervalMs;
533+
// DLQ configurations
534+
private final boolean errorsDLQAutoCreateTopicsEnable;
535+
private final String errorsDLQTopicNamePrefix;
518536
// Streams group configurations
519537
private final int streamsGroupSessionTimeoutMs;
520538
private final int streamsGroupMinSessionTimeoutMs;
@@ -579,6 +597,9 @@ public GroupCoordinatorConfig(AbstractConfig config) {
579597
this.shareGroupMinAssignmentIntervalMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG);
580598
this.shareGroupMaxAssignmentIntervalMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
581599
this.shareGroupInitializeRetryIntervalMs = Math.max(initializeRetryMs, this.offsetCommitTimeoutMs);
600+
// DLQ configurations
601+
this.errorsDLQAutoCreateTopicsEnable = config.getBoolean(GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG);
602+
this.errorsDLQTopicNamePrefix = config.getString(GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG);
582603
// Streams group configurations
583604
this.streamsGroupSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG);
584605
this.streamsGroupMinSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
@@ -1188,6 +1209,20 @@ public int shareGroupInitializeRetryIntervalMs() {
11881209
return shareGroupInitializeRetryIntervalMs;
11891210
}
11901211

1212+
/**
1213+
* Whether automatic creation of DLQ topics is enabled.
1214+
*/
1215+
public boolean errorsDLQAutoCreateTopicsEnable() {
1216+
return errorsDLQAutoCreateTopicsEnable;
1217+
}
1218+
1219+
/**
1220+
* The required prefix for DLQ topic names.
1221+
*/
1222+
public String errorsDLQTopicNamePrefix() {
1223+
return errorsDLQTopicNamePrefix;
1224+
}
1225+
11911226
/**
11921227
* The streams group session timeout in milliseconds.
11931228
*/

0 commit comments

Comments
 (0)