Skip to content

Commit d9cdc57

Browse files
authored
fix: with_dynamic_directory path traversal (#9162)
1 parent 40614fb commit d9cdc57

2 files changed

Lines changed: 79 additions & 1 deletion

File tree

marimo/_server/asgi.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,14 @@ def __init__(
108108
"Using path='/' or path='' is not supported."
109109
)
110110
self.directory = Path(directory)
111+
# Precompute the resolved directory so we don't hit the filesystem
112+
# on every request. Fall back to an absolute path if resolve()
113+
# fails (e.g., broken symlink), so the middleware still starts
114+
# and per-request checks can handle resolution errors.
115+
try:
116+
self._resolved_directory = self.directory.resolve()
117+
except (RuntimeError, OSError):
118+
self._resolved_directory = self.directory.absolute()
111119
self.app_builder = app_builder
112120
self._app_cache: dict[str, ASGIApp] = {}
113121
self.validate_callback = validate_callback
@@ -135,16 +143,35 @@ def _redirect_response(self, scope: Scope) -> Response:
135143
LOGGER.debug(f"Redirecting to: {redirect_url}")
136144
return RedirectResponse(url=redirect_url, status_code=307)
137145

146+
def _is_within_directory(self, path: Path) -> bool:
147+
"""Check that path resolves to a location within self.directory."""
148+
try:
149+
path.resolve().relative_to(self._resolved_directory)
150+
return True
151+
except (ValueError, RuntimeError, OSError):
152+
return False
153+
138154
def _find_matching_file(
139155
self, relative_path: str
140156
) -> tuple[Path, str] | None:
141157
"""Find a matching Python file in the directory structure.
142158
Returns tuple of (matching file, remaining path) if found, None otherwise.
143159
"""
160+
# Reject path traversal segments. Normalize "\" to "/" so the check
161+
# also catches backslash segments, which Windows treats as path
162+
# separators (e.g. "..\\secret" via %5C in the URL).
163+
segments = relative_path.replace("\\", "/").split("/")
164+
if ".." in segments:
165+
return None
166+
144167
# Try direct match first, skip if relative path has an extension
145168
if not Path(relative_path).suffix:
146169
direct_match = self.directory / f"{relative_path}.py"
147-
if not direct_match.name.startswith("_") and direct_match.exists():
170+
if (
171+
not direct_match.name.startswith("_")
172+
and self._is_within_directory(direct_match)
173+
and direct_match.exists()
174+
):
148175
return (direct_match, "")
149176

150177
# Try nested path by progressively checking each part
@@ -159,6 +186,7 @@ def _find_matching_file(
159186
if (
160187
cache_key in self._app_cache
161188
and not potential_path.name.startswith("_")
189+
and self._is_within_directory(potential_path)
162190
):
163191
return (potential_path.with_suffix(".py"), "/".join(remaining))
164192

tests/_server/test_asgi.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import os
4+
import shutil
45
import sys
56
import tempfile
67
import unittest
@@ -512,6 +513,55 @@ def test_hidden_file_passes_through(self):
512513
assert response.status_code == 404
513514
assert response.text == "Not Found"
514515

516+
def test_path_traversal_blocked(self):
517+
# Create a dedicated sibling directory with a unique name to hold
518+
# the "outside" file, so we don't clobber files in the shared
519+
# parent directory and are safe under parallel test runs.
520+
parent_dir = Path(self.temp_dir).parent
521+
outside_dir = Path(tempfile.mkdtemp(dir=parent_dir))
522+
outside_name = outside_dir.name
523+
(outside_dir / "secret.py").write_text(contents)
524+
try:
525+
# Raw traversal
526+
response = self.client.get(f"/apps/../{outside_name}/secret/")
527+
assert response.status_code == 404
528+
assert response.text == "Not Found"
529+
530+
# URL-encoded traversal (%2e%2e = ..)
531+
response = self.client.get(f"/apps/%2e%2e/{outside_name}/secret/")
532+
assert response.status_code == 404
533+
assert response.text == "Not Found"
534+
535+
# Double-encoded
536+
response = self.client.get(
537+
f"/apps/%252e%252e/{outside_name}/secret/"
538+
)
539+
assert response.status_code == 404
540+
assert response.text == "Not Found"
541+
542+
# Nested traversal
543+
response = self.client.get(
544+
f"/apps/nested/../../{outside_name}/secret/"
545+
)
546+
assert response.status_code == 404
547+
assert response.text == "Not Found"
548+
finally:
549+
shutil.rmtree(outside_dir, ignore_errors=True)
550+
551+
def test_path_traversal_blocked_backslash(self):
552+
# Windows treats "\" as a path separator, so URLs like
553+
# "/apps/..%5Csecret/" (%5C = "\") could bypass a naive "/"-only
554+
# check. Call _find_matching_file directly since httpx/starlette
555+
# would otherwise reject or normalize the raw URL before it
556+
# reaches the middleware.
557+
middleware = self.app_with_middleware
558+
assert middleware._find_matching_file("..\\secret") is None
559+
assert middleware._find_matching_file("foo\\..\\secret") is None
560+
assert middleware._find_matching_file("nested\\..\\..\\secret") is None
561+
# Mixed separators
562+
assert middleware._find_matching_file("..\\..\\secret") is None
563+
assert middleware._find_matching_file("foo/..\\secret") is None
564+
515565
def test_valid_app_path(self):
516566
response = self.client.get("/apps/test_app/")
517567
assert response.status_code == 200

0 commit comments

Comments
 (0)