diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bcd14a4..b1f16fc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,9 +13,29 @@ jobs: - uses: actions/setup-python@v5 with: python-version: "3.11" + cache: pip + cache-dependency-path: pyproject.toml - name: Install dev dependencies run: python -m pip install --upgrade pip && pip install -e ".[dev]" - name: Ruff run: ruff check . - name: Mypy run: mypy + + test: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.9", "3.11", "3.12"] + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + cache: pip + cache-dependency-path: pyproject.toml + - name: Install dev dependencies + run: python -m pip install --upgrade pip && pip install -e ".[dev]" + - name: Pytest + run: pytest diff --git a/pyproject.toml b/pyproject.toml index 1a73fbb..af18763 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,7 @@ dev = [ "ruff>=0.6", "mypy>=1.10", "types-jsonschema", + "pytest>=7.0", ] [project.scripts] @@ -34,3 +35,7 @@ target-version = "py39" files = ["src"] disallow_untyped_defs = true disallow_incomplete_defs = true + +[tool.pytest.ini_options] +testpaths = ["tests"] +addopts = "-ra" diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..1851b41 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,30 @@ +"""Shared pytest fixtures. + +OPENHACK_ROOT is pinned to the repo root so ``paths.root()`` resolves +deterministically regardless of where pytest is invoked from. Modules under +test reach for ``root() / "agents" / "experts"`` and ``root() / "config"``, +so the real on-disk workspace is the simplest fixture. +""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from openhack.paths import ALL_RUN_DIRS + +REPO_ROOT = Path(__file__).resolve().parent.parent + + +@pytest.fixture(autouse=True) +def _pin_openhack_root(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("OPENHACK_ROOT", str(REPO_ROOT)) + + +@pytest.fixture() +def run_dir(tmp_path: Path) -> Path: + """A scratch run directory with the standard subdirs created.""" + for name in ALL_RUN_DIRS: + (tmp_path / name).mkdir(parents=True, exist_ok=True) + return tmp_path diff --git a/tests/test_backlog.py b/tests/test_backlog.py new file mode 100644 index 0000000..7c4f354 --- /dev/null +++ b/tests/test_backlog.py @@ -0,0 +1,431 @@ +"""Layer 2: scenario backlog validation and write-out. + +Tests prefer the public entry points (``coverage_errors``, ``record_backlog``) +over private predicates. The private helpers are still exercised — just +through the API a real caller uses. +""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +import pytest + +from openhack import backlog +from openhack.backlog import coverage_errors, record_backlog + + +# --------------------------------------------------------------------------- +# Fixture builders +# --------------------------------------------------------------------------- + + +def _scn(**overrides: Any) -> dict[str, Any]: + base: dict[str, Any] = {"id": "S001", "expert": "injection", "target_path": "app/Foo.php"} + base.update(overrides) + return base + + +def _write_coverage(path: Path, payload: dict[str, Any]) -> None: + (path / "recon-output").mkdir(parents=True, exist_ok=True) + (path / "recon-output" / "coverage-gaps.json").write_text(json.dumps(payload)) + + +def _write_units(path: Path, units: list[dict[str, Any]]) -> None: + (path / "recon-output").mkdir(parents=True, exist_ok=True) + (path / "recon-output" / "routing-units.jsonl").write_text( + "".join(json.dumps(u) + "\n" for u in units) + ) + + +# --------------------------------------------------------------------------- +# coverage_errors — exercises the scenario/decision predicates as a side effect +# --------------------------------------------------------------------------- + + +def test_path_requirement_flagged_when_no_scenario_covers_it(run_dir: Path) -> None: + _write_coverage(run_dir, {"input_with_sink_or_exposure": [{"path": "app/Untouched.php"}]}) + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=[]) + assert any("missing path coverage for app/Untouched.php" in e for e in errors) + + +def test_path_requirement_satisfied_by_target_path(run_dir: Path) -> None: + _write_coverage(run_dir, {"input_with_sink_or_exposure": [{"path": "app/Foo.php"}]}) + errors = coverage_errors(run_dir, scenarios=[_scn()], coverage_decisions=[]) + assert not any("missing path coverage" in e for e in errors) + + +def test_path_requirement_satisfied_by_related_paths(run_dir: Path) -> None: + """``_scenario_paths`` must consider ``related_paths`` as well as ``target_path``.""" + _write_coverage(run_dir, {"input_with_sink_or_exposure": [{"path": "app/Bar.php"}]}) + scn = _scn(related_paths=["app/Bar.php"]) + errors = coverage_errors(run_dir, scenarios=[scn], coverage_decisions=[]) + assert not any("missing path coverage" in e for e in errors) + + +def test_path_requirement_satisfied_by_covered_paths_list(run_dir: Path) -> None: + _write_coverage(run_dir, {"input_with_sink_or_exposure": [{"path": "app/Inc.php"}]}) + scn = _scn(covered_paths=["app/Inc.php"]) + errors = coverage_errors(run_dir, scenarios=[scn], coverage_decisions=[]) + assert not any("missing path coverage" in e for e in errors) + + +def test_path_requirement_satisfied_by_path_level_decision(run_dir: Path) -> None: + _write_coverage(run_dir, {"input_with_sink_or_exposure": [{"path": "app/Untouched.php"}]}) + decisions = [{ + "path": "app/Untouched.php", "expert": "*", + "decision": "not_applicable", "reason": "framework-owned, not invocable by users", + }] + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=decisions) + assert not any("missing path coverage" in e for e in errors) + + +def test_pair_requirement_flagged_when_expert_mismatches(run_dir: Path) -> None: + _write_coverage(run_dir, { + "routing_requirements": [{"path": "app/Foo.php", "expert": "injection"}], + }) + # Scenario covers the path but with a different expert. + scn = _scn(expert="cryptographic-failures") + errors = coverage_errors(run_dir, scenarios=[scn], coverage_decisions=[]) + assert any("missing expert coverage for app/Foo.php -> injection" in e for e in errors) + + +def test_pair_requirement_satisfied_by_matching_scenario(run_dir: Path) -> None: + _write_coverage(run_dir, { + "routing_requirements": [{"path": "app/Foo.php", "expert": "injection"}], + }) + errors = coverage_errors(run_dir, scenarios=[_scn()], coverage_decisions=[]) + assert not any("missing expert coverage" in e for e in errors) + + +def test_routing_unit_satisfied_by_scenario_with_unit_id(run_dir: Path) -> None: + _write_units(run_dir, [{ + "unit_id": "U001", + "path": "app/Foo.php", + "coverage": "mandatory", + "required_experts": ["injection"], + }]) + scn = _scn(routing_unit_id="U001") + errors = coverage_errors(run_dir, scenarios=[scn], coverage_decisions=[]) + assert not any("missing routing-unit" in e for e in errors) + + +def test_routing_unit_satisfied_by_covered_routing_unit_ids(run_dir: Path) -> None: + """A scenario can claim coverage over a unit it isn't the primary owner of.""" + _write_units(run_dir, [{ + "unit_id": "U002", + "path": "app/Foo.php", + "coverage": "mandatory", + "required_experts": ["injection"], + }]) + scn = _scn(routing_unit_id="U001", covered_routing_unit_ids=["U002"]) + errors = coverage_errors(run_dir, scenarios=[scn], coverage_decisions=[]) + assert not any("missing routing-unit" in e for e in errors) + + +def test_routing_unit_flagged_when_no_scenario_or_decision(run_dir: Path) -> None: + _write_units(run_dir, [{ + "unit_id": "U001", + "path": "app/Foo.php", + "coverage": "mandatory", + "required_experts": ["injection"], + }]) + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=[]) + assert any("missing routing-unit expert coverage for U001" in e for e in errors) + + +# --------------------------------------------------------------------------- +# Boundary-requirement coverage (item 4 in review) +# --------------------------------------------------------------------------- + + +def _boundary_req(**overrides: Any) -> dict[str, Any]: + base: dict[str, Any] = { + "path": "app/Api.php", + "expert": "injection", + "boundary_id": "B1", + "endpoint": "/api/run", + } + base.update(overrides) + return base + + +def test_boundary_requirement_flagged_without_scenario_or_decision(run_dir: Path) -> None: + _write_coverage(run_dir, {"boundary_requirements": [_boundary_req()]}) + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=[]) + assert any( + "missing request-boundary coverage for app/Api.php -> injection -> /api/run" in e + for e in errors + ) + + +def test_boundary_requirement_satisfied_by_scenario_with_boundary_id(run_dir: Path) -> None: + _write_coverage(run_dir, {"boundary_requirements": [_boundary_req()]}) + scn = _scn(boundary_id="B1") + errors = coverage_errors(run_dir, scenarios=[scn], coverage_decisions=[]) + assert not any("missing request-boundary" in e for e in errors) + + +def test_boundary_requirement_satisfied_by_covered_boundary_ids_list(run_dir: Path) -> None: + _write_coverage(run_dir, {"boundary_requirements": [_boundary_req()]}) + scn = _scn(covered_boundary_ids=["B1", "B2"]) + errors = coverage_errors(run_dir, scenarios=[scn], coverage_decisions=[]) + assert not any("missing request-boundary" in e for e in errors) + + +def test_boundary_requirement_satisfied_by_recon_item_id_fallback(run_dir: Path) -> None: + """Boundary req without scenario boundary_id can be matched by recon_item_id.""" + _write_coverage(run_dir, { + "boundary_requirements": [_boundary_req(recon_item_id="R1")], + }) + scn = _scn(recon_item_id="R1") + errors = coverage_errors(run_dir, scenarios=[scn], coverage_decisions=[]) + assert not any("missing request-boundary" in e for e in errors) + + +def test_boundary_requirement_satisfied_by_boundary_id_decision(run_dir: Path) -> None: + _write_coverage(run_dir, {"boundary_requirements": [_boundary_req()]}) + decisions = [{ + "path": "app/Api.php", "expert": "injection", "boundary_id": "B1", + "decision": "not_applicable", "reason": "internal admin endpoint behind VPN", + }] + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=decisions) + assert not any("missing request-boundary" in e for e in errors) + + +def test_boundary_decision_without_boundary_id_does_not_satisfy(run_dir: Path) -> None: + """``_has_boundary_decision`` requires the boundary_id to match exactly.""" + _write_coverage(run_dir, {"boundary_requirements": [_boundary_req()]}) + decisions = [{ + "path": "app/Api.php", "expert": "injection", + "decision": "not_applicable", "reason": "internal admin endpoint behind VPN", + }] + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=decisions) + assert any("missing request-boundary" in e for e in errors) + + +# --------------------------------------------------------------------------- +# Decision validation (exercised through coverage_errors) +# --------------------------------------------------------------------------- + + +def test_decision_with_unknown_value_is_flagged(run_dir: Path) -> None: + decisions = [{"path": "a.php", "expert": "injection", "decision": "wat", "reason": "x" * 25}] + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=decisions) + assert any("invalid decision" in e for e in errors) + + +def test_decision_missing_path_is_flagged(run_dir: Path) -> None: + decisions = [{"decision": "not_applicable", "reason": "x" * 25}] + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=decisions) + assert any("missing path" in e for e in errors) + + +@pytest.mark.parametrize("decision_value", ["covered_by_scenario", "merged", "scenario"]) +def test_coverage_claim_decision_requires_scenario_ids( + run_dir: Path, decision_value: str +) -> None: + decisions = [{"path": "a.php", "expert": "injection", "decision": decision_value}] + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=decisions) + assert any("must reference scenario_ids" in e for e in errors) + + +def test_decision_referencing_unknown_scenario_is_flagged(run_dir: Path) -> None: + decisions = [{ + "path": "a.php", "expert": "injection", + "decision": "covered_by_scenario", "scenario_ids": ["S999"], + }] + errors = coverage_errors(run_dir, scenarios=[_scn()], coverage_decisions=decisions) + assert any("references unknown" in e and "S999" in e for e in errors) + + +def test_dismissal_decision_requires_substantive_reason(run_dir: Path) -> None: + decisions = [{ + "path": "a.php", "expert": "injection", "decision": "not_applicable", "reason": "no" + }] + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=decisions) + assert any("needs a concrete reason" in e for e in errors) + + +def test_decision_with_wildcard_expert_is_accepted(run_dir: Path) -> None: + decisions = [{ + "path": "a.php", "expert": "*", + "decision": "not_applicable", "reason": "x" * 25, + }] + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=decisions) + assert not any("invalid decision" in e or "unknown expert" in e for e in errors) + + +def test_decision_with_unknown_expert_is_flagged(run_dir: Path) -> None: + decisions = [{ + "path": "a.php", "expert": "made-up-expert", + "decision": "not_applicable", "reason": "x" * 25, + }] + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=decisions) + assert any("unknown expert" in e for e in errors) + + +# --------------------------------------------------------------------------- +# record_backlog — full pipeline including emit() log + scope check +# --------------------------------------------------------------------------- + + +def _valid_scenario(scn_id: str = "S001", **overrides: Any) -> dict[str, Any]: + base: dict[str, Any] = { + "id": scn_id, + "recon_item_id": "R001", + "expert": "injection", + "target_path": "app/Foo.php", + "proof_question": "Is user input concatenated into a raw SQL query?", + "evidence_required": ["sink call", "lack of binding"], + "security_invariant": "Database queries must use parameter binding.", + "proof_obligations": [ + {"id": "p1", "question": "Is the sink raw?", "evidence_required": "snippet"} + ], + } + base.update(overrides) + return base + + +@pytest.fixture() +def patched_run_dir(run_dir: Path, monkeypatch: pytest.MonkeyPatch) -> Path: + """Redirect ``run_path`` so ``record_backlog`` writes into tmp.""" + monkeypatch.setattr(backlog, "run_path", lambda target, run_id: run_dir) + return run_dir + + +def _router_output(scenarios: list[dict[str, Any]], **extras: Any) -> dict[str, Any]: + payload = {"scenarios": scenarios, "coverage_decisions": [], "coverage_notes": []} + payload.update(extras) + return payload + + +def test_record_backlog_writes_scenario_files_on_happy_path( + patched_run_dir: Path, tmp_path: Path +) -> None: + router = tmp_path / "router.json" + router.write_text(json.dumps(_router_output([_valid_scenario()]))) + + result = record_backlog("acme", "demo", router) + assert [s["id"] for s in result] == ["S001"] + + written = patched_run_dir / "scenarios" / "backlog" / "S001.json" + payload = json.loads(written.read_text()) + assert payload["priority"] == "normal" # DEFAULTS layered in + assert payload["result_location"] == "scenarios/finished/S001.json" + + index = patched_run_dir / "scenarios" / "index.jsonl" + assert index.read_text().strip().count("\n") == 0 # one line, no trailing extras + + +def test_record_backlog_emits_audit_event( + patched_run_dir: Path, tmp_path: Path +) -> None: + """The recorder must log a ``scenario-router/complete`` event for auditing.""" + router = tmp_path / "router.json" + router.write_text(json.dumps(_router_output([_valid_scenario()]))) + record_backlog("acme", "demo", router) + + events = patched_run_dir / "logs" / "events.jsonl" + assert events.is_file() + lines = [json.loads(line) for line in events.read_text().splitlines() if line.strip()] + assert any( + e.get("actor") == "scenario-router" and e.get("status") == "complete" + for e in lines + ) + + +def test_record_backlog_rejects_scenario_using_unselected_expert( + patched_run_dir: Path, tmp_path: Path +) -> None: + """A run-config that restricts experts must block out-of-scope scenarios.""" + (patched_run_dir / "run-config.yaml").write_text( + 'expert_scope:\n mode: "selected"\n experts:\n - "injection"\n' + ) + scn = _valid_scenario(expert="broken-access-control") + router = tmp_path / "router.json" + router.write_text(json.dumps(_router_output([scn]))) + with pytest.raises(ValueError, match="uses unselected expert"): + record_backlog("acme", "demo", router) + + +def test_record_backlog_accepts_scenario_with_selected_expert( + patched_run_dir: Path, tmp_path: Path +) -> None: + (patched_run_dir / "run-config.yaml").write_text( + 'expert_scope:\n mode: "selected"\n experts:\n - "injection"\n' + ) + router = tmp_path / "router.json" + router.write_text(json.dumps(_router_output([_valid_scenario()]))) + result = record_backlog("acme", "demo", router) + assert [s["id"] for s in result] == ["S001"] + + +def test_record_backlog_rejects_unknown_expert( + patched_run_dir: Path, tmp_path: Path +) -> None: + router = tmp_path / "router.json" + router.write_text(json.dumps(_router_output([_valid_scenario(expert="made-up-expert")]))) + with pytest.raises(ValueError, match="Unknown expert"): + record_backlog("acme", "demo", router) + + +def test_record_backlog_rejects_duplicate_scenario_id( + patched_run_dir: Path, tmp_path: Path +) -> None: + router = tmp_path / "router.json" + router.write_text(json.dumps(_router_output([ + _valid_scenario("S001"), + _valid_scenario("S001", target_path="app/Bar.php"), + ]))) + with pytest.raises(ValueError, match="Duplicate scenario id"): + record_backlog("acme", "demo", router) + + +def test_record_backlog_rejects_duplicate_proof_obligation_id( + patched_run_dir: Path, tmp_path: Path +) -> None: + scn = _valid_scenario() + scn["proof_obligations"] = [ + {"id": "p1", "question": "Q1", "evidence_required": "e"}, + {"id": "p1", "question": "Q2", "evidence_required": "e"}, + ] + router = tmp_path / "router.json" + router.write_text(json.dumps(_router_output([scn]))) + with pytest.raises(ValueError, match="duplicate proof obligation"): + record_backlog("acme", "demo", router) + + +def test_record_backlog_rejects_missing_required_field( + patched_run_dir: Path, tmp_path: Path +) -> None: + scn = _valid_scenario() + scn.pop("security_invariant") + router = tmp_path / "router.json" + router.write_text(json.dumps(_router_output([scn]))) + with pytest.raises(ValueError, match="missing: \\['security_invariant'\\]"): + record_backlog("acme", "demo", router) + + +def test_record_backlog_surfaces_schema_failure( + patched_run_dir: Path, tmp_path: Path +) -> None: + scn = _valid_scenario(id="invalid-id-format") + router = tmp_path / "router.json" + router.write_text(json.dumps(_router_output([scn]))) + with pytest.raises(ValueError, match="scenario-schema.json"): + record_backlog("acme", "demo", router) + + +def test_record_backlog_surfaces_coverage_gap( + patched_run_dir: Path, tmp_path: Path +) -> None: + _write_coverage(patched_run_dir, { + "routing_requirements": [{"path": "app/Unrelated.php", "expert": "injection"}], + }) + router = tmp_path / "router.json" + router.write_text(json.dumps(_router_output([_valid_scenario()]))) + with pytest.raises(ValueError, match="does not cover recon evidence"): + record_backlog("acme", "demo", router) diff --git a/tests/test_coverage.py b/tests/test_coverage.py new file mode 100644 index 0000000..f18fbe6 --- /dev/null +++ b/tests/test_coverage.py @@ -0,0 +1,285 @@ +"""Layer 2: coverage scoring and routing requirement generation. + +``coverage.py`` is the biggest single module (608 LOC) and decides which +``(path, expert)`` pairs become mandatory scenarios. A miss here surfaces +as silently dropped attack surface, so these tests pin down the decision +table branch-by-branch. +""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +import pytest + +from openhack.coverage import ( + _path_class, + _score_pair, + _tokens, + coverage_opportunities, + coverage_suggestions, + routing_requirements, + write_coverage, +) + + +# --------------------------------------------------------------------------- +# Path classification +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "path,expected", + [ + (".ddev/config.yaml", "dev"), + (".devcontainer/Dockerfile", "dev"), + ("app/tests/FooTest.php", "test"), + ("src/__fixtures__/sample.json", "test"), + ("public/assets/libraries/jquery.js", "asset"), + (".github/workflows/ci.yml", "ci"), + ("path/to/.github/workflows/x.yml", "ci"), + ("package.json", "manifest"), + ("composer.lock", "manifest"), + ("requirements.txt", "manifest"), + ("docs/intro.md", "docs"), + ("README.md", "docs"), + ("notes.rst", "docs"), + ("src/translations/en.yml", "fixture"), + ("public/assets/js/app.js", "client"), + ("src/foo.js", "client"), + ("public/assets/logo.png", "asset"), + ("public/icon.svg", "asset"), + ("templates/home.twig", "template"), + ("config/services.yml", "config"), + ("settings.xml", "config"), + ("bin/run", "script"), + ("scripts/deploy.sh", "script"), + ("app/Http/Controller.php", "runtime"), + ("app/bundles/foo/Service.php", "runtime"), + ("plugins/extra/handler.php", "runtime"), + ("README", "other"), + ], +) +def test_path_class(path: str, expected: str) -> None: + assert _path_class(path) == expected + + +# --------------------------------------------------------------------------- +# Tokenizer +# --------------------------------------------------------------------------- + + +def test_tokens_drops_short_and_stopwords() -> None: + out = _tokens("the AND a URL path data 12 abc_def query") + assert "the" not in out and "and" not in out + assert "url" not in out # in STOPWORDS + assert "path" not in out # in STOPWORDS + assert "abc_def" in out # underscores preserved + assert "query" in out + assert "12" not in out # below length-3 cutoff + + +def test_tokens_splits_on_non_alphanumeric() -> None: + assert _tokens("Foo-Bar.baz/Qux") == {"foo", "bar", "baz", "qux"} + + +# --------------------------------------------------------------------------- +# Pair scoring +# --------------------------------------------------------------------------- + + +def _pair(**overrides: Any) -> dict[str, Any]: + base: dict[str, Any] = { + "expert": "injection", + "path": "app/Foo.php", + "reason": "test", + "matched_terms": [], + "signals": [], + "kinds": [], + "evidence": [], + "interesting": False, + "path_class": "runtime", + } + base.update(overrides) + return base + + +def test_score_boundary_mandatory_always_high() -> None: + pair = _pair(boundary_mandatory=True, strong_terms=["endpoint"]) + confidence, strong, _ = _score_pair(pair) + assert confidence == "high" + assert strong == ["endpoint"] + + +def test_score_supply_chain_on_manifest_is_high() -> None: + pair = _pair(expert="software-supply-chain-failures", path="package.json") + confidence, _, reason = _score_pair(pair) + assert confidence == "high" + assert "supply-chain" in reason or "Dependency" in reason + + +def test_score_non_productive_path_class_is_low() -> None: + pair = _pair(path="public/assets/logo.png") + confidence, _, reason = _score_pair(pair) + assert confidence == "low" + assert "not a runtime attack surface" in reason + + +def test_score_runtime_without_strong_terms_is_low() -> None: + pair = _pair(path="app/Generic.php") + confidence, _, reason = _score_pair(pair) + assert confidence == "low" + assert "generic" in reason.lower() + + +def test_score_runtime_with_strong_terms_but_no_sink_is_suggestion() -> None: + pair = _pair(path="app/query/Builder.php", interesting=False) + confidence, strong, _ = _score_pair(pair) + assert confidence == "suggestion" + assert "query" in strong + + +def test_score_runtime_with_strong_terms_and_sink_is_high() -> None: + pair = _pair(path="app/query/Builder.php", interesting=True) + confidence, strong, reason = _score_pair(pair) + assert confidence == "high" + assert "query" in strong + assert "source, sink" in reason or "boundary evidence" in reason + + +# --------------------------------------------------------------------------- +# End-to-end: candidate pair generation from inventory +# --------------------------------------------------------------------------- + + +def _inv_row(kind: str, path: str, **extra: Any) -> dict[str, Any]: + row: dict[str, Any] = { + "kind": kind, + "path": path, + "line": 1, + "match": [], + "text": "", + } + row.update(extra) + return row + + +SELECTED = ["injection", "software-supply-chain-failures"] + + +def test_routing_requirements_yields_high_confidence_pairs_only() -> None: + inventory: dict[str, list[dict[str, Any]]] = { + "inputs": [_inv_row("inputs", "app/QueryHandler.php", match=["query"])], + "sinks": [_inv_row("sinks", "app/QueryHandler.php", match=["raw"])], + } + reqs = routing_requirements(inventory, recon_items=None, selected_experts=SELECTED) + assert reqs, "expected at least one high-confidence requirement" + for req in reqs: + assert req["confidence"] == "high" + # Public pairs have the private 'interesting' flag stripped. + assert "interesting" not in req + assert req["requirement"].startswith("Create a scenario") + + +def test_routing_requirements_skips_non_productive_paths() -> None: + inventory: dict[str, list[dict[str, Any]]] = { + "inputs": [_inv_row("inputs", "tests/QueryTest.php", match=["query"])], + "sinks": [_inv_row("sinks", "tests/QueryTest.php", match=["raw"])], + } + reqs = routing_requirements(inventory, recon_items=None, selected_experts=SELECTED) + assert reqs == [] + + +def test_routing_requirements_promotes_supply_chain_for_manifest() -> None: + inventory: dict[str, list[dict[str, Any]]] = { + "inputs": [_inv_row("inputs", "package.json", match=["dependency"])], + } + reqs = routing_requirements( + inventory, recon_items=None, selected_experts=["software-supply-chain-failures"] + ) + paths = {req["path"] for req in reqs} + assert "package.json" in paths + + +def test_coverage_opportunities_groups_by_expert() -> None: + inventory: dict[str, list[dict[str, Any]]] = { + "inputs": [ + _inv_row("inputs", "app/QueryHandler.php", match=["query"]), + _inv_row("inputs", "app/ShellRunner.php", match=["shell", "exec"]), + ], + "sinks": [ + _inv_row("sinks", "app/QueryHandler.php", match=["raw"]), + _inv_row("sinks", "app/ShellRunner.php", match=["exec"]), + ], + } + opps = coverage_opportunities( + inventory, recon_items=None, selected_experts=["injection"] + ) + assert len(opps) == 1 + [opp] = opps + assert opp["expert"] == "injection" + assert opp["candidate_paths"] >= 2 + paths = {ex["path"] for ex in opp["examples"]} + assert {"app/QueryHandler.php", "app/ShellRunner.php"} <= paths + + +def test_coverage_suggestions_skip_required_pairs() -> None: + """Items already represented in ``required_keys`` must not double-count.""" + inventory: dict[str, list[dict[str, Any]]] = { + "inputs": [_inv_row("inputs", "app/QueryHandler.php", match=["query"])], + "sinks": [_inv_row("sinks", "app/QueryHandler.php", match=["raw"])], + } + required = {("app/QueryHandler.php", "injection")} + sugs = coverage_suggestions( + inventory, + recon_items=None, + required_keys=required, + selected_experts=["injection"], + ) + assert all(s["path"] != "app/QueryHandler.php" for s in sugs) + + +# --------------------------------------------------------------------------- +# write_coverage — disk-side entry point called from the CLI +# --------------------------------------------------------------------------- + + +def test_write_coverage_emits_coverage_gaps_json(run_dir: Path) -> None: + inventory: dict[str, list[dict[str, Any]]] = { + "inputs": [_inv_row("inputs", "app/QueryHandler.php", match=["query"])], + "sinks": [_inv_row("sinks", "app/QueryHandler.php", match=["raw"])], + } + out = write_coverage(run_dir, inventory, recon_items=None) + + assert out == run_dir / "recon-output" / "coverage-gaps.json" + payload = json.loads(out.read_text()) + + # The five sections the rest of the pipeline consumes. + for key in ( + "input_with_sink_or_exposure", + "request_boundaries", + "boundary_requirements", + "expert_opportunities", + "routing_requirements", + "coverage_suggestions", + "triage_summary", + ): + assert key in payload, f"missing top-level key: {key}" + + summary = payload["triage_summary"] + assert summary["hard_routing_requirements"] == len(payload["routing_requirements"]) + assert summary["expert_scope"] == "unconfigured-all" + # No run-config.yaml → all 12 expert IDs end up in the scope. + assert len(summary["selected_experts"]) == 12 + + +def test_write_coverage_honours_run_config_expert_scope(run_dir: Path) -> None: + (run_dir / "run-config.yaml").write_text( + 'expert_scope:\n mode: "selected"\n experts:\n - "injection"\n' + ) + out = write_coverage(run_dir, inventory={"inputs": []}, recon_items=None) + summary = json.loads(out.read_text())["triage_summary"] + assert summary["expert_scope"] == "selected" + assert summary["selected_experts"] == ["injection"] diff --git a/tests/test_paths.py b/tests/test_paths.py new file mode 100644 index 0000000..80f7452 --- /dev/null +++ b/tests/test_paths.py @@ -0,0 +1,54 @@ +"""Layer 2: path resolution and run-directory scaffolding.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from openhack.paths import ALL_RUN_DIRS, ensure_run_dirs, root, run_path + + +def test_root_resolves_via_openhack_root_env(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("OPENHACK_ROOT", str(Path(__file__).resolve().parent.parent)) + assert (root() / "agents" / "experts").is_dir() + + +def test_root_raises_when_env_var_points_at_non_workspace( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.setenv("OPENHACK_ROOT", str(tmp_path)) + with pytest.raises(RuntimeError, match="OPENHACK_ROOT is not a valid workspace root"): + root() + + +def test_root_falls_back_to_walk_up_when_env_unset( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """With no env var set, root() walks up from CWD / module location. + + The package is installed editable from this repo, so the module-location + walk-up will land on the real workspace even when CWD is unrelated. + """ + monkeypatch.delenv("OPENHACK_ROOT", raising=False) + monkeypatch.chdir(tmp_path) + found = root() + assert (found / "agents" / "experts").is_dir() + assert (found / "templates" / "scenario-prompt.md").is_file() + + +def test_run_path_is_under_root() -> None: + path = run_path("acme/widget", "2026-05-20-demo") + assert path == root() / "runs" / "acme/widget" / "2026-05-20-demo" + + +def test_ensure_run_dirs_creates_every_standard_subdir( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """``ensure_run_dirs`` materializes the full layout idempotently.""" + monkeypatch.setattr("openhack.paths.run_path", lambda target, run_id: tmp_path / target / run_id) + created = ensure_run_dirs("acme/widget", "demo") + for name in ALL_RUN_DIRS: + assert (created / name).is_dir() + # Idempotent: a second call must not raise. + ensure_run_dirs("acme/widget", "demo") diff --git a/tests/test_routing_units.py b/tests/test_routing_units.py new file mode 100644 index 0000000..e0e6064 --- /dev/null +++ b/tests/test_routing_units.py @@ -0,0 +1,268 @@ +"""Layer 2: routing-unit clustering.""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +import pytest + +from openhack.routing_units import ( + KIND_TERMS, + MAX_EVIDENCE_ROWS, + _candidate_kinds, + _compact_row, + _dedupe_rows, + _kind_for_terms, + _row_kind, + build_routing_units, + write_routing_units, +) + + +@pytest.mark.parametrize( + "text,expected", + [ + ({"exec", "shell"}, "command_execution_sink"), + ({"query", "raw"}, "database_query_sink"), + ({"innerhtml"}, "html_template_dom_sink"), + ({"upload", "filename"}, "file_upload_download_storage"), + ({"webhook", "fetch"}, "outbound_fetch_boundary"), + ({"session", "role"}, "identity_state_access_control"), + ({"secret"}, "secret_debug_exposure"), + ({"yaml", "deserialize"}, "parser_deserialization_integrity"), + ({"jwt", "crypto"}, "cryptographic_secret_token"), + ({"queue", "limit"}, "resource_consumption"), + ({"manifest", "lockfile"}, "supply_chain_manifest"), + (set(), "configuration_or_static_surface"), + ({"unrelated"}, "configuration_or_static_surface"), + ], +) +def test_kind_for_terms(text: set[str], expected: str) -> None: + assert _kind_for_terms(text) == expected + + +def test_kind_for_terms_first_match_wins_on_overlap() -> None: + """``KIND_TERMS`` order is a deliberate priority list. + + ``template`` appears in both ``html_template_dom_sink`` (earlier) and + ``parser_deserialization_integrity`` (later); the earlier entry must win. + """ + by_name = dict(KIND_TERMS) + assert "template" in by_name["html_template_dom_sink"] + assert "template" in by_name["parser_deserialization_integrity"] + assert _kind_for_terms({"template"}) == "html_template_dom_sink" + # Same overlap pattern for ``token`` between identity and secret-exposure. + assert "token" in by_name["identity_state_access_control"] + assert "token" in by_name["secret_debug_exposure"] + assert _kind_for_terms({"token"}) == "identity_state_access_control" + + +def test_row_kind_classifies_request_boundary_evidence() -> None: + row = { + "kind": "inputs", + "path": "app/Api.php", + "line": 10, + "match": ["execute"], + "text": "shell exec", + } + assert _row_kind(row) == "command_execution_sink" + + +def test_compact_row_truncates_long_text() -> None: + row = {"kind": "inputs", "line": 1, "match": [], "text": "x" * 1000} + compact = _compact_row(row) + assert len(compact["text"]) == 240 + assert compact["kind"] == "inputs" + + +def test_compact_row_keeps_optional_keys_when_present() -> None: + row = { + "kind": "request_boundaries", + "line": 5, + "match": [], + "text": "", + "endpoint": "/api/foo", + "methods": ["POST"], + } + compact = _compact_row(row) + assert compact["endpoint"] == "/api/foo" + assert compact["methods"] == ["POST"] + + +def test_compact_row_drops_empty_optional_keys() -> None: + row = {"kind": "inputs", "line": 1, "match": [], "text": "", "endpoint": "", "methods": []} + compact = _compact_row(row) + assert "endpoint" not in compact + assert "methods" not in compact + + +def test_dedupe_rows_collapses_duplicates_and_caps_at_max() -> None: + rows = [{"kind": "inputs", "line": 1, "match": ["x"], "text": "same"}] * 5 + rows.extend( + {"kind": "inputs", "line": i, "match": ["y"], "text": f"row-{i}"} + for i in range(MAX_EVIDENCE_ROWS + 5) + ) + deduped = _dedupe_rows(rows) + assert len(deduped) <= MAX_EVIDENCE_ROWS + # The duplicate block collapses to one entry, then unique rows fill the rest. + assert sum(1 for r in deduped if r["text"] == "same") == 1 + + +def test_candidate_kinds_for_boundary_returns_request_boundary() -> None: + pair = { + "expert": "injection", + "path": "app/Api.php", + "boundary_mandatory": True, + "boundary_id": "B1", + } + assert _candidate_kinds(pair, {}) == ["request_boundary"] + + +def test_candidate_kinds_uses_expert_hints_from_rows() -> None: + pair: dict[str, Any] = { + "expert": "injection", + "path": "app/Api.php", + "matched_terms": [], + "signals": [], + "evidence": [], + } + rows_by_kind = { + "sinks": [ + {"kind": "sinks", "path": "app/Api.php", "line": 1, "match": ["exec"], "text": "shell"}, + ], + } + assert "command_execution_sink" in _candidate_kinds(pair, rows_by_kind) + + +# --------------------------------------------------------------------------- +# build_routing_units end-to-end +# --------------------------------------------------------------------------- + + +def _req(path: str, expert: str, **extra: Any) -> dict[str, Any]: + base: dict[str, Any] = { + "expert": expert, + "path": path, + "reason": "test", + "matched_terms": [], + "signals": [], + "kinds": [], + "evidence": [], + "interesting": True, + "path_class": "runtime", + } + base.update(extra) + return base + + +def test_build_routing_units_assigns_ids_in_sort_order() -> None: + coverage_gaps = { + "routing_requirements": [ + _req("app/QueryHandler.php", "injection"), + _req("app/Auth.php", "authentication-failures"), + ], + } + inventory: dict[str, list[dict[str, Any]]] = { + "sinks": [ + {"kind": "sinks", "path": "app/QueryHandler.php", "line": 1, "match": ["raw"], "text": "query"}, + {"kind": "sinks", "path": "app/Auth.php", "line": 1, "match": ["session"], "text": "auth"}, + ], + } + units = build_routing_units(coverage_gaps, inventory) + assert [u["unit_id"] for u in units] == ["U001", "U002"] + # Mandatory coverage requirements always sort first; both here are mandatory. + assert all(u["coverage"] == "mandatory" for u in units) + + +def test_build_routing_units_separates_required_from_suggested() -> None: + coverage_gaps = { + "routing_requirements": [_req("app/QueryHandler.php", "injection")], + "coverage_suggestions": [_req("app/QueryHandler.php", "broken-access-control")], + } + inventory: dict[str, list[dict[str, Any]]] = { + "sinks": [ + {"kind": "sinks", "path": "app/QueryHandler.php", "line": 1, "match": ["raw", "role"], "text": "query"}, + ], + } + units = build_routing_units(coverage_gaps, inventory) + # Both pairs target the same path; whether they merge into one unit or split + # depends on the chosen kind. Verify the expert tagging is preserved. + required = {expert for u in units for expert in u["required_experts"]} + suggested = {expert for u in units for expert in u["suggested_experts"]} + assert "injection" in required + assert "broken-access-control" in suggested + assert "injection" not in suggested + assert "broken-access-control" not in required + + +def test_build_routing_units_preserves_boundary_fields() -> None: + coverage_gaps = { + "routing_requirements": [ + _req( + "app/Api.php", + "injection", + boundary_mandatory=True, + boundary_id="B1", + endpoint="/api/run", + methods=["POST"], + boundary_type="route", + request_fields=["cmd"], + ), + ], + } + units = build_routing_units(coverage_gaps, inventory={}) + assert len(units) == 1 + unit = units[0] + assert unit["kind"] == "request_boundary" + assert unit["boundary_id"] == "B1" + assert unit["endpoint"] == "/api/run" + assert unit["methods"] == ["POST"] + + +def test_build_routing_units_emits_mandatory_path_unit_for_uncovered_gap() -> None: + coverage_gaps = { + "input_with_sink_or_exposure": [{"path": "app/Untriaged.php"}], + } + inventory: dict[str, list[dict[str, Any]]] = { + "inputs": [ + {"kind": "inputs", "path": "app/Untriaged.php", "line": 1, "match": [], "text": "raw"} + ], + } + units = build_routing_units(coverage_gaps, inventory) + assert len(units) == 1 + assert units[0]["coverage"] == "mandatory_path" + assert units[0]["required_experts"] == [] + + +# --------------------------------------------------------------------------- +# write_routing_units — disk-side entry point called from the CLI +# --------------------------------------------------------------------------- + + +def test_write_routing_units_emits_jsonl_one_unit_per_line(run_dir: Path) -> None: + (run_dir / "recon-output").mkdir(parents=True, exist_ok=True) + (run_dir / "recon-output" / "coverage-gaps.json").write_text(json.dumps({ + "routing_requirements": [_req("app/QueryHandler.php", "injection")], + })) + inventory: dict[str, list[dict[str, Any]]] = { + "sinks": [ + {"kind": "sinks", "path": "app/QueryHandler.php", "line": 1, "match": ["raw"], "text": "query"}, + ], + } + out = write_routing_units(run_dir, inventory) + + assert out == run_dir / "recon-output" / "routing-units.jsonl" + lines = [line for line in out.read_text().splitlines() if line.strip()] + assert len(lines) == 1 + unit = json.loads(lines[0]) + assert unit["unit_id"] == "U001" + assert unit["path"] == "app/QueryHandler.php" + assert "injection" in unit["required_experts"] + + +def test_write_routing_units_with_no_coverage_file_writes_empty(run_dir: Path) -> None: + out = write_routing_units(run_dir, inventory={}) + assert out.exists() + assert out.read_text() == "" diff --git a/tests/test_schemas.py b/tests/test_schemas.py new file mode 100644 index 0000000..711af97 --- /dev/null +++ b/tests/test_schemas.py @@ -0,0 +1,321 @@ +"""Layer 1: JSON Schema golden tests. + +For each durable-artifact schema we keep a minimum-valid baseline and a +table of single-field mutations that should fail validation. The assertions +check both that the validator raises and that the error message points at +the right JSON path — that way a schema change that silently loosens a rule +still trips the test. +""" + +from __future__ import annotations + +from typing import Any, Callable + +import pytest + +from openhack.schemas import ( + validate_finding, + validate_finding_candidate, + validate_finding_triage, + validate_result, + validate_scenario, +) + +SHA256 = "a" * 64 + + +# --------------------------------------------------------------------------- +# Baselines +# --------------------------------------------------------------------------- + + +def _scenario() -> dict[str, Any]: + return { + "id": "S001", + "recon_item_id": "R001", + "expert": "injection", + "target_path": "app/Http/Foo.php", + "proof_question": "Is the user-supplied id concatenated into a raw SQL query?", + "evidence_required": ["sink call", "lack of binding"], + } + + +def _scenario_result() -> dict[str, Any]: + return { + "scenario_id": "S001", + "review_mode": "per-scenario-subagent", + "subagent_id": "agent-1", + "scenario_prompt_sha256": SHA256, + "reviewed_files": ["app/Http/Foo.php"], + "status": "verified", + "expert": "injection", + "summary": "Confirmed raw SQL concatenation.", + "evidence": [ + { + "path": "app/Http/Foo.php", + "line": 42, + "snippet": "$db->raw($_GET['id'])", + "note": "user input flows directly into raw()", + } + ], + } + + +def _finding() -> dict[str, Any]: + return { + "title": "SQL injection in Foo.php", + "severity": "high", + "target_path": "app/Http/Foo.php", + "attacker_role": "unauthenticated user", + "preconditions": "Endpoint reachable without auth.", + "non_technical_summary": "An attacker can read the database.", + "summary": "Raw SQL built from user input.", + "attack_chain": "GET /foo?id=' OR 1=1 -- → raw() executes attacker SQL", + "example_attack": "curl 'http://host/foo?id=1%20OR%201=1--'", + "evidence": "See app/Http/Foo.php:42", + "impact": "Full database read.", + "impact_analysis": "User table and secrets exposed.", + "attacker_use": "Exfiltrate PII.", + "recommended_fix": "Use parameter binding.", + "validation_notes": "Reproduced locally on commit abc123.", + } + + +def _finding_candidate() -> dict[str, Any]: + return { + "candidate_id": "S001-F001", + "scenario_id": "S001", + "source_result": "scenarios/finished/S001.json", + "expert": "injection", + "status": "pending_triage", + "finding": _finding(), + } + + +def _finding_triage() -> dict[str, Any]: + return { + "candidate_id": "S001-F001", + "review_mode": "per-finding-triage-agent", + "triage_agent_id": "triage-1", + "triage_prompt_sha256": SHA256, + "reviewed_files": ["app/Http/Foo.php"], + "decision": "accepted", + "summary": "Confirmed vulnerable.", + "final_severity": "high", + "severity_rationale": "Direct DB read by unauth user.", + "confidence": "high", + "evidence_assessment": "Evidence is sufficient.", + "evidence_gaps": [], + "required_changes": [], + } + + +# --------------------------------------------------------------------------- +# Mutation helpers +# --------------------------------------------------------------------------- + + +def _drop(key: str) -> Callable[[dict[str, Any]], None]: + def mutate(value: dict[str, Any]) -> None: + value.pop(key, None) + + return mutate + + +def _set(path: list[str | int], new_value: Any) -> Callable[[dict[str, Any]], None]: + def mutate(value: dict[str, Any]) -> None: + cursor: Any = value + for part in path[:-1]: + cursor = cursor[part] + cursor[path[-1]] = new_value + + return mutate + + +# --------------------------------------------------------------------------- +# Happy paths +# --------------------------------------------------------------------------- + + +def test_scenario_baseline_validates() -> None: + validate_scenario(_scenario()) + + +def test_scenario_result_baseline_validates() -> None: + validate_result(_scenario_result(), scenario_id="S001") + + +def test_finding_baseline_validates() -> None: + validate_finding(_finding()) + + +def test_finding_candidate_baseline_validates() -> None: + validate_finding_candidate(_finding_candidate()) + + +def test_finding_triage_baseline_validates() -> None: + validate_finding_triage(_finding_triage()) + + +def test_finding_evidence_accepts_all_three_shapes() -> None: + """Schema declares ``evidence`` as ``oneOf [string, array, object]``.""" + for shape in ( + "string evidence", + [{"path": "a.php", "line": 1, "snippet": "x", "note": "n"}], + {"path": "a.php", "details": "..."}, + ): + finding = _finding() + finding["evidence"] = shape + validate_finding(finding) + + +# --------------------------------------------------------------------------- +# Negative cases — each row mutates the baseline and asserts a failure path +# --------------------------------------------------------------------------- + + +SCENARIO_CASES = [ + pytest.param(_drop("id"), "$", id="missing-id"), + pytest.param(_drop("expert"), "$", id="missing-expert"), + pytest.param(_drop("proof_question"), "$", id="missing-proof-question"), + pytest.param(_set(["id"], "S99"), "$.id", id="id-too-short"), + pytest.param(_set(["id"], "scenario-1"), "$.id", id="id-bad-prefix"), + pytest.param(_set(["evidence_required"], 7), "$.evidence_required", id="evidence-bad-type"), + pytest.param(_set(["evidence_required"], [""]), "$.evidence_required", id="evidence-array-empty-string"), + pytest.param(_set(["target_path"], ""), "$.target_path", id="target-path-empty"), + pytest.param(_set(["priority"], "urgent"), "$.priority", id="priority-bad-enum"), + pytest.param(_set(["routing_unit_id"], "unit-1"), "$.routing_unit_id", id="routing-unit-bad-pattern"), + pytest.param( + _set(["proof_obligations"], [{"id": "BAD ID", "question": "?", "evidence_required": "e"}]), + "$.proof_obligations.0.id", + id="obligation-id-bad-pattern", + ), + pytest.param( + _set(["proof_obligations"], [{"id": "ok", "question": "?"}]), + "$.proof_obligations.0", + id="obligation-missing-evidence-required", + ), +] + + +@pytest.mark.parametrize("mutate,expected_path", SCENARIO_CASES) +def test_scenario_invalid_cases(mutate: Callable[[dict[str, Any]], None], expected_path: str) -> None: + scenario = _scenario() + mutate(scenario) + with pytest.raises(ValueError) as exc: + validate_scenario(scenario) + assert expected_path in str(exc.value) + assert "scenario-schema.json" in str(exc.value) + + +RESULT_CASES = [ + pytest.param(_drop("scenario_id"), "$", id="missing-scenario-id"), + pytest.param(_set(["scenario_id"], "X1"), "$.scenario_id", id="scenario-id-bad-pattern"), + pytest.param(_set(["review_mode"], "batch"), "$.review_mode", id="review-mode-not-allowed"), + pytest.param(_set(["status"], "maybe"), "$.status", id="status-bad-enum"), + pytest.param(_set(["scenario_prompt_sha256"], "deadbeef"), "$.scenario_prompt_sha256", id="sha-too-short"), + pytest.param(_set(["reviewed_files"], []), "$.reviewed_files", id="reviewed-files-empty"), + pytest.param(_set(["evidence"], []), "$.evidence", id="evidence-empty"), + pytest.param( + _set(["evidence"], [{"path": "a.php", "line": 1, "snippet": "x"}]), + "$.evidence.0", + id="evidence-missing-note", + ), + pytest.param( + _set(["proof_obligations"], [{"id": "ok", "status": "weird", "summary": "s"}]), + "$.proof_obligations.0.status", + id="obligation-status-bad-enum", + ), +] + + +@pytest.mark.parametrize("mutate,expected_path", RESULT_CASES) +def test_scenario_result_invalid_cases( + mutate: Callable[[dict[str, Any]], None], expected_path: str +) -> None: + result = _scenario_result() + mutate(result) + with pytest.raises(ValueError) as exc: + validate_result(result, scenario_id="S001") + assert expected_path in str(exc.value) + assert "scenario-result-schema.json" in str(exc.value) + + +FINDING_CASES = [ + pytest.param(_drop("title"), "$", id="missing-title"), + pytest.param(_drop("recommended_fix"), "$", id="missing-recommended-fix"), + pytest.param(_set(["severity"], "catastrophic"), "$.severity", id="severity-bad-enum"), + pytest.param(_set(["summary"], ""), "$.summary", id="summary-empty"), + pytest.param(_set(["evidence"], 7), "$.evidence", id="evidence-bad-type"), +] + + +@pytest.mark.parametrize("mutate,expected_path", FINDING_CASES) +def test_finding_invalid_cases( + mutate: Callable[[dict[str, Any]], None], expected_path: str +) -> None: + finding = _finding() + mutate(finding) + with pytest.raises(ValueError) as exc: + validate_finding(finding) + assert expected_path in str(exc.value) + assert "finding-schema.json" in str(exc.value) + + +CANDIDATE_CASES = [ + pytest.param(_drop("candidate_id"), "$", id="missing-candidate-id"), + pytest.param(_set(["candidate_id"], "S001-001"), "$.candidate_id", id="candidate-id-bad-pattern"), + pytest.param(_set(["candidate_id"], "S1-F1"), "$.candidate_id", id="candidate-id-too-short"), + pytest.param(_set(["status"], "accepted"), "$.status", id="status-not-pending-triage"), + pytest.param(_set(["scenario_id"], "scn-1"), "$.scenario_id", id="scenario-id-bad-pattern"), +] + + +@pytest.mark.parametrize("mutate,expected_path", CANDIDATE_CASES) +def test_finding_candidate_invalid_cases( + mutate: Callable[[dict[str, Any]], None], expected_path: str +) -> None: + candidate = _finding_candidate() + mutate(candidate) + with pytest.raises(ValueError) as exc: + validate_finding_candidate(candidate) + assert expected_path in str(exc.value) + assert "finding-candidate-schema.json" in str(exc.value) + + +TRIAGE_CASES = [ + pytest.param(_drop("decision"), "$", id="missing-decision"), + pytest.param(_drop("evidence_gaps"), "$", id="missing-evidence-gaps"), + pytest.param(_set(["decision"], "approved"), "$.decision", id="decision-bad-enum"), + pytest.param(_set(["review_mode"], "per-scenario-subagent"), "$.review_mode", id="review-mode-wrong"), + pytest.param(_set(["final_severity"], "huge"), "$.final_severity", id="severity-bad-enum"), + pytest.param(_set(["confidence"], "very-high"), "$.confidence", id="confidence-bad-enum"), + pytest.param(_set(["triage_prompt_sha256"], "ZZZ"), "$.triage_prompt_sha256", id="sha-bad-pattern"), + pytest.param(_set(["reviewed_files"], []), "$.reviewed_files", id="reviewed-files-empty"), +] + + +@pytest.mark.parametrize("mutate,expected_path", TRIAGE_CASES) +def test_finding_triage_invalid_cases( + mutate: Callable[[dict[str, Any]], None], expected_path: str +) -> None: + triage = _finding_triage() + mutate(triage) + with pytest.raises(ValueError) as exc: + validate_finding_triage(triage) + assert expected_path in str(exc.value) + assert "finding-triage-schema.json" in str(exc.value) + + +def test_validator_reports_each_violation() -> None: + """The error message must name each failing field so authors fix in one pass.""" + scenario = _scenario() + scenario.pop("id") + scenario.pop("expert") + scenario["target_path"] = "" + with pytest.raises(ValueError) as exc: + validate_scenario(scenario) + message = str(exc.value) + assert "'id' is a required property" in message + assert "'expert' is a required property" in message + assert "target_path" in message