Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions marimo/_ast/codegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,9 +543,11 @@ def generate_filecontents(
) -> str:
"""Translates a sequences of codes (cells) to a Python file"""

# Update old internal cell names to the new ones
# Normalize internal cell names. Empty names would emit ``def ():``
# (invalid Python) and fall back to the unparsable-cell path;
# ``"__"`` is a legacy internal marker.
for idx, name in enumerate(names):
if name == "__":
if name == "__" or not name:
names[idx] = DEFAULT_CELL_NAME

setup_cell = pop_setup_cell(codes, names, cell_configs)
Expand Down
79 changes: 77 additions & 2 deletions marimo/_session/extensions/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,19 @@
from __future__ import annotations

import asyncio
import copy
import html
from enum import Enum
from functools import partial
from typing import TYPE_CHECKING

import msgspec

from marimo import _loggers
from marimo._cli.print import red
from marimo._messaging.notebook.document import NotebookCell
from marimo._messaging.notification import (
AlertNotification,
NotebookDocumentTransactionNotification,
NotificationMessage,
)
Expand All @@ -26,6 +31,7 @@
EventAwareExtension,
SessionExtension,
)
from marimo._session.model import SessionMode
from marimo._session.state.serialize import (
SessionCacheKey,
SessionCacheManager,
Expand All @@ -41,6 +47,7 @@
QueueDistributor,
)
from marimo._utils.print import print_, print_tabbed
from marimo._utils.serial_task_runner import SerialTaskRunner

if TYPE_CHECKING:
from logging import Logger
Expand Down Expand Up @@ -218,6 +225,12 @@ def __init__(
self.kernel_manager = kernel_manager
self.queue_manager = queue_manager
self.distributor: Distributor[KernelMessage] | None = None
# Log the unnamed-notebook skip once per session, not per mutation.
self._unnamed_autosave_logged = False
# FIFO so a slow older save never clobbers a newer one.
self._autosave_runner = SerialTaskRunner(
thread_name_prefix="marimo-autosave"
)

def _create_distributor(
self,
Expand All @@ -232,19 +245,22 @@ def _create_distributor(
# Edit mode with original kernel manager uses connection
return ConnectionDistributor(kernel_manager.kernel_connection)

@staticmethod
def _on_kernel_message(session: Session, msg: KernelMessage) -> None:
def _on_kernel_message(self, session: Session, msg: KernelMessage) -> None:
"""Route a raw kernel message to the appropriate session method.

Document transactions are intercepted and applied to the
``session.document``, then ``session.notify()`` is invoked with the (versioned) result.

Kernel-sourced transactions also trigger an auto-save so agent-driven
mutations via ``code_mode`` land on disk the same way frontend edits do.

Everything else is forwarded verbatim via ``session.notify()``.

TODO: if more notification types need server-side interception,
consider a middleware chain instead of inline dispatch.
"""
notif: KernelMessage | NotificationMessage = msg
kernel_transaction_applied = False

name = try_deserialize_kernel_notification_name(msg)
if name == NotebookDocumentTransactionNotification.name:
Expand All @@ -257,13 +273,70 @@ def _on_kernel_message(session: Session, msg: KernelMessage) -> None:
notif = NotebookDocumentTransactionNotification(
transaction=applied
)
kernel_transaction_applied = applied.source == "kernel"
except Exception:
LOGGER.warning(
"Failed to decode/apply kernel document transaction"
)

session.notify(notif, from_consumer_id=None)

if kernel_transaction_applied:
self._maybe_autosave(session)

def _maybe_autosave(self, session: Session) -> None:
"""Best-effort persistence of kernel-driven mutations to disk.

Skipped in run mode and for unnamed notebooks. Failures surface as
an ``AlertNotification`` toast; they never raise out of the
interceptor.
"""
if self.kernel_manager.mode != SessionMode.EDIT:
return

if session.app_file_manager.path is None:
if not self._unnamed_autosave_logged:
LOGGER.debug(
"Skipping code_mode auto-save for unnamed notebook"
)
self._unnamed_autosave_logged = True
return

# Deep-copy on the caller thread. ``NotebookCell`` and
# ``CellConfig`` are mutable and owned by the document, so a
# shallow copy would let the event-loop thread mutate fields
# under the worker thread's feet (torn snapshot).
cells_snapshot: list[NotebookCell] = copy.deepcopy(
session.document.cells
)

self._autosave_runner.submit(
partial(session.app_file_manager.save_from_cells, cells_snapshot),
on_error=partial(self._post_autosave_failure, session),
)

@staticmethod
def _post_autosave_failure(session: Session, err: Exception) -> None:
# Runs on the event loop thread — the runner routes on_error there
# so session.notify can safely touch the per-consumer asyncio.Queue.
LOGGER.warning(
"Failed to auto-save notebook after kernel mutation: %s", err
)
try:
session.notify(
AlertNotification(
title="Auto-save failed",
description=html.escape(
f"Could not persist kernel changes to "
f"{session.app_file_manager.path}: {err}"
),
variant="danger",
),
from_consumer_id=None,
)
except Exception:
LOGGER.exception("Failed to broadcast auto-save failure alert")

def on_attach(self, session: Session, event_bus: SessionEventBus) -> None:
del event_bus
self.distributor = self._create_distributor(
Expand All @@ -279,6 +352,8 @@ def on_detach(self) -> None:
if self.distributor is not None:
self.distributor.stop()
self.distributor = None
# Don't block session close on disk I/O; kernel still holds state.
self._autosave_runner.shutdown(wait=False)

def flush(self) -> None:
"""Flush any pending messages from the distributor."""
Expand Down
Loading
Loading