[Proposal] Iceberg subsystem for datalake_fdw — design proposal #1683
Replies: 3 comments 6 replies
-
Beta Was this translation helpful? Give feedback.
-
If I am not mistaken - there is no way to There were discussion for Multi-Catalog support for Cloudberry (may be there are some useful thoughts) |
Beta Was this translation helpful? Give feedback.
-
|
Hi! Thank you very much for sharing your thoughts and ideas! We here in Moscow have struggled with the same issue. I must say that everyone is crazy about lakehouse architecture, especially since no one fully understands what it actually means. But anyway, I have formulated their wishes for myself as "using various databases to work with well-structured transactional data" I really appreciate efforts and willing to participate in development process. That's why it's very important for me to understand why we are doing it, which parts are important, and what types of work should be done first and what can be postponed until later stages. I'd like to focus on: The best SELECT performanceWhere the Cloudberry place in lakehouse world? Everyone know about Trino and Apache Doris/Starrocks, and most probably the kernel of feature lakehouse system will be one of them, not Cloudberry. We could try to catch up with them and achieve feature parity, doing the same as these products, not worse for a start, and preferably better. It's real, but it also takes a lot of effort and time. And still does not succeed. If we only accept that there are other databases and they do a better job and they are more likely to be used for it. We can set priorities and start by doing something better than everyone else. I mean we in MPP get used to everything should be properly distributed. And use one of the best cost-based optimizers to produce execution plan. Let's: Native Polaris intergrationWhy place metadata catalog outside the Cloudberry cluster? Let's make it a first-class citizen. One could configure the Apache Cloudberry cluster with the Polaris catalog. The Cloudberry can store data, and it can also be used for storing Polaris catalog data. And so, Cloudberry is once again the central element of the lakehouse. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Proposers
@MisterRaindrop
Proposal Status
Under Discussion
Abstract
1. Abstract
Cloudberry does not have a complete set of plug-in tools for accessing various data sources.
I plan to design a data lake approach to access these data sources, and evolve Cloudberry toward a data lake–enabled architecture.
datalake_fdwextends Cloudberry with two complementary ways of accessing data-lake storage:CREATE ICEBERG TABLEinside CB to create and manage Apache Iceberg tables with full SELECT / INSERT / UPDATE / DELETE / VACUUM, Schema Evolution, and snapshot-based Read Committed isolation.This document focuses on the second part — the design, the key decisions, and the open questions — and is meant for community review.
Motivation
2. Motivation & Goals
2.1 Why we need this
As an MPP data warehouse, Cloudberry has long lacked a transactional read / write entry point for data-lake formats, Iceberg in particular:
The Iceberg subsystem aims to introduce Iceberg tables as first-class "lake tables" in CB without breaking PostgreSQL / Cloudberry transactional semantics:
CREATE ICEBERG TABLE ...,INSERT,UPDATE,DELETE,VACUUM);SAVEPOINTis supported.2.2 Goals
The first release of this design aims to deliver:
2.3 Non-goals (outside the first release)
Implementation
3. Overall Architecture
The proposed design has four layers, split into a metadata path and a data path:
datalake_agentis a Java jar; it is launched and supervised by the PG bgworkerdatalake_proxyat postmaster startup (see §5.4);4. The Core Abstraction: Catalog × Volume × Table
The design splits an Iceberg table into three independently configurable, freely composable pieces:
metadata.jsonlocation, schema-evolution historyA Volume can be shared by multiple tables (different paths under the same bucket); a Catalog can reference multiple Volumes (different tables on different storage). Polaris is a special case — the storage configuration is dispatched by the Polaris service, so a user-side Volume is optional.
Why Catalog and Volume are separated
In real deployments they are orthogonal:
Making Catalog and Volume two separate FDWs, each with its own Server / UserMapping, lets us cover every combination without inventing a new FDW for each.
Builtin Catalog
For users with no external Catalog (Polaris / Hive) available, the design offers a Builtin option: the
metadata.jsonlocation is stored directly in a CB system table. Data files still live on the Volume, and other engines can open the table through Iceberg's HadoopCatalog / FileIO using that path.Why we need it: it removes the hard dependency on a Catalog service and lowers the barrier to entry. It also gives a zero-dependency option for the "CB is the only writer" single-writer scenario.
5. Components & Design Decisions
The following lists the design choice — and the reason behind it — for each key component.
5.1 Iceberg Table AM: why not a pure FDW
The most direct approach would be to keep using FDW, but two hard limitations get in the way:
Table AM (
TableAmRoutine), introduced in PG 14, is a first-class storage abstraction: from the SQL side an Iceberg table looks like an ordinary table, and UPDATE / DELETE /ctidsemantics, transactional callbacks, and ANALYZE all come for free from the kernel.The proposed approach is therefore: register Iceberg tables as a dedicated Table AM, and have the AM delegate data I/O to the Volume FDW internally (reusing the existing S3 / HDFS read / write code). We get the SQL consistency of tableam and avoid reimplementing the storage layer.
The core code will live in
src/am_iceberg/. The AM handler itself is very thin; the main logic is planned to be organized as follows:pg_iceberg_ddl.c—OAT_POST_CREATE / OAT_DROPhook; creates/drops Iceberg tables via the Catalog on DDL;pg_iceberg_catalog.c— unified wrapper for all Catalog calls;pg_iceberg_metadata.c— manages theiceberg.pg_iceberg_metadatasystem table;pg_iceberg_metadata_tracker.c— transaction-scoped metadata tracker (see §5.6);pg_iceberg_rewrite_plan.c— QD ↔ QE JSON contract for VACUUM compaction.5.2 Catalog FDW: abstracting three backends
iceberg_catalog_fdwabstracts metadata operations into a set ofIcebergCatalogOperations (create_table / load_table / drop_table / append / update / delete / get_fragment / get_statistics / plan_file_groups / commit_* and so on).The Server's
typeoption decides the backend:typepolarishiveUpwards, the AM only sees
pg_iceberg_*_with_catalog()functions. Downwards,agent_clitalks to the agent over RPC (Builtin is the exception — it short-circuits to the CB system table).Why FDW instead of a plain C function: it lets us reuse PG's
CREATE SERVER / USER MAPPINGfor credentials and permissions, and unifies the configuration entry point across multiple Catalog types.5.3 Volume FDW: the data-file I/O abstraction
iceberg_volume_fdwis planned to handle the actual read / write of data files and delete files (manifest / metadata json are managed by the Catalog side). It implements the full FDW interface:GetForeignRelSize / GetForeignPaths / BeginForeignScan / BeginForeignModify / ....Its responsibilities:
fdw_privateat plan time);segindex;The Server's
typeoption decides storage:s3/s3b(OSS / MinIO / OBS / …) /hdfs.5.4 datalake_agent: why a separate Java service
This is the single most important design trade-off.
Iceberg's metadata semantics are complex: manifest lists, snapshot logs, partition-spec evolution, schema field-id mapping, optimistic CAS commit, and so on. The community's most invested, most mature implementation is
iceberg-java.Reimplementing all of this on the C / C++ side would cost us:
Therefore the design delegates all metadata operations to a dedicated
datalake_agent(Java Spring Boot, wrapping iceberg-java + hive-jdbc + hadoop-client). The interface is planned to cover:/iceberg/tables— create / load / drop;/fragments— plan files (with predicate pushdown);/modify— incremental snapshot generation;/commit— CAS commit;/plan-rewrite+/commit-rewrite— VACUUM.Upside:
Cost: one extra network hop — but only on the metadata path; data I/O still goes straight from C++ to storage, so throughput is unaffected.
Process lifecycle: managed by the
datalake_proxybgworkerTo tie the agent's lifecycle to the CB cluster and spare users from babysitting a Java process, the design introduces
datalake_proxy(contrib/datalake_proxy/), a PG background worker (bgworker):datalake_proxyis registered inshared_preload_librariesand starts with the postmaster;_PG_init, a bgworker is registered thatforks a child process to run the agent jar on startup;datalake_proxyrestarts it;datalake_proxy.register_datalake_proxytoggles the feature;datalake_proxy.dlagent_memory_limit(default 2 GB) caps the agent's JVM heap;datalake_proxyto the agent for a clean shutdown.From the user's perspective this means "CB is up → Iceberg is available" — no extra deployment, no extra supervisor.
RPC protocol: gRPC
JSON / REST has two pain points at scale — large fragment lists cost CPU to encode / decode, and plan-file results are slow to deserialize when they get big. The plan is to expose the same interface over protobuf + gRPC:
get_fragmentscan be server-streamed) reduce QD memory pressure;5.5 Provider layer: the data plane
src/provider/iceberg/is planned to be a C++ implementation covering Iceberg's data plane:file_path string, pos long);FileScanTaskinto a row reader.Why Provider does not go through the agent: data I/O is the system's throughput bottleneck. Only by having each segment read / write storage independently and in parallel can we sustain MPP-scale writes. Meanwhile, mature C++ libraries already exist for Parquet (arrow-cpp / orc) — reusing them is far more efficient than routing through an agent.
5.6 Metadata Tracker: the heart of transactional semantics
The problem: Iceberg uses optimistic CAS (via the metadata.json version chain) for concurrency, while PG uses MVCC. How do we fit Iceberg's snapshot semantics inside a PG transaction?
The design: a transaction-scoped
Metadata Tracker. Its shape is inspired by Rust iceberg-rs'sMetadataLocationTrackerand pg_lake'sIcebergSnapshotBuilder.Under this design, modifications to an Iceberg table within a transaction flow as follows:
Three rebase trigger points:
The resulting semantics:
level_historystack, recording the metadata and file counts before each nested-transaction modification.5.7 Deletion Queue: why asynchronous cleanup
DROPping an Iceberg table, replacing old files during VACUUM, orphans left behind by a rolled-back transaction — all of these need deletions against object storage.
Why not delete synchronously: a single Iceberg table can reference tens of thousands to millions of files. Synchronous deletion inside the transaction would make DDL block for a long time, and a mid-way failure would leave the system in a "metadata gone, files stranded" inconsistent state.
The design: an
iceberg.pg_iceberg_deletion_queuesystem table plus a background task.DELETION_TYPE_METADATA);DELETION_TYPE_FILE);retry_count++and are retried later, giving idempotency.6. End-to-End Flows
Execution paths for each key SQL under this design.
CREATE ICEBERG TABLE
pg_class / pg_attribute / pg_lake_table;OAT_POST_CREATEhook on the QD calls the agent's/iceberg/tablesto produce the initial metadata.json;iceberg.pg_iceberg_metadata.SELECT
scan_get_am_privateand obtains the metadata_location "that this scan should see" (an already-modified table triggers one rebase);/fragments(with pushdown predicates) and receivesList<FileScanTask>;segindex;INSERT / UPDATE / DELETE
tracker.apply_updates_with_rebase:data_files / delete_files;/modifyto generate a new intermediate metadata.json.tracker_commit_allperforms the CAS for every modified table.VACUUM
/plan-rewriteand receives a rewrite plan (groups built from min-input-files + target-file-size);/commit-rewriteto commit a RewriteFiles snapshot;DROP
OAT_DROPhook enqueues the metadata_location into the deletion queue;pg_iceberg_metadatais removed;7. MPP Execution Model
The responsibilities are divided as follows under MPP.
7.1 QD vs QE responsibilities
Principle: only the QD talks to the agent. Letting N QEs hit the agent in parallel would both make the agent a bottleneck and introduce concurrent writes to Iceberg snapshot state, which brings its own complexity. The parallel part is the data I/O.
7.2 Fragment dispatch
The QD places
List<FileScanTask>into the plan tree; it is serialized and dispatched to QEs. Each QE picks its fragments round-robin bysegindex % segcount.The GUC
datalake.external_table_limit_segment_numcan cap the number of segments that participate in a scan — useful when joining with small tables to reduce dispatch overhead.7.3 Global file-id consistency
UPDATE / DELETEplans may include a Redistribute Motion that ships a row from QE-i to QE-j. QE-j, when it later dereferences the ctid, must still be able to resolve it back to its original file.Under this design, ctids are encoded as
<file_id, row_pos>. To let any QE resolve a ctid from any origin,BeginForeignModifypre-populates a global file-id map using the full fragment list (not just the subset assigned to the current QE).8. Pushdown & Optimization
WHERE clauses are translated through
deparse.cinto the agent's FilterNode tree; the agent then converts that into an IcebergExpression, applying partition pruning + manifest min/max filtering atplanFilestime. Operators planned for pushdown:=, !=, >, <, >=, <=, IS [NOT] NULL, LIKE, IN, AND, OR.The Provider C++ layer then applies row-group filtering + residual predicates + column projection.
A fragment cache (GUC
datalake.enable_iceberg_fragment_cache, defaulton) cachesmetadata_location + filter→ plan result within a single backend, avoiding repeated trips to the agent.9. Concurrency with External Engines
Community Iceberg engines (Spark / Trino / …) may write the same table concurrently. Under this design:
global != last_baseand replan (accumulated files are reapplied on top of the new global);10. Extensibility
New Catalog type (Nessie / Glue / in-house):
Catalogconstruction on the agent side;typebranch on the PG side.Because all Iceberg semantics live in the agent, the PG-side change is minimal.
New storage backend:
typeand handle its connection parameters.New DML shapes (MERGE / UPSERT): mostly planner work; the underlying "write data file + write position-delete" primitives can be reused.
11. Outside the First Release (follow-up work)
Items the first release will not cover and that will be discussed in later iterations:
12. Appendix
12.1 Key GUCs (planned)
iceberg_default_catalog''iceberg_default_volume''datalake_agent_server_urldatalake.enable_iceberg_fragment_cacheondatalake.iceberg_vacuum_compact_min_input_files10datalake.iceberg_vacuum_rewrite_target_file_size_mb512datalake.iceberg_postion_deletes_threshold100000datalake.external_table_limit_segment_num0datalake.disable_filter_pushdownoffdatalake.iceberg_autovacuumoffdatalake.iceberg_autovacuum_naptime60012.2 New system tables (planned)
iceberg.pg_iceberg_metadata— current metadata location for each Iceberg tablerelidmetadata_locationprevious_metadata_locationis_internaldefault_spec_idiceberg.pg_iceberg_deletion_queue— queue of files to be cleaned uppathtable_nameorphaned_atretry_countdeletion_type0 = FILE/1 = METADATA12.3 Planned code layout
Suggested review focus:
datalake_proxybgworker process model is the right way to host the Java agent;Rollout/Adoption Plan
No response
Are you willing to submit a PR?
Beta Was this translation helpful? Give feedback.
All reactions