diff --git a/git/refs/log.py b/git/refs/log.py index 4751cff99..fbbe66b22 100644 --- a/git/refs/log.py +++ b/git/refs/log.py @@ -4,7 +4,6 @@ __all__ = ["RefLog", "RefLogEntry"] from mmap import mmap -import os.path as osp import re import time as _time @@ -212,8 +211,11 @@ def path(cls, ref: "SymbolicReference") -> str: :param ref: :class:`~git.refs.symbolic.SymbolicReference` instance + + :raise ValueError: + If `ref.path` is invalid or escapes the repository's reflog directory. """ - return osp.join(ref.repo.git_dir, "logs", to_native_path(ref.path)) + return to_native_path(ref._get_validated_reflog_path(ref.repo, ref.path)) @classmethod def iter_entries(cls, stream: Union[str, "BytesIO", mmap]) -> Iterator[RefLogEntry]: diff --git a/git/refs/remote.py b/git/refs/remote.py index b4f4f7b36..e16ae70f8 100644 --- a/git/refs/remote.py +++ b/git/refs/remote.py @@ -58,17 +58,20 @@ def delete(cls, repo: "Repo", *refs: "RemoteReference", **kwargs: Any) -> None: `kwargs` are given for comparability with the base class method as we should not narrow the signature. """ + for ref in refs: + cls._check_ref_name_valid(ref.path) + repo.git.branch("-d", "-r", *refs) # The official deletion method will ignore remote symbolic refs - these are # generally ignored in the refs/ folder. We don't though and delete remainders # manually. for ref in refs: try: - os.remove(os.path.join(repo.common_dir, ref.path)) + os.remove(cls._get_validated_path(repo.common_dir, ref.path)) except OSError: pass try: - os.remove(os.path.join(repo.git_dir, ref.path)) + os.remove(cls._get_validated_path(repo.git_dir, ref.path)) except OSError: pass # END for each ref diff --git a/git/refs/symbolic.py b/git/refs/symbolic.py index 99af4f57c..020de5e13 100644 --- a/git/refs/symbolic.py +++ b/git/refs/symbolic.py @@ -110,6 +110,32 @@ def name(self) -> str: def abspath(self) -> PathLike: return join_path_native(_git_dir(self.repo, self.path), self.path) + @staticmethod + def _get_validated_path(base: PathLike, path: PathLike) -> str: + path = os.fspath(path) + base_path = os.path.realpath(os.fspath(base)) + abs_path = os.path.realpath(os.path.join(base_path, path)) + try: + common_path = os.path.commonpath([base_path, abs_path]) + except ValueError as e: + raise ValueError("Reference path %r escapes the repository" % path) from e + if os.path.normcase(common_path) != os.path.normcase(base_path): + raise ValueError("Reference path %r escapes the repository" % path) + return abs_path + + @classmethod + def _get_validated_ref_path(cls, repo: "Repo", path: PathLike) -> str: + """Return the absolute filesystem path for a ref after validating it.""" + cls._check_ref_name_valid(path) + ref_path = os.fspath(path) + return cls._get_validated_path(_git_dir(repo, ref_path), ref_path) + + @classmethod + def _get_validated_reflog_path(cls, repo: "Repo", path: PathLike) -> str: + """Return the absolute filesystem path for a reflog after validating it.""" + cls._check_ref_name_valid(path) + return cls._get_validated_path(os.path.join(repo.git_dir, "logs"), path) + @classmethod def _get_packed_refs_path(cls, repo: "Repo") -> str: return os.path.join(repo.common_dir, "packed-refs") @@ -485,7 +511,7 @@ def set_reference( # END handle non-existing # END retrieve old hexsha - fpath = self.abspath + fpath = self._get_validated_ref_path(self.repo, self.path) assure_directory_exists(fpath, is_file=True) lfd = LockedFD(fpath) @@ -632,7 +658,7 @@ def delete(cls, repo: "Repo", path: PathLike) -> None: Alternatively the symbolic reference to be deleted. """ full_ref_path = cls.to_full_path(path) - abs_path = os.path.join(repo.common_dir, full_ref_path) + abs_path = cls._get_validated_ref_path(repo, full_ref_path) if os.path.exists(abs_path): os.remove(abs_path) else: @@ -695,9 +721,8 @@ def _create( symbolic reference. Otherwise it will be resolved to the corresponding object and a detached symbolic reference will be created instead. """ - git_dir = _git_dir(repo, path) full_ref_path = cls.to_full_path(path) - abs_ref_path = os.path.join(git_dir, full_ref_path) + abs_ref_path = cls._get_validated_ref_path(repo, full_ref_path) # Figure out target data. target = reference @@ -789,8 +814,8 @@ def rename(self, new_path: PathLike, force: bool = False) -> "SymbolicReference" if self.path == new_path: return self - new_abs_path = os.path.join(_git_dir(self.repo, new_path), new_path) - cur_abs_path = os.path.join(_git_dir(self.repo, self.path), self.path) + new_abs_path = self._get_validated_ref_path(self.repo, new_path) + cur_abs_path = self._get_validated_ref_path(self.repo, self.path) if os.path.isfile(new_abs_path): if not force: # If they point to the same file, it's not an error. diff --git a/git/util.py b/git/util.py index c3ffdd62b..712fabe85 100644 --- a/git/util.py +++ b/git/util.py @@ -289,7 +289,7 @@ def join_path(a: PathLike, *p: PathLike) -> PathLike: if sys.platform == "win32": - def to_native_path_windows(path: PathLike) -> PathLike: + def to_native_path_windows(path: PathLike) -> str: path = os.fspath(path) return path.replace("/", "\\") diff --git a/test/test_refs.py b/test/test_refs.py index 329515807..d77b34eba 100644 --- a/test/test_refs.py +++ b/test/test_refs.py @@ -3,6 +3,7 @@ # This module is part of GitPython and is released under the # 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/ +import contextlib from itertools import chain import os.path as osp from pathlib import Path @@ -18,6 +19,7 @@ RefLog, Reference, RemoteReference, + Repo, SymbolicReference, TagReference, ) @@ -29,6 +31,18 @@ class TestRefs(TestBase): + @contextlib.contextmanager + def _repo_with_initial_commit(self, base_dir): + repo_dir = base_dir / "repo" + repo = Repo.init(repo_dir) + (repo_dir / "file.txt").write_text("initial\n", encoding="utf-8") + repo.index.add(["file.txt"]) + repo.index.commit("initial") + try: + yield repo + finally: + repo.git.clear_cache() + def test_from_path(self): # Should be able to create any reference directly. for ref_type in (Reference, Head, TagReference, RemoteReference): @@ -648,6 +662,115 @@ def test_refs_outside_repo(self): ref_file_name = Path(ref_file.name).name self.assertRaises(BadName, self.rorepo.commit, f"../../{ref_file_name}") + def test_reference_create_rejects_path_traversal(self): + with tempfile.TemporaryDirectory() as tmp_dir: + base_dir = Path(tmp_dir) + with self._repo_with_initial_commit(base_dir) as repo: + outside_path = base_dir / "outside_write.txt" + + self.assertRaises(ValueError, Reference.create, repo, "../../../outside_write.txt", "HEAD") + assert not outside_path.exists() + + def test_symbolic_reference_create_rejects_path_traversal(self): + with tempfile.TemporaryDirectory() as tmp_dir: + base_dir = Path(tmp_dir) + with self._repo_with_initial_commit(base_dir) as repo: + outside_path = base_dir / "outside_write.txt" + + self.assertRaises(ValueError, SymbolicReference.create, repo, "../../outside_write.txt", "HEAD") + assert not outside_path.exists() + + def test_symbolic_reference_set_reference_rejects_path_traversal(self): + with tempfile.TemporaryDirectory() as tmp_dir: + base_dir = Path(tmp_dir) + with self._repo_with_initial_commit(base_dir) as repo: + outside_path = base_dir / "outside_write.txt" + + self.assertRaises(ValueError, SymbolicReference(repo, "../../outside_write.txt").set_reference, "HEAD") + assert not outside_path.exists() + + def test_symbolic_reference_rename_rejects_path_traversal(self): + with tempfile.TemporaryDirectory() as tmp_dir: + base_dir = Path(tmp_dir) + with self._repo_with_initial_commit(base_dir) as repo: + outside_path = base_dir / "outside_move.txt" + ref = SymbolicReference.create(repo, "SAFE_RENAME_SOURCE", "HEAD") + + self.assertRaises(ValueError, ref.rename, "../../outside_move.txt") + assert not outside_path.exists() + assert Path(ref.abspath).is_file() + + def test_symbolic_reference_delete_rejects_path_traversal(self): + with tempfile.TemporaryDirectory() as tmp_dir: + base_dir = Path(tmp_dir) + with self._repo_with_initial_commit(base_dir) as repo: + outside_path = base_dir / "outside_delete.txt" + outside_path.write_text("do not delete\n", encoding="utf-8") + + self.assertRaises(ValueError, SymbolicReference.delete, repo, "../../outside_delete.txt") + assert outside_path.read_text(encoding="utf-8") == "do not delete\n" + + def test_symbolic_reference_log_append_rejects_path_traversal(self): + with tempfile.TemporaryDirectory() as tmp_dir: + base_dir = Path(tmp_dir) + with self._repo_with_initial_commit(base_dir) as repo: + outside_path = base_dir / "outside_reflog.txt" + + ref = SymbolicReference(repo, "../../../outside_reflog.txt") + self.assertRaises( + ValueError, ref.log_append, Commit.NULL_BIN_SHA, "do not write", repo.head.commit.binsha + ) + assert not outside_path.exists() + + def test_symbolic_reference_set_reference_rejects_symlink_escape(self): + with tempfile.TemporaryDirectory() as tmp_dir: + base_dir = Path(tmp_dir) + with self._repo_with_initial_commit(base_dir) as repo: + outside_dir = base_dir / "outside_refs" + outside_dir.mkdir() + outside_path = outside_dir / "escaped" + + refs_heads_dir = Path(repo.common_dir) / "refs" / "heads" + refs_heads_dir.mkdir(parents=True, exist_ok=True) + symlink_path = refs_heads_dir / "link_out" + try: + symlink_path.symlink_to(outside_dir, target_is_directory=True) + except (OSError, NotImplementedError) as ex: + self.skipTest("symlinks unavailable on this platform: %s" % ex) + if osp.realpath(symlink_path / "escaped") == osp.abspath(symlink_path / "escaped"): + self.skipTest("realpath does not resolve directory symlinks on this platform") + + ref = SymbolicReference(repo, "refs/heads/link_out/escaped") + self.assertRaises(ValueError, ref.set_reference, "HEAD") + assert not outside_path.exists() + + def test_remote_reference_delete_cleanup_rejects_path_traversal(self): + with tempfile.TemporaryDirectory() as tmp_dir: + base_dir = Path(tmp_dir) + git_dir = base_dir / "repo" / ".git" + git_dir.mkdir(parents=True) + outside_path = base_dir / "outside_remote_delete.txt" + outside_path.write_text("do not delete\n", encoding="utf-8") + + class GitStub: + branch_called = False + + def branch(self, *args): + self.branch_called = True + + class RepoStub: + pass + + repo = RepoStub() + repo.git = GitStub() + repo.common_dir = str(git_dir) + repo.git_dir = str(git_dir) + ref = RemoteReference(repo, "../../outside_remote_delete.txt", check_path=False) + + self.assertRaises(ValueError, RemoteReference.delete, repo, ref) + assert not repo.git.branch_called + assert outside_path.read_text(encoding="utf-8") == "do not delete\n" + def test_validity_ref_names(self): """Ensure ref names are checked for validity.