Add cloud text I/O benchmark#2008
Conversation
Signed-off-by: nightcityblade <nightcityblade@gmail.com>
Greptile SummaryThis PR adds a new
Confidence Score: 4/5Safe to merge after fixing the document-count metric; all other structural patterns are consistent with the existing benchmark suite. The benchmark's two headline metrics — total_documents and throughput_docs_per_sec — will silently report file counts instead of row counts because the pipeline's final stage returns FileGroupTask whose num_items counts files, not documents. The rest of the script (pipeline construction, compression handling, result persistence, exit-code propagation) is correct. benchmarking/scripts/cloud_text_io_benchmark.py — specifically the total_documents calculation at line 134. Important Files Changed
Sequence DiagramsequenceDiagram
participant CLI as main()
participant RB as run_benchmark()
participant BP as _build_pipeline()
participant P as Pipeline
participant R as JsonlReader / ParquetReader
participant W as JsonlWriter / ParquetWriter
participant WBR as write_benchmark_results()
CLI->>RB: run_benchmark(args)
RB->>BP: _build_pipeline(input_path, output_path, format, ...)
BP->>R: instantiate reader
BP->>W: instantiate writer (compression merged into write_kwargs)
BP-->>RB: Pipeline
RB->>P: pipeline.run(executor)
P->>R: process() → DocumentBatch tasks
R-->>P: DocumentBatch[]
P->>W: process(DocumentBatch) → FileGroupTask
W-->>P: "FileGroupTask[] (data = [file_path])"
P-->>RB: tasks: FileGroupTask[]
Note over RB: task.num_items = len(file_paths) ≠ document count
RB-->>CLI: results dict
CLI->>WBR: write params.json / metrics.json / tasks.pkl
Reviews (2): Last reviewed commit: "Ensure cloud text I/O benchmark writes f..." | Re-trigger Greptile |
| args = parser.parse_args() | ||
| results = run_benchmark(args) | ||
| write_benchmark_results(results, args.benchmark_results_path) | ||
| return 0 if results["metrics"]["is_success"] else 1 |
There was a problem hiding this comment.
Benchmark results not written on setup failures
write_benchmark_results is only called when run_benchmark returns successfully. Errors that occur before pipeline.run() — such as json.JSONDecodeError from malformed --read-kwargs-json, TypeError from _json_arg, or ValueError from setup_executor — will propagate uncaught, leaving no result artefact behind. Other benchmarks in the same directory (e.g. modifier_benchmark.py) guard against this by initialising results with a failure default and wrapping the call in try/finally so write_benchmark_results is always invoked. Consider adopting the same pattern here.
| read_kwargs=read_kwargs, | ||
| write_kwargs=write_kwargs, | ||
| ) | ||
| executor = setup_executor(args.executor, config=executor_config or None) |
There was a problem hiding this comment.
executor_config or None silently coerces an explicit empty JSON object (--executor-config-json='{}') into None, while read_kwargs and write_kwargs keep their empty-dict values and are passed through as-is. A user passing '{}' for the executor config would not see it treated differently from omitting the flag entirely, which is inconsistent with how the other two kwargs are handled. Consider passing executor_config directly (and letting setup_executor handle an empty dict), or documenting the coercion explicitly.
| executor = setup_executor(args.executor, config=executor_config or None) | |
| executor = setup_executor(args.executor, config=executor_config if executor_config else None) |
|
Addressed the Greptile feedback by updating Validation:
|
Description
Adds a cloud text I/O benchmark script for JSONL and Parquet datasets. The benchmark reads from a configurable local or cloud URI, writes to a configurable output URI, records CPU/GPU labels through the benchmark parameters, and reports document throughput plus local output throughput when available.
Fixes #1521
Usage
python benchmarking/scripts/cloud_text_io_benchmark.py \ --benchmark-results-path=/tmp/cloud-io-results \ --input-path=s3://bucket/text-jsonl \ --output-path=s3://bucket/curator-cloud-io-output \ --format=jsonl \ --compression=gzip \ --executor=ray_data \ --device-label=gpu \ --read-kwargs-json='{"storage_options": {}}'Checklist
Testing performed:
ruff format benchmarking/scripts/cloud_text_io_benchmark.pyruff check benchmarking/scripts/cloud_text_io_benchmark.pypython3 -m py_compile benchmarking/scripts/cloud_text_io_benchmark.py