Skip to content

Commit 30cea6f

Browse files
authored
KAFKA-20456: Use lighter options for RocksDB offsets column family (#22085)
This change gives the offsets column family its own lightweight `ColumnFamilyOptions` instead of sharing the data CF's options. The offsets CF stores only a handful of key-value pairs (one per changelog partition) — it doesn't need the large write buffers, bloom filters, or aggressive compaction configured for the data CF. Reviewers: Nick Telford <nick.telford@gmail.com>, Matthias J. Sax <matthias@confluent.io>
1 parent 0a367aa commit 30cea6f

5 files changed

Lines changed: 22 additions & 6 deletions

File tree

streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeaders.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ void openRocksDB(final DBOptions dbOptions,
6161
dbOptions,
6262
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions),
6363
new ColumnFamilyDescriptor(SESSION_STORE_HEADERS_VALUES_COLUMN_FAMILY_NAME, columnFamilyOptions),
64-
new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, columnFamilyOptions)
64+
new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, createOffsetsCFOptions())
6565
);
6666
final ColumnFamilyHandle noHeadersColumnFamily = columnFamilies.get(0);
6767
final ColumnFamilyHandle withHeadersColumnFamily = columnFamilies.get(1);

streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingWindowStoreWithHeaders.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ void openRocksDB(final DBOptions dbOptions,
6868
dbOptions,
6969
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions),
7070
new ColumnFamilyDescriptor(WINDOW_STORE_HEADERS_VALUES_COLUMN_FAMILY_NAME, columnFamilyOptions),
71-
new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, columnFamilyOptions)
71+
new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, createOffsetsCFOptions())
7272
);
7373
final ColumnFamilyHandle noHeadersColumnFamily = columnFamilies.get(0);
7474
final ColumnFamilyHandle withHeadersColumnFamily = columnFamilies.get(1);

streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,12 +310,28 @@ private void addValueProvidersToMetricsRecorder() {
310310
}
311311
}
312312

313+
/**
314+
* Creates lightweight {@link ColumnFamilyOptions} for the offsets column family. The offsets CF
315+
* stores only a small number of key-value pairs (one per changelog partition), so it does not
316+
* need the heavyweight options used for the data CF (large write buffers, bloom filters,
317+
* aggressive compaction). Sharing the data CF's options causes unnecessary write amplification
318+
* and compaction pressure that can contribute to RocksDB write stalls under heavy restore I/O.
319+
*/
320+
protected static ColumnFamilyOptions createOffsetsCFOptions() {
321+
final ColumnFamilyOptions offsetsCFOptions = new ColumnFamilyOptions();
322+
offsetsCFOptions.setCompressionType(CompressionType.NO_COMPRESSION);
323+
offsetsCFOptions.setCompactionStyle(CompactionStyle.LEVEL);
324+
offsetsCFOptions.setWriteBufferSize(1024 * 1024L); // 1MB — sufficient for offset metadata
325+
offsetsCFOptions.setMaxWriteBufferNumber(2);
326+
return offsetsCFOptions;
327+
}
328+
313329
void openRocksDB(final DBOptions dbOptions,
314330
final ColumnFamilyOptions columnFamilyOptions) {
315331
final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
316332
dbOptions,
317333
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions),
318-
new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, columnFamilyOptions)
334+
new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, createOffsetsCFOptions())
319335
);
320336

321337
cfAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(1), columnFamilies.get(0));

streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ void openRocksDB(final DBOptions dbOptions,
5757
dbOptions,
5858
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions),
5959
new ColumnFamilyDescriptor(TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME, columnFamilyOptions),
60-
new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, columnFamilyOptions)
60+
new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, createOffsetsCFOptions())
6161
);
6262
final ColumnFamilyHandle noTimestampColumnFamily = columnFamilies.get(0);
6363
final ColumnFamilyHandle withTimestampColumnFamily = columnFamilies.get(1);

streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ private void openFromDefaultStore(final DBOptions dbOptions,
9898
dbOptions,
9999
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions),
100100
new ColumnFamilyDescriptor(TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME, columnFamilyOptions),
101-
new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, columnFamilyOptions)
101+
new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, createOffsetsCFOptions())
102102
);
103103

104104
final ColumnFamilyHandle defaultCf = columnFamilies.get(0);
@@ -139,7 +139,7 @@ private void openFromTimestampedStore(final DBOptions dbOptions,
139139
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions),
140140
new ColumnFamilyDescriptor(LEGACY_TIMESTAMPED_CF_NAME, columnFamilyOptions),
141141
new ColumnFamilyDescriptor(TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME, columnFamilyOptions),
142-
new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, columnFamilyOptions)
142+
new ColumnFamilyDescriptor(OFFSETS_COLUMN_FAMILY_NAME, createOffsetsCFOptions())
143143
);
144144

145145
try {

0 commit comments

Comments
 (0)