Add metrics to FFI_ExecutionPlan#22136
Conversation
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
There was a problem hiding this comment.
Pull request overview
This PR extends datafusion-ffi so ExecutionPlan::metrics() can be accessed across the FFI boundary, preserving execution metrics for foreign plans (e.g., for EXPLAIN ANALYZE and DisplayableExecutionPlan::with_metrics(...)) by marshalling metrics as a snapshot.
Changes:
- Add FFI-stable mirrors of
MetricsSet,Metric, and allMetricValuevariants (plus related metric types) with bidirectional conversions. - Add a
metricsfunction pointer toFFI_ExecutionPlanand implementForeignExecutionPlan::metrics()via that callback. - Expose
RatioMetricsinternals needed for marshalling via new accessors; addchronoas a directdatafusion-ffidependency for timestamp conversion.
Reviewed changes
Copilot reviewed 5 out of 6 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| datafusion/physical-expr-common/src/metrics/value.rs | Adds RatioMetrics accessors needed to snapshot/serialize merge strategy and display options. |
| datafusion/ffi/src/metrics.rs | Introduces FFI-stable metric types and snapshot conversions, plus round-trip tests. |
| datafusion/ffi/src/lib.rs | Exposes the new metrics module publicly. |
| datafusion/ffi/src/execution_plan.rs | Adds FFI_ExecutionPlan.metrics callback and wires it into ForeignExecutionPlan::metrics(); adds an integration-style test. |
| datafusion/ffi/Cargo.toml | Adds direct chrono dependency for timestamp marshalling. |
| Cargo.lock | Records the new chrono dependency for datafusion-ffi. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /// Snapshot the plan's execution metrics. Returns `None` when the | ||
| /// underlying [`ExecutionPlan::metrics`] returned `None`. | ||
| pub metrics: unsafe extern "C" fn(plan: &Self) -> FFI_Option<FFI_MetricsSet>, | ||
|
|
There was a problem hiding this comment.
This is expected.
| fn timestamp_from_ffi(nanos: FFI_Option<i64>) -> Timestamp { | ||
| let ts = Timestamp::new(); | ||
| if let Some(n) = nanos.into_option() { | ||
| ts.set(DateTime::<Utc>::from_timestamp_nanos(n)); | ||
| } | ||
| ts | ||
| } |
There was a problem hiding this comment.
Will we lose something if we round trip and the original is not UTC?
There was a problem hiding this comment.
That shouldn't be possible. Timestamp is composed of Arc<Mutex<Option<DateTime<Utc>>>>, and is documented as only storing UTC-nanoseconds.
Not sure why it shows up on the FFI enum but not the original
timsaucer
left a comment
There was a problem hiding this comment.
Thank you for the contribution!
### Related * Probably closes RR-4596 * Part of RR-4594 ### What Surfaces the per-query DataFusion metrics we already record on every dataset query — plan-time decisions (`filters_pushed_down`, `filters_applied_client_side`, `entity_path_narrowing_applied`, `query_chunks`, `query_segments`, etc.) and execution-time fetch counters (`fetch_grpc_bytes`, `fetch_direct_bytes`, `fetch_direct_retries_total`, etc.) — to Python users via a `rerun.experimental.query_metrics()` context manager. These are the same numbers that already should surface in Rust-side `EXPLAIN ANALYZE` and PostHog analytics spans, not new instrumentation. #### Why not just `df.explain(analyze=True)`? Datafusion's `FFI_ExecutionPlan` is missing methods from `trait ExecutionPlan`, including the metrics hook. I've opened a PR apache/datafusion#22136 to fix this, but it's a breaking change and will require a major version bump and this is a prag******** short-term solution. We can decide if we want to remove `query_metrics()` if/when the Datafusion PR lands (which is why it lives in experimental). #### Example ```python import tempfile from pathlib import Path import numpy as np from datafusion import col, lit import rerun as rr from rerun.experimental import QueryMetrics, query_metrics def throughput_mb_s(q: QueryMetrics) -> float: """Effective fetch throughput (plan + fetch + decode), MB/s.""" fetched = q.fetch_grpc_bytes + q.fetch_direct_bytes if q.total_duration_us <= 0 or fetched <= 0: return 0.0 return (fetched / 1e6) / (q.total_duration_us / 1e6) rng = np.random.default_rng(0) with tempfile.TemporaryDirectory() as tmp: # 3 segments × 50 rows, one Points3D entity. paths = [] for i in range(3): rec = rr.RecordingStream("demo", recording_id=f"seg{i}") for j in range(50): rec.set_time("t", sequence=j) rec.log("/points", rr.Points3D(rng.random((10, 3)))) p = Path(tmp) / f"seg{i}.rrd" rec.save(str(p)) paths.append(p) with rr.server.Server(datasets={"demo": paths}) as server: ds = server.client().get_dataset(name="demo") seg0 = ds.segment_ids()[0] with query_metrics() as m: # Q1: scan everything ds.reader(index="t").limit(100).collect() # Q2: filter on rerun_segment_id pushes into query_dataset ds.reader(index="t").filter(col("rerun_segment_id") == lit(seg0)).limit(100).collect() for label, q in zip(["all segments", "one segment"], m.queries): fetched = q.fetch_grpc_bytes + q.fetch_direct_bytes print( f"{label:>13}: chunks={q.query_chunks:>3} " f"bytes={fetched:>6} pushdown={q.filters_pushed_down} " f"{q.total_duration_us / 1000:>5.1f} ms " f"{throughput_mb_s(q):>5.2f} MB/s" ) ``` Output: ``` all segments: chunks= 3 bytes= 32096 pushdown=0 0.6 ms 51.85 MB/s one segment: chunks= 1 bytes= 11208 pushdown=1 0.6 ms 20.12 MB/s ``` ### Testing * All tests pass * Added e2e test to check the context manager * Changed some existing tests to validate pushdown filtering (before they couldn't assert that before) ### Compatibility Full compatibility (client-side changes only). I put the context manager in `rerun.experimental` so we can choose to remove it if the Datafusion PR lands soon. --------- Source-Ref: e2b958694d7bad386083108a03fcce6794727f77 Co-authored-by: Emil Ernerfeldt <emil.ernerfeldt@gmail.com>
## Which issue does this PR close? - Closes apache#22152 ## Rationale for this change `ExecutionPlan::partition_statistics` and `TableProvider::statistics` are not currently transported across the DataFusion FFI boundary, so foreign plans and providers always report `Statistics::new_unknown` / `None`. This blocks optimizer rules that depend on statistics (e.g. join reordering, partition pruning) from working with out-of-process plugins, which defeats the point of exposing those hooks to plugin authors. `Statistics` contains `Precision<ScalarValue>` for column min/max/sum. `ScalarValue` is a large enum that's impractical to mirror in `#[repr(C)]`, so I reuse the existing `datafusion_proto_common::Statistics` prost encoding — the same pattern this crate already uses for filter expressions. ## What changes are included in this PR? - New `datafusion_ffi::statistics` module with `[de]serialize_statistics` helpers wrapping the`datafusion_proto_common::Statistics` round-trip. - New `partition_statistics` field on `FFI_ExecutionPlan` and corresponding `ExecutionPlan::partition_statistics` impl on `ForeignExecutionPlan` - New `statistics` field on `FFI_TableProvider` and corresponding `TableProvider::statistics` impl on `ForeignTableProvider`. Since the trait returns `Option<Statistics>`, the implementation cannot propagate decode errors, it logs a `log::warn!` and triggers a `debug_assert!`. This PR is expected to be merged after apache#22136 so it includes those changes. ## Are these changes tested? Yes: - Unit tests in `statistics.rs` cover three round-trip cases: `Statistics::new_unknown`, fully-exact statistics with `ScalarValue::Int32`/`Int64`/`Utf8` min/max/sum, and mixed `Precision::Exact`/`Inexact`/`Absent` values. - A new round-trip integration test in `execution_plan.rs` exercises `ForeignExecutionPlan::partition_statistics` with both `None` and `Some(idx)` partitions, against a plan with no statistics (returns `Statistics::new_unknown`) and a plan with concrete statistics. - A new round-trip integration test in `table_provider.rs` uses a thin `TableWithStats` wrapper over `MemTable` to verify both the `None` path and the concrete `Statistics` path through `ForeignTableProvider::statistics`. ## Are there any user-facing changes? This is a breaking ABI change for the `datafusion-ffi` crate: - `FFI_ExecutionPlan` gains a `partition_statistics` field. - `FFI_TableProvider` gains a `statistics` field. Plugins compiled against earlier versions of `datafusion-ffi` will need to be recompiled. There are no breaking changes to the Rust trait surface or to `Statistics` itself; downstream `ExecutionPlan` / `TableProvider` implementations require no changes.
Which issue does this PR close?
ExecutionPlan::metrics()across the FFI boundary #22135Rationale for this change
FFI_ExecutionPlanexposes most of theExecutionPlantrait but does notexpose
metrics(). As a result,ForeignExecutionPlan::metrics()fallsthrough to the trait default (
None), so anything downstream of an FFIboundary loses metrics. The most visible breakage is
EXPLAIN ANALYZE,which renders empty metric blocks for foreign plans; anything calling
DisplayableExecutionPlan::with_metrics(...)on a plan tree containingforeign nodes is similarly affected.
This PR makes foreign plans behave the same as local plans for metric
reporting. Metrics are passed as a snapshot, and all atomic-backed
counters/gauges/timers are read into plain integer fields at marshal time.
Correct because none of the in-tree consumers (
AnalyzeExec,DisplayableExecutionPlan) poll metricsduring streaming.
What changes are included in this PR?
datafusion/ffi/src/metrics.rswith FFI-stable mirrors ofMetricsSet,Metric,MetricValue(all 16 variants),Label,MetricType,MetricCategory,PruningMetrics,RatioMetrics, andRatioMergeStrategy, plus bidirectionalFromconversions.MetricValue::Custom { value: Arc<dyn CustomMetricValue> }is marshalledas
(name, Display output, as_usize()). On the consumer side it isreconstructed as a small
FfiCustomMetricValueshim that preservesDisplayandas_usize().aggregatebecomes a no-op (snapshots arenot mergeable) and
as_anyonly downcasts to the shim — this is thedocumented compromise.
FFI_ExecutionPlangains a newmetricsfunction pointer (appendedafter
repartitioned).ForeignExecutionPlan::metrics()is implementedto call through it.
RatioMetrics:merge_strategy()anddisplay_raw_values()— needed to marshal these otherwise-private fields.chronoadded as a direct dependency ofdatafusion-ffi(used forTimestamp↔ unix-nanos conversion).Are these changes tested?
Yes. New tests, all passing:
datafusion/ffi/src/metrics.rsround-trip everyMetricValuevariant individually, plus a fullMetric(value + labels + partition + type + category) and a
MetricsSet.test_ffi_execution_plan_metrics_round_tripindatafusion/ffi/src/execution_plan.rsexercises the full FFI path:builds an
ExecutionPlanwith aMetricsSet, wraps it inFFI_ExecutionPlan, retrieves metrics viaForeignExecutionPlan::metrics()through
mock_foreign_marker_id, and asserts the aggregated value matches.EmptyExectest helper extended withwith_metrics(MetricsSet).Existing test suites still pass:
cargo test -p datafusion-ffi --all-featuresandcargo test -p datafusion-ffi --features integration-tests.Are there any user-facing changes?
Yes — this PR adds public API and makes a binary-incompatible change to
FFI_ExecutionPlan. Please add theapi changelabel.datafusion_ffi::metrics:FFI_MetricsSet,FFI_Metric,FFI_MetricValue,FFI_Label,FFI_MetricType,FFI_MetricCategory,FFI_PruningMetrics,FFI_RatioMetrics,FFI_RatioMergeStrategy, andFfiCustomMetricValue.FFI_ExecutionPlan: a newmetricsfunction pointerfield is appended. Producers and consumers must be rebuilt together, as
is already enforced by the major-version check via
datafusion_ffi::version().RatioMetrics:merge_strategy()anddisplay_raw_values(). Non-breaking additions.MetricValue::Customacross FFI is lossy by design: the underlyingdyn CustomMetricValueis not preserved; only itsDisplayoutput andas_usize()snapshot survive. Documented onFfiCustomMetricValue.