Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions git/refs/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
__all__ = ["RefLog", "RefLogEntry"]

from mmap import mmap
import os.path as osp
import re
import time as _time

Expand Down Expand Up @@ -212,8 +211,11 @@ def path(cls, ref: "SymbolicReference") -> str:

:param ref:
:class:`~git.refs.symbolic.SymbolicReference` instance
Comment thread
Byron marked this conversation as resolved.

:raise ValueError:
If `ref.path` is invalid or escapes the repository's reflog directory.
"""
return osp.join(ref.repo.git_dir, "logs", to_native_path(ref.path))
return to_native_path(ref._get_validated_reflog_path(ref.repo, ref.path))

@classmethod
def iter_entries(cls, stream: Union[str, "BytesIO", mmap]) -> Iterator[RefLogEntry]:
Expand Down
7 changes: 5 additions & 2 deletions git/refs/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,20 @@ def delete(cls, repo: "Repo", *refs: "RemoteReference", **kwargs: Any) -> None:
`kwargs` are given for comparability with the base class method as we
should not narrow the signature.
"""
for ref in refs:
cls._check_ref_name_valid(ref.path)

repo.git.branch("-d", "-r", *refs)
# The official deletion method will ignore remote symbolic refs - these are
# generally ignored in the refs/ folder. We don't though and delete remainders
# manually.
for ref in refs:
try:
os.remove(os.path.join(repo.common_dir, ref.path))
os.remove(cls._get_validated_path(repo.common_dir, ref.path))
except OSError:
pass
try:
os.remove(os.path.join(repo.git_dir, ref.path))
os.remove(cls._get_validated_path(repo.git_dir, ref.path))
except OSError:
pass
# END for each ref
Expand Down
37 changes: 31 additions & 6 deletions git/refs/symbolic.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,32 @@ def name(self) -> str:
def abspath(self) -> PathLike:
return join_path_native(_git_dir(self.repo, self.path), self.path)

@staticmethod
def _get_validated_path(base: PathLike, path: PathLike) -> str:
path = os.fspath(path)
base_path = os.path.realpath(os.fspath(base))
abs_path = os.path.realpath(os.path.join(base_path, path))
try:
common_path = os.path.commonpath([base_path, abs_path])
except ValueError as e:
raise ValueError("Reference path %r escapes the repository" % path) from e
if os.path.normcase(common_path) != os.path.normcase(base_path):
raise ValueError("Reference path %r escapes the repository" % path)
Comment thread
Byron marked this conversation as resolved.
return abs_path

@classmethod
def _get_validated_ref_path(cls, repo: "Repo", path: PathLike) -> str:
"""Return the absolute filesystem path for a ref after validating it."""
cls._check_ref_name_valid(path)
ref_path = os.fspath(path)
return cls._get_validated_path(_git_dir(repo, ref_path), ref_path)

@classmethod
def _get_validated_reflog_path(cls, repo: "Repo", path: PathLike) -> str:
"""Return the absolute filesystem path for a reflog after validating it."""
cls._check_ref_name_valid(path)
return cls._get_validated_path(os.path.join(repo.git_dir, "logs"), path)

@classmethod
def _get_packed_refs_path(cls, repo: "Repo") -> str:
return os.path.join(repo.common_dir, "packed-refs")
Expand Down Expand Up @@ -485,7 +511,7 @@ def set_reference(
# END handle non-existing
# END retrieve old hexsha

fpath = self.abspath
fpath = self._get_validated_ref_path(self.repo, self.path)
assure_directory_exists(fpath, is_file=True)

lfd = LockedFD(fpath)
Expand Down Expand Up @@ -632,7 +658,7 @@ def delete(cls, repo: "Repo", path: PathLike) -> None:
Alternatively the symbolic reference to be deleted.
"""
full_ref_path = cls.to_full_path(path)
abs_path = os.path.join(repo.common_dir, full_ref_path)
abs_path = cls._get_validated_ref_path(repo, full_ref_path)
if os.path.exists(abs_path):
os.remove(abs_path)
else:
Expand Down Expand Up @@ -695,9 +721,8 @@ def _create(
symbolic reference. Otherwise it will be resolved to the corresponding object
and a detached symbolic reference will be created instead.
"""
git_dir = _git_dir(repo, path)
full_ref_path = cls.to_full_path(path)
abs_ref_path = os.path.join(git_dir, full_ref_path)
abs_ref_path = cls._get_validated_ref_path(repo, full_ref_path)

# Figure out target data.
target = reference
Expand Down Expand Up @@ -789,8 +814,8 @@ def rename(self, new_path: PathLike, force: bool = False) -> "SymbolicReference"
if self.path == new_path:
return self

new_abs_path = os.path.join(_git_dir(self.repo, new_path), new_path)
cur_abs_path = os.path.join(_git_dir(self.repo, self.path), self.path)
new_abs_path = self._get_validated_ref_path(self.repo, new_path)
cur_abs_path = self._get_validated_ref_path(self.repo, self.path)
if os.path.isfile(new_abs_path):
if not force:
# If they point to the same file, it's not an error.
Expand Down
2 changes: 1 addition & 1 deletion git/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ def join_path(a: PathLike, *p: PathLike) -> PathLike:

if sys.platform == "win32":

def to_native_path_windows(path: PathLike) -> PathLike:
def to_native_path_windows(path: PathLike) -> str:
path = os.fspath(path)
return path.replace("/", "\\")

Expand Down
123 changes: 123 additions & 0 deletions test/test_refs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# This module is part of GitPython and is released under the
# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/

import contextlib
from itertools import chain
import os.path as osp
from pathlib import Path
Expand All @@ -18,6 +19,7 @@
RefLog,
Reference,
RemoteReference,
Repo,
SymbolicReference,
TagReference,
)
Expand All @@ -29,6 +31,18 @@


class TestRefs(TestBase):
@contextlib.contextmanager
def _repo_with_initial_commit(self, base_dir):
repo_dir = base_dir / "repo"
repo = Repo.init(repo_dir)
(repo_dir / "file.txt").write_text("initial\n", encoding="utf-8")
repo.index.add(["file.txt"])
repo.index.commit("initial")
try:
yield repo
finally:
repo.git.clear_cache()

def test_from_path(self):
# Should be able to create any reference directly.
for ref_type in (Reference, Head, TagReference, RemoteReference):
Expand Down Expand Up @@ -648,6 +662,115 @@ def test_refs_outside_repo(self):
ref_file_name = Path(ref_file.name).name
self.assertRaises(BadName, self.rorepo.commit, f"../../{ref_file_name}")

def test_reference_create_rejects_path_traversal(self):
with tempfile.TemporaryDirectory() as tmp_dir:
base_dir = Path(tmp_dir)
with self._repo_with_initial_commit(base_dir) as repo:
outside_path = base_dir / "outside_write.txt"

self.assertRaises(ValueError, Reference.create, repo, "../../../outside_write.txt", "HEAD")
assert not outside_path.exists()

Comment thread
Byron marked this conversation as resolved.
def test_symbolic_reference_create_rejects_path_traversal(self):
with tempfile.TemporaryDirectory() as tmp_dir:
base_dir = Path(tmp_dir)
with self._repo_with_initial_commit(base_dir) as repo:
outside_path = base_dir / "outside_write.txt"

self.assertRaises(ValueError, SymbolicReference.create, repo, "../../outside_write.txt", "HEAD")
assert not outside_path.exists()

def test_symbolic_reference_set_reference_rejects_path_traversal(self):
with tempfile.TemporaryDirectory() as tmp_dir:
base_dir = Path(tmp_dir)
with self._repo_with_initial_commit(base_dir) as repo:
outside_path = base_dir / "outside_write.txt"

self.assertRaises(ValueError, SymbolicReference(repo, "../../outside_write.txt").set_reference, "HEAD")
assert not outside_path.exists()

def test_symbolic_reference_rename_rejects_path_traversal(self):
with tempfile.TemporaryDirectory() as tmp_dir:
base_dir = Path(tmp_dir)
with self._repo_with_initial_commit(base_dir) as repo:
outside_path = base_dir / "outside_move.txt"
ref = SymbolicReference.create(repo, "SAFE_RENAME_SOURCE", "HEAD")

self.assertRaises(ValueError, ref.rename, "../../outside_move.txt")
assert not outside_path.exists()
assert Path(ref.abspath).is_file()

def test_symbolic_reference_delete_rejects_path_traversal(self):
with tempfile.TemporaryDirectory() as tmp_dir:
base_dir = Path(tmp_dir)
with self._repo_with_initial_commit(base_dir) as repo:
outside_path = base_dir / "outside_delete.txt"
outside_path.write_text("do not delete\n", encoding="utf-8")

self.assertRaises(ValueError, SymbolicReference.delete, repo, "../../outside_delete.txt")
assert outside_path.read_text(encoding="utf-8") == "do not delete\n"

def test_symbolic_reference_log_append_rejects_path_traversal(self):
with tempfile.TemporaryDirectory() as tmp_dir:
base_dir = Path(tmp_dir)
with self._repo_with_initial_commit(base_dir) as repo:
outside_path = base_dir / "outside_reflog.txt"

ref = SymbolicReference(repo, "../../../outside_reflog.txt")
self.assertRaises(
ValueError, ref.log_append, Commit.NULL_BIN_SHA, "do not write", repo.head.commit.binsha
)
assert not outside_path.exists()

def test_symbolic_reference_set_reference_rejects_symlink_escape(self):
with tempfile.TemporaryDirectory() as tmp_dir:
base_dir = Path(tmp_dir)
with self._repo_with_initial_commit(base_dir) as repo:
outside_dir = base_dir / "outside_refs"
outside_dir.mkdir()
outside_path = outside_dir / "escaped"

refs_heads_dir = Path(repo.common_dir) / "refs" / "heads"
refs_heads_dir.mkdir(parents=True, exist_ok=True)
symlink_path = refs_heads_dir / "link_out"
try:
symlink_path.symlink_to(outside_dir, target_is_directory=True)
except (OSError, NotImplementedError) as ex:
self.skipTest("symlinks unavailable on this platform: %s" % ex)
if osp.realpath(symlink_path / "escaped") == osp.abspath(symlink_path / "escaped"):
self.skipTest("realpath does not resolve directory symlinks on this platform")
Comment thread
Byron marked this conversation as resolved.

ref = SymbolicReference(repo, "refs/heads/link_out/escaped")
self.assertRaises(ValueError, ref.set_reference, "HEAD")
assert not outside_path.exists()

def test_remote_reference_delete_cleanup_rejects_path_traversal(self):
with tempfile.TemporaryDirectory() as tmp_dir:
base_dir = Path(tmp_dir)
git_dir = base_dir / "repo" / ".git"
git_dir.mkdir(parents=True)
outside_path = base_dir / "outside_remote_delete.txt"
outside_path.write_text("do not delete\n", encoding="utf-8")

class GitStub:
branch_called = False

def branch(self, *args):
self.branch_called = True

class RepoStub:
pass

repo = RepoStub()
repo.git = GitStub()
repo.common_dir = str(git_dir)
repo.git_dir = str(git_dir)
ref = RemoteReference(repo, "../../outside_remote_delete.txt", check_path=False)

self.assertRaises(ValueError, RemoteReference.delete, repo, ref)
assert not repo.git.branch_called
assert outside_path.read_text(encoding="utf-8") == "do not delete\n"

def test_validity_ref_names(self):
"""Ensure ref names are checked for validity.

Expand Down
Loading