diff --git a/CHANGELOG.md b/CHANGELOG.md index 075fa7562..dc9d4516f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `apm install` now deploys `.instructions.md` files to `.claude/rules/*.md` for Claude Code, converting `applyTo:` frontmatter to Claude's `paths:` format (#516) +### Changed + +- Artifactory virtual file downloads now use the Archive Entry Download API to fetch individual files without downloading the full archive; falls back to full-archive extraction when the entry API is unavailable (#525) + ## [0.8.9] - 2026-03-31 ### Fixed diff --git a/src/apm_cli/deps/artifactory_entry.py b/src/apm_cli/deps/artifactory_entry.py new file mode 100644 index 000000000..76c468e6e --- /dev/null +++ b/src/apm_cli/deps/artifactory_entry.py @@ -0,0 +1,183 @@ +"""Artifactory registry backend -- Archive Entry Download. + +JFrog Artifactory supports downloading individual entries from an archived +artifact without fetching the entire archive. The URL pattern appends +``!/{path}`` to the archive URL:: + + GET {archive_url}!/{root_prefix}/{file_path} + +Both GitHub and GitLab archives use a root directory prefix of +``{repo}-{ref}/``, though hosting platforms may normalize the ref +(e.g. ``feature/foo`` becomes ``feature-foo`` in the directory name). +This module tries both the raw and normalized forms. + +:class:`ArtifactoryRegistryClient` implements the :class:`RegistryClient` +protocol defined in :mod:`~apm_cli.deps.registry_proxy` so the download +pipeline can fetch files without knowing which registry type is in use. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Callable, List, Optional +from urllib.parse import quote + +import requests as _requests + +if TYPE_CHECKING: + from .registry_proxy import RegistryConfig + +logger = logging.getLogger(__name__) + + +class ArtifactoryRegistryClient: + """Artifactory backend for the :class:`RegistryClient` protocol. + + Constructed via :meth:`RegistryConfig.get_client`; callers interact + with the :class:`RegistryClient` protocol, not this class directly. + """ + + def __init__(self, config: "RegistryConfig") -> None: + self._config = config + + # -- RegistryClient protocol --------------------------------------------- + + def fetch_file( + self, + owner: str, + repo: str, + file_path: str, + ref: str = "main", + resilient_get: Optional[Callable] = None, + ) -> Optional[bytes]: + """Fetch a single file via the Archive Entry Download API. + + Tries each candidate archive URL (GitHub heads, GitLab, GitHub + tags) with both raw and normalized root prefixes. Returns raw + file bytes on success, or ``None`` when the entry API is not + supported or the file is not found -- the caller should fall + back to downloading the full archive. + """ + return _fetch_entry( + host=self._config.host, + prefix=self._config.prefix, + owner=owner, + repo=repo, + file_path=file_path, + ref=ref, + scheme=self._config.scheme, + headers=self._config.get_headers(), + resilient_get=resilient_get, + ) + + +# --------------------------------------------------------------------------- +# Standalone helper (for callers without a RegistryConfig) +# --------------------------------------------------------------------------- + + +def fetch_entry_from_archive( + host: str, + prefix: str, + owner: str, + repo: str, + file_path: str, + ref: str = "main", + scheme: str = "https", + headers: Optional[dict] = None, + resilient_get: Optional[Callable] = None, +) -> Optional[bytes]: + """Fetch a single file from an Artifactory-proxied archive. + + Convenience wrapper around the core entry-download logic for callers + that do not have a :class:`RegistryConfig` instance (e.g. the + marketplace client in #506). + + Returns raw file bytes on success, or ``None`` on failure. + """ + return _fetch_entry( + host=host, + prefix=prefix, + owner=owner, + repo=repo, + file_path=file_path, + ref=ref, + scheme=scheme, + headers=headers, + resilient_get=resilient_get, + ) + + +# --------------------------------------------------------------------------- +# Core implementation (shared by class and standalone function) +# --------------------------------------------------------------------------- + + +def _fetch_entry( + host: str, + prefix: str, + owner: str, + repo: str, + file_path: str, + ref: str, + scheme: str, + headers: Optional[dict], + resilient_get: Optional[Callable], +) -> Optional[bytes]: + """Core entry-download logic shared by the class and standalone helper.""" + from ..utils.github_host import build_artifactory_archive_url + from ..utils.path_security import PathTraversalError, validate_path_segments + + # Guard: reject traversal sequences via the centralized path validator + try: + validate_path_segments( + file_path, + context="artifactory archive entry path", + reject_empty=True, + ) + except PathTraversalError: + logger.debug("Refusing invalid file_path: %s", file_path) + return None + + archive_urls = build_artifactory_archive_url( + host, prefix, owner, repo, ref, scheme=scheme, + ) + + # Root directory inside the archive is typically "{repo}-{ref}", but + # hosting platforms may normalize refs (e.g. "feature/foo" -> "feature-foo"). + root_prefixes: List[str] = [f"{repo}-{ref}"] + normalized_ref = ref.replace("/", "-") + if normalized_ref != ref: + normalized_root = f"{repo}-{normalized_ref}" + if normalized_root not in root_prefixes: + root_prefixes.append(normalized_root) + + req_headers = headers or {} + + for archive_url in archive_urls: + for root_prefix in root_prefixes: + # URL-encode the entry path (spaces, special chars) but keep '/' as-is + encoded_path = quote(f"{root_prefix}/{file_path}", safe="/") + entry_url = f"{archive_url}!/{encoded_path}" + try: + if resilient_get is not None: + resp = resilient_get(entry_url, headers=req_headers, timeout=30) + else: + resp = _requests.get( + entry_url, headers=req_headers, timeout=30, + ) + if resp.status_code == 200: + logger.debug("Archive entry download OK: %s", entry_url) + return resp.content + logger.debug( + "Archive entry download HTTP %d: %s", + resp.status_code, + entry_url, + ) + except _requests.RequestException: + logger.debug( + "Archive entry download failed: %s", entry_url, exc_info=True, + ) + continue + + return None diff --git a/src/apm_cli/deps/github_downloader.py b/src/apm_cli/deps/github_downloader.py index d75dd4e87..9776f7efb 100644 --- a/src/apm_cli/deps/github_downloader.py +++ b/src/apm_cli/deps/github_downloader.py @@ -315,7 +315,36 @@ def _download_artifactory_archive(self, host: str, prefix: str, owner: str, repo def _download_file_from_artifactory(self, host: str, prefix: str, owner: str, repo: str, file_path: str, ref: str, scheme: str = "https") -> bytes: - """Download a single file from Artifactory by fetching the full archive and extracting it.""" + """Download a single file from Artifactory. + + Tries the Archive Entry Download API first (fetches one file + without downloading the full archive). Falls back to the full + archive approach when the entry API is unavailable or returns an + error. + """ + # Fast path: use the RegistryClient interface for entry download + cfg = self.registry_config + if cfg is not None and cfg.host == host: + client = cfg.get_client() + content = client.fetch_file( + owner, repo, file_path, ref, + resilient_get=self._resilient_get, + ) + else: + # No RegistryConfig or host mismatch (explicit FQDN mode) -- + # fall back to the standalone helper. + from .artifactory_entry import fetch_entry_from_archive + + content = fetch_entry_from_archive( + host, prefix, owner, repo, file_path, ref, + scheme=scheme, + headers=self._get_artifactory_headers(), + resilient_get=self._resilient_get, + ) + if content is not None: + return content + + # Fallback: download full archive and extract the file import io import zipfile diff --git a/src/apm_cli/deps/registry_proxy.py b/src/apm_cli/deps/registry_proxy.py index 92411094f..f680df133 100644 --- a/src/apm_cli/deps/registry_proxy.py +++ b/src/apm_cli/deps/registry_proxy.py @@ -24,7 +24,7 @@ import os import warnings from dataclasses import dataclass -from typing import TYPE_CHECKING, List, Optional +from typing import TYPE_CHECKING, Callable, List, Optional, Protocol, runtime_checkable from urllib.parse import urlparse if TYPE_CHECKING: @@ -36,6 +36,32 @@ # --------------------------------------------------------------------------- +@runtime_checkable +class RegistryClient(Protocol): + """Interface for registry proxy backends. + + Each backend (Artifactory, Nexus, etc.) implements this protocol so + the download pipeline can fetch files without knowing which registry + type is in use. + """ + + def fetch_file( + self, + owner: str, + repo: str, + file_path: str, + ref: str = "main", + resilient_get: Optional[Callable] = None, + ) -> Optional[bytes]: + """Fetch a single file from the registry. + + Returns raw file bytes on success, or ``None`` when the file + cannot be fetched (caller should fall back to full-archive + download). + """ + ... + + @dataclass(frozen=True) class RegistryConfig: """Immutable registry proxy configuration parsed from environment variables. @@ -129,6 +155,17 @@ def get_headers(self) -> dict: return {"Authorization": f"Bearer {self.token}"} return {} + def get_client(self) -> "RegistryClient": + """Return a :class:`RegistryClient` for this configuration. + + Currently returns an Artifactory backend. When additional + registry types are needed, this method can inspect the URL or + a configuration hint to select the right backend. + """ + from .artifactory_entry import ArtifactoryRegistryClient + + return ArtifactoryRegistryClient(config=self) + def validate_lockfile_deps( self, locked_deps: "List[LockedDependency]" ) -> "List[LockedDependency]": diff --git a/tests/unit/test_artifactory_support.py b/tests/unit/test_artifactory_support.py index da9ac5b50..84de822c3 100644 --- a/tests/unit/test_artifactory_support.py +++ b/tests/unit/test_artifactory_support.py @@ -538,21 +538,22 @@ def _make_zip_bytes(self, root_prefix="repo-main/", files=None): return buf.getvalue() def test_extract_single_file(self): - """Extract a specific file from the archive.""" + """Extract a specific file from the archive (full-archive fallback).""" zip_bytes = self._make_zip_bytes() mock_resp = Mock() mock_resp.status_code = 200 mock_resp.content = zip_bytes - with patch.object(self.downloader, "_resilient_get", return_value=mock_resp): - content = self.downloader._download_file_from_artifactory( - "art.example.com", - "artifactory/github", - "owner", - "repo", - "apm.yml", - "main", - ) + with patch("apm_cli.deps.artifactory_entry.fetch_entry_from_archive", return_value=None): + with patch.object(self.downloader, "_resilient_get", return_value=mock_resp): + content = self.downloader._download_file_from_artifactory( + "art.example.com", + "artifactory/github", + "owner", + "repo", + "apm.yml", + "main", + ) assert b"name: test" in content @@ -563,17 +564,49 @@ def test_file_not_found(self): mock_resp.status_code = 200 mock_resp.content = zip_bytes - with patch.object(self.downloader, "_resilient_get", return_value=mock_resp): - with pytest.raises(RuntimeError, match="Failed to download file"): - self.downloader._download_file_from_artifactory( - "art.example.com", - "artifactory/github", - "owner", - "repo", - "nonexistent.txt", - "main", + with patch("apm_cli.deps.artifactory_entry.fetch_entry_from_archive", return_value=None): + with patch.object(self.downloader, "_resilient_get", return_value=mock_resp): + with pytest.raises(RuntimeError, match="Failed to download file"): + self.downloader._download_file_from_artifactory( + "art.example.com", + "artifactory/github", + "owner", + "repo", + "nonexistent.txt", + "main", + ) + + def test_entry_download_used_before_full_archive(self): + """Archive entry download is tried before the full archive.""" + expected = b"# My Prompt\nDo something useful." + + with patch("apm_cli.deps.artifactory_entry.fetch_entry_from_archive", return_value=expected) as mock_entry: + content = self.downloader._download_file_from_artifactory( + "art.example.com", "artifactory/github", + "owner", "repo", "prompts/deploy.prompt.md", "main", + ) + + assert content == expected + mock_entry.assert_called_once() + + def test_entry_download_failure_falls_back_to_full_archive(self): + """When entry download returns None, full archive is used.""" + zip_bytes = self._make_zip_bytes( + files={"prompts/deploy.prompt.md": b"# Prompt content"} + ) + mock_resp = Mock() + mock_resp.status_code = 200 + mock_resp.content = zip_bytes + + with patch("apm_cli.deps.artifactory_entry.fetch_entry_from_archive", return_value=None): + with patch.object(self.downloader, "_resilient_get", return_value=mock_resp): + content = self.downloader._download_file_from_artifactory( + "art.example.com", "artifactory/github", + "owner", "repo", "prompts/deploy.prompt.md", "main", ) + assert b"# Prompt content" in content + class TestArtifactoryResolveReference: """Test resolve_git_reference for Artifactory deps.""" @@ -1191,3 +1224,302 @@ def test_local_dep_never_flagged(self): content_hash=None, ) assert cfg.find_missing_hashes([locked]) == [] + + +# -- RegistryClient protocol and ArtifactoryRegistryClient -- + + +class TestRegistryClientProtocol: + """Test that ArtifactoryRegistryClient satisfies RegistryClient.""" + + def test_implements_protocol(self): + """ArtifactoryRegistryClient is a valid RegistryClient.""" + from apm_cli.deps.artifactory_entry import ArtifactoryRegistryClient + from apm_cli.deps.registry_proxy import RegistryClient + + assert isinstance(ArtifactoryRegistryClient, type) + assert issubclass(ArtifactoryRegistryClient, RegistryClient) + + def test_get_client_returns_registry_client(self): + """RegistryConfig.get_client() returns a RegistryClient instance.""" + from apm_cli.deps.registry_proxy import RegistryClient, RegistryConfig + + with patch.dict( + os.environ, + {"PROXY_REGISTRY_URL": "https://art.example.com/artifactory/github"}, + clear=True, + ): + cfg = RegistryConfig.from_env() + + client = cfg.get_client() + assert isinstance(client, RegistryClient) + + def test_client_fetch_file_delegates_to_entry_download(self): + """ArtifactoryRegistryClient.fetch_file uses the entry download logic.""" + from apm_cli.deps.artifactory_entry import ArtifactoryRegistryClient + from apm_cli.deps.registry_proxy import RegistryConfig + + with patch.dict( + os.environ, + {"PROXY_REGISTRY_URL": "https://art.example.com/artifactory/github"}, + clear=True, + ): + cfg = RegistryConfig.from_env() + + client = ArtifactoryRegistryClient(config=cfg) + mock_resp = Mock(status_code=200, content=b"file bytes") + mock_get = Mock(return_value=mock_resp) + + result = client.fetch_file("owner", "repo", "file.md", "main", resilient_get=mock_get) + + assert result == b"file bytes" + url = mock_get.call_args[0][0] + assert url.startswith("https://art.example.com/artifactory/github/") + assert "repo-main/file.md" in url + + +# -- Archive Entry Download: fetch individual files from zip -- + + +class TestArchiveEntryDownload: + """Test fetch_entry_from_archive() shared utility.""" + + def _mock_get(self, status_code=200, content=b"file content"): + resp = Mock() + resp.status_code = status_code + resp.content = content + return Mock(return_value=resp) + + def test_entry_download_success(self): + """Returns file content on HTTP 200.""" + from apm_cli.deps.artifactory_entry import fetch_entry_from_archive + + expected = b"# My prompt" + mock_get = self._mock_get(content=expected) + + result = fetch_entry_from_archive( + "art.example.com", "artifactory/github", + "owner", "repo", "prompts/deploy.prompt.md", "main", + headers={"Authorization": "Bearer tok"}, + resilient_get=mock_get, + ) + + assert result == expected + call_args = mock_get.call_args + url = call_args[0][0] + assert "!/" in url + assert "repo-main/prompts/deploy.prompt.md" in url + + def test_entry_download_returns_none_on_404(self): + """Returns None when all URLs return 404.""" + from apm_cli.deps.artifactory_entry import fetch_entry_from_archive + + mock_get = self._mock_get(status_code=404) + + result = fetch_entry_from_archive( + "art.example.com", "artifactory/github", + "owner", "repo", "missing.md", "main", + resilient_get=mock_get, + ) + + assert result is None + # Should have tried all 3 URL patterns + assert mock_get.call_count == 3 + + def test_entry_download_returns_none_on_connection_error(self): + """Returns None when the HTTP call raises an exception.""" + from apm_cli.deps.artifactory_entry import fetch_entry_from_archive + import requests as _requests + + mock_get = Mock(side_effect=_requests.ConnectionError("refused")) + + result = fetch_entry_from_archive( + "art.example.com", "artifactory/github", + "owner", "repo", "file.md", "main", + resilient_get=mock_get, + ) + + assert result is None + + def test_entry_download_tries_all_url_patterns(self): + """Tries GitHub heads, GitLab, and GitHub tags URLs in order.""" + from apm_cli.deps.artifactory_entry import fetch_entry_from_archive + + resp_404 = Mock(status_code=404, content=b"") + resp_200 = Mock(status_code=200, content=b"found it") + mock_get = Mock(side_effect=[resp_404, resp_404, resp_200]) + + result = fetch_entry_from_archive( + "art.example.com", "artifactory/github", + "owner", "repo", "SKILL.md", "v1.0", + resilient_get=mock_get, + ) + + assert result == b"found it" + assert mock_get.call_count == 3 + urls = [call[0][0] for call in mock_get.call_args_list] + assert "refs/heads/v1.0.zip!/repo-v1.0/SKILL.md" in urls[0] + assert "archive/v1.0/repo-v1.0.zip!/repo-v1.0/SKILL.md" in urls[1] + assert "refs/tags/v1.0.zip!/repo-v1.0/SKILL.md" in urls[2] + + def test_entry_url_encodes_special_chars(self): + """Spaces and special characters in file paths are URL-encoded.""" + from apm_cli.deps.artifactory_entry import fetch_entry_from_archive + + mock_get = self._mock_get() + + fetch_entry_from_archive( + "art.example.com", "artifactory/github", + "owner", "repo", "path with spaces/file.md", "main", + resilient_get=mock_get, + ) + + url = mock_get.call_args[0][0] + assert "path%20with%20spaces/file.md" in url + assert " " not in url.split("!/")[1] + + def test_entry_download_passes_headers(self): + """Auth headers are forwarded to the HTTP call.""" + from apm_cli.deps.artifactory_entry import fetch_entry_from_archive + + mock_get = self._mock_get() + headers = {"Authorization": "Bearer my-token"} + + fetch_entry_from_archive( + "art.example.com", "artifactory/github", + "owner", "repo", "file.md", "main", + headers=headers, + resilient_get=mock_get, + ) + + call_kwargs = mock_get.call_args + assert call_kwargs[1]["headers"] == headers + + def test_entry_download_stops_on_first_success(self): + """Stops trying URL patterns after the first 200 response.""" + from apm_cli.deps.artifactory_entry import fetch_entry_from_archive + + mock_get = self._mock_get(content=b"first hit") + + result = fetch_entry_from_archive( + "art.example.com", "artifactory/github", + "owner", "repo", "file.md", "main", + resilient_get=mock_get, + ) + + assert result == b"first hit" + assert mock_get.call_count == 1 + + def test_entry_download_rejects_path_traversal(self): + """file_path with ../ components is rejected (CWE-22).""" + from apm_cli.deps.artifactory_entry import fetch_entry_from_archive + + mock_get = self._mock_get() + + result = fetch_entry_from_archive( + "art.example.com", "artifactory/github", + "owner", "repo", "../../etc/passwd", "main", + resilient_get=mock_get, + ) + + assert result is None + mock_get.assert_not_called() + + def test_entry_download_rejects_mid_path_traversal(self): + """Traversal hidden in the middle of the path is also rejected.""" + from apm_cli.deps.artifactory_entry import fetch_entry_from_archive + + mock_get = self._mock_get() + + result = fetch_entry_from_archive( + "art.example.com", "artifactory/github", + "owner", "repo", "subdir/../../../secret", "main", + resilient_get=mock_get, + ) + + assert result is None + mock_get.assert_not_called() + + def test_entry_download_rejects_dot_segment(self): + """Single-dot path segment is also rejected by validate_path_segments.""" + from apm_cli.deps.artifactory_entry import fetch_entry_from_archive + + mock_get = self._mock_get() + + result = fetch_entry_from_archive( + "art.example.com", "artifactory/github", + "owner", "repo", "subdir/./file.md", "main", + resilient_get=mock_get, + ) + + assert result is None + mock_get.assert_not_called() + + def test_entry_download_rejects_empty_segment(self): + """Empty path segments (double slash) are rejected.""" + from apm_cli.deps.artifactory_entry import fetch_entry_from_archive + + mock_get = self._mock_get() + + result = fetch_entry_from_archive( + "art.example.com", "artifactory/github", + "owner", "repo", "subdir//file.md", "main", + resilient_get=mock_get, + ) + + assert result is None + mock_get.assert_not_called() + + def test_entry_download_with_tag_ref(self): + """Tag refs produce correct root prefix ({repo}-{tag}).""" + from apm_cli.deps.artifactory_entry import fetch_entry_from_archive + + mock_get = self._mock_get(content=b"tagged content") + + result = fetch_entry_from_archive( + "art.example.com", "artifactory/github", + "owner", "my-repo", "README.md", "v2.1.0", + resilient_get=mock_get, + ) + + assert result == b"tagged content" + url = mock_get.call_args[0][0] + assert "my-repo-v2.1.0/README.md" in url + + def test_entry_download_with_slash_ref(self): + """Branch refs with slashes try both raw and normalized root prefixes.""" + from apm_cli.deps.artifactory_entry import fetch_entry_from_archive + + # First call (raw ref "feature/foo") returns 404, + # second call (normalized "feature-foo") returns 200 + resp_404 = Mock(status_code=404, content=b"") + resp_200 = Mock(status_code=200, content=b"branch content") + mock_get = Mock(side_effect=[resp_404, resp_200]) + + result = fetch_entry_from_archive( + "art.example.com", "artifactory/github", + "owner", "repo", "file.md", "feature/foo", + resilient_get=mock_get, + ) + + assert result == b"branch content" + urls = [call[0][0] for call in mock_get.call_args_list] + # First try: raw ref in root prefix + assert "repo-feature/foo/file.md" in urls[0] + # Second try: normalized ref in root prefix + assert "repo-feature-foo/file.md" in urls[1] + + def test_entry_download_with_no_headers(self): + """Works without auth headers (public repos).""" + from apm_cli.deps.artifactory_entry import fetch_entry_from_archive + + mock_get = self._mock_get(content=b"public") + + result = fetch_entry_from_archive( + "art.example.com", "artifactory/github", + "owner", "repo", "file.md", "main", + resilient_get=mock_get, + ) + + assert result == b"public" + assert mock_get.call_args[1]["headers"] == {}