Skip to content

Parquet reader refactor: move to a Streamly-based streaming pipeline (bounded memory, clearer structure, optional concurrency) #133

@mchav

Description

@mchav

Summary

The current Parquet reader in dataframe works, but its control-flow and data-flow are largely “batch oriented”: we tend to read/accumulate buffers and intermediate structures (page bytes, decoded values, etc.) in lists/vectors before producing the final Column/DataFrame. This makes it harder to:

  • guarantee bounded memory for large row groups / wide tables
  • structure the decode pipeline as composable stages
  • add concurrency safely (e.g. per-column-chunk, per-page, per-row-group)
  • instrument progress/backpressure/early termination

This issue proposes refactoring the Parquet reader to use Streamly as the backbone for:
file → row groups → column chunks → pages → decoded values → column builders.

The goal is not to “rewrite everything”, but to introduce a streaming API and progressively migrate internals so we can keep memory stable and make future features easier (predicate pushdown, projection pushdown, row-group skipping, async IO, etc.).


Motivation / Why Streamly

Streamly gives us:

  • Streaming IO with backpressure (avoid holding entire pages/columns in memory)
  • Compositional pipelines (clear “stages” with well-defined responsibilities)
  • Optional concurrency (map/parMap style over row groups / column chunks)
  • A path to incremental decoding + building without inventing our own streaming abstractions

Even if we keep a “read whole column” convenience API at the top-level, the internals can be streaming.


Current implementation (as-is, high level)

This is a conceptual description of what the current reader effectively does (names may differ by module/function):

  1. Open file and parse footer/metadata to get:

    • schema, logical types
    • row groups, column chunks (offsets, sizes, compression codecs)
    • encodings, statistics, dictionary page presence, etc.
  2. For each row group:

    • For each column chunk:

      • Seek to the chunk offset

      • Read chunk bytes (or read page-by-page, but often collecting)

      • Parse PageHeader → read page payload

      • Decode:

        • repetition levels / definition levels
        • dictionary pages + indices
        • data pages (plain / dict / RLE / bit-pack)
        • decompress if needed (snappy/gzip/zstd/etc.)
      • Convert decoded values into a DI.Column / Column representation:

        • handle nullability via def levels
        • handle types (int32/int64/byte array/fixed len byte array, etc.)
        • produce final vector(s), sometimes via intermediate lists/vectors
  3. Combine decoded columns into a DataFrame.

Pain points in the current shape

  • Intermediate accumulation: lists of pages, indices, values, def levels, etc.

  • IO patterns: ad-hoc seeking/reading, unclear buffering strategy, some repeated allocation

  • Difficult to add concurrency without risking spikes in memory or complicating code

  • Harder to implement:

    • “stop early” (e.g. row limit)
    • projection pushdown (read only selected columns)
    • row-group skipping based on stats (without preloading more than necessary)
    • robust progress instrumentation

Proposal: Streamly-based Parquet decode pipeline

Guiding principles

  • Keep a stable public API (or add a streaming variant alongside it).
  • Define a small set of streaming primitives that become the “spine” of Parquet reading.
  • Decode page-by-page, producing incremental column chunks.
  • Avoid “decode everything into lists then convert”; instead “decode stream → builder”.

New internal pipeline (conceptual)

Stage A — Metadata

  • Parse footer/metadata as today (this is small; no need to stream).

Stage B — Row group streaming

  • Stream RowGroupInfo

Stage C — Column chunk streaming

  • For each row group:

    • Stream ColumnChunkInfo

Stage D — Page streaming

  • For a given column chunk:

    • Seek to chunk start

    • Stream:

      • PageHeader
      • PagePayload (decompressed bytes or a decompression stream)

Stage E — Decode streaming

  • Convert page payload stream into typed value stream:

    • handle dictionary pages by updating per-chunk decode state
    • handle data pages by emitting decoded values incrementally
    • handle def/rep levels by emitting Maybe a or a (values + bitmap) representation

Stage F — Column building

  • Feed the decoded stream into a builder:

    • Stream (Maybe a) or Stream a + null bitmap
    • produce final DI.Column / Column at end of stream
    • optionally produce chunked output and concatenate at end

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions