From f6be2fb309695d919b5b938fd6d81366d8a9d5d7 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 15 May 2026 14:27:33 -0400 Subject: [PATCH 01/10] feat: per-session Python UDF inlining toggle + sender ctx + strict refusal Adds a per-session toggle that turns inline Python UDF encoding on or off, plus the supporting plumbing to make it usable through pickle.dumps. Codec layer: * PythonLogicalCodec / PythonPhysicalCodec gain a python_udf_inlining bool (default true) and a with_python_udf_inlining(enabled) builder. Each try_encode_udf{,af,wf} short-circuits to inner when the toggle is off; each try_decode_udf{,af,wf} that recognizes a DFPY* magic on a strict codec returns a clean Execution error instead of invoking cloudpickle.loads. The refusal message names the UDF and the wire family so an operator can see at a glance whether to re-encode the bytes or register the UDF on the receiver. Session layer: * PySessionContext::with_python_udf_inlining(enabled) returns a new session whose stacked logical + physical codecs both carry the toggle. The Arc is cloned (cheap), only the codec pair is rebuilt, so registrations and config stay attached. * SessionContext.with_python_udf_inlining(*, enabled) is the Python wrapper. enabled is keyword-only because positional booleans at the call site read as opaque. Sender-side context: * datafusion.ipc gains set_sender_ctx / get_sender_ctx / clear_sender_ctx thread-locals. Expr.__reduce__ now consults get_sender_ctx() to pick the codec for outbound pickles, which is the only path through which a strict session affects pickle.dumps (the protocol calls __reduce__ with no arguments). Without a sender context the default codec is used. Tests: * test_pickle_expr.py picks up TestPythonUdfInliningToggle (covers both directions of the toggle plus the explicit-ctx fast path), TestWorkerCtxLifecycle (set/clear/threading), and TestSenderCtxLifecycle. * New test_pickle_multiprocessing.py + helpers exercise the full driver -> worker round-trip on a multiprocessing.Pool with set_*_ctx installed in the worker initializer. * CI workflow gets a 30-minute timeout-minutes backstop so a hung pickle worker can't block the matrix indefinitely. User-guide docs and the runnable examples land in PR4 of this series. Co-Authored-By: Claude Opus 4.7 (1M context) --- .github/workflows/test.yml | 3 + crates/core/src/codec.rs | 122 +++++++-- crates/core/src/context.rs | 16 ++ python/datafusion/context.py | 43 +++ python/datafusion/expr.py | 22 +- python/datafusion/ipc.py | 59 ++++- .../tests/_pickle_multiprocessing_helpers.py | 89 +++++++ python/tests/test_pickle_expr.py | 246 +++++++++++++++++- python/tests/test_pickle_multiprocessing.py | 131 ++++++++++ 9 files changed, 704 insertions(+), 27 deletions(-) create mode 100644 python/tests/_pickle_multiprocessing_helpers.py create mode 100644 python/tests/test_pickle_multiprocessing.py diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index c597ab308..2cd792ea9 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -29,6 +29,9 @@ env: jobs: test-matrix: runs-on: ubuntu-latest + # Backstop: a hung multiprocessing worker (e.g. during a pickle regression) + # should not block CI longer than this. + timeout-minutes: 30 strategy: fail-fast: false matrix: diff --git a/crates/core/src/codec.rs b/crates/core/src/codec.rs index 363ee82b8..9299ce1be 100644 --- a/crates/core/src/codec.rs +++ b/crates/core/src/codec.rs @@ -232,16 +232,44 @@ fn strip_wire_header<'a>( #[derive(Debug)] pub struct PythonLogicalCodec { inner: Arc, + python_udf_inlining: bool, } impl PythonLogicalCodec { pub fn new(inner: Arc) -> Self { - Self { inner } + Self { + inner, + python_udf_inlining: true, + } } pub fn inner(&self) -> &Arc { &self.inner } + + /// Whether Python-defined UDFs are encoded inline (and decoded + /// from cloudpickle blobs). Defaults to `true`. Set to `false` + /// when the codec sits on a session that must produce + /// cross-language wire bytes, or reject `cloudpickle.loads` on + /// untrusted `from_bytes` input. + /// + /// Security scope: strict mode (`false`) narrows only the codec + /// layer — it stops `Expr::from_bytes` from invoking + /// `cloudpickle.loads` on the inline `DFPY*` payload. It does + /// **not** make `pickle.loads(untrusted_bytes)` safe; treat every + /// `pickle.loads` on untrusted input as unsafe regardless of this + /// setting. See Python's [pickle module security warning][1] for + /// why `pickle.loads` is unsafe in general. + /// + /// [1]: https://docs.python.org/3/library/pickle.html#module-pickle + pub fn with_python_udf_inlining(mut self, enabled: bool) -> Self { + self.python_udf_inlining = enabled; + self + } + + pub fn python_udf_inlining(&self) -> bool { + self.python_udf_inlining + } } impl Default for PythonLogicalCodec { @@ -301,48 +329,76 @@ impl LogicalExtensionCodec for PythonLogicalCodec { } fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec) -> Result<()> { - if try_encode_python_scalar_udf(node, buf)? { + if self.python_udf_inlining && try_encode_python_scalar_udf(node, buf)? { return Ok(()); } self.inner.try_encode_udf(node, buf) } fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result> { - if let Some(udf) = try_decode_python_scalar_udf(buf)? { - return Ok(udf); + if self.python_udf_inlining { + if let Some(udf) = try_decode_python_scalar_udf(buf)? { + return Ok(udf); + } + } else if buf.starts_with(PY_SCALAR_UDF_FAMILY) { + return Err(refuse_inline_payload("scalar UDF", name)); } self.inner.try_decode_udf(name, buf) } fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec) -> Result<()> { - if try_encode_python_udaf(node, buf)? { + if self.python_udf_inlining && try_encode_python_udaf(node, buf)? { return Ok(()); } self.inner.try_encode_udaf(node, buf) } fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result> { - if let Some(udaf) = try_decode_python_udaf(buf)? { - return Ok(udaf); + if self.python_udf_inlining { + if let Some(udaf) = try_decode_python_udaf(buf)? { + return Ok(udaf); + } + } else if buf.starts_with(PY_AGG_UDF_FAMILY) { + return Err(refuse_inline_payload("aggregate UDF", name)); } self.inner.try_decode_udaf(name, buf) } fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec) -> Result<()> { - if try_encode_python_udwf(node, buf)? { + if self.python_udf_inlining && try_encode_python_udwf(node, buf)? { return Ok(()); } self.inner.try_encode_udwf(node, buf) } fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result> { - if let Some(udwf) = try_decode_python_udwf(buf)? { - return Ok(udwf); + if self.python_udf_inlining { + if let Some(udwf) = try_decode_python_udwf(buf)? { + return Ok(udwf); + } + } else if buf.starts_with(PY_WINDOW_UDF_FAMILY) { + return Err(refuse_inline_payload("window UDF", name)); } self.inner.try_decode_udwf(name, buf) } } +/// Build the error returned by a strict codec when it receives an +/// inline Python-UDF payload it has been told not to deserialize. +fn refuse_inline_payload(kind: &str, name: &str) -> datafusion::error::DataFusionError { + // `Execution`, not `Plan`: this is a wire-format decode refusal at + // codec time, not a planner-stage failure. Downstream error + // classification keys off the variant — surfacing this as a planner + // error would mis-route it into "fix your SQL" buckets. + datafusion::error::DataFusionError::Execution(format!( + "Refusing to deserialize inline Python {kind} '{name}': Python UDF \ + inlining is disabled on this session. Ask the sender to re-encode \ + with inlining disabled (so the UDF travels by name), or register \ + '{name}' on this receiver's session and enable inlining on both \ + sides — receivers cannot re-encode bytes they did not produce." + )) +} + /// `PhysicalExtensionCodec` mirror of [`PythonLogicalCodec`] parked /// on the same `SessionContext`. Carries the Python-aware encoding /// hooks for physical-layer types (`ExecutionPlan`, `PhysicalExpr`) @@ -358,16 +414,30 @@ impl LogicalExtensionCodec for PythonLogicalCodec { #[derive(Debug)] pub struct PythonPhysicalCodec { inner: Arc, + python_udf_inlining: bool, } impl PythonPhysicalCodec { pub fn new(inner: Arc) -> Self { - Self { inner } + Self { + inner, + python_udf_inlining: true, + } } pub fn inner(&self) -> &Arc { &self.inner } + + /// See [`PythonLogicalCodec::with_python_udf_inlining`]. + pub fn with_python_udf_inlining(mut self, enabled: bool) -> Self { + self.python_udf_inlining = enabled; + self + } + + pub fn python_udf_inlining(&self) -> bool { + self.python_udf_inlining + } } impl Default for PythonPhysicalCodec { @@ -391,15 +461,19 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec { } fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec) -> Result<()> { - if try_encode_python_scalar_udf(node, buf)? { + if self.python_udf_inlining && try_encode_python_scalar_udf(node, buf)? { return Ok(()); } self.inner.try_encode_udf(node, buf) } fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result> { - if let Some(udf) = try_decode_python_scalar_udf(buf)? { - return Ok(udf); + if self.python_udf_inlining { + if let Some(udf) = try_decode_python_scalar_udf(buf)? { + return Ok(udf); + } + } else if buf.starts_with(PY_SCALAR_UDF_FAMILY) { + return Err(refuse_inline_payload("scalar UDF", name)); } self.inner.try_decode_udf(name, buf) } @@ -417,29 +491,37 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec { } fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec) -> Result<()> { - if try_encode_python_udaf(node, buf)? { + if self.python_udf_inlining && try_encode_python_udaf(node, buf)? { return Ok(()); } self.inner.try_encode_udaf(node, buf) } fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result> { - if let Some(udaf) = try_decode_python_udaf(buf)? { - return Ok(udaf); + if self.python_udf_inlining { + if let Some(udaf) = try_decode_python_udaf(buf)? { + return Ok(udaf); + } + } else if buf.starts_with(PY_AGG_UDF_FAMILY) { + return Err(refuse_inline_payload("aggregate UDF", name)); } self.inner.try_decode_udaf(name, buf) } fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec) -> Result<()> { - if try_encode_python_udwf(node, buf)? { + if self.python_udf_inlining && try_encode_python_udwf(node, buf)? { return Ok(()); } self.inner.try_encode_udwf(node, buf) } fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result> { - if let Some(udwf) = try_decode_python_udwf(buf)? { - return Ok(udwf); + if self.python_udf_inlining { + if let Some(udwf) = try_decode_python_udwf(buf)? { + return Ok(udwf); + } + } else if buf.starts_with(PY_WINDOW_UDF_FAMILY) { + return Err(refuse_inline_payload("window UDF", name)); } self.inner.try_decode_udwf(name, buf) } diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs index 642afeef7..03221eb68 100644 --- a/crates/core/src/context.rs +++ b/crates/core/src/context.rs @@ -1404,6 +1404,22 @@ impl PySessionContext { physical_codec, }) } + + pub fn with_python_udf_inlining(&self, enabled: bool) -> Self { + let logical_codec = Arc::new( + PythonLogicalCodec::new(Arc::clone(self.logical_codec.inner())) + .with_python_udf_inlining(enabled), + ); + let physical_codec = Arc::new( + PythonPhysicalCodec::new(Arc::clone(self.physical_codec.inner())) + .with_python_udf_inlining(enabled), + ); + Self { + ctx: Arc::clone(&self.ctx), + logical_codec, + physical_codec, + } + } } impl PySessionContext { diff --git a/python/datafusion/context.py b/python/datafusion/context.py index 5c3501941..e3ceb3b05 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -1769,3 +1769,46 @@ def with_physical_extension_codec(self, codec: Any) -> SessionContext: new = SessionContext.__new__(SessionContext) new.ctx = new_internal return new + + def with_python_udf_inlining(self, *, enabled: bool) -> SessionContext: + """Toggle inline encoding of Python-defined UDFs on this session. + + ``enabled`` is keyword-only: + ``with_python_udf_inlining(enabled=False)`` reads at the call + site as the inverse of + ``with_python_udf_inlining(enabled=True)``, where a positional + ``True`` / ``False`` would not. + + When ``True`` (the default), Python scalar, aggregate, and window + UDFs travel inside the serialized expression and are + reconstructed on the receiver without pre-registration. + + Set ``False`` to: + + * Produce serialized bytes that round-trip through a non-Python + decoder (cross-language portability). UDFs are stored by name + only; the receiver must have matching registrations. + * Refuse to reconstruct Python UDFs from + :meth:`Expr.from_bytes` input that may come from an untrusted + source — ``cloudpickle.loads`` will not be invoked. + + The toggle applies directly to :meth:`Expr.to_bytes` / + :meth:`Expr.from_bytes` calls that pass this session as their + ``ctx`` argument. To make the toggle apply through + :func:`pickle.dumps` (which calls :meth:`Expr.to_bytes` with no + context), install this session as the driver's sender context + via :func:`datafusion.ipc.set_sender_ctx` — and install it as + the worker's context via + :func:`datafusion.ipc.set_worker_ctx` for the corresponding + :func:`pickle.loads`. + + For the full security model, see + :doc:`/user-guide/io/distributing_work` (Security section). In + short: this toggle narrows only the :meth:`Expr.from_bytes` + surface; :func:`pickle.loads` on untrusted bytes remains + unsafe regardless of the toggle. + """ + new_internal = self.ctx.with_python_udf_inlining(enabled) + new = SessionContext.__new__(SessionContext) + new.ctx = new_internal + return new diff --git a/python/datafusion/expr.py b/python/datafusion/expr.py index 7e95bc127..15a45c327 100644 --- a/python/datafusion/expr.py +++ b/python/datafusion/expr.py @@ -446,13 +446,16 @@ def to_bytes(self, ctx: SessionContext | None = None) -> bytes: worker process for distributed evaluation. When ``ctx`` is supplied, encoding routes through that session's - installed :class:`LogicalExtensionCodec`. When ``ctx`` is - ``None``, the default codec is used. + installed :class:`LogicalExtensionCodec` (so settings like + :meth:`SessionContext.with_python_udf_inlining` take effect). + When ``ctx`` is ``None``, the default codec is used (Python UDF + inlining on, no user-installed extension codec). Built-in functions and Python UDFs (scalar, aggregate, window) travel inside the returned bytes; the worker does not need to pre-register them. UDFs imported via the FFI capsule protocol - travel by name only and must be registered on the worker. + travel by name only and must be registered on the worker. See + :doc:`/user-guide/io/distributing_work`. .. warning:: Security Bytes returned here may embed a cloudpickled Python @@ -522,7 +525,9 @@ def from_bytes(cls, buf: bytes, ctx: SessionContext | None = None) -> Expr: Accepts output of :meth:`to_bytes` or :func:`pickle.dumps`. ``ctx`` is the :class:`SessionContext` used to resolve any - function references that travel by name (e.g. FFI UDFs). When + function references that travel by name (e.g. FFI UDFs, or + Python UDFs sent with inlining disabled via + :meth:`SessionContext.with_python_udf_inlining`). When ``ctx`` is ``None`` the worker context installed via :func:`datafusion.ipc.set_worker_ctx` is consulted; if no worker context is installed, the global :class:`SessionContext` is used @@ -586,8 +591,15 @@ def __reduce__(self) -> tuple[Callable[[bytes], Expr], tuple[bytes]]: >>> e = col("a") * lit(2) >>> pickle.loads(pickle.dumps(e)).canonical_name() 'a * Int64(2)' + + The encoding side honors a driver-side sender context installed + via :func:`datafusion.ipc.set_sender_ctx` — that is how + :meth:`SessionContext.with_python_udf_inlining` propagates + through ``pickle.dumps``. """ - return (Expr._reconstruct, (self.to_bytes(),)) + from datafusion.ipc import get_sender_ctx + + return (Expr._reconstruct, (self.to_bytes(get_sender_ctx()),)) @classmethod def _reconstruct(cls, proto_bytes: bytes) -> Expr: diff --git a/python/datafusion/ipc.py b/python/datafusion/ipc.py index 8dd7fc463..5188fae95 100644 --- a/python/datafusion/ipc.py +++ b/python/datafusion/ipc.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -"""Worker-side setup for distributing DataFusion expressions. +"""Driver- and worker-side setup for distributing DataFusion expressions. When a :class:`Expr` is shipped to a worker process (e.g. through :func:`multiprocessing.Pool` or a Ray actor), the worker reconstructs the @@ -53,6 +53,27 @@ def init_worker(): payloads are not portable across Python minor versions. See :meth:`datafusion.Expr.to_bytes` for examples of what travels by value vs. by reference. + +On the driver side, call :func:`set_sender_ctx` to control how +:func:`pickle.dumps` encodes expressions — for example, to apply +:meth:`SessionContext.with_python_udf_inlining` to every pickled +expression on this thread: + +.. code-block:: python + + from datafusion import SessionContext + from datafusion.ipc import set_sender_ctx + + driver_ctx = SessionContext().with_python_udf_inlining(enabled=False) + set_sender_ctx(driver_ctx) + pickle.dumps(expr) # encoded with inlining disabled + +Without a sender context the default codec is used (Python UDF +inlining on). The sender context only affects pickle / ``to_bytes`` +encoding; explicit ``expr.to_bytes(ctx)`` calls still use the supplied +``ctx``. + +See :doc:`/user-guide/io/distributing_work` for the full pattern. """ from __future__ import annotations @@ -65,8 +86,11 @@ def init_worker(): __all__ = [ + "clear_sender_ctx", "clear_worker_ctx", + "get_sender_ctx", "get_worker_ctx", + "set_sender_ctx", "set_worker_ctx", ] @@ -125,6 +149,39 @@ def get_worker_ctx() -> SessionContext | None: return getattr(_local, "ctx", None) +def set_sender_ctx(ctx: SessionContext) -> None: + """Install this driver's :class:`SessionContext` for outbound pickles. + + Controls how :func:`pickle.dumps` encodes :class:`Expr` instances on + this thread. The most useful application is propagating a session + configured with + :meth:`SessionContext.with_python_udf_inlining` so the toggle takes + effect through pickle (which otherwise calls + :meth:`Expr.to_bytes` with no context and uses the default codec). + + Idempotent: overwrites any previous value. Stored in a thread-local + slot, so worker threads on the driver may install their own contexts. + Does not affect :meth:`Expr.to_bytes` calls that pass an explicit + ``ctx`` — those continue to use the supplied context. + """ + _local.sender_ctx = ctx + + +def clear_sender_ctx() -> None: + """Remove this driver's installed sender :class:`SessionContext`. + + After clearing, pickled expressions fall back to the default codec + (Python UDF inlining on). + """ + if hasattr(_local, "sender_ctx"): + del _local.sender_ctx + + +def get_sender_ctx() -> SessionContext | None: + """Return this driver's installed sender :class:`SessionContext`, or ``None``.""" + return getattr(_local, "sender_ctx", None) + + def _resolve_ctx( explicit_ctx: SessionContext | None = None, ) -> SessionContext: diff --git a/python/tests/_pickle_multiprocessing_helpers.py b/python/tests/_pickle_multiprocessing_helpers.py new file mode 100644 index 000000000..4f04967f2 --- /dev/null +++ b/python/tests/_pickle_multiprocessing_helpers.py @@ -0,0 +1,89 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# The leading underscore is load-bearing: pytest with --import-mode=importlib +# (used in CI) assigns synthetic module names to test modules, which breaks +# subprocess imports during multiprocessing. An underscore-prefixed module is +# not collected as a test module, so it imports under its normal __name__ +# inside worker processes. + +from __future__ import annotations + +import pyarrow as pa +from datafusion import SessionContext, udf +from datafusion.ipc import clear_worker_ctx, set_worker_ctx + + +def make_double_udf(): + """Build the canonical UDF used in the multiprocessing tests.""" + return udf( + lambda arr: pa.array([(v.as_py() or 0) * 2 for v in arr]), + [pa.int64()], + pa.int64(), + volatility="immutable", + name="double", + ) + + +def make_times_seven_udf(): + """Closure-capturing UDF — verifies cloudpickle preserves closed-over state.""" + multiplier = 7 + + def fn(arr): + return pa.array([(v.as_py() or 0) * multiplier for v in arr]) + + return udf( + fn, + [pa.int64()], + pa.int64(), + volatility="immutable", + name="times_seven", + ) + + +def init_worker_empty(): + """Pool initializer: install an empty SessionContext (no UDFs).""" + set_worker_ctx(SessionContext()) + + +def init_worker_clear(): + """Pool initializer: explicitly clear any prior worker context.""" + clear_worker_ctx() + + +def unpickle_and_describe(blob: bytes) -> str: + """Unpickle a proto-bytes blob and return its canonical name.""" + import pickle + + expr = pickle.loads(blob) # noqa: S301 + return expr.canonical_name() + + +def unpickle_and_evaluate(blob: bytes, batch: list[int]) -> list[int]: + """Unpickle an expression and evaluate it against an in-memory batch. + + Returns the result column as a Python list. Used to verify that + cloudpickled UDFs (including closure state) execute correctly in + a fresh worker process. + """ + import pickle + + expr = pickle.loads(blob) # noqa: S301 + ctx = SessionContext() + df = ctx.from_pydict({"a": batch}) + out = df.with_column("result", expr).select("result") + return out.to_pydict()["result"] diff --git a/python/tests/test_pickle_expr.py b/python/tests/test_pickle_expr.py index eb0441c49..ab3a55cbf 100644 --- a/python/tests/test_pickle_expr.py +++ b/python/tests/test_pickle_expr.py @@ -21,27 +21,36 @@ with the pickled expression and do not need worker-side pre-registration. The worker context (:mod:`datafusion.ipc`) is only consulted for UDFs imported via the FFI capsule protocol. + +Cross-process tests live in ``test_pickle_multiprocessing.py``. """ from __future__ import annotations import pickle +import threading import pyarrow as pa import pytest from datafusion import Expr, SessionContext, col, lit, udf from datafusion.ipc import ( + clear_sender_ctx, clear_worker_ctx, + get_sender_ctx, + get_worker_ctx, + set_sender_ctx, set_worker_ctx, ) @pytest.fixture(autouse=True) def _reset_worker_ctx(): - """Ensure every test starts with no worker context installed.""" + """Ensure every test starts with no worker or sender context installed.""" clear_worker_ctx() + clear_sender_ctx() yield clear_worker_ctx() + clear_sender_ctx() def _double_udf(): @@ -124,6 +133,8 @@ def fn(arr): e = u(col("a")) blob = pickle.dumps(e) decoded = pickle.loads(blob) # noqa: S301 + # Round-trip names match; functional verification of captured state + # happens in test_pickle_multiprocessing via an actual UDF call. assert decoded.canonical_name() == e.canonical_name() def test_multi_arg_udf_round_trip(self): @@ -311,3 +322,236 @@ def test_cross_version_error_message(self): Exception, match="not portable across Python minor versions" ): Expr.from_bytes(bytes(tampered)) + + +class TestPythonUdfInliningToggle: + """`SessionContext.with_python_udf_inlining(enabled=False)` opts out of + inline Python UDF encoding for both encode and decode paths.""" + + def _build_double_udf(self): + return udf( + lambda arr: pa.array([(v.as_py() or 0) * 2 for v in arr]), + [pa.int64()], + pa.int64(), + volatility="immutable", + name="double", + ) + + def test_strict_encoder_emits_smaller_blob(self): + """Strict mode skips cloudpickle of the Python callable, so the + encoded bytes are dramatically smaller than the inline form.""" + ctx_inline = SessionContext() + ctx_strict = ctx_inline.with_python_udf_inlining(enabled=False) + u = self._build_double_udf() + e = u(col("a")) + + blob_inline = e.to_bytes(ctx_inline) + blob_strict = e.to_bytes(ctx_strict) + + assert len(blob_strict) < len(blob_inline) // 4 + + def test_toggle_off_then_on_restores_inline_encoding(self): + """`with_python_udf_inlining` is per-call clone semantics: + flipping off and then on must produce a context that emits the + same inline form as a fresh default context, byte-for-byte. + + Guards against a regression where the off→on transition leaves + the codec in a sticky strict state (e.g. by mutating shared + codec state instead of cloning). + """ + u = self._build_double_udf() + e = u(col("a")) + + baseline = SessionContext() + toggled = ( + SessionContext() + .with_python_udf_inlining(enabled=False) + .with_python_udf_inlining(enabled=True) + ) + + blob_baseline = e.to_bytes(baseline) + blob_toggled = e.to_bytes(toggled) + + assert blob_baseline == blob_toggled + + # Sanity check the decoded form against a fresh ctx — the + # toggled-back blob should be self-contained inline, not a + # strict by-name payload that needs registry resolution. + decoded = Expr.from_bytes(blob_toggled, ctx=SessionContext()) + assert "double" in decoded.canonical_name() + + def test_strict_roundtrip_via_registry(self): + """When both sender and receiver disable inlining, the UDF + travels by name only and the receiver resolves it from its + registered functions.""" + strict_sender = SessionContext().with_python_udf_inlining(enabled=False) + u = self._build_double_udf() + blob = u(col("a")).to_bytes(strict_sender) + + receiver = SessionContext().with_python_udf_inlining(enabled=False) + receiver.register_udf(u) + restored = Expr.from_bytes(blob, ctx=receiver) + assert "double" in restored.canonical_name() + + def test_strict_decoder_refuses_inline_payload(self): + """An inline-encoded blob fed to a strict receiver raises with a + clear error rather than silently invoking cloudpickle.loads. + + The receiver is intentionally *not* given a matching + registration: the codec refusal must trip before the registry + is ever consulted, so registering the UDF here would only mask + a regression that moved the check after registry lookup. + """ + sender = SessionContext() + u = self._build_double_udf() + blob = u(col("a")).to_bytes(sender) + + strict_receiver = SessionContext().with_python_udf_inlining(enabled=False) + # `RuntimeError` (not bare `Exception`): the codec refusal is + # surfaced through `parse_expr` → `PyRuntimeError`. Tightening + # the assertion catches a regression that swallows the refusal + # as a different error type. + with pytest.raises(RuntimeError, match="inlining is disabled"): + Expr.from_bytes(blob, ctx=strict_receiver) + + def test_sender_ctx_propagates_through_pickle(self): + """`set_sender_ctx` makes `pickle.dumps` use a strict codec. + + Without a sender context, pickle defaults to the inline codec and + the blob is large. With a strict sender context installed, the + blob shrinks because the Python callable is encoded by name + instead of cloudpickled. + """ + u = self._build_double_udf() + e = u(col("a")) + + blob_default = pickle.dumps(e) + + strict_sender = SessionContext().with_python_udf_inlining(enabled=False) + set_sender_ctx(strict_sender) + try: + blob_strict = pickle.dumps(e) + finally: + clear_sender_ctx() + + assert len(blob_strict) < len(blob_default) // 4 + + def test_sender_ctx_strict_roundtrip_via_pickle(self): + """End-to-end pickle round-trip with strict mode on both sides. + + Driver installs a strict sender context. Worker installs a + matching strict context with the UDF registered. The UDF + travels by name through `pickle.dumps` / `pickle.loads`. + """ + u = self._build_double_udf() + e = u(col("a")) + + strict_sender = SessionContext().with_python_udf_inlining(enabled=False) + set_sender_ctx(strict_sender) + try: + blob = pickle.dumps(e) + finally: + clear_sender_ctx() + + worker = SessionContext().with_python_udf_inlining(enabled=False) + worker.register_udf(u) + set_worker_ctx(worker) + try: + decoded = pickle.loads(blob) # noqa: S301 + finally: + clear_worker_ctx() + + assert "double" in decoded.canonical_name() + + def test_sender_ctx_strict_pickle_accepted_by_inline_worker_with_registry(self): + """A strict-encoded blob still decodes fine on an inline worker + because the wire format is the same default-codec by-name form. + Sanity check: cross-config works as long as the receiver can + resolve the name.""" + u = self._build_double_udf() + e = u(col("a")) + + strict_sender = SessionContext().with_python_udf_inlining(enabled=False) + set_sender_ctx(strict_sender) + try: + blob = pickle.dumps(e) + finally: + clear_sender_ctx() + + worker = SessionContext() + worker.register_udf(u) + set_worker_ctx(worker) + try: + decoded = pickle.loads(blob) # noqa: S301 + finally: + clear_worker_ctx() + + assert "double" in decoded.canonical_name() + + +class TestWorkerCtxLifecycle: + def test_set_and_clear(self): + assert get_worker_ctx() is None + ctx = SessionContext() + set_worker_ctx(ctx) + assert get_worker_ctx() is ctx + clear_worker_ctx() + assert get_worker_ctx() is None + + def test_clear_when_unset_is_noop(self): + clear_worker_ctx() # no error + assert get_worker_ctx() is None + + def test_thread_local_isolation(self): + main_ctx = SessionContext() + set_worker_ctx(main_ctx) + + seen_in_thread: list = [] + + def worker(): + seen_in_thread.append(get_worker_ctx()) + set_worker_ctx(SessionContext()) + seen_in_thread.append(get_worker_ctx()) + + t = threading.Thread(target=worker) + t.start() + t.join() + + # Thread saw no ctx initially (thread-local), then its own. + assert seen_in_thread[0] is None + assert seen_in_thread[1] is not main_ctx + # Main thread's ctx is unchanged by the thread's actions. + assert get_worker_ctx() is main_ctx + + +class TestSenderCtxLifecycle: + def test_set_and_clear(self): + assert get_sender_ctx() is None + ctx = SessionContext() + set_sender_ctx(ctx) + assert get_sender_ctx() is ctx + clear_sender_ctx() + assert get_sender_ctx() is None + + def test_clear_when_unset_is_noop(self): + clear_sender_ctx() # no error + assert get_sender_ctx() is None + + def test_thread_local_isolation(self): + main_ctx = SessionContext() + set_sender_ctx(main_ctx) + + seen_in_thread: list = [] + + def worker(): + seen_in_thread.append(get_sender_ctx()) + set_sender_ctx(SessionContext()) + seen_in_thread.append(get_sender_ctx()) + + t = threading.Thread(target=worker) + t.start() + t.join() + + assert seen_in_thread[0] is None + assert seen_in_thread[1] is not main_ctx + assert get_sender_ctx() is main_ctx diff --git a/python/tests/test_pickle_multiprocessing.py b/python/tests/test_pickle_multiprocessing.py new file mode 100644 index 000000000..6eabaff9e --- /dev/null +++ b/python/tests/test_pickle_multiprocessing.py @@ -0,0 +1,131 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Cross-process pickle tests for :class:`Expr`. + +Workers run with each :mod:`multiprocessing` start method (``fork``, +``forkserver``, ``spawn``). Python UDFs (scalar, aggregate, window) travel +with the pickled expression and need no worker-side pre-registration. +Worker-side helpers live in ``_pickle_multiprocessing_helpers`` — the +underscore prefix avoids pytest collection so the module imports under +its real name in worker subprocesses. +""" + +from __future__ import annotations + +import functools +import multiprocessing as mp +import pickle +import sys + +import pytest +from datafusion import col, lit + +from . import _pickle_multiprocessing_helpers as helpers + + +@functools.cache +def _multiprocessing_available() -> tuple[bool, str]: + """Return (available, reason). Some sandboxed environments deny semaphore + creation; without semaphores, ``multiprocessing.Pool`` cannot start. + + Cached so the probe Pool only spawns once per session, and only when a + test in this module is actually about to run — collection-only runs + (e.g. ``pytest --collect-only`` on the full suite) skip the probe. + """ + try: + ctx = mp.get_context("spawn") + with ctx.Pool(processes=1) as pool: + pool.map(int, [0]) + except (PermissionError, OSError) as exc: + return False, f"multiprocessing.Pool unavailable: {exc}" + return True, "" + + +@pytest.fixture(autouse=True) +def _skip_if_multiprocessing_unavailable(): + available, reason = _multiprocessing_available() + if not available: + pytest.skip(reason) + + +START_METHODS = [ + pytest.param( + "fork", + marks=pytest.mark.skipif( + sys.platform == "darwin", + reason="fork start method is unsafe with PyArrow/tokio on macOS", + ), + ), + "forkserver", + "spawn", +] + + +@pytest.mark.parametrize("start_method", START_METHODS) +@pytest.mark.timeout(120) +def test_builtin_pickle_via_pool(start_method): + """Built-in expressions round-trip in every start method.""" + expr = col("a") + lit(1) + blob = pickle.dumps(expr) + + ctx = mp.get_context(start_method) + with ctx.Pool(processes=2) as pool: + results = pool.map(helpers.unpickle_and_describe, [blob, blob, blob]) + + assert all(r == expr.canonical_name() for r in results) + + +@pytest.mark.parametrize("start_method", START_METHODS) +@pytest.mark.timeout(120) +def test_udf_pickle_self_contained(start_method): + """Scalar UDF travels inside the proto blob — no worker pre-registration. + + Workers start with no UDF registered. The Rust-side ``PythonUDFCodec`` + reconstructs the UDF from bytes embedded in the pickle blob. + """ + udf_obj = helpers.make_double_udf() + expr = udf_obj(col("a")) + blob = pickle.dumps(expr) + + ctx = mp.get_context(start_method) + with ctx.Pool(processes=2) as pool: + results = pool.starmap( + helpers.unpickle_and_evaluate, + [(blob, [1, 2, 3]), (blob, [10, 20, 30])], + ) + + assert results[0] == [2, 4, 6] + assert results[1] == [20, 40, 60] + + +@pytest.mark.parametrize("start_method", START_METHODS) +@pytest.mark.timeout(120) +def test_closure_capturing_udf_via_pool(start_method): + """Cloudpickle preserves closure state across the codec boundary.""" + udf_obj = helpers.make_times_seven_udf() + expr = udf_obj(col("a")) + blob = pickle.dumps(expr) + + ctx = mp.get_context(start_method) + with ctx.Pool(processes=2) as pool: + results = pool.starmap( + helpers.unpickle_and_evaluate, + [(blob, [1, 2, 3])], + ) + + assert results[0] == [7, 14, 21] From 14178db09e837d439252b0d500531e07d7fdd3c5 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Thu, 21 May 2026 07:49:09 -0400 Subject: [PATCH 02/10] update uv lock --- uv.lock | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/uv.lock b/uv.lock index 3fd3eec4b..cedd9b178 100644 --- a/uv.lock +++ b/uv.lock @@ -628,25 +628,26 @@ wheels = [ [[package]] name = "maturin" -version = "1.8.1" +version = "1.13.3" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "tomli", marker = "python_full_version < '3.11'" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/9a/08/ccb0f917722a35ab0d758be9bb5edaf645c3a3d6170061f10d396ecd273f/maturin-1.8.1.tar.gz", hash = "sha256:49cd964aabf59f8b0a6969f9860d2cdf194ac331529caae14c884f5659568857", size = 197397, upload-time = "2024-12-30T14:03:48.109Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/4c/00/f34077315f34db8ad2ccf6bfe11b864ca27baab3a1320634da8e3cf89a48/maturin-1.8.1-py3-none-linux_armv6l.whl", hash = "sha256:7e590a23d9076b8a994f2e67bc63dc9a2d1c9a41b1e7b45ac354ba8275254e89", size = 7568415, upload-time = "2024-12-30T14:03:07.939Z" }, - { url = "https://files.pythonhosted.org/packages/5c/07/9219976135ce0cb32d2fa6ea5c6d0ad709013d9a17967312e149b98153a6/maturin-1.8.1-py3-none-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:8d8251a95682c83ea60988c804b620c181911cd824aa107b4a49ac5333c92968", size = 14527816, upload-time = "2024-12-30T14:03:13.851Z" }, - { url = "https://files.pythonhosted.org/packages/e6/04/fa009a00903acdd1785d58322193140bfe358595347c39f315112dabdf9e/maturin-1.8.1-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:b9fc1a4354cac5e32c190410208039812ea88c4a36bd2b6499268ec49ef5de00", size = 7580446, upload-time = "2024-12-30T14:03:17.64Z" }, - { url = "https://files.pythonhosted.org/packages/9b/d4/414b2aab9bbfe88182b734d3aa1b4fef7d7701e50f6be48500378b8c8721/maturin-1.8.1-py3-none-manylinux_2_12_i686.manylinux2010_i686.musllinux_1_1_i686.whl", hash = "sha256:621e171c6b39f95f1d0df69a118416034fbd59c0f89dcaea8c2ea62019deecba", size = 7650535, upload-time = "2024-12-30T14:03:21.115Z" }, - { url = "https://files.pythonhosted.org/packages/f0/64/879418a8a0196013ec1fb19eada0781c04a30e8d6d9227e80f91275a4f5b/maturin-1.8.1-py3-none-manylinux_2_12_x86_64.manylinux2010_x86_64.musllinux_1_1_x86_64.whl", hash = "sha256:98f638739a5132962347871b85c91f525c9246ef4d99796ae98a2031e3df029f", size = 8006702, upload-time = "2024-12-30T14:03:24.318Z" }, - { url = "https://files.pythonhosted.org/packages/39/c2/605829324f8371294f70303aca130682df75318958efed246873d3d604ab/maturin-1.8.1-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.musllinux_1_1_aarch64.whl", hash = "sha256:f9f5c47521924b6e515cbc652a042fe5f17f8747445be9d931048e5d8ddb50a4", size = 7368164, upload-time = "2024-12-30T14:03:26.582Z" }, - { url = "https://files.pythonhosted.org/packages/be/6c/30e136d397bb146b94b628c0ef7f17708281611b97849e2cf37847025ac7/maturin-1.8.1-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.musllinux_1_1_armv7l.whl", hash = "sha256:0f4407c7353c31bfbb8cdeb82bc2170e474cbfb97b5ba27568f440c9d6c1fdd4", size = 7450889, upload-time = "2024-12-30T14:03:28.893Z" }, - { url = "https://files.pythonhosted.org/packages/1b/50/e1f5023512696d4e56096f702e2f68d6d9a30afe0a4eec82b0e27b8eb4e4/maturin-1.8.1-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.musllinux_1_1_ppc64le.whl", hash = "sha256:ec49cd70cad3c389946c6e2bc0bd50772a7fcb463040dd800720345897eec9bf", size = 9585819, upload-time = "2024-12-30T14:03:31.125Z" }, - { url = "https://files.pythonhosted.org/packages/b7/80/b24b5248d89d2e5982553900237a337ea098ca9297b8369ca2aa95549e0f/maturin-1.8.1-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:c08767d794de8f8a11c5c8b1b47a4ff9fb6ae2d2d97679e27030f2f509c8c2a0", size = 10920801, upload-time = "2024-12-30T14:03:35.127Z" }, - { url = "https://files.pythonhosted.org/packages/6e/f4/8ede7a662fabf93456b44390a5ad22630e25fb5ddaecf787251071b2e143/maturin-1.8.1-py3-none-win32.whl", hash = "sha256:d678407713f3e10df33c5b3d7a343ec0551eb7f14d8ad9ba6febeb96f4e4c75c", size = 6873556, upload-time = "2024-12-30T14:03:37.913Z" }, - { url = "https://files.pythonhosted.org/packages/9c/22/757f093ed0e319e9648155b8c9d716765442bea5bc98ebc58ad4ad5b0524/maturin-1.8.1-py3-none-win_amd64.whl", hash = "sha256:a526f90fe0e5cb59ffb81f4ff547ddc42e823bbdeae4a31012c0893ca6dcaf46", size = 7823153, upload-time = "2024-12-30T14:03:40.33Z" }, - { url = "https://files.pythonhosted.org/packages/a4/f5/051413e04f6da25069db5e76759ecdb8cd2a8ab4a94045b5a3bf548c66fa/maturin-1.8.1-py3-none-win_arm64.whl", hash = "sha256:e95f077fd2ddd2f048182880eed458c308571a534be3eb2add4d3dac55bf57f4", size = 6552131, upload-time = "2024-12-30T14:03:45.203Z" }, +sdist = { url = "https://files.pythonhosted.org/packages/9c/1c/612d23d33ec21b9ae7ece7b3f0dd5f9dfd57b4009e9d2938165869ebd6ae/maturin-1.13.3.tar.gz", hash = "sha256:771e1e9e71a278e56db01552e0d1acfd1464259f9575b6e72842f893cd299079", size = 357934, upload-time = "2026-05-11T07:43:39.027Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/71/66/18c2aaac0b2a5dea9f1db5984ce83b905ad205cfc7c02d0091e707c0c2e7/maturin-1.13.3-py3-none-linux_armv6l.whl", hash = "sha256:3cc13929ca82aefa4adbf0f2c35419369796213c6fb0eb24e914945f50ef5d8c", size = 10190971, upload-time = "2026-05-11T07:43:10.431Z" }, + { url = "https://files.pythonhosted.org/packages/bc/71/26a988d092e4fd6a9523d46d44400a46cad7cdf3fd206ce702240c748aee/maturin-1.13.3-py3-none-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:53b08bd075649ce96513ad9abf241a43cb685ed6e9e7790f8dbc2d66e95d8323", size = 19716714, upload-time = "2026-05-11T07:43:36.911Z" }, + { url = "https://files.pythonhosted.org/packages/82/5c/f3fd0e184255d9fc7e272c62af3dfa84c617b2577ef83af9ce615f5279cc/maturin-1.13.3-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:4cd478e6e4c56251e48ed079b8efd55b30bc5c09cf695a1bdafaeb582ee735a0", size = 10194726, upload-time = "2026-05-11T07:43:07.05Z" }, + { url = "https://files.pythonhosted.org/packages/a9/e1/f4edb69fb647b77c4769a9bfd4d6fb62961e653d164bc277ecdffac3ab61/maturin-1.13.3-py3-none-manylinux_2_12_i686.manylinux2010_i686.musllinux_1_1_i686.whl", hash = "sha256:a2675e25f313034ae6f57388cf14818f87d8961c4a96795287f3e155f59beb11", size = 10172781, upload-time = "2026-05-11T07:43:40.796Z" }, + { url = "https://files.pythonhosted.org/packages/c7/7d/a1be934690cdcc3c6609769ceaad322ab7501c2ee5bafcac1b14d609e403/maturin-1.13.3-py3-none-manylinux_2_12_x86_64.manylinux2010_x86_64.musllinux_1_1_x86_64.whl", hash = "sha256:4667ef609ab446c1b5e0bfe4f9fb99699ab6d8548433f8d1a684256e0b67217f", size = 10682670, upload-time = "2026-05-11T07:43:13.132Z" }, + { url = "https://files.pythonhosted.org/packages/18/f5/372ae19b72ce8f6e37e5864ae4dc5b252ee9fce0619ccc3aa366aa3a7f97/maturin-1.13.3-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.musllinux_1_1_aarch64.whl", hash = "sha256:3db93337ed97e60ffc878aa8b493cd7ae44d3a5e1a37256db3a4491f57565018", size = 10060363, upload-time = "2026-05-11T07:43:21.107Z" }, + { url = "https://files.pythonhosted.org/packages/cb/5b/c68340cca09368af0df80965dfabed4234205a492a93da00793c7b9aae20/maturin-1.13.3-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.musllinux_1_1_armv7l.whl", hash = "sha256:1cc0a110b224ca90406b668a3e3c1f5a515062e59e26292f6dbaf5fd4909c6f3", size = 10017551, upload-time = "2026-05-11T07:43:33.916Z" }, + { url = "https://files.pythonhosted.org/packages/28/1e/f90fb2b000bad9e6d850cd5afb88b2f1e2a279cfb4de02ea40078484690e/maturin-1.13.3-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.musllinux_1_1_ppc64le.whl", hash = "sha256:c00ea6428dea17bf616fe93770837634454b28c2de1a876e42ef8036c616079a", size = 13301712, upload-time = "2026-05-11T07:43:26.492Z" }, + { url = "https://files.pythonhosted.org/packages/be/58/1670f68a8f04ccd7b90df11047bd9a046585310e84e1967cc9849cd1c5a3/maturin-1.13.3-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:49fd6ab08da28098ccf37afca24cdba72376ba9c1eedf9dd25ff82ed771961ff", size = 10946765, upload-time = "2026-05-11T07:43:16.135Z" }, + { url = "https://files.pythonhosted.org/packages/4b/ac/00c955c2ef134817b1a7bdaa76b0309e9c5291eb17d9ff88069eecd08bc2/maturin-1.13.3-py3-none-manylinux_2_31_riscv64.musllinux_1_1_riscv64.whl", hash = "sha256:b6741d7bf4af97da937528fd1e523c6ab54f53d9a21870fa735d6e67fd88e273", size = 10388661, upload-time = "2026-05-11T07:43:18.727Z" }, + { url = "https://files.pythonhosted.org/packages/97/c6/cbf8a51dde19c19aeba0d9b075095a2effb9b31fd312b1aae3ac79f8aea2/maturin-1.13.3-py3-none-win32.whl", hash = "sha256:0ef257e692cc756c87af5bea95ddfe7d3ac49d3376a7a87f728d63f06e7b6f8b", size = 8901838, upload-time = "2026-05-11T07:43:23.76Z" }, + { url = "https://files.pythonhosted.org/packages/a1/ff/c6a50a59dc8313097d43ac5f4d74df6a500c8cb62b0dc9e054f53e203a48/maturin-1.13.3-py3-none-win_amd64.whl", hash = "sha256:def4a435ea9d2ee93b18ba579dc8c9cf898889a66f312cd379b5e374ec3e3ad6", size = 10340801, upload-time = "2026-05-11T07:43:29.239Z" }, + { url = "https://files.pythonhosted.org/packages/6c/93/e32e79333f0902ba292b996f504f5f06be59587f7d02ab8d5ed1e3066445/maturin-1.13.3-py3-none-win_arm64.whl", hash = "sha256:2389fe92d017cea9d94e521fa0175314a4c52f79a1057b901fbc9f8686ef7d0b", size = 9706562, upload-time = "2026-05-11T07:43:31.743Z" }, ] [[package]] From eb3c1946487dbde0edbabfde6ed6b0f1dce194e4 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Thu, 21 May 2026 08:19:22 -0400 Subject: [PATCH 03/10] docs: clarify Python UDF inlining docstring; drop unresolved :doc: refs Rewrite with_python_udf_inlining docstring for readability and remove references to /user-guide/io/distributing_work, which does not exist yet. Keep security warning inline as a .. warning:: Security block, matching the existing pattern in Expr.to_bytes / from_bytes / __reduce__. The central doc will land in a follow-on PR. Co-Authored-By: Claude Opus 4.7 (1M context) --- python/datafusion/context.py | 65 ++++++++++++++++-------------------- python/datafusion/expr.py | 3 +- python/datafusion/ipc.py | 2 -- 3 files changed, 30 insertions(+), 40 deletions(-) diff --git a/python/datafusion/context.py b/python/datafusion/context.py index e3ceb3b05..d28de18d4 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -1771,42 +1771,35 @@ def with_physical_extension_codec(self, codec: Any) -> SessionContext: return new def with_python_udf_inlining(self, *, enabled: bool) -> SessionContext: - """Toggle inline encoding of Python-defined UDFs on this session. - - ``enabled`` is keyword-only: - ``with_python_udf_inlining(enabled=False)`` reads at the call - site as the inverse of - ``with_python_udf_inlining(enabled=True)``, where a positional - ``True`` / ``False`` would not. - - When ``True`` (the default), Python scalar, aggregate, and window - UDFs travel inside the serialized expression and are - reconstructed on the receiver without pre-registration. - - Set ``False`` to: - - * Produce serialized bytes that round-trip through a non-Python - decoder (cross-language portability). UDFs are stored by name - only; the receiver must have matching registrations. - * Refuse to reconstruct Python UDFs from - :meth:`Expr.from_bytes` input that may come from an untrusted - source — ``cloudpickle.loads`` will not be invoked. - - The toggle applies directly to :meth:`Expr.to_bytes` / - :meth:`Expr.from_bytes` calls that pass this session as their - ``ctx`` argument. To make the toggle apply through - :func:`pickle.dumps` (which calls :meth:`Expr.to_bytes` with no - context), install this session as the driver's sender context - via :func:`datafusion.ipc.set_sender_ctx` — and install it as - the worker's context via - :func:`datafusion.ipc.set_worker_ctx` for the corresponding - :func:`pickle.loads`. - - For the full security model, see - :doc:`/user-guide/io/distributing_work` (Security section). In - short: this toggle narrows only the :meth:`Expr.from_bytes` - surface; :func:`pickle.loads` on untrusted bytes remains - unsafe regardless of the toggle. + """Control whether Python UDFs are embedded in serialized expressions. + + When ``enabled=True`` (the default), serialized expressions carry + the Python code for any scalar, aggregate, or window UDFs they + reference. The receiver rebuilds the UDFs from those bytes and + does not need to register them first. + + When ``enabled=False``, serialized expressions store only the + UDF names. This has two uses: + + * **Cross-language portability.** The bytes can be decoded by a + non-Python receiver, which must already have UDFs registered + under matching names. + * **Safer deserialization.** :meth:`Expr.from_bytes` will refuse + to rebuild Python UDFs rather than call ``cloudpickle.loads`` + on untrusted input. + + The setting affects :meth:`Expr.to_bytes` and + :meth:`Expr.from_bytes` whenever this session is passed as the + ``ctx`` argument. :func:`pickle.dumps` and :func:`pickle.loads` + do not pass a context, so to apply the setting through pickle, + register this session with + :func:`datafusion.ipc.set_sender_ctx` on the sender and + :func:`datafusion.ipc.set_worker_ctx` on the receiver. + + .. warning:: Security + This setting narrows only :meth:`Expr.from_bytes`. Calling + :func:`pickle.loads` on untrusted bytes remains unsafe + regardless of the toggle. """ new_internal = self.ctx.with_python_udf_inlining(enabled) new = SessionContext.__new__(SessionContext) diff --git a/python/datafusion/expr.py b/python/datafusion/expr.py index 15a45c327..c3e172ae1 100644 --- a/python/datafusion/expr.py +++ b/python/datafusion/expr.py @@ -454,8 +454,7 @@ def to_bytes(self, ctx: SessionContext | None = None) -> bytes: Built-in functions and Python UDFs (scalar, aggregate, window) travel inside the returned bytes; the worker does not need to pre-register them. UDFs imported via the FFI capsule protocol - travel by name only and must be registered on the worker. See - :doc:`/user-guide/io/distributing_work`. + travel by name only and must be registered on the worker. .. warning:: Security Bytes returned here may embed a cloudpickled Python diff --git a/python/datafusion/ipc.py b/python/datafusion/ipc.py index 5188fae95..d99e23a0d 100644 --- a/python/datafusion/ipc.py +++ b/python/datafusion/ipc.py @@ -72,8 +72,6 @@ def init_worker(): inlining on). The sender context only affects pickle / ``to_bytes`` encoding; explicit ``expr.to_bytes(ctx)`` calls still use the supplied ``ctx``. - -See :doc:`/user-guide/io/distributing_work` for the full pattern. """ from __future__ import annotations From 7dd425297919a092d26cd776ba0d79b8f354a956 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Thu, 21 May 2026 08:26:01 -0400 Subject: [PATCH 04/10] docs: add doctest examples for sender ctx + UDF inlining toggle Per CLAUDE.md, every Python function needs a docstring example. Adds examples to with_python_udf_inlining, set_sender_ctx, clear_sender_ctx, and get_sender_ctx. Also clarifies that with_python_udf_inlining returns a new SessionContext and leaves the original unchanged, matching the with_logical_extension_codec pattern. Co-Authored-By: Claude Opus 4.7 (1M context) --- python/datafusion/context.py | 9 +++++++++ python/datafusion/ipc.py | 30 +++++++++++++++++++++++++++++- 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/python/datafusion/context.py b/python/datafusion/context.py index d28de18d4..c5b1d7858 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -1800,6 +1800,15 @@ def with_python_udf_inlining(self, *, enabled: bool) -> SessionContext: This setting narrows only :meth:`Expr.from_bytes`. Calling :func:`pickle.loads` on untrusted bytes remains unsafe regardless of the toggle. + + Returns a new :class:`SessionContext` with the toggle applied; + the original session is unchanged. + + Examples: + >>> from datafusion import SessionContext + >>> strict = SessionContext().with_python_udf_inlining(enabled=False) + >>> isinstance(strict, SessionContext) + True """ new_internal = self.ctx.with_python_udf_inlining(enabled) new = SessionContext.__new__(SessionContext) diff --git a/python/datafusion/ipc.py b/python/datafusion/ipc.py index d99e23a0d..d0431a131 100644 --- a/python/datafusion/ipc.py +++ b/python/datafusion/ipc.py @@ -161,6 +161,17 @@ def set_sender_ctx(ctx: SessionContext) -> None: slot, so worker threads on the driver may install their own contexts. Does not affect :meth:`Expr.to_bytes` calls that pass an explicit ``ctx`` — those continue to use the supplied context. + + Examples: + >>> from datafusion import SessionContext + >>> from datafusion.ipc import ( + ... set_sender_ctx, get_sender_ctx, clear_sender_ctx, + ... ) + >>> driver = SessionContext().with_python_udf_inlining(enabled=False) + >>> set_sender_ctx(driver) + >>> get_sender_ctx() is driver + True + >>> clear_sender_ctx() """ _local.sender_ctx = ctx @@ -170,13 +181,30 @@ def clear_sender_ctx() -> None: After clearing, pickled expressions fall back to the default codec (Python UDF inlining on). + + Examples: + >>> from datafusion import SessionContext + >>> from datafusion.ipc import ( + ... set_sender_ctx, clear_sender_ctx, get_sender_ctx, + ... ) + >>> set_sender_ctx(SessionContext()) + >>> clear_sender_ctx() + >>> get_sender_ctx() is None + True """ if hasattr(_local, "sender_ctx"): del _local.sender_ctx def get_sender_ctx() -> SessionContext | None: - """Return this driver's installed sender :class:`SessionContext`, or ``None``.""" + """Return this driver's installed sender :class:`SessionContext`, or ``None``. + + Examples: + >>> from datafusion.ipc import get_sender_ctx, clear_sender_ctx + >>> clear_sender_ctx() + >>> get_sender_ctx() is None + True + """ return getattr(_local, "sender_ctx", None) From a24daf6ede1a4b014e5cb1d5a6acdf5589d7f640 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Thu, 21 May 2026 08:48:56 -0400 Subject: [PATCH 05/10] refactor: address review nits for UDF inlining toggle + sender ctx * codec: strict refusal routes through `read_framed_payload` so malformed inline bytes surface their own diagnostic; the "inlining is disabled" message now fires only when the payload would have decoded. * codec: add summary line above `PythonPhysicalCodec::with_python_udf_inlining` cross-link for rustdoc rendering. * expr: hoist `get_sender_ctx` import to module top; note that `__reduce__` also drives `copy.copy` / `copy.deepcopy`. * context: accept `with_python_udf_inlining` positionally or as kwarg (drop `*,`). * tests: replace size-ratio heuristic with semantic check for the `DFPYUDF` family prefix; switch single-batch closure test to `pool.apply`. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/core/src/codec.rs | 46 +++++++++++++++------ python/datafusion/context.py | 2 +- python/datafusion/expr.py | 7 ++-- python/tests/test_pickle_expr.py | 26 ++++++++---- python/tests/test_pickle_multiprocessing.py | 7 +--- 5 files changed, 57 insertions(+), 31 deletions(-) diff --git a/crates/core/src/codec.rs b/crates/core/src/codec.rs index 9299ce1be..d43580a75 100644 --- a/crates/core/src/codec.rs +++ b/crates/core/src/codec.rs @@ -340,8 +340,8 @@ impl LogicalExtensionCodec for PythonLogicalCodec { if let Some(udf) = try_decode_python_scalar_udf(buf)? { return Ok(udf); } - } else if buf.starts_with(PY_SCALAR_UDF_FAMILY) { - return Err(refuse_inline_payload("scalar UDF", name)); + } else { + refuse_if_inline(buf, PY_SCALAR_UDF_FAMILY, "scalar UDF", name)?; } self.inner.try_decode_udf(name, buf) } @@ -358,8 +358,8 @@ impl LogicalExtensionCodec for PythonLogicalCodec { if let Some(udaf) = try_decode_python_udaf(buf)? { return Ok(udaf); } - } else if buf.starts_with(PY_AGG_UDF_FAMILY) { - return Err(refuse_inline_payload("aggregate UDF", name)); + } else { + refuse_if_inline(buf, PY_AGG_UDF_FAMILY, "aggregate UDF", name)?; } self.inner.try_decode_udaf(name, buf) } @@ -376,13 +376,30 @@ impl LogicalExtensionCodec for PythonLogicalCodec { if let Some(udwf) = try_decode_python_udwf(buf)? { return Ok(udwf); } - } else if buf.starts_with(PY_WINDOW_UDF_FAMILY) { - return Err(refuse_inline_payload("window UDF", name)); + } else { + refuse_if_inline(buf, PY_WINDOW_UDF_FAMILY, "window UDF", name)?; } self.inner.try_decode_udwf(name, buf) } } +/// Strict-mode gate: if `buf` is a well-framed inline payload for +/// `family`, return the strict-refusal error; otherwise return +/// `Ok(())` so the caller can delegate to its `inner` codec. +/// +/// Routing through [`read_framed_payload`] (rather than a bare +/// `starts_with` probe) means malformed inline bytes — wrong +/// wire-format version, mismatched Python version, truncated header — +/// surface *their* diagnostic instead of the strict-mode message. +/// The strict message implies sender intent ("inlining is disabled"), +/// so it should fire only when the bytes really would have decoded. +fn refuse_if_inline(buf: &[u8], family: &[u8], kind: &str, name: &str) -> Result<()> { + Python::attach(|py| match read_framed_payload(py, buf, family, kind)? { + Some(_) => Err(refuse_inline_payload(kind, name)), + None => Ok(()), + }) +} + /// Build the error returned by a strict codec when it receives an /// inline Python-UDF payload it has been told not to deserialize. fn refuse_inline_payload(kind: &str, name: &str) -> datafusion::error::DataFusionError { @@ -429,7 +446,10 @@ impl PythonPhysicalCodec { &self.inner } - /// See [`PythonLogicalCodec::with_python_udf_inlining`]. + /// Toggle inline encoding of Python UDFs on this physical codec. + /// + /// Mirrors [`PythonLogicalCodec::with_python_udf_inlining`]; see + /// that method for the full security and portability discussion. pub fn with_python_udf_inlining(mut self, enabled: bool) -> Self { self.python_udf_inlining = enabled; self @@ -472,8 +492,8 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec { if let Some(udf) = try_decode_python_scalar_udf(buf)? { return Ok(udf); } - } else if buf.starts_with(PY_SCALAR_UDF_FAMILY) { - return Err(refuse_inline_payload("scalar UDF", name)); + } else { + refuse_if_inline(buf, PY_SCALAR_UDF_FAMILY, "scalar UDF", name)?; } self.inner.try_decode_udf(name, buf) } @@ -502,8 +522,8 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec { if let Some(udaf) = try_decode_python_udaf(buf)? { return Ok(udaf); } - } else if buf.starts_with(PY_AGG_UDF_FAMILY) { - return Err(refuse_inline_payload("aggregate UDF", name)); + } else { + refuse_if_inline(buf, PY_AGG_UDF_FAMILY, "aggregate UDF", name)?; } self.inner.try_decode_udaf(name, buf) } @@ -520,8 +540,8 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec { if let Some(udwf) = try_decode_python_udwf(buf)? { return Ok(udwf); } - } else if buf.starts_with(PY_WINDOW_UDF_FAMILY) { - return Err(refuse_inline_payload("window UDF", name)); + } else { + refuse_if_inline(buf, PY_WINDOW_UDF_FAMILY, "window UDF", name)?; } self.inner.try_decode_udwf(name, buf) } diff --git a/python/datafusion/context.py b/python/datafusion/context.py index c5b1d7858..5af347d33 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -1770,7 +1770,7 @@ def with_physical_extension_codec(self, codec: Any) -> SessionContext: new.ctx = new_internal return new - def with_python_udf_inlining(self, *, enabled: bool) -> SessionContext: + def with_python_udf_inlining(self, enabled: bool) -> SessionContext: """Control whether Python UDFs are embedded in serialized expressions. When ``enabled=True`` (the default), serialized expressions carry diff --git a/python/datafusion/expr.py b/python/datafusion/expr.py index c3e172ae1..2607cb8a9 100644 --- a/python/datafusion/expr.py +++ b/python/datafusion/expr.py @@ -53,6 +53,7 @@ from ._internal import expr as expr_internal from ._internal import functions as functions_internal +from .ipc import get_sender_ctx if TYPE_CHECKING: from collections.abc import Sequence @@ -594,10 +595,10 @@ def __reduce__(self) -> tuple[Callable[[bytes], Expr], tuple[bytes]]: The encoding side honors a driver-side sender context installed via :func:`datafusion.ipc.set_sender_ctx` — that is how :meth:`SessionContext.with_python_udf_inlining` propagates - through ``pickle.dumps``. + through ``pickle.dumps``. The sender context is read by + ``__reduce__``, so :func:`copy.copy` and :func:`copy.deepcopy` + — which also go through ``__reduce__`` — pick it up too. """ - from datafusion.ipc import get_sender_ctx - return (Expr._reconstruct, (self.to_bytes(get_sender_ctx()),)) @classmethod diff --git a/python/tests/test_pickle_expr.py b/python/tests/test_pickle_expr.py index ab3a55cbf..588caa21a 100644 --- a/python/tests/test_pickle_expr.py +++ b/python/tests/test_pickle_expr.py @@ -337,9 +337,13 @@ def _build_double_udf(self): name="double", ) - def test_strict_encoder_emits_smaller_blob(self): - """Strict mode skips cloudpickle of the Python callable, so the - encoded bytes are dramatically smaller than the inline form.""" + def test_strict_encoder_omits_inline_payload(self): + """Strict mode emits the by-name wire form: no `DFPYUDF` magic + in the blob, no cloudpickled callable. Semantic check is + sharper than a size-ratio heuristic — a renamed UDF or a + smaller-than-expected closure would still flip the magic + bytes, but might not move the size by 4x. + """ ctx_inline = SessionContext() ctx_strict = ctx_inline.with_python_udf_inlining(enabled=False) u = self._build_double_udf() @@ -348,7 +352,10 @@ def test_strict_encoder_emits_smaller_blob(self): blob_inline = e.to_bytes(ctx_inline) blob_strict = e.to_bytes(ctx_strict) - assert len(blob_strict) < len(blob_inline) // 4 + # `DFPYUDF` is the scalar Python-UDF family prefix; see + # `PY_SCALAR_UDF_FAMILY` in crates/core/src/codec.rs. + assert b"DFPYUDF" in blob_inline + assert b"DFPYUDF" not in blob_strict def test_toggle_off_then_on_restores_inline_encoding(self): """`with_python_udf_inlining` is per-call clone semantics: @@ -417,10 +424,10 @@ def test_strict_decoder_refuses_inline_payload(self): def test_sender_ctx_propagates_through_pickle(self): """`set_sender_ctx` makes `pickle.dumps` use a strict codec. - Without a sender context, pickle defaults to the inline codec and - the blob is large. With a strict sender context installed, the - blob shrinks because the Python callable is encoded by name - instead of cloudpickled. + Without a sender context, pickle defaults to the inline codec + and the blob contains the `DFPYUDF` family prefix. With a + strict sender context installed, the callable encodes by name + and the prefix is absent. """ u = self._build_double_udf() e = u(col("a")) @@ -434,7 +441,8 @@ def test_sender_ctx_propagates_through_pickle(self): finally: clear_sender_ctx() - assert len(blob_strict) < len(blob_default) // 4 + assert b"DFPYUDF" in blob_default + assert b"DFPYUDF" not in blob_strict def test_sender_ctx_strict_roundtrip_via_pickle(self): """End-to-end pickle round-trip with strict mode on both sides. diff --git a/python/tests/test_pickle_multiprocessing.py b/python/tests/test_pickle_multiprocessing.py index 6eabaff9e..216542c01 100644 --- a/python/tests/test_pickle_multiprocessing.py +++ b/python/tests/test_pickle_multiprocessing.py @@ -123,9 +123,6 @@ def test_closure_capturing_udf_via_pool(start_method): ctx = mp.get_context(start_method) with ctx.Pool(processes=2) as pool: - results = pool.starmap( - helpers.unpickle_and_evaluate, - [(blob, [1, 2, 3])], - ) + result = pool.apply(helpers.unpickle_and_evaluate, (blob, [1, 2, 3])) - assert results[0] == [7, 14, 21] + assert result == [7, 14, 21] From 74405d872b84a383d117d50830dbf7a6978d3fa1 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Thu, 21 May 2026 09:14:40 -0400 Subject: [PATCH 06/10] refactor: keyword-only inlining flag, skip GIL on prefix mismatch - `SessionContext.with_python_udf_inlining` now keyword-only (`*, enabled`) to match the documented call style and the existing doctests/tests. - `refuse_if_inline` and the three `try_decode_python_*` decoders short- circuit on a `starts_with(family)` check before `Python::attach`, so plans whose UDFs are not Python-defined no longer pay a GIL acquisition per decode call. Semantics preserved: `strip_wire_header` already returns `Ok(None)` when the prefix does not match. - `datafusion.ipc` module docstring wraps the `set_sender_ctx` example in `try`/`finally` and notes that the thread-local holds a strong reference to the installed `SessionContext` until cleared. Co-Authored-By: Claude Opus 4.7 --- crates/core/src/codec.rs | 18 ++++++++++++++++++ python/datafusion/context.py | 2 +- python/datafusion/ipc.py | 15 +++++++++++++-- 3 files changed, 32 insertions(+), 3 deletions(-) diff --git a/crates/core/src/codec.rs b/crates/core/src/codec.rs index d43580a75..9ecbdbebc 100644 --- a/crates/core/src/codec.rs +++ b/crates/core/src/codec.rs @@ -393,7 +393,16 @@ impl LogicalExtensionCodec for PythonLogicalCodec { /// surface *their* diagnostic instead of the strict-mode message. /// The strict message implies sender intent ("inlining is disabled"), /// so it should fire only when the bytes really would have decoded. +/// +/// Fast path: short-circuit on the family-magic prefix before +/// acquiring the GIL. Plans with many non-Python UDFs would otherwise +/// pay a GIL acquisition per decode call just to confirm "not a +/// Python UDF". `read_framed_payload` itself rejects buffers that +/// don't start with `family`, so this is purely an optimization. fn refuse_if_inline(buf: &[u8], family: &[u8], kind: &str, name: &str) -> Result<()> { + if !buf.starts_with(family) { + return Ok(()); + } Python::attach(|py| match read_framed_payload(py, buf, family, kind)? { Some(_) => Err(refuse_inline_payload(kind, name)), None => Ok(()), @@ -578,6 +587,9 @@ pub(crate) fn try_encode_python_scalar_udf(node: &ScalarUDF, buf: &mut Vec) /// the caller to delegate to its `inner` codec (and eventually the /// `FunctionRegistry`). pub(crate) fn try_decode_python_scalar_udf(buf: &[u8]) -> Result>> { + if !buf.starts_with(PY_SCALAR_UDF_FAMILY) { + return Ok(None); + } Python::attach(|py| -> Result>> { let Some(payload) = read_framed_payload(py, buf, PY_SCALAR_UDF_FAMILY, "scalar UDF")? else { @@ -834,6 +846,9 @@ pub(crate) fn try_encode_python_udwf(node: &WindowUDF, buf: &mut Vec) -> Res } pub(crate) fn try_decode_python_udwf(buf: &[u8]) -> Result>> { + if !buf.starts_with(PY_WINDOW_UDF_FAMILY) { + return Ok(None); + } Python::attach(|py| -> Result>> { let Some(payload) = read_framed_payload(py, buf, PY_WINDOW_UDF_FAMILY, "window UDF")? else { @@ -916,6 +931,9 @@ pub(crate) fn try_encode_python_udaf(node: &AggregateUDF, buf: &mut Vec) -> } pub(crate) fn try_decode_python_udaf(buf: &[u8]) -> Result>> { + if !buf.starts_with(PY_AGG_UDF_FAMILY) { + return Ok(None); + } Python::attach(|py| -> Result>> { let Some(payload) = read_framed_payload(py, buf, PY_AGG_UDF_FAMILY, "aggregate UDF")? else { diff --git a/python/datafusion/context.py b/python/datafusion/context.py index 5af347d33..c5b1d7858 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -1770,7 +1770,7 @@ def with_physical_extension_codec(self, codec: Any) -> SessionContext: new.ctx = new_internal return new - def with_python_udf_inlining(self, enabled: bool) -> SessionContext: + def with_python_udf_inlining(self, *, enabled: bool) -> SessionContext: """Control whether Python UDFs are embedded in serialized expressions. When ``enabled=True`` (the default), serialized expressions carry diff --git a/python/datafusion/ipc.py b/python/datafusion/ipc.py index d0431a131..bb876bfb4 100644 --- a/python/datafusion/ipc.py +++ b/python/datafusion/ipc.py @@ -62,16 +62,27 @@ def init_worker(): .. code-block:: python from datafusion import SessionContext - from datafusion.ipc import set_sender_ctx + from datafusion.ipc import clear_sender_ctx, set_sender_ctx driver_ctx = SessionContext().with_python_udf_inlining(enabled=False) set_sender_ctx(driver_ctx) - pickle.dumps(expr) # encoded with inlining disabled + try: + pickle.dumps(expr) # encoded with inlining disabled + finally: + clear_sender_ctx() Without a sender context the default codec is used (Python UDF inlining on). The sender context only affects pickle / ``to_bytes`` encoding; explicit ``expr.to_bytes(ctx)`` calls still use the supplied ``ctx``. + +The thread-local holds a strong reference to the installed +:class:`SessionContext` until :func:`clear_sender_ctx` is called or +the thread exits. Long-running driver threads that install a sender +context once and never clear it will retain that session for the +lifetime of the thread; pair :func:`set_sender_ctx` with +:func:`clear_sender_ctx` (e.g. in a ``try``/``finally``) when the +sender context is only needed for a bounded scope. """ from __future__ import annotations From e8413dd6f050a136dba6f6ffc43d2e6c3c1a33f1 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Thu, 21 May 2026 11:49:23 -0400 Subject: [PATCH 07/10] Add dev dependency --- pyproject.toml | 1 + uv.lock | 14 ++++++++++++++ 2 files changed, 15 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index a02f4608a..418640a49 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -203,6 +203,7 @@ dev = [ "pyarrow>=19.0.0", "pygithub==2.5.0", "pytest-asyncio>=0.23.3", + "pytest-timeout>=2.3.1", "pytest>=7.4.4", "pyyaml>=6.0.3", "ruff>=0.15.1", diff --git a/uv.lock b/uv.lock index cedd9b178..26ab8b20e 100644 --- a/uv.lock +++ b/uv.lock @@ -343,6 +343,7 @@ dev = [ { name = "pygithub" }, { name = "pytest" }, { name = "pytest-asyncio" }, + { name = "pytest-timeout" }, { name = "pyyaml" }, { name = "ruff" }, { name = "toml" }, @@ -380,6 +381,7 @@ dev = [ { name = "pygithub", specifier = "==2.5.0" }, { name = "pytest", specifier = ">=7.4.4" }, { name = "pytest-asyncio", specifier = ">=0.23.3" }, + { name = "pytest-timeout", specifier = ">=2.3.1" }, { name = "pyyaml", specifier = ">=6.0.3" }, { name = "ruff", specifier = ">=0.15.1" }, { name = "toml", specifier = ">=0.10.2" }, @@ -1239,6 +1241,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/67/17/3493c5624e48fd97156ebaec380dcaafee9506d7e2c46218ceebbb57d7de/pytest_asyncio-0.25.3-py3-none-any.whl", hash = "sha256:9e89518e0f9bd08928f97a3482fdc4e244df17529460bc038291ccaf8f85c7c3", size = 19467, upload-time = "2025-01-28T18:37:56.798Z" }, ] +[[package]] +name = "pytest-timeout" +version = "2.4.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/ac/82/4c9ecabab13363e72d880f2fb504c5f750433b2b6f16e99f4ec21ada284c/pytest_timeout-2.4.0.tar.gz", hash = "sha256:7e68e90b01f9eff71332b25001f85c75495fc4e3a836701876183c4bcfd0540a", size = 17973, upload-time = "2025-05-05T19:44:34.99Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/fa/b6/3127540ecdf1464a00e5a01ee60a1b09175f6913f0644ac748494d9c4b21/pytest_timeout-2.4.0-py3-none-any.whl", hash = "sha256:c42667e5cdadb151aeb5b26d114aff6bdf5a907f176a007a30b940d3d865b5c2", size = 14382, upload-time = "2025-05-05T19:44:33.502Z" }, +] + [[package]] name = "python-dateutil" version = "2.9.0.post0" From d00c61999a5dba795038cc8fe8c39499d1339c5d Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Thu, 21 May 2026 12:13:58 -0400 Subject: [PATCH 08/10] Add testing for CI failure --- .github/workflows/test.yml | 14 +++ .../tests/_pickle_multiprocessing_helpers.py | 97 +++++++++++++++++-- python/tests/test_pickle_multiprocessing.py | 22 ++++- 3 files changed, 120 insertions(+), 13 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 2cd792ea9..c0fc0747a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -102,6 +102,20 @@ jobs: git submodule update --init uv run --no-project pytest -v --import-mode=importlib + # Always dump the multiprocessing worker diagnostic log, even on + # job timeout, so we can see where forkserver/spawn workers stalled. + # See python/tests/_pickle_multiprocessing_helpers.py for what each + # line means. Safe to remove once forkserver/spawn hang is resolved. + - name: Dump multiprocessing diagnostic log + if: always() + run: | + echo "=== /tmp/df_mp_worker_diag.log ===" + if [ -f /tmp/df_mp_worker_diag.log ]; then + cat /tmp/df_mp_worker_diag.log + else + echo "(no diagnostic log produced)" + fi + - name: FFI unit tests run: | cd examples/datafusion-ffi-example diff --git a/python/tests/_pickle_multiprocessing_helpers.py b/python/tests/_pickle_multiprocessing_helpers.py index 4f04967f2..15b1ffbee 100644 --- a/python/tests/_pickle_multiprocessing_helpers.py +++ b/python/tests/_pickle_multiprocessing_helpers.py @@ -23,9 +23,56 @@ from __future__ import annotations -import pyarrow as pa -from datafusion import SessionContext, udf -from datafusion.ipc import clear_worker_ctx, set_worker_ctx +import os +import tempfile +import time +import traceback +from pathlib import Path + +# Diagnostic log path for multiprocessing worker timing. +# Workers write here so a CI-side `cat` after a job timeout can show +# where each worker stalled (e.g. inside `import datafusion`). Lives in +# the system temp dir so it persists across Pool worker exits and is +# readable by a follow-up workflow step. Override via env var when +# debugging locally. +_DIAG_LOG = Path( + os.environ.get( + "DF_MP_DIAG_LOG", + str(Path(tempfile.gettempdir()) / "df_mp_worker_diag.log"), + ) +) + + +def _diag(event: str) -> None: + """Append a diagnostic line: timestamp, pid, parent pid, event. + + Opens / flushes / closes per call so a hang mid-import still leaves + a partial trail on disk. Parent pid distinguishes forkserver-born + workers (parent = forkserver) from spawn-born workers (parent = + main pytest process). + """ + try: + with _DIAG_LOG.open("a", encoding="utf-8") as fh: + fh.write( + f"{time.time():.3f} pid={os.getpid()} ppid={os.getppid()} {event}\n" + ) + fh.flush() + os.fsync(fh.fileno()) + except OSError: + # Best-effort diagnostic; never let logging itself break a test. + pass + + +_diag("helpers module: starting imports") +import pyarrow as pa # noqa: E402 + +_diag("helpers module: pyarrow imported") +from datafusion import SessionContext, udf # noqa: E402 + +_diag("helpers module: datafusion imported") +from datafusion.ipc import clear_worker_ctx, set_worker_ctx # noqa: E402 + +_diag("helpers module: all imports complete") def make_double_udf(): @@ -65,12 +112,31 @@ def init_worker_clear(): clear_worker_ctx() +def diag_init(): + """Pool initializer used by the diagnostic-instrumented tests. + + Logs that a worker process is alive and has finished its module + imports. If this line never appears for a given pid, the hang is + inside import / Rust extension init (before any task runs). + """ + _diag("worker init: ready for tasks") + + def unpickle_and_describe(blob: bytes) -> str: """Unpickle a proto-bytes blob and return its canonical name.""" import pickle - expr = pickle.loads(blob) # noqa: S301 - return expr.canonical_name() + _diag("unpickle_and_describe: enter") + try: + expr = pickle.loads(blob) # noqa: S301 + _diag("unpickle_and_describe: pickle.loads done") + name = expr.canonical_name() + except BaseException as exc: + _diag(f"unpickle_and_describe: raised {type(exc).__name__}: {exc}") + _diag(traceback.format_exc()) + raise + _diag(f"unpickle_and_describe: returning name={name!r}") + return name def unpickle_and_evaluate(blob: bytes, batch: list[int]) -> list[int]: @@ -82,8 +148,19 @@ def unpickle_and_evaluate(blob: bytes, batch: list[int]) -> list[int]: """ import pickle - expr = pickle.loads(blob) # noqa: S301 - ctx = SessionContext() - df = ctx.from_pydict({"a": batch}) - out = df.with_column("result", expr).select("result") - return out.to_pydict()["result"] + _diag(f"unpickle_and_evaluate: enter batch_len={len(batch)}") + try: + expr = pickle.loads(blob) # noqa: S301 + _diag("unpickle_and_evaluate: pickle.loads done") + ctx = SessionContext() + _diag("unpickle_and_evaluate: SessionContext built") + df = ctx.from_pydict({"a": batch}) + out = df.with_column("result", expr).select("result") + _diag("unpickle_and_evaluate: plan built, collecting") + result = out.to_pydict()["result"] + except BaseException as exc: + _diag(f"unpickle_and_evaluate: raised {type(exc).__name__}: {exc}") + _diag(traceback.format_exc()) + raise + _diag(f"unpickle_and_evaluate: returning len={len(result)}") + return result diff --git a/python/tests/test_pickle_multiprocessing.py b/python/tests/test_pickle_multiprocessing.py index 216542c01..5e3e223df 100644 --- a/python/tests/test_pickle_multiprocessing.py +++ b/python/tests/test_pickle_multiprocessing.py @@ -80,12 +80,16 @@ def _skip_if_multiprocessing_unavailable(): @pytest.mark.timeout(120) def test_builtin_pickle_via_pool(start_method): """Built-in expressions round-trip in every start method.""" + helpers._diag(f"test_builtin_pickle_via_pool[{start_method}]: enter") expr = col("a") + lit(1) blob = pickle.dumps(expr) ctx = mp.get_context(start_method) - with ctx.Pool(processes=2) as pool: + helpers._diag(f"test_builtin_pickle_via_pool[{start_method}]: creating Pool") + with ctx.Pool(processes=2, initializer=helpers.diag_init) as pool: + helpers._diag(f"test_builtin_pickle_via_pool[{start_method}]: pool ready, map") results = pool.map(helpers.unpickle_and_describe, [blob, blob, blob]) + helpers._diag(f"test_builtin_pickle_via_pool[{start_method}]: pool closed") assert all(r == expr.canonical_name() for r in results) @@ -98,16 +102,22 @@ def test_udf_pickle_self_contained(start_method): Workers start with no UDF registered. The Rust-side ``PythonUDFCodec`` reconstructs the UDF from bytes embedded in the pickle blob. """ + helpers._diag(f"test_udf_pickle_self_contained[{start_method}]: enter") udf_obj = helpers.make_double_udf() expr = udf_obj(col("a")) blob = pickle.dumps(expr) ctx = mp.get_context(start_method) - with ctx.Pool(processes=2) as pool: + helpers._diag(f"test_udf_pickle_self_contained[{start_method}]: creating Pool") + with ctx.Pool(processes=2, initializer=helpers.diag_init) as pool: + helpers._diag( + f"test_udf_pickle_self_contained[{start_method}]: pool ready, starmap" + ) results = pool.starmap( helpers.unpickle_and_evaluate, [(blob, [1, 2, 3]), (blob, [10, 20, 30])], ) + helpers._diag(f"test_udf_pickle_self_contained[{start_method}]: pool closed") assert results[0] == [2, 4, 6] assert results[1] == [20, 40, 60] @@ -117,12 +127,18 @@ def test_udf_pickle_self_contained(start_method): @pytest.mark.timeout(120) def test_closure_capturing_udf_via_pool(start_method): """Cloudpickle preserves closure state across the codec boundary.""" + helpers._diag(f"test_closure_capturing_udf_via_pool[{start_method}]: enter") udf_obj = helpers.make_times_seven_udf() expr = udf_obj(col("a")) blob = pickle.dumps(expr) ctx = mp.get_context(start_method) - with ctx.Pool(processes=2) as pool: + helpers._diag(f"test_closure_capturing_udf_via_pool[{start_method}]: creating Pool") + with ctx.Pool(processes=2, initializer=helpers.diag_init) as pool: + helpers._diag( + f"test_closure_capturing_udf_via_pool[{start_method}]: pool ready, apply" + ) result = pool.apply(helpers.unpickle_and_evaluate, (blob, [1, 2, 3])) + helpers._diag(f"test_closure_capturing_udf_via_pool[{start_method}]: pool closed") assert result == [7, 14, 21] From f7c8c6cd9fcab7152bc7063057051defa138ce5d Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Thu, 21 May 2026 17:05:53 -0400 Subject: [PATCH 09/10] Additional debugging for mp tests in CI --- .../tests/_pickle_multiprocessing_helpers.py | 65 +++++++++++++++++++ python/tests/test_pickle_multiprocessing.py | 38 ++++++++++- 2 files changed, 100 insertions(+), 3 deletions(-) diff --git a/python/tests/_pickle_multiprocessing_helpers.py b/python/tests/_pickle_multiprocessing_helpers.py index 15b1ffbee..0ca669fcf 100644 --- a/python/tests/_pickle_multiprocessing_helpers.py +++ b/python/tests/_pickle_multiprocessing_helpers.py @@ -122,6 +122,71 @@ def diag_init(): _diag("worker init: ready for tasks") +def _read_text(path: str) -> str: + """Read a /proc file; return ``""`` if not accessible.""" + try: + return Path(path).read_text(encoding="utf-8", errors="replace").strip() + except OSError: + return "" + + +def _descendants(root_pid: int) -> list[int]: + """Return ``root_pid`` plus all descendant pids via /proc//task/.../children. + + Linux-only; returns ``[root_pid]`` on platforms without ``/proc``. + """ + out: list[int] = [root_pid] + if not Path("/proc").is_dir(): + return out + queue = [root_pid] + while queue: + pid = queue.pop() + try: + task_dir = Path(f"/proc/{pid}/task") + if not task_dir.is_dir(): + continue + for tdir in task_dir.iterdir(): + children_file = tdir / "children" + try: + children = children_file.read_text(encoding="utf-8").split() + except OSError: + continue + for child in children: + try: + cpid = int(child) + except ValueError: + continue + out.append(cpid) + queue.append(cpid) + except OSError: + continue + return out + + +def snapshot_processes(label: str, root_pid: int | None = None) -> None: + """Dump a process-state snapshot to the diagnostic log. + + For each descendant of ``root_pid`` (default: current process), record + cmdline, status (``R``/``S``/``D``), wchan (kernel function the task + is blocked in), and kernel stack. Use this to localize a worker hang: + a wchan of ``do_futex`` points at a lock; ``poll_schedule_timeout`` + points at a blocking I/O wait; ``do_select`` at multiprocessing's + pipe read. + """ + pid = root_pid if root_pid is not None else os.getpid() + _diag(f"snapshot[{label}] root_pid={pid}") + for cpid in _descendants(pid): + cmd = _read_text(f"/proc/{cpid}/cmdline").replace("\x00", " ").strip() + stat = _read_text(f"/proc/{cpid}/status").splitlines() + state_line = next((s for s in stat if s.startswith("State:")), "State: ?") + wchan = _read_text(f"/proc/{cpid}/wchan") + stack = _read_text(f"/proc/{cpid}/stack") + _diag(f"snapshot[{label}] pid={cpid} {state_line} wchan={wchan} cmd={cmd!r}") + if stack and stack != "": + for line in stack.splitlines()[:10]: + _diag(f"snapshot[{label}] pid={cpid} stack: {line}") + + def unpickle_and_describe(blob: bytes) -> str: """Unpickle a proto-bytes blob and return its canonical name.""" import pickle diff --git a/python/tests/test_pickle_multiprocessing.py b/python/tests/test_pickle_multiprocessing.py index 5e3e223df..02a3e261d 100644 --- a/python/tests/test_pickle_multiprocessing.py +++ b/python/tests/test_pickle_multiprocessing.py @@ -27,10 +27,12 @@ from __future__ import annotations +import contextlib import functools import multiprocessing as mp import pickle import sys +import threading import pytest from datafusion import col, lit @@ -38,6 +40,27 @@ from . import _pickle_multiprocessing_helpers as helpers +@contextlib.contextmanager +def _snapshot_on_hang(label: str, fire_after_seconds: float = 30.0): + """Schedule a process-state snapshot ``fire_after_seconds`` from now. + + Cancelled if the ``with`` block exits before then. Used to capture + worker state mid-hang — fork tests return in well under the delay, + so the timer only fires when something is actually stuck. + """ + timer = threading.Timer( + fire_after_seconds, + helpers.snapshot_processes, + args=(label,), + ) + timer.daemon = True + timer.start() + try: + yield + finally: + timer.cancel() + + @functools.cache def _multiprocessing_available() -> tuple[bool, str]: """Return (available, reason). Some sandboxed environments deny semaphore @@ -86,7 +109,10 @@ def test_builtin_pickle_via_pool(start_method): ctx = mp.get_context(start_method) helpers._diag(f"test_builtin_pickle_via_pool[{start_method}]: creating Pool") - with ctx.Pool(processes=2, initializer=helpers.diag_init) as pool: + with ( + ctx.Pool(processes=2, initializer=helpers.diag_init) as pool, + _snapshot_on_hang(f"builtin[{start_method}]"), + ): helpers._diag(f"test_builtin_pickle_via_pool[{start_method}]: pool ready, map") results = pool.map(helpers.unpickle_and_describe, [blob, blob, blob]) helpers._diag(f"test_builtin_pickle_via_pool[{start_method}]: pool closed") @@ -109,7 +135,10 @@ def test_udf_pickle_self_contained(start_method): ctx = mp.get_context(start_method) helpers._diag(f"test_udf_pickle_self_contained[{start_method}]: creating Pool") - with ctx.Pool(processes=2, initializer=helpers.diag_init) as pool: + with ( + ctx.Pool(processes=2, initializer=helpers.diag_init) as pool, + _snapshot_on_hang(f"udf[{start_method}]"), + ): helpers._diag( f"test_udf_pickle_self_contained[{start_method}]: pool ready, starmap" ) @@ -134,7 +163,10 @@ def test_closure_capturing_udf_via_pool(start_method): ctx = mp.get_context(start_method) helpers._diag(f"test_closure_capturing_udf_via_pool[{start_method}]: creating Pool") - with ctx.Pool(processes=2, initializer=helpers.diag_init) as pool: + with ( + ctx.Pool(processes=2, initializer=helpers.diag_init) as pool, + _snapshot_on_hang(f"closure[{start_method}]"), + ): helpers._diag( f"test_closure_capturing_udf_via_pool[{start_method}]: pool ready, apply" ) From 949af914a64df2e7e6add1e31f9ff6f5a854fe4d Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 22 May 2026 06:37:13 -0400 Subject: [PATCH 10/10] Set path for workers --- python/tests/test_pickle_multiprocessing.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/python/tests/test_pickle_multiprocessing.py b/python/tests/test_pickle_multiprocessing.py index 02a3e261d..e20360bc5 100644 --- a/python/tests/test_pickle_multiprocessing.py +++ b/python/tests/test_pickle_multiprocessing.py @@ -33,12 +33,25 @@ import pickle import sys import threading +from pathlib import Path import pytest from datafusion import col, lit from . import _pickle_multiprocessing_helpers as helpers +# `pytest --import-mode=importlib` (used in CI) does not prepend the test +# parent directory to `sys.path`; pytest loads `tests` via its own importlib +# hook. multiprocessing forkserver / spawn workers receive only the parent's +# `sys.path` snapshot, not pytest's hook, so they fail to import +# `tests._pickle_multiprocessing_helpers` with `ModuleNotFoundError: No module +# named 'tests'`. Ensure the parent directory of the `tests` package is on +# `sys.path` so workers can resolve it the standard way. Fork start method is +# unaffected (inherits the already-imported module object). +_TESTS_PARENT = str(Path(__file__).resolve().parent.parent) +if _TESTS_PARENT not in sys.path: + sys.path.insert(0, _TESTS_PARENT) + @contextlib.contextmanager def _snapshot_on_hang(label: str, fire_after_seconds: float = 30.0):