[parquet] Add map shredding for hot keys#7877
Conversation
|
Benchmark command: mvn -s ~/.m2/apache-community.xml -pl paimon-format -am -Pfast-build \
-DfailIfNoTests=false -Dtest=MapShreddingStorageBenchmark testBenchmark file: [MapShreddingStorageBenchmark.java] Common Setup
Results
Scenario Details
Conclusion: in this synthetic storage benchmark, map shredding reduces file size in both cases. The biggest gain appears when hot map keys are long and repeated across many rows, saving about |
|
This looks very suitable to be solved using Variant, why not? |
Hi @JingsongLi here are two reason we considering to introduce the shredding to the map
|
Can you demonstrate some benchmarks? As you said, the difference in storage between map and variant? |
JingsongLi
left a comment
There was a problem hiding this comment.
Review: [parquet] Add map shredding for hot keys
Overall this is a well-structured feature that extends the Parquet format layer to promote frequently-occurring map keys into independent columnar sidecar columns. The physical layout is clean, the metadata round-trip through footer key-value pairs is self-contained, and the tests cover multiple value types (STRING, INT, VARIANT) plus nested maps. A few observations follow.
Correctness Concerns
1. Comma in hot key names breaks metadata parsing
Hot keys are serialized into footer metadata as a comma-separated string (String.join(",", keys) in toFooterMetadata), and deserialized using the same parseColumnPaths that splits on commas. If a map key legitimately contains a comma (e.g. "Content-Type, charset"), it will be incorrectly split during reads. Consider using a delimiter that is less likely to appear in map keys or adding proper escaping (e.g. backslash-escape or JSON array encoding).
2. MapSidecarWriter.valueIndex mutable state coupling
In ParquetRowDataWriter, MapSidecarWriter.findValueIndex() stores the found index in a mutable field valueIndex, which is later consumed by write(). The correctness depends on RowFieldWriter.shouldWrite() always being called immediately before write() for the same row. If this invariant is ever violated (e.g. a future refactor), the wrong value or a stale index would be written silently. A safer pattern would be to pass the found index explicitly, or recompute it in write().
3. InternalRowToSizeVisitor allocated per-row in the extractor
In MapShreddingKeyExtractor.add():
BiFunction<DataGetters, Integer, Integer> valueSizer =
column.valueType().accept(new InternalRowToSizeVisitor());This creates a new visitor object on every call to add() for every column. Since the visitor is stateless and depends only on column.valueType() (which is fixed), it should be pre-computed once per column during construction and cached in ResolvedMapShreddingColumn or in the extractor itself. For 10,000 buffered rows x N columns, this is significant unnecessary allocation.
Design Observations
4. Hot key selection metric: total byte size vs. frequency
The extractor ranks keys by total accumulated value size (sizes.merge(key.toString(), valueSize, Long::sum)). This means a rare key with a single 10 KB value can outrank a key that appears in 99% of rows but has small values. The storage benefit of shredding comes primarily from eliminating repeated key strings and improving columnar encoding. A frequency-based or hybrid metric (frequency * average_value_size) might give better compression in practice. Worth documenting the trade-off in the config description at minimum.
5. Sidecar field ID collision risk
The sidecar column field IDs are computed as:
SpecialFields.getMapValueFieldId(field.id(), 1) + 1024 + iThe magic 1024 offset is not validated against the existing field ID space. For schemas with many fields or deeply nested types that already use high IDs, this could silently collide. Consider deriving IDs from a dedicated namespace or adding a collision check during schema conversion.
6. reachTargetSize during the buffering phase
In MapShreddingFormatWriter.reachTargetSize(), when the delegate is null (still buffering), it returns stream.getPos() >= targetSize. But during buffering no data has been written to the stream yet (position is 0), so this always returns false. This is likely fine in practice since the buffer phase is bounded by maxInferBufferRow/maxInferBufferMemory, but the semantics are misleading. A comment explaining this would help future readers.
7. Schema cache bypass for shredded files
When dynamicMapKeys is non-empty, getOrCreateRequestedSchema skips the requestedSchemaCache and recomputes the schema every time. Since each file can have different hot keys, this is correct, but it could become a performance concern if many small files are read. Consider keying the cache on (fileSchema, dynamicMapKeys) if profiling shows this matters.
Minor / Style
MapShreddingStorageBenchmarkduplicates the option key strings as constants (MAP_SHREDDING_COLUMNS, etc.) instead of referencingCoreOptions.MAP_SHREDDING_COLUMNS.key(). Using the canonical keys avoids silent drift.- The
hasResidualEntrymethod inMapWriteriterates all entries to check whether any should not be skipped. In the common case where the map is mostly composed of hot keys (which are skipped), this results in O(n) per row to determine if the repeated group should be opened at all. This is functionally correct but could be noted as a potential optimization point for large maps. - The option naming uses camelCase (
maxInferBufferRow,maxInferBufferMemory) while the existing variant shredding options and most other Paimon options use dot-separated lowercase (max-infer-buffer-row). Aligning with the project convention would be more consistent.
Good work overall. The feature is well-isolated, the test coverage is thorough (especially testing Variant values inside shredded maps), and the benchmark demonstrates clear storage savings.
Purpose
Add Parquet map shredding support for
MAP<STRING, T>columns.This allows selected map columns to extract hot keys into independent physical Parquet columns while preserving the original logical map schema for readers. The feature is controlled by
map.shredding.*options, aligned with the existingvariant.shredding.*naming style. It also adds a focused round-trip test and a storage benchmark to validate the storage benefit.Tests
mvn -pl paimon-api,paimon-format -Pfast-build -DskipTests compilemvn -pl paimon-format -am -Pfast-build -DfailIfNoTests=false -Dtest=ParquetFormatReadWriteTest#testMapShreddingRoundTrip,MapShreddingStorageBenchmark testgit diff --checkPhysical Layout
This change does not introduce a new Parquet logical type and does not modify the standard Parquet
MAPencoding. A shredded map is still written with the regular Parquet map group as the residual map. Hot keys are promoted into additional sibling sidecar columns in the parent Parquet group.For example, a logical field:
is normally written as:
With map shredding enabled, if
user-agentandhostare selected as hot keys, the physical Parquet schema becomes:The footer metadata records the mapping from sidecar columns to map keys:
During writing, entries for promoted hot keys are omitted from the residual map when their values are non-null, and their values are written into the corresponding sidecar columns. During reading, Paimon reads both the residual map and the sidecar columns, then reconstructs the original logical
MAP<STRING, T>value.For nested maps, the same rule applies within the containing row group. For example, for
payload.headers, sidecar columns are added as siblings of theheadersmap inside thepayloadgroup, and the footer metadata uses the full logical path:#7876