From 0e909af81e527d3c72681425688d17b5aa2931d1 Mon Sep 17 00:00:00 2001 From: Rutger van Waveren Date: Wed, 20 May 2026 14:54:47 +0200 Subject: [PATCH 1/2] test: add schema and pure-logic test suites MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the first two test layers for OpenHack: JSON Schema golden tests for all five durable artifact shapes, and pure-logic unit tests for the modules whose correctness gates the scenario→finding pipeline (paths, coverage scoring, routing-unit clustering, backlog validation). - 150 tests, runs in <0.2s, no LLM or network dependencies - pytest added to dev extras and wired into CI alongside ruff/mypy - conftest pins OPENHACK_ROOT so on-disk schema/expert lookups resolve deterministically regardless of where pytest is invoked from Co-Authored-By: Claude Opus 4.7 (1M context) --- .github/workflows/ci.yml | 2 + pyproject.toml | 5 + tests/__init__.py | 0 tests/conftest.py | 30 +++ tests/test_backlog.py | 380 ++++++++++++++++++++++++++++++++++++ tests/test_coverage.py | 261 +++++++++++++++++++++++++ tests/test_paths.py | 54 +++++ tests/test_routing_units.py | 227 +++++++++++++++++++++ tests/test_schemas.py | 334 +++++++++++++++++++++++++++++++ 9 files changed, 1293 insertions(+) create mode 100644 tests/__init__.py create mode 100644 tests/conftest.py create mode 100644 tests/test_backlog.py create mode 100644 tests/test_coverage.py create mode 100644 tests/test_paths.py create mode 100644 tests/test_routing_units.py create mode 100644 tests/test_schemas.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bcd14a4..9148c85 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -19,3 +19,5 @@ jobs: run: ruff check . - name: Mypy run: mypy + - name: Pytest + run: pytest diff --git a/pyproject.toml b/pyproject.toml index 1a73fbb..af18763 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,7 @@ dev = [ "ruff>=0.6", "mypy>=1.10", "types-jsonschema", + "pytest>=7.0", ] [project.scripts] @@ -34,3 +35,7 @@ target-version = "py39" files = ["src"] disallow_untyped_defs = true disallow_incomplete_defs = true + +[tool.pytest.ini_options] +testpaths = ["tests"] +addopts = "-ra" diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..c9bf42e --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,30 @@ +"""Shared pytest fixtures. + +OPENHACK_ROOT is pinned to the repo root so ``paths.root()`` resolves +deterministically regardless of where pytest is invoked from. Modules under +test reach for ``root() / "agents" / "experts"`` and ``root() / "config"``, +so the real on-disk workspace is the simplest fixture. +""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +REPO_ROOT = Path(__file__).resolve().parent.parent + + +@pytest.fixture(autouse=True) +def _pin_openhack_root(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("OPENHACK_ROOT", str(REPO_ROOT)) + + +@pytest.fixture() +def run_dir(tmp_path: Path) -> Path: + """A scratch run directory with the standard subdirs created.""" + from openhack.paths import ALL_RUN_DIRS + + for name in ALL_RUN_DIRS: + (tmp_path / name).mkdir(parents=True, exist_ok=True) + return tmp_path diff --git a/tests/test_backlog.py b/tests/test_backlog.py new file mode 100644 index 0000000..54c3ba4 --- /dev/null +++ b/tests/test_backlog.py @@ -0,0 +1,380 @@ +"""Layer 2: scenario backlog validation and write-out.""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +import pytest + +from openhack import backlog +from openhack.backlog import ( + DECISIONS, + _scenario_covers_boundary, + _scenario_covers_pair, + _scenario_covers_path, + _scenario_covers_unit, + _scenario_paths, + _validate_decisions, + coverage_errors, + record_backlog, +) + +EXPERTS = { + "injection", + "broken-access-control", + "authentication-failures", + "cryptographic-failures", +} + + +# --------------------------------------------------------------------------- +# _scenario_paths +# --------------------------------------------------------------------------- + + +def test_scenario_paths_collects_from_all_fields() -> None: + scenario: dict[str, Any] = { + "target_path": "app/Foo.php", + "target_paths": ["app/Bar.php", "app/Baz.php"], + "related_paths": "app/Util.php", + "covered_paths": ["app/Inc.php"], + } + assert _scenario_paths(scenario) == { + "app/Foo.php", + "app/Bar.php", + "app/Baz.php", + "app/Util.php", + "app/Inc.php", + } + + +def test_scenario_paths_handles_missing_fields_and_filters_empty() -> None: + scenario: dict[str, Any] = {"target_path": "app/A.php", "related_paths": []} + assert _scenario_paths(scenario) == {"app/A.php"} + + +# --------------------------------------------------------------------------- +# Scenario coverage predicates +# --------------------------------------------------------------------------- + + +def _scn(**overrides: Any) -> dict[str, Any]: + base: dict[str, Any] = {"id": "S001", "expert": "injection", "target_path": "app/Foo.php"} + base.update(overrides) + return base + + +def test_scenario_covers_path_matches_target_path() -> None: + assert _scenario_covers_path([_scn()], "app/Foo.php") + assert not _scenario_covers_path([_scn()], "app/Bar.php") + + +def test_scenario_covers_path_also_matches_related_paths() -> None: + scn = _scn(related_paths=["app/Bar.php"]) + assert _scenario_covers_path([scn], "app/Bar.php") + + +def test_scenario_covers_pair_requires_expert_and_path() -> None: + scn = _scn() + assert _scenario_covers_pair([scn], "app/Foo.php", "injection") + assert not _scenario_covers_pair([scn], "app/Foo.php", "cryptographic-failures") + assert not _scenario_covers_pair([scn], "app/Bar.php", "injection") + + +def test_scenario_covers_boundary_by_boundary_id() -> None: + scn = _scn(boundary_id="B1") + req = {"boundary_id": "B1", "expert": "injection"} + assert _scenario_covers_boundary([scn], req) + req2 = {"boundary_id": "B2", "expert": "injection"} + assert not _scenario_covers_boundary([scn], req2) + + +def test_scenario_covers_boundary_by_covered_boundary_ids_list() -> None: + scn = _scn(covered_boundary_ids=["B1", "B2"]) + req = {"boundary_id": "B2", "expert": "injection"} + assert _scenario_covers_boundary([scn], req) + + +def test_scenario_covers_boundary_falls_back_to_recon_item_id() -> None: + scn = _scn(recon_item_id="R1") + req = {"recon_item_id": "R1", "expert": "injection"} + assert _scenario_covers_boundary([scn], req) + + +def test_scenario_covers_unit_by_routing_unit_id_or_covered_list() -> None: + direct = _scn(routing_unit_id="U001") + covered = _scn(id="S002", covered_routing_unit_ids=["U002"]) + assert _scenario_covers_unit([direct], "U001", "injection") + assert _scenario_covers_unit([covered], "U002", "injection") + assert not _scenario_covers_unit([direct], "U001", "cryptographic-failures") + + +# --------------------------------------------------------------------------- +# _validate_decisions +# --------------------------------------------------------------------------- + + +def test_validate_decisions_rejects_unknown_decision_value() -> None: + decisions = [{"path": "a.php", "expert": "injection", "decision": "wat", "reason": "x" * 25}] + errors = _validate_decisions(decisions, scenarios=[], experts=EXPERTS) + assert any("invalid decision" in e for e in errors) + + +def test_validate_decisions_requires_path() -> None: + decisions = [{"decision": "not_applicable", "reason": "x" * 25}] + errors = _validate_decisions(decisions, scenarios=[], experts=EXPERTS) + assert any("missing path" in e for e in errors) + + +def test_validate_decisions_requires_scenario_ids_for_coverage_claims() -> None: + for decision_value in ("covered_by_scenario", "merged", "scenario"): + decisions = [{"path": "a.php", "expert": "injection", "decision": decision_value}] + errors = _validate_decisions(decisions, scenarios=[], experts=EXPERTS) + assert any("must reference scenario_ids" in e for e in errors), decision_value + + +def test_validate_decisions_flags_unknown_scenario_id() -> None: + decisions = [{ + "path": "a.php", + "expert": "injection", + "decision": "covered_by_scenario", + "scenario_ids": ["S999"], + }] + scenarios = [_scn()] + errors = _validate_decisions(decisions, scenarios=scenarios, experts=EXPERTS) + assert any("references unknown" in e and "S999" in e for e in errors) + + +def test_validate_decisions_requires_substantive_reason_for_dismissals() -> None: + # 'not_applicable' is a dismissal — short reason is rejected. + decisions = [{ + "path": "a.php", "expert": "injection", "decision": "not_applicable", "reason": "no" + }] + errors = _validate_decisions(decisions, scenarios=[], experts=EXPERTS) + assert any("needs a concrete reason" in e for e in errors) + + +def test_validate_decisions_accepts_wildcard_expert() -> None: + decisions = [{"path": "a.php", "expert": "*", "decision": "not_applicable", "reason": "x" * 25}] + errors = _validate_decisions(decisions, scenarios=[], experts=EXPERTS) + assert errors == [] + + +def test_validate_decisions_rejects_unknown_expert() -> None: + decisions = [{ + "path": "a.php", "expert": "made-up-expert", + "decision": "not_applicable", "reason": "x" * 25, + }] + errors = _validate_decisions(decisions, scenarios=[], experts=EXPERTS) + assert any("unknown expert" in e for e in errors) + + +def test_decisions_constant_lists_every_decision_kind() -> None: + """The set is consulted by router prompts; lock it in.""" + assert DECISIONS == { + "scenario", "covered_by_scenario", "merged", + "not_applicable", "needs_context", "out_of_scope", + } + + +# --------------------------------------------------------------------------- +# coverage_errors +# --------------------------------------------------------------------------- + + +def _write_coverage(path: Path, payload: dict[str, Any]) -> None: + (path / "recon-output").mkdir(parents=True, exist_ok=True) + (path / "recon-output" / "coverage-gaps.json").write_text(json.dumps(payload)) + + +def _write_units(path: Path, units: list[dict[str, Any]]) -> None: + (path / "recon-output").mkdir(parents=True, exist_ok=True) + (path / "recon-output" / "routing-units.jsonl").write_text( + "".join(json.dumps(u) + "\n" for u in units) + ) + + +def test_coverage_errors_flags_uncovered_path(run_dir: Path) -> None: + _write_coverage(run_dir, {"input_with_sink_or_exposure": [{"path": "app/Untouched.php"}]}) + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=[]) + assert any("missing path coverage for app/Untouched.php" in e for e in errors) + + +def test_coverage_errors_path_decision_satisfies_uncovered_path(run_dir: Path) -> None: + _write_coverage(run_dir, {"input_with_sink_or_exposure": [{"path": "app/Untouched.php"}]}) + decisions = [{ + "path": "app/Untouched.php", "expert": "*", + "decision": "not_applicable", "reason": "framework-owned, not invocable by users", + }] + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=decisions) + assert not any("missing path coverage" in e for e in errors) + + +def test_coverage_errors_flags_unrouted_required_pair(run_dir: Path) -> None: + _write_coverage(run_dir, { + "routing_requirements": [{"path": "app/Foo.php", "expert": "injection"}], + }) + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=[]) + assert any("missing expert coverage for app/Foo.php -> injection" in e for e in errors) + + +def test_coverage_errors_satisfied_by_scenario_targeting_the_pair(run_dir: Path) -> None: + _write_coverage(run_dir, { + "routing_requirements": [{"path": "app/Foo.php", "expert": "injection"}], + }) + scn = _scn() # target_path=app/Foo.php, expert=injection + errors = coverage_errors(run_dir, scenarios=[scn], coverage_decisions=[]) + assert not any("missing expert coverage" in e for e in errors) + + +def test_coverage_errors_flags_missing_routing_unit_coverage(run_dir: Path) -> None: + _write_units(run_dir, [{ + "unit_id": "U001", + "path": "app/Foo.php", + "coverage": "mandatory", + "required_experts": ["injection"], + }]) + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=[]) + assert any("missing routing-unit expert coverage for U001" in e for e in errors) + + +def test_coverage_errors_routing_unit_satisfied_by_scenario_with_unit_id(run_dir: Path) -> None: + _write_units(run_dir, [{ + "unit_id": "U001", + "path": "app/Foo.php", + "coverage": "mandatory", + "required_experts": ["injection"], + }]) + scn = _scn(routing_unit_id="U001") + errors = coverage_errors(run_dir, scenarios=[scn], coverage_decisions=[]) + assert not any("missing routing-unit" in e for e in errors) + + +# --------------------------------------------------------------------------- +# record_backlog — happy path + key error gates +# --------------------------------------------------------------------------- + + +def _valid_scenario(scn_id: str = "S001", **overrides: Any) -> dict[str, Any]: + base: dict[str, Any] = { + "id": scn_id, + "recon_item_id": "R001", + "expert": "injection", + "target_path": "app/Foo.php", + "proof_question": "Is user input concatenated into a raw SQL query?", + "evidence_required": ["sink call", "lack of binding"], + "security_invariant": "Database queries must use parameter binding.", + "proof_obligations": [ + {"id": "p1", "question": "Is the sink raw?", "evidence_required": "snippet"} + ], + } + base.update(overrides) + return base + + +@pytest.fixture() +def patched_run_dir( + run_dir: Path, monkeypatch: pytest.MonkeyPatch +) -> Path: + """Redirect ``run_path`` so ``record_backlog`` writes into tmp.""" + monkeypatch.setattr(backlog, "run_path", lambda target, run_id: run_dir) + return run_dir + + +def _router_output(scenarios: list[dict[str, Any]], **extras: Any) -> dict[str, Any]: + payload = {"scenarios": scenarios, "coverage_decisions": [], "coverage_notes": []} + payload.update(extras) + return payload + + +def test_record_backlog_writes_scenario_files_on_happy_path( + patched_run_dir: Path, tmp_path: Path +) -> None: + router = tmp_path / "router.json" + router.write_text(json.dumps(_router_output([_valid_scenario()]))) + + result = record_backlog("acme", "demo", router) + assert [s["id"] for s in result] == ["S001"] + + written = patched_run_dir / "scenarios" / "backlog" / "S001.json" + assert written.is_file() + payload = json.loads(written.read_text()) + # DEFAULTS are layered in by record_backlog. + assert payload["priority"] == "normal" + assert payload["result_location"] == "scenarios/finished/S001.json" + + index = patched_run_dir / "scenarios" / "index.jsonl" + assert index.read_text().strip().count("\n") == 0 # one line, no trailing extras + + decisions = patched_run_dir / "scenarios" / "coverage-decisions.json" + assert json.loads(decisions.read_text())["coverage_decisions"] == [] + + +def test_record_backlog_rejects_unknown_expert( + patched_run_dir: Path, tmp_path: Path +) -> None: + router = tmp_path / "router.json" + router.write_text(json.dumps(_router_output([_valid_scenario(expert="made-up-expert")]))) + with pytest.raises(ValueError, match="Unknown expert"): + record_backlog("acme", "demo", router) + + +def test_record_backlog_rejects_duplicate_scenario_id( + patched_run_dir: Path, tmp_path: Path +) -> None: + router = tmp_path / "router.json" + router.write_text(json.dumps(_router_output([ + _valid_scenario("S001"), + _valid_scenario("S001", target_path="app/Bar.php"), + ]))) + with pytest.raises(ValueError, match="Duplicate scenario id"): + record_backlog("acme", "demo", router) + + +def test_record_backlog_rejects_duplicate_proof_obligation_id( + patched_run_dir: Path, tmp_path: Path +) -> None: + scn = _valid_scenario() + scn["proof_obligations"] = [ + {"id": "p1", "question": "Q1", "evidence_required": "e"}, + {"id": "p1", "question": "Q2", "evidence_required": "e"}, + ] + router = tmp_path / "router.json" + router.write_text(json.dumps(_router_output([scn]))) + with pytest.raises(ValueError, match="duplicate proof obligation"): + record_backlog("acme", "demo", router) + + +def test_record_backlog_rejects_missing_required_field( + patched_run_dir: Path, tmp_path: Path +) -> None: + scn = _valid_scenario() + scn.pop("security_invariant") + router = tmp_path / "router.json" + router.write_text(json.dumps(_router_output([scn]))) + with pytest.raises(ValueError, match="missing: \\['security_invariant'\\]"): + record_backlog("acme", "demo", router) + + +def test_record_backlog_surfaces_schema_failure( + patched_run_dir: Path, tmp_path: Path +) -> None: + scn = _valid_scenario(id="invalid-id-format") + router = tmp_path / "router.json" + router.write_text(json.dumps(_router_output([scn]))) + with pytest.raises(ValueError, match="scenario-schema.json"): + record_backlog("acme", "demo", router) + + +def test_record_backlog_surfaces_coverage_gap( + patched_run_dir: Path, tmp_path: Path +) -> None: + _write_coverage(patched_run_dir, { + "routing_requirements": [{"path": "app/Unrelated.php", "expert": "injection"}], + }) + router = tmp_path / "router.json" + router.write_text(json.dumps(_router_output([_valid_scenario()]))) + with pytest.raises(ValueError, match="does not cover recon evidence"): + record_backlog("acme", "demo", router) diff --git a/tests/test_coverage.py b/tests/test_coverage.py new file mode 100644 index 0000000..64a6e1f --- /dev/null +++ b/tests/test_coverage.py @@ -0,0 +1,261 @@ +"""Layer 2: coverage scoring and routing requirement generation. + +``coverage.py`` is the biggest single module (608 LOC) and decides which +``(path, expert)`` pairs become mandatory scenarios. A miss here surfaces +as silently dropped attack surface, so these tests pin down the decision +table branch-by-branch. +""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from openhack.coverage import ( + MAX_REQUIREMENTS_PER_PATH, + PRODUCTIVE_CLASSES, + SUGGESTION_LIMIT, + _path_class, + _score_pair, + _source_or_sink, + _tokens, + coverage_opportunities, + coverage_suggestions, + routing_requirements, +) + + +# --------------------------------------------------------------------------- +# Path classification +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "path,expected", + [ + (".ddev/config.yaml", "dev"), + (".devcontainer/Dockerfile", "dev"), + ("app/tests/FooTest.php", "test"), + ("src/__fixtures__/sample.json", "test"), + ("public/assets/libraries/jquery.js", "asset"), + (".github/workflows/ci.yml", "ci"), + ("path/to/.github/workflows/x.yml", "ci"), + ("package.json", "manifest"), + ("composer.lock", "manifest"), + ("requirements.txt", "manifest"), + ("docs/intro.md", "docs"), + ("README.md", "docs"), + ("notes.rst", "docs"), + ("src/translations/en.yml", "fixture"), + ("public/assets/js/app.js", "client"), + ("src/foo.js", "client"), + ("public/assets/logo.png", "asset"), + ("public/icon.svg", "asset"), + ("templates/home.twig", "template"), + ("config/services.yml", "config"), + ("settings.xml", "config"), + ("bin/run", "script"), + ("scripts/deploy.sh", "script"), + ("app/Http/Controller.php", "runtime"), + ("app/bundles/foo/Service.php", "runtime"), + ("plugins/extra/handler.php", "runtime"), + ("README", "other"), + ], +) +def test_path_class(path: str, expected: str) -> None: + assert _path_class(path) == expected + + +def test_productive_classes_match_expectations() -> None: + """Sanity-check the productive set — scoring depends on this membership.""" + assert PRODUCTIVE_CLASSES == { + "client", "config", "manifest", "runtime", "script", "template" + } + + +# --------------------------------------------------------------------------- +# Tokenizer +# --------------------------------------------------------------------------- + + +def test_tokens_drops_short_and_stopwords() -> None: + out = _tokens("the AND a URL path data 12 abc_def query") + assert "the" not in out and "and" not in out + assert "url" not in out # in STOPWORDS + assert "path" not in out # in STOPWORDS + assert "abc_def" in out # underscores preserved + assert "query" in out + assert "12" not in out # below length-3 cutoff + + +def test_tokens_splits_on_non_alphanumeric() -> None: + assert _tokens("Foo-Bar.baz/Qux") == {"foo", "bar", "baz", "qux"} + + +# --------------------------------------------------------------------------- +# Pair scoring +# --------------------------------------------------------------------------- + + +def _pair(**overrides: Any) -> dict[str, Any]: + base: dict[str, Any] = { + "expert": "injection", + "path": "app/Foo.php", + "reason": "test", + "matched_terms": [], + "signals": [], + "kinds": [], + "evidence": [], + "interesting": False, + "path_class": "runtime", + } + base.update(overrides) + return base + + +def test_score_boundary_mandatory_always_high() -> None: + pair = _pair(boundary_mandatory=True, strong_terms=["endpoint"]) + confidence, strong, _ = _score_pair(pair) + assert confidence == "high" + assert strong == ["endpoint"] + + +def test_score_supply_chain_on_manifest_is_high() -> None: + pair = _pair(expert="software-supply-chain-failures", path="package.json") + confidence, _, reason = _score_pair(pair) + assert confidence == "high" + assert "supply-chain" in reason or "Dependency" in reason + + +def test_score_non_productive_path_class_is_low() -> None: + pair = _pair(path="public/assets/logo.png") + confidence, _, reason = _score_pair(pair) + assert confidence == "low" + assert "not a runtime attack surface" in reason + + +def test_score_runtime_without_strong_terms_is_low() -> None: + pair = _pair(path="app/Generic.php") + confidence, _, reason = _score_pair(pair) + assert confidence == "low" + assert "generic" in reason.lower() + + +def test_score_runtime_with_strong_terms_but_no_sink_is_suggestion() -> None: + pair = _pair(path="app/query/Builder.php", interesting=False) + confidence, strong, _ = _score_pair(pair) + assert confidence == "suggestion" + assert "query" in strong + + +def test_score_runtime_with_strong_terms_and_sink_is_high() -> None: + pair = _pair(path="app/query/Builder.php", interesting=True) + confidence, strong, reason = _score_pair(pair) + assert confidence == "high" + assert "query" in strong + assert "source, sink" in reason or "boundary evidence" in reason + + +def test_source_or_sink_truthy_for_boundary() -> None: + assert _source_or_sink(_pair(boundary_mandatory=True)) is True + assert _source_or_sink(_pair(interesting=True)) is True + assert _source_or_sink(_pair()) is False + + +# --------------------------------------------------------------------------- +# End-to-end: candidate pair generation from inventory +# --------------------------------------------------------------------------- + + +def _inv_row(kind: str, path: str, **extra: Any) -> dict[str, Any]: + row: dict[str, Any] = { + "kind": kind, + "path": path, + "line": 1, + "match": [], + "text": "", + } + row.update(extra) + return row + + +SELECTED = ["injection", "software-supply-chain-failures"] + + +def test_routing_requirements_yields_high_confidence_pairs_only() -> None: + inventory: dict[str, list[dict[str, Any]]] = { + "inputs": [_inv_row("inputs", "app/QueryHandler.php", match=["query"])], + "sinks": [_inv_row("sinks", "app/QueryHandler.php", match=["raw"])], + } + reqs = routing_requirements(inventory, recon_items=None, selected_experts=SELECTED) + assert reqs, "expected at least one high-confidence requirement" + for req in reqs: + assert req["confidence"] == "high" + # Public pairs have the private 'interesting' flag stripped. + assert "interesting" not in req + assert req["requirement"].startswith("Create a scenario") + + +def test_routing_requirements_skips_non_productive_paths() -> None: + inventory: dict[str, list[dict[str, Any]]] = { + "inputs": [_inv_row("inputs", "tests/QueryTest.php", match=["query"])], + "sinks": [_inv_row("sinks", "tests/QueryTest.php", match=["raw"])], + } + reqs = routing_requirements(inventory, recon_items=None, selected_experts=SELECTED) + assert reqs == [] + + +def test_routing_requirements_promotes_supply_chain_for_manifest() -> None: + inventory: dict[str, list[dict[str, Any]]] = { + "inputs": [_inv_row("inputs", "package.json", match=["dependency"])], + } + reqs = routing_requirements( + inventory, recon_items=None, selected_experts=["software-supply-chain-failures"] + ) + paths = {req["path"] for req in reqs} + assert "package.json" in paths + + +def test_coverage_opportunities_groups_by_expert() -> None: + inventory: dict[str, list[dict[str, Any]]] = { + "inputs": [ + _inv_row("inputs", "app/QueryHandler.php", match=["query"]), + _inv_row("inputs", "app/ShellRunner.php", match=["shell", "exec"]), + ], + "sinks": [ + _inv_row("sinks", "app/QueryHandler.php", match=["raw"]), + _inv_row("sinks", "app/ShellRunner.php", match=["exec"]), + ], + } + opps = coverage_opportunities( + inventory, recon_items=None, selected_experts=["injection"] + ) + assert len(opps) == 1 + [opp] = opps + assert opp["expert"] == "injection" + assert opp["candidate_paths"] >= 2 + paths = {ex["path"] for ex in opp["examples"]} + assert {"app/QueryHandler.php", "app/ShellRunner.php"} <= paths + + +def test_coverage_suggestions_skip_required_pairs() -> None: + """Items already represented in ``required_keys`` must not double-count.""" + inventory: dict[str, list[dict[str, Any]]] = { + "inputs": [_inv_row("inputs", "app/QueryHandler.php", match=["query"])], + "sinks": [_inv_row("sinks", "app/QueryHandler.php", match=["raw"])], + } + required = {("app/QueryHandler.php", "injection")} + sugs = coverage_suggestions( + inventory, + recon_items=None, + required_keys=required, + selected_experts=["injection"], + ) + assert all(s["path"] != "app/QueryHandler.php" for s in sugs) + + +def test_constants_have_expected_values() -> None: + """Lock in the public limits referenced from docs/AGENTS.md.""" + assert MAX_REQUIREMENTS_PER_PATH == 4 + assert SUGGESTION_LIMIT == 500 diff --git a/tests/test_paths.py b/tests/test_paths.py new file mode 100644 index 0000000..80f7452 --- /dev/null +++ b/tests/test_paths.py @@ -0,0 +1,54 @@ +"""Layer 2: path resolution and run-directory scaffolding.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from openhack.paths import ALL_RUN_DIRS, ensure_run_dirs, root, run_path + + +def test_root_resolves_via_openhack_root_env(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("OPENHACK_ROOT", str(Path(__file__).resolve().parent.parent)) + assert (root() / "agents" / "experts").is_dir() + + +def test_root_raises_when_env_var_points_at_non_workspace( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.setenv("OPENHACK_ROOT", str(tmp_path)) + with pytest.raises(RuntimeError, match="OPENHACK_ROOT is not a valid workspace root"): + root() + + +def test_root_falls_back_to_walk_up_when_env_unset( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """With no env var set, root() walks up from CWD / module location. + + The package is installed editable from this repo, so the module-location + walk-up will land on the real workspace even when CWD is unrelated. + """ + monkeypatch.delenv("OPENHACK_ROOT", raising=False) + monkeypatch.chdir(tmp_path) + found = root() + assert (found / "agents" / "experts").is_dir() + assert (found / "templates" / "scenario-prompt.md").is_file() + + +def test_run_path_is_under_root() -> None: + path = run_path("acme/widget", "2026-05-20-demo") + assert path == root() / "runs" / "acme/widget" / "2026-05-20-demo" + + +def test_ensure_run_dirs_creates_every_standard_subdir( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """``ensure_run_dirs`` materializes the full layout idempotently.""" + monkeypatch.setattr("openhack.paths.run_path", lambda target, run_id: tmp_path / target / run_id) + created = ensure_run_dirs("acme/widget", "demo") + for name in ALL_RUN_DIRS: + assert (created / name).is_dir() + # Idempotent: a second call must not raise. + ensure_run_dirs("acme/widget", "demo") diff --git a/tests/test_routing_units.py b/tests/test_routing_units.py new file mode 100644 index 0000000..142425a --- /dev/null +++ b/tests/test_routing_units.py @@ -0,0 +1,227 @@ +"""Layer 2: routing-unit clustering.""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from openhack.routing_units import ( + KIND_TERMS, + MAX_EVIDENCE_ROWS, + _candidate_kinds, + _compact_row, + _dedupe_rows, + _kind_for_terms, + _row_kind, + build_routing_units, +) + + +@pytest.mark.parametrize( + "text,expected", + [ + ({"exec", "shell"}, "command_execution_sink"), + ({"query", "raw"}, "database_query_sink"), + ({"innerhtml"}, "html_template_dom_sink"), + ({"upload", "filename"}, "file_upload_download_storage"), + ({"webhook", "fetch"}, "outbound_fetch_boundary"), + ({"session", "role"}, "identity_state_access_control"), + ({"secret"}, "secret_debug_exposure"), + ({"yaml", "deserialize"}, "parser_deserialization_integrity"), + ({"jwt", "crypto"}, "cryptographic_secret_token"), + ({"queue", "limit"}, "resource_consumption"), + ({"manifest", "lockfile"}, "supply_chain_manifest"), + (set(), "configuration_or_static_surface"), + ({"unrelated"}, "configuration_or_static_surface"), + ], +) +def test_kind_for_terms(text: set[str], expected: str) -> None: + assert _kind_for_terms(text) == expected + + +def test_kind_for_terms_first_match_wins() -> None: + """Order in ``KIND_TERMS`` is a deliberate priority list.""" + # 'queue' appears in both resource_consumption and parser_deserialization_integrity + # — KIND_TERMS lists parser earlier, so it should win for ambiguous terms in its set. + [parser_terms] = [terms for name, terms in KIND_TERMS if name == "parser_deserialization_integrity"] + # Pick an unambiguous parser-only term to confirm priority logic. + assert _kind_for_terms({"xxe"}) == "parser_deserialization_integrity" + assert "xxe" in parser_terms + + +def test_row_kind_classifies_request_boundary_evidence() -> None: + row = { + "kind": "inputs", + "path": "app/Api.php", + "line": 10, + "match": ["execute"], + "text": "shell exec", + } + assert _row_kind(row) == "command_execution_sink" + + +def test_compact_row_truncates_long_text() -> None: + row = {"kind": "inputs", "line": 1, "match": [], "text": "x" * 1000} + compact = _compact_row(row) + assert len(compact["text"]) == 240 + assert compact["kind"] == "inputs" + + +def test_compact_row_keeps_optional_keys_when_present() -> None: + row = { + "kind": "request_boundaries", + "line": 5, + "match": [], + "text": "", + "endpoint": "/api/foo", + "methods": ["POST"], + } + compact = _compact_row(row) + assert compact["endpoint"] == "/api/foo" + assert compact["methods"] == ["POST"] + + +def test_compact_row_drops_empty_optional_keys() -> None: + row = {"kind": "inputs", "line": 1, "match": [], "text": "", "endpoint": "", "methods": []} + compact = _compact_row(row) + assert "endpoint" not in compact + assert "methods" not in compact + + +def test_dedupe_rows_collapses_duplicates_and_caps_at_max() -> None: + rows = [{"kind": "inputs", "line": 1, "match": ["x"], "text": "same"}] * 5 + rows.extend( + {"kind": "inputs", "line": i, "match": ["y"], "text": f"row-{i}"} + for i in range(MAX_EVIDENCE_ROWS + 5) + ) + deduped = _dedupe_rows(rows) + assert len(deduped) <= MAX_EVIDENCE_ROWS + # The duplicate block collapses to one entry, then unique rows fill the rest. + assert sum(1 for r in deduped if r["text"] == "same") == 1 + + +def test_candidate_kinds_for_boundary_returns_request_boundary() -> None: + pair = { + "expert": "injection", + "path": "app/Api.php", + "boundary_mandatory": True, + "boundary_id": "B1", + } + assert _candidate_kinds(pair, {}) == ["request_boundary"] + + +def test_candidate_kinds_uses_expert_hints_from_rows() -> None: + pair: dict[str, Any] = { + "expert": "injection", + "path": "app/Api.php", + "matched_terms": [], + "signals": [], + "evidence": [], + } + rows_by_kind = { + "sinks": [ + {"kind": "sinks", "path": "app/Api.php", "line": 1, "match": ["exec"], "text": "shell"}, + ], + } + assert "command_execution_sink" in _candidate_kinds(pair, rows_by_kind) + + +# --------------------------------------------------------------------------- +# build_routing_units end-to-end +# --------------------------------------------------------------------------- + + +def _req(path: str, expert: str, **extra: Any) -> dict[str, Any]: + base: dict[str, Any] = { + "expert": expert, + "path": path, + "reason": "test", + "matched_terms": [], + "signals": [], + "kinds": [], + "evidence": [], + "interesting": True, + "path_class": "runtime", + } + base.update(extra) + return base + + +def test_build_routing_units_assigns_ids_in_sort_order() -> None: + coverage_gaps = { + "routing_requirements": [ + _req("app/QueryHandler.php", "injection"), + _req("app/Auth.php", "authentication-failures"), + ], + } + inventory: dict[str, list[dict[str, Any]]] = { + "sinks": [ + {"kind": "sinks", "path": "app/QueryHandler.php", "line": 1, "match": ["raw"], "text": "query"}, + {"kind": "sinks", "path": "app/Auth.php", "line": 1, "match": ["session"], "text": "auth"}, + ], + } + units = build_routing_units(coverage_gaps, inventory) + assert [u["unit_id"] for u in units] == ["U001", "U002"] + # Mandatory coverage requirements always sort first; both here are mandatory. + assert all(u["coverage"] == "mandatory" for u in units) + + +def test_build_routing_units_separates_required_from_suggested() -> None: + coverage_gaps = { + "routing_requirements": [_req("app/QueryHandler.php", "injection")], + "coverage_suggestions": [_req("app/QueryHandler.php", "broken-access-control")], + } + inventory: dict[str, list[dict[str, Any]]] = { + "sinks": [ + {"kind": "sinks", "path": "app/QueryHandler.php", "line": 1, "match": ["raw", "role"], "text": "query"}, + ], + } + units = build_routing_units(coverage_gaps, inventory) + # Both pairs target the same path; whether they merge into one unit or split + # depends on the chosen kind. Verify the expert tagging is preserved. + required = {expert for u in units for expert in u["required_experts"]} + suggested = {expert for u in units for expert in u["suggested_experts"]} + assert "injection" in required + assert "broken-access-control" in suggested + assert "injection" not in suggested + assert "broken-access-control" not in required + + +def test_build_routing_units_preserves_boundary_fields() -> None: + coverage_gaps = { + "routing_requirements": [ + _req( + "app/Api.php", + "injection", + boundary_mandatory=True, + boundary_id="B1", + endpoint="/api/run", + methods=["POST"], + boundary_type="route", + request_fields=["cmd"], + ), + ], + } + units = build_routing_units(coverage_gaps, inventory={}) + assert len(units) == 1 + unit = units[0] + assert unit["kind"] == "request_boundary" + assert unit["boundary_id"] == "B1" + assert unit["endpoint"] == "/api/run" + assert unit["methods"] == ["POST"] + + +def test_build_routing_units_emits_mandatory_path_unit_for_uncovered_gap() -> None: + coverage_gaps = { + "input_with_sink_or_exposure": [{"path": "app/Untriaged.php"}], + } + inventory: dict[str, list[dict[str, Any]]] = { + "inputs": [ + {"kind": "inputs", "path": "app/Untriaged.php", "line": 1, "match": [], "text": "raw"} + ], + } + units = build_routing_units(coverage_gaps, inventory) + assert len(units) == 1 + assert units[0]["coverage"] == "mandatory_path" + assert units[0]["required_experts"] == [] diff --git a/tests/test_schemas.py b/tests/test_schemas.py new file mode 100644 index 0000000..de9f9c5 --- /dev/null +++ b/tests/test_schemas.py @@ -0,0 +1,334 @@ +"""Layer 1: JSON Schema golden tests. + +For each durable-artifact schema we keep a minimum-valid baseline and a +table of single-field mutations that should fail validation. The assertions +check both that the validator raises and that the error message points at +the right JSON path — that way a schema change that silently loosens a rule +still trips the test. +""" + +from __future__ import annotations + +import copy +from typing import Any, Callable + +import pytest + +from openhack.schemas import ( + validate_finding, + validate_finding_candidate, + validate_finding_triage, + validate_result, + validate_scenario, +) + +SHA256 = "a" * 64 + + +# --------------------------------------------------------------------------- +# Baselines +# --------------------------------------------------------------------------- + + +def _scenario() -> dict[str, Any]: + return { + "id": "S001", + "recon_item_id": "R001", + "expert": "injection", + "target_path": "app/Http/Foo.php", + "proof_question": "Is the user-supplied id concatenated into a raw SQL query?", + "evidence_required": ["sink call", "lack of binding"], + } + + +def _scenario_result() -> dict[str, Any]: + return { + "scenario_id": "S001", + "review_mode": "per-scenario-subagent", + "subagent_id": "agent-1", + "scenario_prompt_sha256": SHA256, + "reviewed_files": ["app/Http/Foo.php"], + "status": "verified", + "expert": "injection", + "summary": "Confirmed raw SQL concatenation.", + "evidence": [ + { + "path": "app/Http/Foo.php", + "line": 42, + "snippet": "$db->raw($_GET['id'])", + "note": "user input flows directly into raw()", + } + ], + } + + +def _finding() -> dict[str, Any]: + return { + "title": "SQL injection in Foo.php", + "severity": "high", + "target_path": "app/Http/Foo.php", + "attacker_role": "unauthenticated user", + "preconditions": "Endpoint reachable without auth.", + "non_technical_summary": "An attacker can read the database.", + "summary": "Raw SQL built from user input.", + "attack_chain": "GET /foo?id=' OR 1=1 -- → raw() executes attacker SQL", + "example_attack": "curl 'http://host/foo?id=1%20OR%201=1--'", + "evidence": "See app/Http/Foo.php:42", + "impact": "Full database read.", + "impact_analysis": "User table and secrets exposed.", + "attacker_use": "Exfiltrate PII.", + "recommended_fix": "Use parameter binding.", + "validation_notes": "Reproduced locally on commit abc123.", + } + + +def _finding_candidate() -> dict[str, Any]: + return { + "candidate_id": "S001-F001", + "scenario_id": "S001", + "source_result": "scenarios/finished/S001.json", + "expert": "injection", + "status": "pending_triage", + "finding": _finding(), + } + + +def _finding_triage() -> dict[str, Any]: + return { + "candidate_id": "S001-F001", + "review_mode": "per-finding-triage-agent", + "triage_agent_id": "triage-1", + "triage_prompt_sha256": SHA256, + "reviewed_files": ["app/Http/Foo.php"], + "decision": "accepted", + "summary": "Confirmed vulnerable.", + "final_severity": "high", + "severity_rationale": "Direct DB read by unauth user.", + "confidence": "high", + "evidence_assessment": "Evidence is sufficient.", + "evidence_gaps": [], + "required_changes": [], + } + + +# --------------------------------------------------------------------------- +# Mutation helpers +# --------------------------------------------------------------------------- + + +def _drop(key: str) -> Callable[[dict[str, Any]], None]: + def mutate(value: dict[str, Any]) -> None: + value.pop(key, None) + + return mutate + + +def _set(path: list[str | int], new_value: Any) -> Callable[[dict[str, Any]], None]: + def mutate(value: dict[str, Any]) -> None: + cursor: Any = value + for part in path[:-1]: + cursor = cursor[part] + cursor[path[-1]] = new_value + + return mutate + + +# --------------------------------------------------------------------------- +# Happy paths +# --------------------------------------------------------------------------- + + +def test_scenario_baseline_validates() -> None: + validate_scenario(_scenario()) + + +def test_scenario_result_baseline_validates() -> None: + validate_result(_scenario_result(), scenario_id="S001") + + +def test_finding_baseline_validates() -> None: + validate_finding(_finding()) + + +def test_finding_candidate_baseline_validates() -> None: + validate_finding_candidate(_finding_candidate()) + + +def test_finding_triage_baseline_validates() -> None: + validate_finding_triage(_finding_triage()) + + +def test_finding_evidence_accepts_all_three_shapes() -> None: + """Schema declares ``evidence`` as ``oneOf [string, array, object]``.""" + for shape in ( + "string evidence", + [{"path": "a.php", "line": 1, "snippet": "x", "note": "n"}], + {"path": "a.php", "details": "..."}, + ): + finding = _finding() + finding["evidence"] = shape + validate_finding(finding) + + +# --------------------------------------------------------------------------- +# Negative cases — each row mutates the baseline and asserts a failure path +# --------------------------------------------------------------------------- + + +SCENARIO_CASES = [ + pytest.param(_drop("id"), "$", id="missing-id"), + pytest.param(_drop("expert"), "$", id="missing-expert"), + pytest.param(_drop("proof_question"), "$", id="missing-proof-question"), + pytest.param(_set(["id"], "S99"), "$.id", id="id-too-short"), + pytest.param(_set(["id"], "scenario-1"), "$.id", id="id-bad-prefix"), + pytest.param(_set(["evidence_required"], 7), "$.evidence_required", id="evidence-bad-type"), + pytest.param(_set(["evidence_required"], [""]), "$.evidence_required", id="evidence-array-empty-string"), + pytest.param(_set(["target_path"], ""), "$.target_path", id="target-path-empty"), + pytest.param(_set(["priority"], "urgent"), "$.priority", id="priority-bad-enum"), + pytest.param(_set(["routing_unit_id"], "unit-1"), "$.routing_unit_id", id="routing-unit-bad-pattern"), + pytest.param( + _set(["proof_obligations"], [{"id": "BAD ID", "question": "?", "evidence_required": "e"}]), + "$.proof_obligations.0.id", + id="obligation-id-bad-pattern", + ), + pytest.param( + _set(["proof_obligations"], [{"id": "ok", "question": "?"}]), + "$.proof_obligations.0", + id="obligation-missing-evidence-required", + ), +] + + +@pytest.mark.parametrize("mutate,expected_path", SCENARIO_CASES) +def test_scenario_invalid_cases(mutate: Callable[[dict[str, Any]], None], expected_path: str) -> None: + scenario = _scenario() + mutate(scenario) + with pytest.raises(ValueError) as exc: + validate_scenario(scenario) + assert expected_path in str(exc.value) + assert "scenario-schema.json" in str(exc.value) + + +RESULT_CASES = [ + pytest.param(_drop("scenario_id"), "$", id="missing-scenario-id"), + pytest.param(_set(["scenario_id"], "X1"), "$.scenario_id", id="scenario-id-bad-pattern"), + pytest.param(_set(["review_mode"], "batch"), "$.review_mode", id="review-mode-not-allowed"), + pytest.param(_set(["status"], "maybe"), "$.status", id="status-bad-enum"), + pytest.param(_set(["scenario_prompt_sha256"], "deadbeef"), "$.scenario_prompt_sha256", id="sha-too-short"), + pytest.param(_set(["reviewed_files"], []), "$.reviewed_files", id="reviewed-files-empty"), + pytest.param(_set(["evidence"], []), "$.evidence", id="evidence-empty"), + pytest.param( + _set(["evidence"], [{"path": "a.php", "line": 1, "snippet": "x"}]), + "$.evidence.0", + id="evidence-missing-note", + ), + pytest.param( + _set(["proof_obligations"], [{"id": "ok", "status": "weird", "summary": "s"}]), + "$.proof_obligations.0.status", + id="obligation-status-bad-enum", + ), +] + + +@pytest.mark.parametrize("mutate,expected_path", RESULT_CASES) +def test_scenario_result_invalid_cases( + mutate: Callable[[dict[str, Any]], None], expected_path: str +) -> None: + result = _scenario_result() + mutate(result) + with pytest.raises(ValueError) as exc: + validate_result(result, scenario_id="S001") + assert expected_path in str(exc.value) + assert "scenario-result-schema.json" in str(exc.value) + + +FINDING_CASES = [ + pytest.param(_drop("title"), "$", id="missing-title"), + pytest.param(_drop("recommended_fix"), "$", id="missing-recommended-fix"), + pytest.param(_set(["severity"], "catastrophic"), "$.severity", id="severity-bad-enum"), + pytest.param(_set(["summary"], ""), "$.summary", id="summary-empty"), + pytest.param(_set(["evidence"], 7), "$.evidence", id="evidence-bad-type"), +] + + +@pytest.mark.parametrize("mutate,expected_path", FINDING_CASES) +def test_finding_invalid_cases( + mutate: Callable[[dict[str, Any]], None], expected_path: str +) -> None: + finding = _finding() + mutate(finding) + with pytest.raises(ValueError) as exc: + validate_finding(finding) + assert expected_path in str(exc.value) + assert "finding-schema.json" in str(exc.value) + + +CANDIDATE_CASES = [ + pytest.param(_drop("candidate_id"), "$", id="missing-candidate-id"), + pytest.param(_set(["candidate_id"], "S001-001"), "$.candidate_id", id="candidate-id-bad-pattern"), + pytest.param(_set(["candidate_id"], "S1-F1"), "$.candidate_id", id="candidate-id-too-short"), + pytest.param(_set(["status"], "accepted"), "$.status", id="status-not-pending-triage"), + pytest.param(_set(["scenario_id"], "scn-1"), "$.scenario_id", id="scenario-id-bad-pattern"), +] + + +@pytest.mark.parametrize("mutate,expected_path", CANDIDATE_CASES) +def test_finding_candidate_invalid_cases( + mutate: Callable[[dict[str, Any]], None], expected_path: str +) -> None: + candidate = _finding_candidate() + mutate(candidate) + with pytest.raises(ValueError) as exc: + validate_finding_candidate(candidate) + assert expected_path in str(exc.value) + assert "finding-candidate-schema.json" in str(exc.value) + + +TRIAGE_CASES = [ + pytest.param(_drop("decision"), "$", id="missing-decision"), + pytest.param(_drop("evidence_gaps"), "$", id="missing-evidence-gaps"), + pytest.param(_set(["decision"], "approved"), "$.decision", id="decision-bad-enum"), + pytest.param(_set(["review_mode"], "per-scenario-subagent"), "$.review_mode", id="review-mode-wrong"), + pytest.param(_set(["final_severity"], "huge"), "$.final_severity", id="severity-bad-enum"), + pytest.param(_set(["confidence"], "very-high"), "$.confidence", id="confidence-bad-enum"), + pytest.param(_set(["triage_prompt_sha256"], "ZZZ"), "$.triage_prompt_sha256", id="sha-bad-pattern"), + pytest.param(_set(["reviewed_files"], []), "$.reviewed_files", id="reviewed-files-empty"), +] + + +@pytest.mark.parametrize("mutate,expected_path", TRIAGE_CASES) +def test_finding_triage_invalid_cases( + mutate: Callable[[dict[str, Any]], None], expected_path: str +) -> None: + triage = _finding_triage() + mutate(triage) + with pytest.raises(ValueError) as exc: + validate_finding_triage(triage) + assert expected_path in str(exc.value) + assert "finding-triage-schema.json" in str(exc.value) + + +def test_validator_reports_multiple_errors() -> None: + """The error message bullets each violation so authors can fix in one pass.""" + scenario = _scenario() + scenario.pop("id") + scenario.pop("expert") + scenario["target_path"] = "" + with pytest.raises(ValueError) as exc: + validate_scenario(scenario) + message = str(exc.value) + # Each missing required field surfaces as a separate bullet. + assert message.count("\n- ") >= 3 + + +def test_baselines_are_independent() -> None: + """Mutating one fixture instance must not leak into another.""" + a = _scenario() + b = _scenario() + a["id"] = "S999" + assert b["id"] == "S001" + # And the deep-copy assumption holds for nested structures. + a2 = copy.deepcopy(_scenario_result()) + a2["evidence"][0]["note"] = "mutated" + fresh = _scenario_result() + assert fresh["evidence"][0]["note"] != "mutated" From caf61b26d5a3b7100263a726e0bd4ef2fdd29d29 Mon Sep 17 00:00:00 2001 From: Rutger van Waveren Date: Wed, 20 May 2026 15:30:58 +0200 Subject: [PATCH 2/2] test: address review feedback on PR #6 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Gaps: - Add coverage for write_coverage and write_routing_units (the disk-side entry points called from the CLI). They were the largest untested seam. - Add record_backlog test for the expert_scope restriction path via run-config.yaml, plus an audit-event assertion on events.jsonl. - Add full coverage of boundary_requirements in coverage_errors: flagged-when-missing, satisfied-by-scenario-boundary-id, satisfied-by-covered_boundary_ids list, satisfied-by-recon_item_id fallback, satisfied-by-boundary-id decision, and the negative case where a decision without a boundary_id does not satisfy. - Rewrite scenario/decision predicate tests to go through coverage_errors rather than calling the private _scenario_covers_* / _validate_decisions helpers. Same coverage, resilient to internal refactors. Cleanup: - Fix test_kind_for_terms_first_match_wins to actually exercise priority (it now uses the 'template' overlap between html_template_dom_sink and parser_deserialization_integrity, plus the 'token' overlap). - Drop test_baselines_are_independent — the helpers return fresh dicts by construction so the assertion was tautological. - Replace the brittle bullet-count assertion in test_validator_reports_multiple_errors with field-name substring checks. - Drop change-detector constants tests (DECISIONS, PRODUCTIVE_CLASSES, MAX_REQUIREMENTS_PER_PATH) — they only fired when someone updated the constant. Nice-to-haves: - CI now caches pip via setup-python's cache: pip. - Split CI into a lint job (3.11) and a test job that matrixes pytest across 3.9, 3.11, 3.12 — pyproject declares requires-python>=3.9. - Remove empty tests/__init__.py (pytest discovers without it). - Hoist the ALL_RUN_DIRS import in conftest to module-level. Co-Authored-By: Claude Opus 4.7 (1M context) --- .github/workflows/ci.yml | 18 ++ tests/__init__.py | 0 tests/conftest.py | 4 +- tests/test_backlog.py | 415 ++++++++++++++++++++---------------- tests/test_coverage.py | 66 ++++-- tests/test_routing_units.py | 57 ++++- tests/test_schemas.py | 23 +- 7 files changed, 352 insertions(+), 231 deletions(-) delete mode 100644 tests/__init__.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9148c85..b1f16fc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,11 +13,29 @@ jobs: - uses: actions/setup-python@v5 with: python-version: "3.11" + cache: pip + cache-dependency-path: pyproject.toml - name: Install dev dependencies run: python -m pip install --upgrade pip && pip install -e ".[dev]" - name: Ruff run: ruff check . - name: Mypy run: mypy + + test: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.9", "3.11", "3.12"] + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + cache: pip + cache-dependency-path: pyproject.toml + - name: Install dev dependencies + run: python -m pip install --upgrade pip && pip install -e ".[dev]" - name: Pytest run: pytest diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/conftest.py b/tests/conftest.py index c9bf42e..1851b41 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -12,6 +12,8 @@ import pytest +from openhack.paths import ALL_RUN_DIRS + REPO_ROOT = Path(__file__).resolve().parent.parent @@ -23,8 +25,6 @@ def _pin_openhack_root(monkeypatch: pytest.MonkeyPatch) -> None: @pytest.fixture() def run_dir(tmp_path: Path) -> Path: """A scratch run directory with the standard subdirs created.""" - from openhack.paths import ALL_RUN_DIRS - for name in ALL_RUN_DIRS: (tmp_path / name).mkdir(parents=True, exist_ok=True) return tmp_path diff --git a/tests/test_backlog.py b/tests/test_backlog.py index 54c3ba4..7c4f354 100644 --- a/tests/test_backlog.py +++ b/tests/test_backlog.py @@ -1,4 +1,9 @@ -"""Layer 2: scenario backlog validation and write-out.""" +"""Layer 2: scenario backlog validation and write-out. + +Tests prefer the public entry points (``coverage_errors``, ``record_backlog``) +over private predicates. The private helpers are still exercised — just +through the API a real caller uses. +""" from __future__ import annotations @@ -9,251 +14,261 @@ import pytest from openhack import backlog -from openhack.backlog import ( - DECISIONS, - _scenario_covers_boundary, - _scenario_covers_pair, - _scenario_covers_path, - _scenario_covers_unit, - _scenario_paths, - _validate_decisions, - coverage_errors, - record_backlog, -) - -EXPERTS = { - "injection", - "broken-access-control", - "authentication-failures", - "cryptographic-failures", -} +from openhack.backlog import coverage_errors, record_backlog # --------------------------------------------------------------------------- -# _scenario_paths +# Fixture builders # --------------------------------------------------------------------------- -def test_scenario_paths_collects_from_all_fields() -> None: - scenario: dict[str, Any] = { - "target_path": "app/Foo.php", - "target_paths": ["app/Bar.php", "app/Baz.php"], - "related_paths": "app/Util.php", - "covered_paths": ["app/Inc.php"], - } - assert _scenario_paths(scenario) == { - "app/Foo.php", - "app/Bar.php", - "app/Baz.php", - "app/Util.php", - "app/Inc.php", - } +def _scn(**overrides: Any) -> dict[str, Any]: + base: dict[str, Any] = {"id": "S001", "expert": "injection", "target_path": "app/Foo.php"} + base.update(overrides) + return base + + +def _write_coverage(path: Path, payload: dict[str, Any]) -> None: + (path / "recon-output").mkdir(parents=True, exist_ok=True) + (path / "recon-output" / "coverage-gaps.json").write_text(json.dumps(payload)) -def test_scenario_paths_handles_missing_fields_and_filters_empty() -> None: - scenario: dict[str, Any] = {"target_path": "app/A.php", "related_paths": []} - assert _scenario_paths(scenario) == {"app/A.php"} +def _write_units(path: Path, units: list[dict[str, Any]]) -> None: + (path / "recon-output").mkdir(parents=True, exist_ok=True) + (path / "recon-output" / "routing-units.jsonl").write_text( + "".join(json.dumps(u) + "\n" for u in units) + ) # --------------------------------------------------------------------------- -# Scenario coverage predicates +# coverage_errors — exercises the scenario/decision predicates as a side effect # --------------------------------------------------------------------------- -def _scn(**overrides: Any) -> dict[str, Any]: - base: dict[str, Any] = {"id": "S001", "expert": "injection", "target_path": "app/Foo.php"} - base.update(overrides) - return base +def test_path_requirement_flagged_when_no_scenario_covers_it(run_dir: Path) -> None: + _write_coverage(run_dir, {"input_with_sink_or_exposure": [{"path": "app/Untouched.php"}]}) + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=[]) + assert any("missing path coverage for app/Untouched.php" in e for e in errors) -def test_scenario_covers_path_matches_target_path() -> None: - assert _scenario_covers_path([_scn()], "app/Foo.php") - assert not _scenario_covers_path([_scn()], "app/Bar.php") +def test_path_requirement_satisfied_by_target_path(run_dir: Path) -> None: + _write_coverage(run_dir, {"input_with_sink_or_exposure": [{"path": "app/Foo.php"}]}) + errors = coverage_errors(run_dir, scenarios=[_scn()], coverage_decisions=[]) + assert not any("missing path coverage" in e for e in errors) -def test_scenario_covers_path_also_matches_related_paths() -> None: +def test_path_requirement_satisfied_by_related_paths(run_dir: Path) -> None: + """``_scenario_paths`` must consider ``related_paths`` as well as ``target_path``.""" + _write_coverage(run_dir, {"input_with_sink_or_exposure": [{"path": "app/Bar.php"}]}) scn = _scn(related_paths=["app/Bar.php"]) - assert _scenario_covers_path([scn], "app/Bar.php") + errors = coverage_errors(run_dir, scenarios=[scn], coverage_decisions=[]) + assert not any("missing path coverage" in e for e in errors) -def test_scenario_covers_pair_requires_expert_and_path() -> None: - scn = _scn() - assert _scenario_covers_pair([scn], "app/Foo.php", "injection") - assert not _scenario_covers_pair([scn], "app/Foo.php", "cryptographic-failures") - assert not _scenario_covers_pair([scn], "app/Bar.php", "injection") +def test_path_requirement_satisfied_by_covered_paths_list(run_dir: Path) -> None: + _write_coverage(run_dir, {"input_with_sink_or_exposure": [{"path": "app/Inc.php"}]}) + scn = _scn(covered_paths=["app/Inc.php"]) + errors = coverage_errors(run_dir, scenarios=[scn], coverage_decisions=[]) + assert not any("missing path coverage" in e for e in errors) -def test_scenario_covers_boundary_by_boundary_id() -> None: - scn = _scn(boundary_id="B1") - req = {"boundary_id": "B1", "expert": "injection"} - assert _scenario_covers_boundary([scn], req) - req2 = {"boundary_id": "B2", "expert": "injection"} - assert not _scenario_covers_boundary([scn], req2) +def test_path_requirement_satisfied_by_path_level_decision(run_dir: Path) -> None: + _write_coverage(run_dir, {"input_with_sink_or_exposure": [{"path": "app/Untouched.php"}]}) + decisions = [{ + "path": "app/Untouched.php", "expert": "*", + "decision": "not_applicable", "reason": "framework-owned, not invocable by users", + }] + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=decisions) + assert not any("missing path coverage" in e for e in errors) -def test_scenario_covers_boundary_by_covered_boundary_ids_list() -> None: - scn = _scn(covered_boundary_ids=["B1", "B2"]) - req = {"boundary_id": "B2", "expert": "injection"} - assert _scenario_covers_boundary([scn], req) +def test_pair_requirement_flagged_when_expert_mismatches(run_dir: Path) -> None: + _write_coverage(run_dir, { + "routing_requirements": [{"path": "app/Foo.php", "expert": "injection"}], + }) + # Scenario covers the path but with a different expert. + scn = _scn(expert="cryptographic-failures") + errors = coverage_errors(run_dir, scenarios=[scn], coverage_decisions=[]) + assert any("missing expert coverage for app/Foo.php -> injection" in e for e in errors) -def test_scenario_covers_boundary_falls_back_to_recon_item_id() -> None: - scn = _scn(recon_item_id="R1") - req = {"recon_item_id": "R1", "expert": "injection"} - assert _scenario_covers_boundary([scn], req) +def test_pair_requirement_satisfied_by_matching_scenario(run_dir: Path) -> None: + _write_coverage(run_dir, { + "routing_requirements": [{"path": "app/Foo.php", "expert": "injection"}], + }) + errors = coverage_errors(run_dir, scenarios=[_scn()], coverage_decisions=[]) + assert not any("missing expert coverage" in e for e in errors) -def test_scenario_covers_unit_by_routing_unit_id_or_covered_list() -> None: - direct = _scn(routing_unit_id="U001") - covered = _scn(id="S002", covered_routing_unit_ids=["U002"]) - assert _scenario_covers_unit([direct], "U001", "injection") - assert _scenario_covers_unit([covered], "U002", "injection") - assert not _scenario_covers_unit([direct], "U001", "cryptographic-failures") +def test_routing_unit_satisfied_by_scenario_with_unit_id(run_dir: Path) -> None: + _write_units(run_dir, [{ + "unit_id": "U001", + "path": "app/Foo.php", + "coverage": "mandatory", + "required_experts": ["injection"], + }]) + scn = _scn(routing_unit_id="U001") + errors = coverage_errors(run_dir, scenarios=[scn], coverage_decisions=[]) + assert not any("missing routing-unit" in e for e in errors) -# --------------------------------------------------------------------------- -# _validate_decisions -# --------------------------------------------------------------------------- +def test_routing_unit_satisfied_by_covered_routing_unit_ids(run_dir: Path) -> None: + """A scenario can claim coverage over a unit it isn't the primary owner of.""" + _write_units(run_dir, [{ + "unit_id": "U002", + "path": "app/Foo.php", + "coverage": "mandatory", + "required_experts": ["injection"], + }]) + scn = _scn(routing_unit_id="U001", covered_routing_unit_ids=["U002"]) + errors = coverage_errors(run_dir, scenarios=[scn], coverage_decisions=[]) + assert not any("missing routing-unit" in e for e in errors) -def test_validate_decisions_rejects_unknown_decision_value() -> None: - decisions = [{"path": "a.php", "expert": "injection", "decision": "wat", "reason": "x" * 25}] - errors = _validate_decisions(decisions, scenarios=[], experts=EXPERTS) - assert any("invalid decision" in e for e in errors) +def test_routing_unit_flagged_when_no_scenario_or_decision(run_dir: Path) -> None: + _write_units(run_dir, [{ + "unit_id": "U001", + "path": "app/Foo.php", + "coverage": "mandatory", + "required_experts": ["injection"], + }]) + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=[]) + assert any("missing routing-unit expert coverage for U001" in e for e in errors) -def test_validate_decisions_requires_path() -> None: - decisions = [{"decision": "not_applicable", "reason": "x" * 25}] - errors = _validate_decisions(decisions, scenarios=[], experts=EXPERTS) - assert any("missing path" in e for e in errors) +# --------------------------------------------------------------------------- +# Boundary-requirement coverage (item 4 in review) +# --------------------------------------------------------------------------- -def test_validate_decisions_requires_scenario_ids_for_coverage_claims() -> None: - for decision_value in ("covered_by_scenario", "merged", "scenario"): - decisions = [{"path": "a.php", "expert": "injection", "decision": decision_value}] - errors = _validate_decisions(decisions, scenarios=[], experts=EXPERTS) - assert any("must reference scenario_ids" in e for e in errors), decision_value +def _boundary_req(**overrides: Any) -> dict[str, Any]: + base: dict[str, Any] = { + "path": "app/Api.php", + "expert": "injection", + "boundary_id": "B1", + "endpoint": "/api/run", + } + base.update(overrides) + return base -def test_validate_decisions_flags_unknown_scenario_id() -> None: - decisions = [{ - "path": "a.php", - "expert": "injection", - "decision": "covered_by_scenario", - "scenario_ids": ["S999"], - }] - scenarios = [_scn()] - errors = _validate_decisions(decisions, scenarios=scenarios, experts=EXPERTS) - assert any("references unknown" in e and "S999" in e for e in errors) +def test_boundary_requirement_flagged_without_scenario_or_decision(run_dir: Path) -> None: + _write_coverage(run_dir, {"boundary_requirements": [_boundary_req()]}) + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=[]) + assert any( + "missing request-boundary coverage for app/Api.php -> injection -> /api/run" in e + for e in errors + ) -def test_validate_decisions_requires_substantive_reason_for_dismissals() -> None: - # 'not_applicable' is a dismissal — short reason is rejected. - decisions = [{ - "path": "a.php", "expert": "injection", "decision": "not_applicable", "reason": "no" - }] - errors = _validate_decisions(decisions, scenarios=[], experts=EXPERTS) - assert any("needs a concrete reason" in e for e in errors) +def test_boundary_requirement_satisfied_by_scenario_with_boundary_id(run_dir: Path) -> None: + _write_coverage(run_dir, {"boundary_requirements": [_boundary_req()]}) + scn = _scn(boundary_id="B1") + errors = coverage_errors(run_dir, scenarios=[scn], coverage_decisions=[]) + assert not any("missing request-boundary" in e for e in errors) + + +def test_boundary_requirement_satisfied_by_covered_boundary_ids_list(run_dir: Path) -> None: + _write_coverage(run_dir, {"boundary_requirements": [_boundary_req()]}) + scn = _scn(covered_boundary_ids=["B1", "B2"]) + errors = coverage_errors(run_dir, scenarios=[scn], coverage_decisions=[]) + assert not any("missing request-boundary" in e for e in errors) -def test_validate_decisions_accepts_wildcard_expert() -> None: - decisions = [{"path": "a.php", "expert": "*", "decision": "not_applicable", "reason": "x" * 25}] - errors = _validate_decisions(decisions, scenarios=[], experts=EXPERTS) - assert errors == [] +def test_boundary_requirement_satisfied_by_recon_item_id_fallback(run_dir: Path) -> None: + """Boundary req without scenario boundary_id can be matched by recon_item_id.""" + _write_coverage(run_dir, { + "boundary_requirements": [_boundary_req(recon_item_id="R1")], + }) + scn = _scn(recon_item_id="R1") + errors = coverage_errors(run_dir, scenarios=[scn], coverage_decisions=[]) + assert not any("missing request-boundary" in e for e in errors) -def test_validate_decisions_rejects_unknown_expert() -> None: +def test_boundary_requirement_satisfied_by_boundary_id_decision(run_dir: Path) -> None: + _write_coverage(run_dir, {"boundary_requirements": [_boundary_req()]}) decisions = [{ - "path": "a.php", "expert": "made-up-expert", - "decision": "not_applicable", "reason": "x" * 25, + "path": "app/Api.php", "expert": "injection", "boundary_id": "B1", + "decision": "not_applicable", "reason": "internal admin endpoint behind VPN", }] - errors = _validate_decisions(decisions, scenarios=[], experts=EXPERTS) - assert any("unknown expert" in e for e in errors) + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=decisions) + assert not any("missing request-boundary" in e for e in errors) -def test_decisions_constant_lists_every_decision_kind() -> None: - """The set is consulted by router prompts; lock it in.""" - assert DECISIONS == { - "scenario", "covered_by_scenario", "merged", - "not_applicable", "needs_context", "out_of_scope", - } +def test_boundary_decision_without_boundary_id_does_not_satisfy(run_dir: Path) -> None: + """``_has_boundary_decision`` requires the boundary_id to match exactly.""" + _write_coverage(run_dir, {"boundary_requirements": [_boundary_req()]}) + decisions = [{ + "path": "app/Api.php", "expert": "injection", + "decision": "not_applicable", "reason": "internal admin endpoint behind VPN", + }] + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=decisions) + assert any("missing request-boundary" in e for e in errors) # --------------------------------------------------------------------------- -# coverage_errors +# Decision validation (exercised through coverage_errors) # --------------------------------------------------------------------------- -def _write_coverage(path: Path, payload: dict[str, Any]) -> None: - (path / "recon-output").mkdir(parents=True, exist_ok=True) - (path / "recon-output" / "coverage-gaps.json").write_text(json.dumps(payload)) +def test_decision_with_unknown_value_is_flagged(run_dir: Path) -> None: + decisions = [{"path": "a.php", "expert": "injection", "decision": "wat", "reason": "x" * 25}] + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=decisions) + assert any("invalid decision" in e for e in errors) -def _write_units(path: Path, units: list[dict[str, Any]]) -> None: - (path / "recon-output").mkdir(parents=True, exist_ok=True) - (path / "recon-output" / "routing-units.jsonl").write_text( - "".join(json.dumps(u) + "\n" for u in units) - ) +def test_decision_missing_path_is_flagged(run_dir: Path) -> None: + decisions = [{"decision": "not_applicable", "reason": "x" * 25}] + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=decisions) + assert any("missing path" in e for e in errors) -def test_coverage_errors_flags_uncovered_path(run_dir: Path) -> None: - _write_coverage(run_dir, {"input_with_sink_or_exposure": [{"path": "app/Untouched.php"}]}) - errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=[]) - assert any("missing path coverage for app/Untouched.php" in e for e in errors) +@pytest.mark.parametrize("decision_value", ["covered_by_scenario", "merged", "scenario"]) +def test_coverage_claim_decision_requires_scenario_ids( + run_dir: Path, decision_value: str +) -> None: + decisions = [{"path": "a.php", "expert": "injection", "decision": decision_value}] + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=decisions) + assert any("must reference scenario_ids" in e for e in errors) -def test_coverage_errors_path_decision_satisfies_uncovered_path(run_dir: Path) -> None: - _write_coverage(run_dir, {"input_with_sink_or_exposure": [{"path": "app/Untouched.php"}]}) +def test_decision_referencing_unknown_scenario_is_flagged(run_dir: Path) -> None: decisions = [{ - "path": "app/Untouched.php", "expert": "*", - "decision": "not_applicable", "reason": "framework-owned, not invocable by users", + "path": "a.php", "expert": "injection", + "decision": "covered_by_scenario", "scenario_ids": ["S999"], }] - errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=decisions) - assert not any("missing path coverage" in e for e in errors) - - -def test_coverage_errors_flags_unrouted_required_pair(run_dir: Path) -> None: - _write_coverage(run_dir, { - "routing_requirements": [{"path": "app/Foo.php", "expert": "injection"}], - }) - errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=[]) - assert any("missing expert coverage for app/Foo.php -> injection" in e for e in errors) + errors = coverage_errors(run_dir, scenarios=[_scn()], coverage_decisions=decisions) + assert any("references unknown" in e and "S999" in e for e in errors) -def test_coverage_errors_satisfied_by_scenario_targeting_the_pair(run_dir: Path) -> None: - _write_coverage(run_dir, { - "routing_requirements": [{"path": "app/Foo.php", "expert": "injection"}], - }) - scn = _scn() # target_path=app/Foo.php, expert=injection - errors = coverage_errors(run_dir, scenarios=[scn], coverage_decisions=[]) - assert not any("missing expert coverage" in e for e in errors) +def test_dismissal_decision_requires_substantive_reason(run_dir: Path) -> None: + decisions = [{ + "path": "a.php", "expert": "injection", "decision": "not_applicable", "reason": "no" + }] + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=decisions) + assert any("needs a concrete reason" in e for e in errors) -def test_coverage_errors_flags_missing_routing_unit_coverage(run_dir: Path) -> None: - _write_units(run_dir, [{ - "unit_id": "U001", - "path": "app/Foo.php", - "coverage": "mandatory", - "required_experts": ["injection"], - }]) - errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=[]) - assert any("missing routing-unit expert coverage for U001" in e for e in errors) +def test_decision_with_wildcard_expert_is_accepted(run_dir: Path) -> None: + decisions = [{ + "path": "a.php", "expert": "*", + "decision": "not_applicable", "reason": "x" * 25, + }] + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=decisions) + assert not any("invalid decision" in e or "unknown expert" in e for e in errors) -def test_coverage_errors_routing_unit_satisfied_by_scenario_with_unit_id(run_dir: Path) -> None: - _write_units(run_dir, [{ - "unit_id": "U001", - "path": "app/Foo.php", - "coverage": "mandatory", - "required_experts": ["injection"], - }]) - scn = _scn(routing_unit_id="U001") - errors = coverage_errors(run_dir, scenarios=[scn], coverage_decisions=[]) - assert not any("missing routing-unit" in e for e in errors) +def test_decision_with_unknown_expert_is_flagged(run_dir: Path) -> None: + decisions = [{ + "path": "a.php", "expert": "made-up-expert", + "decision": "not_applicable", "reason": "x" * 25, + }] + errors = coverage_errors(run_dir, scenarios=[], coverage_decisions=decisions) + assert any("unknown expert" in e for e in errors) # --------------------------------------------------------------------------- -# record_backlog — happy path + key error gates +# record_backlog — full pipeline including emit() log + scope check # --------------------------------------------------------------------------- @@ -275,9 +290,7 @@ def _valid_scenario(scn_id: str = "S001", **overrides: Any) -> dict[str, Any]: @pytest.fixture() -def patched_run_dir( - run_dir: Path, monkeypatch: pytest.MonkeyPatch -) -> Path: +def patched_run_dir(run_dir: Path, monkeypatch: pytest.MonkeyPatch) -> Path: """Redirect ``run_path`` so ``record_backlog`` writes into tmp.""" monkeypatch.setattr(backlog, "run_path", lambda target, run_id: run_dir) return run_dir @@ -299,17 +312,55 @@ def test_record_backlog_writes_scenario_files_on_happy_path( assert [s["id"] for s in result] == ["S001"] written = patched_run_dir / "scenarios" / "backlog" / "S001.json" - assert written.is_file() payload = json.loads(written.read_text()) - # DEFAULTS are layered in by record_backlog. - assert payload["priority"] == "normal" + assert payload["priority"] == "normal" # DEFAULTS layered in assert payload["result_location"] == "scenarios/finished/S001.json" index = patched_run_dir / "scenarios" / "index.jsonl" assert index.read_text().strip().count("\n") == 0 # one line, no trailing extras - decisions = patched_run_dir / "scenarios" / "coverage-decisions.json" - assert json.loads(decisions.read_text())["coverage_decisions"] == [] + +def test_record_backlog_emits_audit_event( + patched_run_dir: Path, tmp_path: Path +) -> None: + """The recorder must log a ``scenario-router/complete`` event for auditing.""" + router = tmp_path / "router.json" + router.write_text(json.dumps(_router_output([_valid_scenario()]))) + record_backlog("acme", "demo", router) + + events = patched_run_dir / "logs" / "events.jsonl" + assert events.is_file() + lines = [json.loads(line) for line in events.read_text().splitlines() if line.strip()] + assert any( + e.get("actor") == "scenario-router" and e.get("status") == "complete" + for e in lines + ) + + +def test_record_backlog_rejects_scenario_using_unselected_expert( + patched_run_dir: Path, tmp_path: Path +) -> None: + """A run-config that restricts experts must block out-of-scope scenarios.""" + (patched_run_dir / "run-config.yaml").write_text( + 'expert_scope:\n mode: "selected"\n experts:\n - "injection"\n' + ) + scn = _valid_scenario(expert="broken-access-control") + router = tmp_path / "router.json" + router.write_text(json.dumps(_router_output([scn]))) + with pytest.raises(ValueError, match="uses unselected expert"): + record_backlog("acme", "demo", router) + + +def test_record_backlog_accepts_scenario_with_selected_expert( + patched_run_dir: Path, tmp_path: Path +) -> None: + (patched_run_dir / "run-config.yaml").write_text( + 'expert_scope:\n mode: "selected"\n experts:\n - "injection"\n' + ) + router = tmp_path / "router.json" + router.write_text(json.dumps(_router_output([_valid_scenario()]))) + result = record_backlog("acme", "demo", router) + assert [s["id"] for s in result] == ["S001"] def test_record_backlog_rejects_unknown_expert( diff --git a/tests/test_coverage.py b/tests/test_coverage.py index 64a6e1f..f18fbe6 100644 --- a/tests/test_coverage.py +++ b/tests/test_coverage.py @@ -8,21 +8,20 @@ from __future__ import annotations +import json +from pathlib import Path from typing import Any import pytest from openhack.coverage import ( - MAX_REQUIREMENTS_PER_PATH, - PRODUCTIVE_CLASSES, - SUGGESTION_LIMIT, _path_class, _score_pair, - _source_or_sink, _tokens, coverage_opportunities, coverage_suggestions, routing_requirements, + write_coverage, ) @@ -67,13 +66,6 @@ def test_path_class(path: str, expected: str) -> None: assert _path_class(path) == expected -def test_productive_classes_match_expectations() -> None: - """Sanity-check the productive set — scoring depends on this membership.""" - assert PRODUCTIVE_CLASSES == { - "client", "config", "manifest", "runtime", "script", "template" - } - - # --------------------------------------------------------------------------- # Tokenizer # --------------------------------------------------------------------------- @@ -157,12 +149,6 @@ def test_score_runtime_with_strong_terms_and_sink_is_high() -> None: assert "source, sink" in reason or "boundary evidence" in reason -def test_source_or_sink_truthy_for_boundary() -> None: - assert _source_or_sink(_pair(boundary_mandatory=True)) is True - assert _source_or_sink(_pair(interesting=True)) is True - assert _source_or_sink(_pair()) is False - - # --------------------------------------------------------------------------- # End-to-end: candidate pair generation from inventory # --------------------------------------------------------------------------- @@ -255,7 +241,45 @@ def test_coverage_suggestions_skip_required_pairs() -> None: assert all(s["path"] != "app/QueryHandler.php" for s in sugs) -def test_constants_have_expected_values() -> None: - """Lock in the public limits referenced from docs/AGENTS.md.""" - assert MAX_REQUIREMENTS_PER_PATH == 4 - assert SUGGESTION_LIMIT == 500 +# --------------------------------------------------------------------------- +# write_coverage — disk-side entry point called from the CLI +# --------------------------------------------------------------------------- + + +def test_write_coverage_emits_coverage_gaps_json(run_dir: Path) -> None: + inventory: dict[str, list[dict[str, Any]]] = { + "inputs": [_inv_row("inputs", "app/QueryHandler.php", match=["query"])], + "sinks": [_inv_row("sinks", "app/QueryHandler.php", match=["raw"])], + } + out = write_coverage(run_dir, inventory, recon_items=None) + + assert out == run_dir / "recon-output" / "coverage-gaps.json" + payload = json.loads(out.read_text()) + + # The five sections the rest of the pipeline consumes. + for key in ( + "input_with_sink_or_exposure", + "request_boundaries", + "boundary_requirements", + "expert_opportunities", + "routing_requirements", + "coverage_suggestions", + "triage_summary", + ): + assert key in payload, f"missing top-level key: {key}" + + summary = payload["triage_summary"] + assert summary["hard_routing_requirements"] == len(payload["routing_requirements"]) + assert summary["expert_scope"] == "unconfigured-all" + # No run-config.yaml → all 12 expert IDs end up in the scope. + assert len(summary["selected_experts"]) == 12 + + +def test_write_coverage_honours_run_config_expert_scope(run_dir: Path) -> None: + (run_dir / "run-config.yaml").write_text( + 'expert_scope:\n mode: "selected"\n experts:\n - "injection"\n' + ) + out = write_coverage(run_dir, inventory={"inputs": []}, recon_items=None) + summary = json.loads(out.read_text())["triage_summary"] + assert summary["expert_scope"] == "selected" + assert summary["selected_experts"] == ["injection"] diff --git a/tests/test_routing_units.py b/tests/test_routing_units.py index 142425a..e0e6064 100644 --- a/tests/test_routing_units.py +++ b/tests/test_routing_units.py @@ -2,6 +2,8 @@ from __future__ import annotations +import json +from pathlib import Path from typing import Any import pytest @@ -15,6 +17,7 @@ _kind_for_terms, _row_kind, build_routing_units, + write_routing_units, ) @@ -40,14 +43,20 @@ def test_kind_for_terms(text: set[str], expected: str) -> None: assert _kind_for_terms(text) == expected -def test_kind_for_terms_first_match_wins() -> None: - """Order in ``KIND_TERMS`` is a deliberate priority list.""" - # 'queue' appears in both resource_consumption and parser_deserialization_integrity - # — KIND_TERMS lists parser earlier, so it should win for ambiguous terms in its set. - [parser_terms] = [terms for name, terms in KIND_TERMS if name == "parser_deserialization_integrity"] - # Pick an unambiguous parser-only term to confirm priority logic. - assert _kind_for_terms({"xxe"}) == "parser_deserialization_integrity" - assert "xxe" in parser_terms +def test_kind_for_terms_first_match_wins_on_overlap() -> None: + """``KIND_TERMS`` order is a deliberate priority list. + + ``template`` appears in both ``html_template_dom_sink`` (earlier) and + ``parser_deserialization_integrity`` (later); the earlier entry must win. + """ + by_name = dict(KIND_TERMS) + assert "template" in by_name["html_template_dom_sink"] + assert "template" in by_name["parser_deserialization_integrity"] + assert _kind_for_terms({"template"}) == "html_template_dom_sink" + # Same overlap pattern for ``token`` between identity and secret-exposure. + assert "token" in by_name["identity_state_access_control"] + assert "token" in by_name["secret_debug_exposure"] + assert _kind_for_terms({"token"}) == "identity_state_access_control" def test_row_kind_classifies_request_boundary_evidence() -> None: @@ -225,3 +234,35 @@ def test_build_routing_units_emits_mandatory_path_unit_for_uncovered_gap() -> No assert len(units) == 1 assert units[0]["coverage"] == "mandatory_path" assert units[0]["required_experts"] == [] + + +# --------------------------------------------------------------------------- +# write_routing_units — disk-side entry point called from the CLI +# --------------------------------------------------------------------------- + + +def test_write_routing_units_emits_jsonl_one_unit_per_line(run_dir: Path) -> None: + (run_dir / "recon-output").mkdir(parents=True, exist_ok=True) + (run_dir / "recon-output" / "coverage-gaps.json").write_text(json.dumps({ + "routing_requirements": [_req("app/QueryHandler.php", "injection")], + })) + inventory: dict[str, list[dict[str, Any]]] = { + "sinks": [ + {"kind": "sinks", "path": "app/QueryHandler.php", "line": 1, "match": ["raw"], "text": "query"}, + ], + } + out = write_routing_units(run_dir, inventory) + + assert out == run_dir / "recon-output" / "routing-units.jsonl" + lines = [line for line in out.read_text().splitlines() if line.strip()] + assert len(lines) == 1 + unit = json.loads(lines[0]) + assert unit["unit_id"] == "U001" + assert unit["path"] == "app/QueryHandler.php" + assert "injection" in unit["required_experts"] + + +def test_write_routing_units_with_no_coverage_file_writes_empty(run_dir: Path) -> None: + out = write_routing_units(run_dir, inventory={}) + assert out.exists() + assert out.read_text() == "" diff --git a/tests/test_schemas.py b/tests/test_schemas.py index de9f9c5..711af97 100644 --- a/tests/test_schemas.py +++ b/tests/test_schemas.py @@ -9,7 +9,6 @@ from __future__ import annotations -import copy from typing import Any, Callable import pytest @@ -308,8 +307,8 @@ def test_finding_triage_invalid_cases( assert "finding-triage-schema.json" in str(exc.value) -def test_validator_reports_multiple_errors() -> None: - """The error message bullets each violation so authors can fix in one pass.""" +def test_validator_reports_each_violation() -> None: + """The error message must name each failing field so authors fix in one pass.""" scenario = _scenario() scenario.pop("id") scenario.pop("expert") @@ -317,18 +316,6 @@ def test_validator_reports_multiple_errors() -> None: with pytest.raises(ValueError) as exc: validate_scenario(scenario) message = str(exc.value) - # Each missing required field surfaces as a separate bullet. - assert message.count("\n- ") >= 3 - - -def test_baselines_are_independent() -> None: - """Mutating one fixture instance must not leak into another.""" - a = _scenario() - b = _scenario() - a["id"] = "S999" - assert b["id"] == "S001" - # And the deep-copy assumption holds for nested structures. - a2 = copy.deepcopy(_scenario_result()) - a2["evidence"][0]["note"] = "mutated" - fresh = _scenario_result() - assert fresh["evidence"][0]["note"] != "mutated" + assert "'id' is a required property" in message + assert "'expert' is a required property" in message + assert "target_path" in message