Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .claude/commands/recover-failed-ingest.md
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,12 @@ python3 utils/validate_reusable_sweep_artifacts.py \
--artifacts-dir /tmp/source-artifacts
```

The validator first collapses reran (flaky) eval duplicates in place — keeping
the latest result per eval identity when a retried eval left duplicate raw dirs
/ aggregate rows — so a legitimate rerun does not fail validation. It only
collapses identities with a clear latest result; genuinely ambiguous duplicates
are still rejected.

The validator does not compare source coverage with
`/tmp/recovery-full-config.json`. It rejects duplicate fixed rows, missing run
stats, inconsistent agentic artifacts, malformed eval metadata, raw/aggregate
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/run-sweep.yml
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,8 @@ jobs:
find source-artifacts -maxdepth 1 -mindepth 1 -type d -printf ' %f\n' | sort

- name: Validate reusable artifacts
# Collapses reran (flaky) eval duplicates to the latest result
# per identity, then validates consistency for ingest.
run: |
python3 utils/validate_reusable_sweep_artifacts.py \
--artifacts-dir source-artifacts
Expand Down
151 changes: 151 additions & 0 deletions utils/test_validate_reusable_sweep_artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from validate_reusable_sweep_artifacts import (
agentic_key,
dedupe_reran_evals,
main,
validate_agentic_artifacts,
validate_eval_artifacts,
Expand Down Expand Up @@ -473,3 +474,153 @@ def test_eval_only_main_does_not_require_benchmark_artifacts(
)

assert main() == 0


# ── dedupe_reran_evals ────────────────────────────────────────────────────────


def _dd_meta(conc: int) -> dict:
return {
"is_multinode": True,
"hw": "b300",
"infmax_model_prefix": "minimaxm3",
"framework": "dynamo-vllm",
"precision": "fp4",
"spec_decoding": "none",
"isl": 8192,
"osl": 1024,
"prefill_tp": 2,
"prefill_ep": 2,
"prefill_dp_attention": True,
"prefill_num_workers": 4,
"decode_tp": 8,
"decode_ep": 8,
"decode_dp_attention": True,
"decode_num_workers": 2,
"conc": conc,
}


def _dd_agg_row(conc: int, source: str, em_strict: float) -> dict:
row = _dd_meta(conc)
row["model_prefix"] = row.pop("infmax_model_prefix")
row["task"] = "gsm8k"
row["em_strict"] = em_strict
row["source"] = source
return row


def _dd_write_aggregate(root: Path, rows: list[dict]) -> Path:
eval_dir = root / "eval_results_all"
eval_dir.mkdir(exist_ok=True)
path = eval_dir / "agg_eval_all.json"
path.write_text(json.dumps(rows, indent=2))
return path


def _dd_write_legacy_raw(
root: Path, name: str, conc: int, timestamp: str | None
) -> None:
artifact_dir = root / name
artifact_dir.mkdir()
(artifact_dir / "meta_env.json").write_text(json.dumps(_dd_meta(conc)))
if timestamp is not None:
(artifact_dir / f"results_{timestamp}.json").write_text("{}")


def test_dedupe_keeps_latest_legacy_rerun(tmp_path: Path) -> None:
# Three reruns of one eval plus a result-less attempt, mirroring a flaky
# config retried until it passed.
old, mid, new, empty = (
"eval_minimaxm3_conc4096_b300-nv_15",
"eval_minimaxm3_conc4096_b300-nv_16",
"eval_minimaxm3_conc4096_b300-nv_12",
"eval_minimaxm3_conc4096_b300-nv_03",
)
_dd_write_legacy_raw(tmp_path, old, 4096, "2026-06-26T13-00-22.596040")
_dd_write_legacy_raw(tmp_path, mid, 4096, "2026-06-26T19-00-52.356121")
_dd_write_legacy_raw(tmp_path, new, 4096, "2026-06-27T04-28-31.838775")
_dd_write_legacy_raw(tmp_path, empty, 4096, None)
_dd_write_aggregate(
tmp_path,
[
_dd_agg_row(4096, f"eval_results/{old}/results_2026-06-26T13-00-22.596040.json", 0.83),
_dd_agg_row(4096, f"eval_results/{new}/results_2026-06-27T04-28-31.838775.json", 0.95),
_dd_agg_row(4096, f"eval_results/{mid}/results_2026-06-26T19-00-52.356121.json", 0.78),
],
)

messages = dedupe_reran_evals(tmp_path)

assert validate_eval_artifacts(tmp_path) == []
rows = json.loads((tmp_path / "eval_results_all" / "agg_eval_all.json").read_text())
assert [r["em_strict"] for r in rows] == [0.95]
assert (tmp_path / new).is_dir()
for superseded in (old, mid, empty):
assert not (tmp_path / superseded).exists()
assert any("kept 1 of 3" in message for message in messages)


def test_dedupe_leaves_ambiguous_duplicates_for_validation(tmp_path: Path) -> None:
# Duplicate raw identities with no result timestamps cannot be ordered, so
# dedupe must leave them and validation must still reject them.
for name in ("eval_minimaxm3_conc4096_b300-nv_01", "eval_minimaxm3_conc4096_b300-nv_02"):
_dd_write_legacy_raw(tmp_path, name, 4096, None)
_dd_write_aggregate(
tmp_path,
[_dd_agg_row(4096, "eval_results/eval_minimaxm3_conc4096_b300-nv_01/x.json", 0.9)],
)

assert dedupe_reran_evals(tmp_path) == []
assert any("duplicate" in e for e in validate_eval_artifacts(tmp_path))


def test_dedupe_is_noop_for_clean_artifacts(tmp_path: Path) -> None:
name = "eval_minimaxm3_conc4096_b300-nv_01"
_dd_write_legacy_raw(tmp_path, name, 4096, "2026-06-27T04-28-31.838775")
agg_path = _dd_write_aggregate(
tmp_path,
[_dd_agg_row(4096, f"eval_results/{name}/results_2026-06-27T04-28-31.838775.json", 0.95)],
)
before = agg_path.read_text()

assert dedupe_reran_evals(tmp_path) == []
assert agg_path.read_text() == before
assert (tmp_path / name).is_dir()
assert validate_eval_artifacts(tmp_path) == []


def test_dedupe_prunes_superseded_batched_conc(tmp_path: Path) -> None:
# Two batched reruns overlap on conc 32; the newer run wins that conc while
# each run keeps the concurrencies unique to it.
older = tmp_path / "eval_minimaxm3_batch_b300-nv_05"
newer = tmp_path / "eval_minimaxm3_batch_b300-nv_09"
for artifact_dir, concs, stamp in (
(older, [16, 32], "2026-06-26T10-00-00.000000"),
(newer, [32, 64], "2026-06-26T20-00-00.000000"),
):
artifact_dir.mkdir()
meta = _dd_meta(0)
meta["eval_concs"] = concs
meta["completed_eval_concs"] = list(concs)
(artifact_dir / "meta_env.json").write_text(json.dumps(meta))
for conc in concs:
(artifact_dir / f"results_{stamp}_conc{conc}.json").write_text("{}")
_dd_write_aggregate(
tmp_path,
[
_dd_agg_row(16, f"eval_results/{older.name}/results_2026-06-26T10-00-00.000000_conc16.json", 0.50),
_dd_agg_row(32, f"eval_results/{older.name}/results_2026-06-26T10-00-00.000000_conc32.json", 0.40),
_dd_agg_row(32, f"eval_results/{newer.name}/results_2026-06-26T20-00-00.000000_conc32.json", 0.90),
_dd_agg_row(64, f"eval_results/{newer.name}/results_2026-06-26T20-00-00.000000_conc64.json", 0.70),
],
)

dedupe_reran_evals(tmp_path)

assert validate_eval_artifacts(tmp_path) == []
assert json.loads((older / "meta_env.json").read_text())["completed_eval_concs"] == [16]
assert not (older / "results_2026-06-26T10-00-00.000000_conc32.json").exists()
assert (older / "results_2026-06-26T10-00-00.000000_conc16.json").exists()
rows = json.loads((tmp_path / "eval_results_all" / "agg_eval_all.json").read_text())
assert [r["em_strict"] for r in rows if r["conc"] == 32] == [0.90]
Loading