Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions rampart/payloads/_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import json
import logging
import re
import shutil
import tempfile
from pathlib import Path
Expand All @@ -34,6 +35,7 @@
from rampart.core.types import Payload, PayloadFormat

logger = logging.getLogger(__name__)
_COLLECTION_NAME_PATTERN = re.compile(r"^[A-Za-z0-9._-]{1,128}$")


class PayloadStore:
Expand Down Expand Up @@ -162,6 +164,7 @@ def load(

def exists(self, name: str) -> bool:
"""Check whether a collection exists on disk."""
self._validate_collection_name(name)
return self._collection_path(name).exists()

def list_collections(self) -> list[str]:
Expand All @@ -176,6 +179,7 @@ def list_collections(self) -> list[str]:

def delete(self, name: str) -> None:
"""Remove a collection from disk."""
self._validate_collection_name(name)
collection_dir = self._root / name
if collection_dir.exists():
shutil.rmtree(collection_dir)
Expand All @@ -193,6 +197,7 @@ def manifest(self, name: str) -> dict[str, Any]:
Raises:
FileNotFoundError: If the collection does not exist.
"""
self._validate_collection_name(name)
path = self._root / name / "manifest.json"
if not path.exists():
msg = f"No manifest for collection '{name}'"
Expand All @@ -205,8 +210,11 @@ def manifest(self, name: str) -> dict[str, Any]:
@staticmethod
def _validate_collection_name(name: str) -> None:
"""Reject names that would escape the store root."""
if not name or "/" in name or "\\" in name or name in (".", ".."):
msg = f"Invalid collection name: {name!r}. Must be a simple directory name."
if not _COLLECTION_NAME_PATTERN.fullmatch(name) or name in (".", ".."):
msg = (
f"Invalid collection name: {name!r}. Must be a filename-safe "
"identifier using letters, numbers, dots, underscores, or hyphens."
)
raise ValueError(
msg,
)
Expand Down
30 changes: 30 additions & 0 deletions tests/unit/payloads/test_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,36 @@ def test_manifest_missing_raises(self, store: PayloadStore) -> None:
with pytest.raises(FileNotFoundError, match="No manifest"):
store.manifest("ghost")

def test_exists_rejects_collection_path_traversal(
self,
store: PayloadStore,
) -> None:
with pytest.raises(ValueError, match="Invalid collection name"):
store.exists("../outside")

def test_delete_rejects_collection_path_traversal(self, tmp_path: Path) -> None:
root = tmp_path / "store"
root.mkdir()
sentinel = tmp_path / "sentinel.txt"
sentinel.write_text("do not delete")
store = PayloadStore(root=root)

with pytest.raises(ValueError, match="Invalid collection name"):
store.delete("..")

assert sentinel.exists()
assert root.exists()

def test_manifest_rejects_collection_path_traversal(self, tmp_path: Path) -> None:
root = tmp_path / "store"
root.mkdir()
outside_manifest = tmp_path / "manifest.json"
outside_manifest.write_text('{"collection": "outside"}')
store = PayloadStore(root=root)

with pytest.raises(ValueError, match="Invalid collection name"):
store.manifest("..")


class TestPayloadStorePathPayload:
def test_path_based_payload_roundtrip(
Expand Down