[core][flink] Supporting Per-Partition Bucket Counts#7865
Conversation
40d518a to
6bc875f
Compare
6bc875f to
c13abfa
Compare
JingsongLi
left a comment
There was a problem hiding this comment.
Review: Per-Partition Bucket Counts
Nice feature — per-partition rescaling without touching unaffected partitions addresses a real operational pain point for skewed workloads. The overall design (partition-bucket mapping loaded from manifests, SchemaBucketFileStoreTable wrapper for overwrite paths) is sound. A few observations below:
1. Silent exception swallowing in PartitionBucketMapping.loadFromScan
} catch (Exception e) {
return new PartitionBucketMapping(defaultBuckets, Collections.emptyMap());
}If the manifest scan fails (e.g., corrupted manifest, transient I/O error), the code silently falls back to an empty mapping. This means ALL writes would route using the table default bucket count, potentially placing rows in the wrong buckets for already-rescaled partitions — causing silent data corruption (duplicate keys across buckets). At minimum this should log the exception at WARN level so operators have a chance to detect it. Consider whether failing fast would be safer here.
2. PartitionBucketMapping staleness in long-running streaming jobs
The mapping is loaded once at job start (in AbstractFileStoreTable.newWriteSelector() / createRowKeyExtractor()). If a partition is rescaled while a streaming job is running (e.g., via the rescale procedure), the running job will continue routing rows using the old mapping. For a partition that was rescaled from 4 to 8 buckets, the streaming job will keep writing to buckets [0,3], potentially missing the new buckets or conflicting with the new layout.
Is there a plan for streaming jobs to detect/reload the mapping (e.g., on checkpoint/restore)? The TableWriteCoordinator does refresh on snapshot changes, which covers the coordinator-based path, but the normal streaming sink path (FlinkSinkBuilder.buildForFixedBucket) loads the mapping once. A documentation note about requiring a restart after rescale would help.
3. PartitionEntry.merge() tie-breaking semantics
When two entries have equal lastFileCreationTime, the receiver's (this) totalBuckets wins:
int newTotalBuckets =
lastFileCreationTime >= entry.lastFileCreationTime
? totalBuckets
: entry.totalBuckets;This means a.merge(b) and b.merge(a) can produce different totalBuckets values when timestamps match but bucket counts differ. If the reduce/aggregation pipeline in PartitionEntry processing doesn't guarantee stable merge order, this could lead to nondeterministic partition bucket counts. The test testMergeWithEqualCreationTimeTakesFirstTotalBuckets documents the behavior, but it would be good to confirm that the scan pipeline guarantees a deterministic accumulation order for entries with identical creation timestamps.
4. TableWriteCoordinator scan reuse concern
In loadPartitionBucketMapping():
this.partitionBucketMapping = PartitionBucketMapping.loadFromScan(scan, defaultNumBuckets);The scan field is shared between loadPartitionBucketMapping (called in refresh()) and the scan(ScanCoordinationRequest) method (which calls scan.withPartitionBucket(...)). If readPartitionEntries() mutates internal scan state (e.g., filters), subsequent withPartitionBucket calls in scan() could produce incorrect results. This appears safe given that withSnapshot is called before both uses, but worth a defensive comment or using a separate scan instance for the mapping load.
5. Serialization size for large partition maps
PartitionBucketMapping is Serializable and is embedded in FixedBucketWriteSelector / FixedBucketRowKeyExtractor, which get serialized across Flink task managers. For tables with many rescaled partitions, the Map<BinaryRow, Integer> could become non-trivial. The optimization to only store partitions that differ from the default is good. Just noting that for extreme cases (tens of thousands of individually rescaled partitions), this could impact checkpoint/serialization size.
6. Minor: SchemaBucketFileStoreTable missing newWrite(commitUser, writeId, rowKeyExtractor) override
SchemaBucketFileStoreTable overrides newWrite(String, Integer) to inject its own extractor, but inherits newWrite(String, Integer, RowKeyExtractor) from DelegatedFileStoreTable which just passes through to the wrapped table. If someone calls the 3-arg overload on a SchemaBucketFileStoreTable, the custom extractor logic is bypassed. This is unlikely in practice (callers use the 2-arg version), but could be a footgun for future changes. Consider adding:
@Override
public TableWriteImpl<?> newWrite(
String commitUser, @Nullable Integer writeId, RowKeyExtractor rowKeyExtractor) {
// Ignore the passed extractor; always use schema-bucket-based routing
return wrapped().newWrite(commitUser, writeId, createRowKeyExtractor());
}7. Test coverage
The test suite is thorough — particularly the RescaleBucketITCase.testWriteToEmptyBucketAfterRescaleKeepsPartitionBucketCount which carefully sets up the preconditions for the empty-bucket edge case. The PartitionEntryTest covering merge order independence is also valuable. One gap: there's no test for the scenario where loadFromScan hits an exception — a test verifying the fallback (or better, the logged warning) would strengthen confidence in that path.
Overall this is a well-structured contribution with clear separation of concerns. The main risk area is the silent fallback in loadFromScan (point 1) which could lead to hard-to-diagnose data issues in production. The rest are design/robustness suggestions.
Problem
In partitioned Paimon tables, all partitions share the same bucket count defined at the table level. This becomes a bottleneck when data is highly skewed: a "hot" partition (e.g., a large tenant) may receive orders of magnitude more data than other partitions, yet it is forced to use the same number of buckets. The only workaround was to increse the number of buckets for the entire table, but that in turn end up creating too many buckets for smaller partitions, leading to a small file problem.
Solution
This PR introduces per-partition bucket counts, allowing individual partitions to be independently rescaled. Skewed partitions can be split into more buckets without affecting the rest of the table.
The core idea is a new
PartitionBucketMappingthat maintains an explicitpartition → bucket countmap alongside a table-level default. Every component that needs to assign a bucket to a row (write selectors, key extractors) now consults this mapping rather than blindly usingschema().numBuckets(). Each partition's bucket count is derived from thetotalBucketsfield already stamped on its data files in the manifest, so no schema migration is required.Changes
Core (
paimon-core)PartitionBucketMapping(new) — Serializable mapping ofBinaryRow partition → int bucketCount, with aloadFromTablefactory that scans the manifest to reconstruct the current per-partition layout and falls back to the schema default gracefully.SchemaBucketFileStoreTable(new) — A lightweightDelegatedFileStoreTablewrapper used during rescale/overwrite operations. It forces all writes to use the new target bucket count (ignoring the per-partition map), ensuring the overwrite lands in the right buckets.FixedBucketRowKeyExtractor/FixedBucketWriteSelector— Updated to accept aPartitionBucketMappingand callresolveNumBuckets(partition)per row instead of using a fixed global count.WriteRestore/FileSystemWriteRestore— Extended withextractTotalBucketslogic that correctly handles three cases: non-empty buckets (use the value from existing data files), empty buckets on partitioned tables (look up the per-partition override), and empty buckets on unpartitioned tables (fall back to schema default so the committer-side mismatch check still fires).PartitionEntry— Minor fix for correct behaviour in non-partitioned table corner cases.Flink (
paimon-flink)FlinkSinkBuilder— WiresPartitionBucketMappinginto the streaming sink pipeline so that per-partition bucket routing is applied at ingest time.RescaleAction/CompactAction— UseRescaleFileStoreTablewhen performing rescale/overwrite so the new bucket count is applied only to the target partitions.RowDataChannelComputer— Updated to route rows to the correct sub-task using the per-partition bucket count.TableWriteCoordinator/PostponeFixedBucketChannelComputer— Fixed to handle the "empty bucket" scenario that can arise in write-restore flows when a partition exists in the mapping but has no files yet.RowDataKeyAndBucketExtractor(deleted) — Test helper class replaced with using the superclass types directly.Behaviour
RuntimeExceptionis thrown if this is violated.rescaleprocedure or a manualINSERT OVERWRITEin batch mode:After the job completes, the rescaled partition uses 32 buckets while all other partitions are untouched.
Testing
We haven been soaking this change in our test environments and we are seeing good results. Plus, we add a bunch of new tests to validate we are not breaking anything:
•
PartitionBucketMappingTest— unit tests for mapping resolution and loadFromTable.•
FixedBucketRowKeyExtractorTest— verifies correct bucket assignment with heterogeneous per-partition counts.•
FileStoreCommitTest— integration tests covering rescale commits with mixed bucket counts.•
FileSystemWriteRestoreTest— covers the empty-bucket write-restore scenario end-to-end, including the non-partitioned corner case.•
RescaleBucketITCase— end-to-end Flink integration tests for INSERT OVERWRITE-based rescale and streaming restore after rescale.•
RescaleActionITCase— end-to-end tests for the rescale procedure action with per-partition targeting.•
TableWriteCoordinatorTest— unit tests for coordinator behaviour under the new mapping.