Skip to content

Commit 1a47d94

Browse files
committed
KAFKA-19193 Support rack-aware partitioning for Kafka producer
According to KIP-1123, this commit adds the support for rack-aware partitioning to `BuiltInPartitioner`. It comes with two new configs for the producer: `partitioner.rack.aware` and `client.rack`, which allows enabling the new behavior. Apart from the added unit tests, the desired behavior was validated by `kafka-producer-perf-test.sh` with an existing and a non-existing rack against a 4 node cluster with two racks and 12-partition topic: ```shell ./kafka_2.13-4.1.0-SNAPSHOT/bin/kafka-producer-perf-test.sh \ --topic test-topic --num-records 100000 --throughput -1 --record-size 1 \ --producer-props bootstrap.servers=127.0.0.1:9092 \ client.rack=rack0 partitioner.rack.aware=true ```
1 parent 76b9dd2 commit 1a47d94

7 files changed

Lines changed: 382 additions & 57 deletions

File tree

clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,9 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
433433
config.getBoolean(ProducerConfig.PARTITIONER_ADAPTIVE_PARTITIONING_ENABLE_CONFIG);
434434
RecordAccumulator.PartitionerConfig partitionerConfig = new RecordAccumulator.PartitionerConfig(
435435
enableAdaptivePartitioning,
436-
config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG)
436+
config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG),
437+
config.getBoolean(ProducerConfig.PARTITIONER_RACK_AWARE_CONFIG),
438+
config.getString(ProducerConfig.CLIENT_RACK_CONFIG)
437439
);
438440
// As per Kafka producer configuration documentation batch.size may be set to 0 to explicitly disable
439441
// batching which in practice actually means using a batch size of 1.

clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@ public class ProducerConfig extends AbstractConfig {
122122
+ "If 'false', producer would choose a partition based on a hash of the key when a key is present. "
123123
+ "Note: this setting has no effect if a custom partitioner is used.";
124124

125+
/** <code>partitioner.rack.aware</code> */
126+
public static final String PARTITIONER_RACK_AWARE_CONFIG = "partitioner.rack.aware";
127+
private static final String PARTITIONER_RACK_AWARE_DOC = "Controls whether the default partitioner is rack-aware. This has no effect when a custom partitioner is used.";
128+
125129
/** <code>acks</code> */
126130
public static final String ACKS_CONFIG = "acks";
127131
private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the "
@@ -178,6 +182,12 @@ public class ProducerConfig extends AbstractConfig {
178182
/** <code>client.id</code> */
179183
public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
180184

185+
/**
186+
* <code>client.rack</code>
187+
*/
188+
public static final String CLIENT_RACK_CONFIG = CommonClientConfigs.CLIENT_RACK_CONFIG;
189+
public static final String DEFAULT_CLIENT_RACK = CommonClientConfigs.DEFAULT_CLIENT_RACK;
190+
181191
/** <code>send.buffer.bytes</code> */
182192
public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG;
183193

@@ -317,7 +327,9 @@ public class ProducerConfig extends AbstractConfig {
317327
"This strategy send records to a partition until at least " + BATCH_SIZE_CONFIG + " bytes is produced to the partition. It works with the strategy:" +
318328
"<ol>" +
319329
"<li>If no partition is specified but a key is present, choose a partition based on a hash of the key.</li>" +
320-
"<li>If no partition or key is present, choose the sticky partition that changes when at least " + BATCH_SIZE_CONFIG + " bytes are produced to the partition.</li>" +
330+
"<li>If no partition or key is present, choose the sticky partition that changes when at least <code>" + BATCH_SIZE_CONFIG + "</code> bytes are produced to the partition.</li>" +
331+
"<li>If <code>" + CLIENT_RACK_CONFIG + "</code> is specified and <code>" + PARTITIONER_RACK_AWARE_CONFIG + "=true</code>, the sticky partition is chosen from partitions " +
332+
"with the leader broker in the same rack, if at least one is available. If none are available, it falls back on selecting from all available partitions.</li>" +
321333
"</ol>" +
322334
"</li>" +
323335
"<li><code>org.apache.kafka.clients.producer.RoundRobinPartitioner</code>: A partitioning strategy where " +
@@ -402,9 +414,11 @@ public class ProducerConfig extends AbstractConfig {
402414
.define(PARTITIONER_ADAPTIVE_PARTITIONING_ENABLE_CONFIG, Type.BOOLEAN, true, Importance.LOW, PARTITIONER_ADAPTIVE_PARTITIONING_ENABLE_DOC)
403415
.define(PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.LOW, PARTITIONER_AVAILABILITY_TIMEOUT_MS_DOC)
404416
.define(PARTITIONER_IGNORE_KEYS_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, PARTITIONER_IGNORE_KEYS_DOC)
417+
.define(PARTITIONER_RACK_AWARE_CONFIG, Type.BOOLEAN, false, Importance.LOW, PARTITIONER_RACK_AWARE_DOC)
405418
.define(LINGER_MS_CONFIG, Type.LONG, 5, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
406419
.define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC)
407420
.define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
421+
.define(CLIENT_RACK_CONFIG, Type.STRING, DEFAULT_CLIENT_RACK, Importance.LOW, CommonClientConfigs.CLIENT_RACK_DOC)
408422
.define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)
409423
.define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(CommonClientConfigs.RECEIVE_BUFFER_LOWER_BOUND), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC)
410424
.define(MAX_REQUEST_SIZE_CONFIG,

clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java

Lines changed: 103 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.concurrent.ThreadLocalRandom;
2929
import java.util.concurrent.atomic.AtomicInteger;
3030
import java.util.concurrent.atomic.AtomicReference;
31+
import java.util.stream.Collectors;
3132

3233
/**
3334
* Built-in default partitioner. Note, that this is just a utility class that is used directly from
@@ -40,8 +41,10 @@ public class BuiltInPartitioner {
4041
private final Logger log;
4142
private final String topic;
4243
private final int stickyBatchSize;
44+
private final boolean rackAware;
45+
private final String rack;
4346

44-
private volatile PartitionLoadStats partitionLoadStats = null;
47+
private volatile PartitionLoadStatsHolder partitionLoadStatsHolder = null;
4548
private final AtomicReference<StickyPartitionInfo> stickyPartitionInfo = new AtomicReference<>();
4649

4750

@@ -51,13 +54,15 @@ public class BuiltInPartitioner {
5154
* @param topic The topic
5255
* @param stickyBatchSize How much to produce to partition before switch
5356
*/
54-
public BuiltInPartitioner(LogContext logContext, String topic, int stickyBatchSize) {
57+
public BuiltInPartitioner(LogContext logContext, String topic, int stickyBatchSize, boolean rackAware, String rack) {
5558
this.log = logContext.logger(BuiltInPartitioner.class);
5659
this.topic = topic;
5760
if (stickyBatchSize < 1) {
5861
throw new IllegalArgumentException("stickyBatchSize must be >= 1 but got " + stickyBatchSize);
5962
}
6063
this.stickyBatchSize = stickyBatchSize;
64+
this.rackAware = rackAware;
65+
this.rack = rack;
6166
}
6267

6368
/**
@@ -67,14 +72,25 @@ private int nextPartition(Cluster cluster) {
6772
int random = randomPartition();
6873

6974
// Cache volatile variable in local variable.
70-
PartitionLoadStats partitionLoadStats = this.partitionLoadStats;
75+
PartitionLoadStatsHolder partitionLoadStats = this.partitionLoadStatsHolder;
76+
7177
int partition;
7278

7379
if (partitionLoadStats == null) {
7480
// We don't have stats to do adaptive partitioning (or it's disabled), just switch to the next
7581
// partition based on uniform distribution.
7682
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
7783
if (!availablePartitions.isEmpty()) {
84+
// Select only partitions with leaders in this rack if configured so, falling back if none are available.
85+
if (rackAware) {
86+
List<PartitionInfo> availablePartitionsInRack = availablePartitions.stream()
87+
.filter(p -> p.leader().hasRack() && p.leader().rack().equals(rack))
88+
.collect(Collectors.toList());
89+
if (!availablePartitionsInRack.isEmpty()) {
90+
availablePartitions = availablePartitionsInRack;
91+
}
92+
}
93+
7894
partition = availablePartitions.get(random % availablePartitions.size()).partition();
7995
} else {
8096
// We don't have available partitions, just pick one among all partitions.
@@ -84,14 +100,20 @@ private int nextPartition(Cluster cluster) {
84100
} else {
85101
// Calculate next partition based on load distribution.
86102
// Note that partitions without leader are excluded from the partitionLoadStats.
87-
assert partitionLoadStats.length > 0;
88103

89-
int[] cumulativeFrequencyTable = partitionLoadStats.cumulativeFrequencyTable;
90-
int weightedRandom = random % cumulativeFrequencyTable[partitionLoadStats.length - 1];
104+
PartitionLoadStats partitionLoadStatsToUse = partitionLoadStats.total;
105+
if (rackAware && partitionLoadStats.inThisRack != null && partitionLoadStats.inThisRack.length > 0) {
106+
partitionLoadStatsToUse = partitionLoadStats.inThisRack;
107+
}
108+
109+
assert partitionLoadStatsToUse.length > 0;
110+
111+
int[] cumulativeFrequencyTable = partitionLoadStatsToUse.cumulativeFrequencyTable;
112+
int weightedRandom = random % cumulativeFrequencyTable[partitionLoadStatsToUse.length - 1];
91113

92114
// By construction, the cumulative frequency table is sorted, so we can use binary
93115
// search to find the desired index.
94-
int searchResult = Arrays.binarySearch(cumulativeFrequencyTable, 0, partitionLoadStats.length, weightedRandom);
116+
int searchResult = Arrays.binarySearch(cumulativeFrequencyTable, 0, partitionLoadStatsToUse.length, weightedRandom);
95117

96118
// binarySearch results the index of the found element, or -(insertion_point) - 1
97119
// (where insertion_point is the index of the first element greater than the key).
@@ -103,8 +125,8 @@ private int nextPartition(Cluster cluster) {
103125
// would return -0 - 1 = -1, by adding 1 we'd get 0. If we're looking for 4, we'd
104126
// get 0, and we need the next one, so adding 1 works here as well.
105127
int partitionIndex = Math.abs(searchResult + 1);
106-
assert partitionIndex < partitionLoadStats.length;
107-
partition = partitionLoadStats.partitionIds[partitionIndex];
128+
assert partitionIndex < partitionLoadStatsToUse.length;
129+
partition = partitionLoadStatsToUse.partitionIds[partitionIndex];
108130
}
109131

110132
log.trace("Switching to partition {} in topic {}", partition, topic);
@@ -120,9 +142,15 @@ int randomPartition() {
120142
* random number.
121143
*/
122144
public int loadStatsRangeEnd() {
123-
assert partitionLoadStats != null;
124-
assert partitionLoadStats.length > 0;
125-
return partitionLoadStats.cumulativeFrequencyTable[partitionLoadStats.length - 1];
145+
assert partitionLoadStatsHolder != null;
146+
assert partitionLoadStatsHolder.total.length > 0;
147+
return partitionLoadStatsHolder.total.cumulativeFrequencyTable[partitionLoadStatsHolder.total.length - 1];
148+
}
149+
150+
public int loadStatsInThisRackRangeEnd() {
151+
assert partitionLoadStatsHolder.inThisRack != null;
152+
assert partitionLoadStatsHolder.inThisRack.length > 0;
153+
return partitionLoadStatsHolder.inThisRack.cumulativeFrequencyTable[partitionLoadStatsHolder.inThisRack.length - 1];
126154
}
127155

128156
/**
@@ -233,18 +261,20 @@ void updatePartitionInfo(StickyPartitionInfo partitionInfo, int appendedBytes, C
233261
*
234262
* @param queueSizes The queue sizes, partitions without leaders are excluded
235263
* @param partitionIds The partition ids for the queues, partitions without leaders are excluded
264+
* @param partitionLeaderRacks The racks of partition leaders for the queues, partitions without leaders are excluded
236265
* @param length The logical length of the arrays (could be less): we may eliminate some partitions
237266
* based on latency, but to avoid reallocation of the arrays, we just decrement
238267
* logical length
239268
* Visible for testing
240269
*/
241-
public void updatePartitionLoadStats(int[] queueSizes, int[] partitionIds, int length) {
270+
public void updatePartitionLoadStats(int[] queueSizes, int[] partitionIds, String[] partitionLeaderRacks, int length) {
242271
if (queueSizes == null) {
243272
log.trace("No load stats for topic {}, not using adaptive", topic);
244-
partitionLoadStats = null;
273+
partitionLoadStatsHolder = null;
245274
return;
246275
}
247276
assert queueSizes.length == partitionIds.length;
277+
assert queueSizes.length == partitionLeaderRacks.length;
248278
assert length <= queueSizes.length;
249279

250280
// The queueSizes.length represents the number of all partitions in the topic and if we have
@@ -257,7 +287,7 @@ public void updatePartitionLoadStats(int[] queueSizes, int[] partitionIds, int l
257287
if (length < 1 || queueSizes.length < 2) {
258288
log.trace("The number of partitions is too small: available={}, all={}, not using adaptive for topic {}",
259289
length, queueSizes.length, topic);
260-
partitionLoadStats = null;
290+
partitionLoadStatsHolder = null;
261291
return;
262292
}
263293

@@ -276,6 +306,7 @@ public void updatePartitionLoadStats(int[] queueSizes, int[] partitionIds, int l
276306
// the value is the index of the partition we're looking for. In this example
277307
// random numbers 0, 1, 2, 3 would map to partition[0], 4 would map to partition[1]
278308
// and 5, 6, 7 would map to partition[2].
309+
// Do the same with this-rack-only partitions if rack awareness is enabled.
279310

280311
// Calculate max queue size + 1 and check if all sizes are the same.
281312
int maxSizePlus1 = queueSizes[0];
@@ -293,18 +324,58 @@ public void updatePartitionLoadStats(int[] queueSizes, int[] partitionIds, int l
293324
// and we didn't exclude partitions that experience high latencies (greater than
294325
// partitioner.availability.timeout.ms).
295326
log.trace("All queue lengths are the same, not using adaptive for topic {}", topic);
296-
partitionLoadStats = null;
327+
partitionLoadStatsHolder = null;
297328
return;
298329
}
299330

331+
// Before inverting and folding, build fully the load stats for this rack, because this depends on the raw queue sizes.
332+
PartitionLoadStats partitionLoadStatsInThisRack = createPartitionLoadStatsForThisRackIfNeeded(queueSizes, partitionIds, partitionLeaderRacks, length);
333+
300334
// Invert and fold the queue size, so that they become separator values in the CFT.
301-
queueSizes[0] = maxSizePlus1 - queueSizes[0];
302-
for (int i = 1; i < length; i++) {
303-
queueSizes[i] = maxSizePlus1 - queueSizes[i] + queueSizes[i - 1];
304-
}
335+
invertAndFoldQueueSizeArray(queueSizes, maxSizePlus1, length);
336+
305337
log.trace("Partition load stats for topic {}: CFT={}, IDs={}, length={}",
306338
topic, queueSizes, partitionIds, length);
307-
partitionLoadStats = new PartitionLoadStats(queueSizes, partitionIds, length);
339+
partitionLoadStatsHolder = new PartitionLoadStatsHolder(
340+
new PartitionLoadStats(queueSizes, partitionIds, length),
341+
partitionLoadStatsInThisRack
342+
);
343+
}
344+
345+
private PartitionLoadStats createPartitionLoadStatsForThisRackIfNeeded(int[] queueSizes, int[] partitionIds, String[] partitionLeaderRacks, int length) {
346+
if (!rackAware) {
347+
return null;
348+
}
349+
int[] queueSizesInThisRack = new int[length];
350+
int[] partitionIdsInThisRack = new int[length];
351+
int lengthInThisRack = 0;
352+
int maxSizePlus1InThisRack = -1;
353+
354+
for (int i = 0; i < length; i++) {
355+
if (rack.equals(partitionLeaderRacks[i])) {
356+
queueSizesInThisRack[lengthInThisRack] = queueSizes[i];
357+
partitionIdsInThisRack[lengthInThisRack] = partitionIds[i];
358+
359+
if (queueSizes[i] > maxSizePlus1InThisRack)
360+
maxSizePlus1InThisRack = queueSizes[i];
361+
362+
lengthInThisRack += 1;
363+
}
364+
}
365+
++maxSizePlus1InThisRack;
366+
367+
invertAndFoldQueueSizeArray(queueSizesInThisRack, maxSizePlus1InThisRack, lengthInThisRack);
368+
return new PartitionLoadStats(queueSizesInThisRack, partitionIdsInThisRack, lengthInThisRack);
369+
}
370+
371+
private void invertAndFoldQueueSizeArray(int[] queueSizes, int maxSizePlus1, int length) {
372+
// queueSizes is expected to be non-empty, but checking just in case.
373+
if (queueSizes.length > 0) {
374+
queueSizes[0] = maxSizePlus1 - queueSizes[0];
375+
for (int i = 1; i < length; i++) {
376+
queueSizes[i] = maxSizePlus1 - queueSizes[i] + queueSizes[i - 1];
377+
}
378+
}
308379
}
309380

310381
/**
@@ -346,4 +417,15 @@ public PartitionLoadStats(int[] cumulativeFrequencyTable, int[] partitionIds, in
346417
this.length = length;
347418
}
348419
}
420+
421+
private static final class PartitionLoadStatsHolder {
422+
final PartitionLoadStats total;
423+
final PartitionLoadStats inThisRack;
424+
425+
private PartitionLoadStatsHolder(PartitionLoadStats total,
426+
PartitionLoadStats inThisRack) {
427+
this.total = total;
428+
this.inThisRack = inThisRack;
429+
}
430+
}
349431
}

0 commit comments

Comments
 (0)