diff --git a/.github/configs/amd-master.yaml b/.github/configs/amd-master.yaml index 3caa5faae..fc3e84419 100644 --- a/.github/configs/amd-master.yaml +++ b/.github/configs/amd-master.yaml @@ -2748,3 +2748,170 @@ minimaxm3-fp8-mi325x-vllm-mtp: - { tp: 8, conc-start: 1, conc-end: 128, spec-decoding: mtp } - { tp: 8, ep: 8, conc-start: 256, conc-end: 256, spec-decoding: mtp } - { tp: 8, ep: 8, dp-attn: true, conc-start: 256, conc-end: 256, spec-decoding: mtp } + +# MiniMax-M3 MXFP8 MI355X vLLM disaggregated (prefill/decode) smoke test on the +# day-zero ROCm image. Minimal 1 prefill (TP8) + 1 decode (TP8) at conc 1 to +# validate the MoRI-IO KV-transfer disagg pipeline end-to-end for M3. Layered on +# the MoRI-patch-removal infra (#1585). No EP (TP8 only); MoE experts are +# TP-sharded as in the single-node M3 TP8 recipe. Per-worker serve flags live in +# benchmarks/multi_node/amd_utils/models_vllm.yaml (MiniMax-M3-MXFP8). +minimaxm3-fp8-mi355x-vllm-disagg: + image: vllm/vllm-openai-rocm:nightly-556bc4e3a089378e9df2482659898192da18db15 + model: MiniMaxAI/MiniMax-M3-MXFP8 + model-prefix: minimaxm3 + runner: mi355x-disagg + precision: fp8 + framework: vllm-disagg + multinode: true + disagg: true + scenarios: + fixed-seq-len: + - isl: 1024 + osl: 1024 + search-space: + - spec-decoding: "none" + conc-list: [ 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024 ] + prefill: + num-worker: 1 + tp: 8 + ep: 1 + dp-attn: false + additional-settings: + - "PREFILL_NODES=1" + decode: + num-worker: 1 + tp: 8 + ep: 1 + dp-attn: false + additional-settings: + - "DECODE_NODES=1" + # Asymmetric 1P TP4 + 1D TP8 (smaller prefill, full-node decode) across + # conc 1,2,4,8,16,32,64,128,256. + - spec-decoding: "none" + conc-list: [ 1, 2, 4, 8, 16, 32, 64, 128, 256 ] + prefill: + num-worker: 1 + tp: 4 + ep: 1 + dp-attn: false + additional-settings: + - "PREFILL_NODES=1" + decode: + num-worker: 1 + tp: 8 + ep: 1 + dp-attn: false + additional-settings: + - "DECODE_NODES=1" + # Balanced half-node 1P TP4 + 1D TP4 at high conc 64,128,256,512,1024. + - spec-decoding: "none" + conc-list: [ 64, 128, 256, 512, 1024 ] + prefill: + num-worker: 1 + tp: 4 + ep: 1 + dp-attn: false + additional-settings: + - "PREFILL_NODES=1" + decode: + num-worker: 1 + tp: 4 + ep: 1 + dp-attn: false + additional-settings: + - "DECODE_NODES=1" + # 2P TP4 + 1D TP8: two half-node TP4 prefill workers (PREFILL_NODES=2) + # feeding one full-node TP8 decode, at high conc 256,512,768,1024. + - spec-decoding: "none" + conc-list: [ 256, 512, 768, 1024 ] + prefill: + num-worker: 2 + tp: 4 + ep: 1 + dp-attn: false + additional-settings: + - "PREFILL_NODES=2" + decode: + num-worker: 1 + tp: 8 + ep: 1 + dp-attn: false + additional-settings: + - "DECODE_NODES=1" + # 8k1k disagg sweep across four P/D layouts (1P TP8 + 1D TP8 conc 1..1024; + # 1P TP4 + 1D TP8 conc 1..256; 1P TP4 + 1D TP4 conc 64..1024; 2P TP4 + 1D TP8 + # conc 256..1024). The multi-node eval policy (8k1k + conc >= 16) marks one + # lm-eval on the highest-max-conc layout (TP8+TP8, eval-conc=median=128) — + # validating the M3 MoRI-IO disagg pipeline's correctness end-to-end. + - isl: 8192 + osl: 1024 + search-space: + - spec-decoding: "none" + conc-list: [ 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024 ] + prefill: + num-worker: 1 + tp: 8 + ep: 1 + dp-attn: false + additional-settings: + - "PREFILL_NODES=1" + decode: + num-worker: 1 + tp: 8 + ep: 1 + dp-attn: false + additional-settings: + - "DECODE_NODES=1" + # Asymmetric 1P TP4 + 1D TP8 (smaller prefill, full-node decode) across + # conc 1,2,4,8,16,32,64,128,256. + - spec-decoding: "none" + conc-list: [ 1, 2, 4, 8, 16, 32, 64, 128, 256 ] + prefill: + num-worker: 1 + tp: 4 + ep: 1 + dp-attn: false + additional-settings: + - "PREFILL_NODES=1" + decode: + num-worker: 1 + tp: 8 + ep: 1 + dp-attn: false + additional-settings: + - "DECODE_NODES=1" + # Balanced half-node 1P TP4 + 1D TP4 at high conc 64,128,256,512,1024. + - spec-decoding: "none" + conc-list: [ 64, 128, 256, 512, 1024 ] + prefill: + num-worker: 1 + tp: 4 + ep: 1 + dp-attn: false + additional-settings: + - "PREFILL_NODES=1" + decode: + num-worker: 1 + tp: 4 + ep: 1 + dp-attn: false + additional-settings: + - "DECODE_NODES=1" + # 2P TP4 + 1D TP8: two half-node TP4 prefill workers (PREFILL_NODES=2) + # feeding one full-node TP8 decode, at high conc 256,512,768,1024. + - spec-decoding: "none" + conc-list: [ 256, 512, 768, 1024 ] + prefill: + num-worker: 2 + tp: 4 + ep: 1 + dp-attn: false + additional-settings: + - "PREFILL_NODES=2" + decode: + num-worker: 1 + tp: 8 + ep: 1 + dp-attn: false + additional-settings: + - "DECODE_NODES=1" diff --git a/benchmarks/multi_node/amd_utils/job.slurm b/benchmarks/multi_node/amd_utils/job.slurm index 01a5bd386..977bcaecc 100755 --- a/benchmarks/multi_node/amd_utils/job.slurm +++ b/benchmarks/multi_node/amd_utils/job.slurm @@ -80,7 +80,6 @@ if [[ "${MORI_CONN_PATCH:-auto}" != "skip" ]] \ export EXTRA_DOCKER_MOUNTS echo "[job.slurm] auto-applied MoRI conn.py overlay: ${_MORI_PATCH_FILE}" fi - xP="${xP:-1}" yD="${yD:-1}" @@ -315,8 +314,10 @@ export IS_MULTINODE="${IS_MULTINODE:-false}" SANITIZED_USER=$(echo "$USER_NAME" | tr -c 'a-zA-Z0-9_.-' '_') export DOCKER_CONT_NAME="container_${ENGINE}_${SANITIZED_USER}_${MODEL_NAME}_${SLURM_JOB_ID}" -# vLLM external router container -VLLM_ROUTER_IMAGE="${VLLM_ROUTER_IMAGE:-vllm/vllm-router:nightly-20260511-e667ebb}" +# vLLM external router container. +# NOTE: vllm/vllm-router only retains ~16 recent nightlies on Docker Hub; older +# dated tags are garbage-collected (manifest unknown) +VLLM_ROUTER_IMAGE="${VLLM_ROUTER_IMAGE:-vllm/vllm-router:nightly-20260617-e667ebb}" ROUTER_CONT_NAME="router_vllm_${SANITIZED_USER}_${SLURM_JOB_ID}" export RUN_FILE_FULL="$WS_PATH/${RUN_FILE}" @@ -401,7 +402,6 @@ if [[ "$ENGINE" == "vllm-disagg" ]]; then -e UCX_LOG_LEVEL=warn -e HSA_ENABLE_SDMA=1 -e PROXY_STREAM_IDLE_TIMEOUT=\${PROXY_STREAM_IDLE_TIMEOUT:-300} - -e VLLM_MORIIO_CONNECTOR_READ_MODE=\${VLLM_MORIIO_CONNECTOR_READ_MODE:-1} -e PYTHONPYCACHEPREFIX=/tmp/pycache ) elif [[ "$ENGINE" == "atom-disagg" ]]; then diff --git a/benchmarks/multi_node/amd_utils/models_vllm.yaml b/benchmarks/multi_node/amd_utils/models_vllm.yaml index b051de8d9..a566fe449 100644 --- a/benchmarks/multi_node/amd_utils/models_vllm.yaml +++ b/benchmarks/multi_node/amd_utils/models_vllm.yaml @@ -26,15 +26,15 @@ amd-Llama-3.3-70B-Instruct-FP8-KV: Kimi-K2.5-MXFP4: prefill_flags: "--tensor-parallel-size 8 --compilation-config '{\"cudagraph_mode\":\"PIECEWISE\"}' --no-enable-prefix-caching --block-size 1 --gpu-memory-utilization 0.90 --mm-encoder-tp-mode data" - decode_flags: "--tensor-parallel-size 8 --enable-expert-parallel --all2all-backend mori --compilation-config '{\"cudagraph_mode\":\"PIECEWISE\"}' --no-enable-prefix-caching --block-size 1 --gpu-memory-utilization 0.90 --mm-encoder-tp-mode data" + decode_flags: "--tensor-parallel-size 8 --enable-expert-parallel --all2all-backend mori_low_latency --compilation-config '{\"cudagraph_mode\":\"PIECEWISE\"}' --no-enable-prefix-caching --block-size 1 --gpu-memory-utilization 0.90 --mm-encoder-tp-mode data" env: "VLLM_USE_V1=1 VLLM_ROCM_USE_AITER=1 VLLM_ROCM_USE_AITER_PAGED_ATTN=0 VLLM_ROCM_USE_AITER_RMSNORM=1 VLLM_USE_AITER_TRITON_SILU_MUL=0 VLLM_ENGINE_READY_TIMEOUT_S=3600" hf_dir: "models--amd--Kimi-K2.5-MXFP4" MiniMax-M2.5: # AITER fused-MoE kernel fmoe_bf16_blockscaleFp8_g1u1_vs_silu_32x384 for gfx950 writes OOB when run with MiniMax's shapes at M=8K(=num batched tokens), crashing vllm during AITER warmup. # Set token budget to 4k to avoid using that shape, instead of disabling AITER_MOE. - prefill_flags: "--max-num-batched-tokens 4K --tensor-parallel-size 8 --enable-expert-parallel --all2all-backend mori --no-enable-prefix-caching --gpu-memory-utilization 0.95 --block-size 32" - decode_flags: "--max-num-batched-tokens 4K --tensor-parallel-size 8 --enable-expert-parallel --all2all-backend mori --no-enable-prefix-caching --gpu-memory-utilization 0.95 --block-size 32" + prefill_flags: "--max-num-batched-tokens 4K --tensor-parallel-size 8 --enable-expert-parallel --all2all-backend mori_low_latency --no-enable-prefix-caching --gpu-memory-utilization 0.95 --block-size 32" + decode_flags: "--max-num-batched-tokens 4K --tensor-parallel-size 8 --enable-expert-parallel --all2all-backend mori_low_latency --no-enable-prefix-caching --gpu-memory-utilization 0.95 --block-size 32" env: "VLLM_USE_V1=1 VLLM_ROCM_USE_AITER=1 VLLM_ROCM_QUICK_REDUCE_QUANTIZATION=INT4 VLLM_ENGINE_READY_TIMEOUT_S=3600 VLLM_ROCM_SHUFFLE_KV_CACHE_LAYOUT=1" hf_dir: "models--MiniMaxAI--MiniMax-M2.5" @@ -42,3 +42,14 @@ gpt-oss-120b: prefill_flags: "--tensor-parallel-size 8" decode_flags: "--tensor-parallel-size 8" env: "VLLM_USE_V1=1 VLLM_ROCM_USE_AITER=1 VLLM_ROCM_USE_AITER_TRITON_BF16_GEMM=0 VLLM_USE_AITER_UNIFIED_ATTENTION=1 VLLM_ROCM_USE_AITER_MHA=0 ROCM_TRITON_MOE_PRESHUFFLE_SCALES=0" + +MiniMax-M3-MXFP8: + # MiniMax-M3 MXFP8 disagg, no EP. The --tensor-parallel-size 8 below is just a + # placeholder: server_vllm.sh sed-rewrites it to PREFILL_TP_SIZE/DECODE_TP_SIZE + # from the master-config prefill/decode tp (the sweep mixes TP8 and TP4 layouts). + # --block-size 128 is mandatory (MSA sparse/index cache); text-only benchmark + # so --language-model-only frees the vision encoder. gfx950 uses FP8 KV cache. + prefill_flags: "--tensor-parallel-size 8 --block-size 128 --language-model-only --kv-cache-dtype fp8 --attention-backend TRITON_ATTN --no-enable-prefix-caching --gpu-memory-utilization 0.90 --tool-call-parser minimax_m3 --reasoning-parser minimax_m3 --enable-auto-tool-choice" + decode_flags: "--tensor-parallel-size 8 --block-size 128 --language-model-only --kv-cache-dtype fp8 --attention-backend TRITON_ATTN --no-enable-prefix-caching --gpu-memory-utilization 0.90 --tool-call-parser minimax_m3 --reasoning-parser minimax_m3 --enable-auto-tool-choice" + env: "VLLM_USE_V1=1 VLLM_ROCM_USE_AITER=1 VLLM_USE_BREAKABLE_CUDAGRAPH=0 VLLM_ENGINE_READY_TIMEOUT_S=3600" + hf_dir: "models--MiniMaxAI--MiniMax-M3-MXFP8" diff --git a/benchmarks/multi_node/amd_utils/patches/README.md b/benchmarks/multi_node/amd_utils/patches/README.md index d9b5de79d..765d571b2 100644 --- a/benchmarks/multi_node/amd_utils/patches/README.md +++ b/benchmarks/multi_node/amd_utils/patches/README.md @@ -1,16 +1,23 @@ -# In-tree sglang patches for the MoRI PD-disagg path - -This directory carries small Python overlays that get bind-mounted over -the upstream sglang source inside the docker container at runtime. -They are needed because some sglang releases ship known bugs in the -MoRI disaggregation backend that block our benchmark + accuracy -configs. - -The mount is wired through the `EXTRA_DOCKER_MOUNTS` env var that -`job.slurm` consumes (an opt-in `${EXTRA_DOCKER_MOUNTS:-}` after the -existing `-v` block). The local-test driver scripts under -`scripts/sglang_disagg/` pre-set this env var to the path of the -relevant overlay; CI runners that need the patch can do the same. +# In-tree patches for the MoRI / MoRIIO PD-disagg path + +This directory carries small overlays that fix up the engine source inside +the docker container at runtime. They are needed because some published +images ship known bugs in the (MoRI / MoRIIO) disaggregation backend that +block our benchmark + accuracy configs — so we can keep reusing the +**stock image** instead of rebuilding a patched one. + +- `mori_conn.py` — single-file overlay (bind-mounted) for the **sglang** + MoRI backend. + +> Note: the vLLM MoRIIO `minimax-m3` overlay (`moriio/`) was retired once the +> upstream fixes (vLLM #46039 / #46290 / #46332) shipped in the ROCm nightly +> image; `minimaxm3-fp8-mi355x-vllm-disagg` now runs the stock nightly directly. + +The `mori_conn.py` overlay is wired through the `EXTRA_DOCKER_MOUNTS` env +var that `job.slurm` consumes (an opt-in `${EXTRA_DOCKER_MOUNTS:-}` after +the existing `-v` block). The local-test driver scripts under +`scripts/sglang_disagg/` pre-set this env var to the path of the relevant +overlay; CI runners that need the patch can do the same. ## `mori_conn.py` diff --git a/benchmarks/multi_node/amd_utils/server_vllm.sh b/benchmarks/multi_node/amd_utils/server_vllm.sh index d61fe0359..f02b1cd56 100755 --- a/benchmarks/multi_node/amd_utils/server_vllm.sh +++ b/benchmarks/multi_node/amd_utils/server_vllm.sh @@ -256,7 +256,7 @@ if [ "$NODE_RANK" -eq 0 ]; then --served-model-name ${SERVED_MODEL} \ --port $SERVER_PORT \ --trust-remote-code \ - --kv-transfer-config '{\"kv_connector\": \"MoRIIOConnector\", \"kv_role\": \"kv_producer\", \"kv_connector_extra_config\": {\"proxy_ip\": \"${NODE0_ADDR}\", \"proxy_ping_port\": \"${PROXY_PING_PORT}\", \"http_port\": \"${SERVER_PORT}\"}}' \ + --kv-transfer-config '{\"kv_connector\": \"MoRIIOConnector\", \"kv_role\": \"kv_producer\", \"kv_connector_extra_config\": {\"proxy_ip\": \"${NODE0_ADDR}\", \"proxy_ping_port\": \"${PROXY_PING_PORT}\", \"http_port\": \"${SERVER_PORT}\", \"read_mode\": true}}' \ ${PREFILL_SERVER_CONFIG}" if [[ "$DRY_RUN" -eq 1 ]]; then @@ -422,7 +422,7 @@ elif [ "$NODE_RANK" -gt 0 ] && [ "$NODE_RANK" -lt "$xP" ]; then --served-model-name ${SERVED_MODEL} \ --port $SERVER_PORT \ --trust-remote-code \ - --kv-transfer-config '{\"kv_connector\": \"MoRIIOConnector\", \"kv_role\": \"kv_producer\", \"kv_connector_extra_config\": {\"proxy_ip\": \"${NODE0_ADDR}\", \"proxy_ping_port\": \"${PROXY_PING_PORT}\", \"http_port\": \"${SERVER_PORT}\"}}' \ + --kv-transfer-config '{\"kv_connector\": \"MoRIIOConnector\", \"kv_role\": \"kv_producer\", \"kv_connector_extra_config\": {\"proxy_ip\": \"${NODE0_ADDR}\", \"proxy_ping_port\": \"${PROXY_PING_PORT}\", \"http_port\": \"${SERVER_PORT}\", \"read_mode\": true}}' \ ${PREFILL_SERVER_CONFIG}" if [[ "$DRY_RUN" -eq 1 ]]; then @@ -478,7 +478,7 @@ else --served-model-name ${SERVED_MODEL} \ --port $SERVER_PORT \ --trust-remote-code \ - --kv-transfer-config '{\"kv_connector\": \"MoRIIOConnector\", \"kv_role\": \"kv_consumer\", \"kv_connector_extra_config\": {\"proxy_ip\": \"${NODE0_ADDR}\", \"proxy_ping_port\": \"${PROXY_PING_PORT}\", \"http_port\": \"${SERVER_PORT}\"}}' \ + --kv-transfer-config '{\"kv_connector\": \"MoRIIOConnector\", \"kv_role\": \"kv_consumer\", \"kv_connector_extra_config\": {\"proxy_ip\": \"${NODE0_ADDR}\", \"proxy_ping_port\": \"${PROXY_PING_PORT}\", \"http_port\": \"${SERVER_PORT}\", \"read_mode\": true}}' \ ${DECODE_SERVER_CONFIG}" if [[ "$DRY_RUN" -eq 1 ]]; then diff --git a/benchmarks/multi_node/amd_utils/setup_deps.sh b/benchmarks/multi_node/amd_utils/setup_deps.sh index add2e3fa5..35eaf17dc 100644 --- a/benchmarks/multi_node/amd_utils/setup_deps.sh +++ b/benchmarks/multi_node/amd_utils/setup_deps.sh @@ -3,8 +3,8 @@ # setup_deps.sh — Install missing disagg dependencies at container start. # # Dispatched by $ENGINE (set by server.sh dispatcher): -# vllm-disagg -> vLLM/MoRI-IO patches + UCX/RIXL path exports -# (base image: vllm/vllm-openai-rocm:v0.18.0) +# vllm-disagg -> recipe deps + amd-quark + UCX/RIXL path exports +# (base image: vllm/vllm-openai-rocm:nightly) # sglang-disagg -> SGLang aiter gluon patch + per-model installs # (base image: lmsysorg/sglang-rocm:v0.5.12-rocm720-mi35x-*) # @@ -79,556 +79,6 @@ install_amd_quark() { _SETUP_INSTALLED+=("amd-quark") } -# --------------------------------------------------------------------------- -# 8. Patch vLLM MoRI-IO save_kv_layer busy-spin (C128 tail-batch deadlock) -# In WRITE mode, save_kv_layer spins forever waiting for the handshake -# callback to set write_ready_flags. This blocks the model worker thread, -# preventing it from responding to EngineCore shm_broadcast, causing a -# TimeoutError cascade and crash. -# Patch: add time.sleep(0.001) and a 30s timeout to yield CPU and prevent -# the model worker from deadlocking. -# --------------------------------------------------------------------------- -patch_moriio_save_kv_timeout() { - python3 -c ' -import os, sys - -try: - import vllm.distributed.kv_transfer.kv_connector.v1.moriio.moriio_connector as mc - f = mc.__file__ - src = open(f).read() - - # Already patched? - if "[PATCHED] save_kv_layer timeout" in src: - print("[SETUP] save_kv_layer timeout patch already applied") - sys.exit(0) - - old = """ while True: - if ( - self._ready_requests.empty() - and remote_engine_id not in self.write_ready_flags - ): - continue""" - - if old not in src: - print("[SETUP] WARN: save_kv_layer busy-spin pattern not found, skipping patch") - sys.exit(0) - - new = """ # [PATCHED] save_kv_layer — null guard + timeout + sleep - if remote_engine_id is None: - return - import time as _time, os as _os - _wait_start = _time.monotonic() - _SAVE_KV_TIMEOUT = float(_os.environ.get("VLLM_MORIIO_HANDSHAKE_TIMEOUT", "30")) - while True: - if ( - self._ready_requests.empty() - and remote_engine_id not in self.write_ready_flags - ): - _elapsed = _time.monotonic() - _wait_start - if _elapsed > _SAVE_KV_TIMEOUT: - import logging as _logging - _logging.getLogger("vllm.moriio").warning( - "[HANGFIX] save_kv_layer: timeout (%.1fs) waiting for " - "write_ready_flags[%s], breaking to unblock model " - "worker", _elapsed, remote_engine_id) - break - _time.sleep(0.001) - continue""" - - new_src = src.replace(old, new) - if new_src == src: - print("[SETUP] WARN: replacement had no effect") - sys.exit(0) - - open(f, "w").write(new_src) - print("[SETUP] Patched save_kv_layer: null guard + timeout + sleep") -except Exception as e: - print(f"[SETUP] WARN patch save_kv_layer: {e}", file=sys.stderr) -' - _SETUP_INSTALLED+=("MoRIIO-save-kv-timeout-patch") -} - -# --------------------------------------------------------------------------- -# 9. Patch MoRIIO waiting_for_transfer_complete with bounded timeout -# The original status.Wait() blocks forever if an RDMA completion never -# arrives (e.g., NIC queue saturation at C256). This replaces the unbounded -# wait with a polling loop using status.Succeeded() + configurable timeout. -# Also adds error handling to the write worker loop so a single failed -# transfer doesn't kill the background thread. -# --------------------------------------------------------------------------- -patch_moriio_transfer_timeout() { - python3 -c ' -import os, sys, textwrap - -try: - import vllm.distributed.kv_transfer.kv_connector.v1.moriio.moriio_engine as me - f = me.__file__ - src = open(f).read() - - if "[PATCHED] transfer completion timeout" in src: - print("[SETUP] transfer completion timeout patch already applied") - sys.exit(0) - - # --- Patch 1: Replace waiting_for_transfer_complete with polling + timeout --- - old_wait = """ def waiting_for_transfer_complete(self): - if not self.transfer_status: - return - - transfers_to_wait = [] - with self.lock: - transfers_to_wait = self.transfer_status[:] - self.transfer_status.clear() - - for status in transfers_to_wait: - try: - status.Wait() - if not status.Succeeded(): - logger.error( - "Transfer failed: %s, Code: %s", status.Message(), status.Code() - ) - raise TransferError("MoRIIO transfer failed!") - except Exception as e: - logger.error("Transfer %s failed: %s", status, e) - raise""" - - new_wait = """ def waiting_for_transfer_complete(self): - # [PATCHED] transfer completion timeout — bounded polling loop - import time as _time, os as _os - if not self.transfer_status: - return - - _timeout = float(_os.environ.get("VLLM_MORIIO_TRANSFER_TIMEOUT", "120")) - - transfers_to_wait = [] - with self.lock: - transfers_to_wait = self.transfer_status[:] - self.transfer_status.clear() - - _start = _time.monotonic() - remaining = list(transfers_to_wait) - _polls = 0 - _completed = 0 - - while remaining: - _elapsed = _time.monotonic() - _start - if _elapsed > _timeout: - logger.error( - "[HANGFIX] transfer_timeout elapsed=%.1fs " - "pending=%d/%d completed=%d polls=%d " - "action=raise_transfer_error", - _elapsed, len(remaining), len(transfers_to_wait), - _completed, _polls, - ) - raise TransferError( - f"RDMA transfer timeout after {_elapsed:.1f}s, " - f"{len(remaining)}/{len(transfers_to_wait)} pending" - ) - - still_waiting = [] - for status in remaining: - try: - if status.Succeeded(): - _completed += 1 - continue - still_waiting.append(status) - except Exception as e: - logger.error( - "[HANGFIX] transfer_poll_error error=%s", e) - raise TransferError( - f"Transfer failed during poll: {e}" - ) from e - - remaining = still_waiting - if remaining: - _time.sleep(0.005) - _polls += 1 - if _polls % 2000 == 0: - logger.warning( - "[HANGFIX] transfer_wait pending=%d " - "completed=%d elapsed=%.1fs timeout=%.0fs", - len(remaining), _completed, - _time.monotonic() - _start, _timeout, - )""" - - if old_wait not in src: - print("[SETUP] WARN: waiting_for_transfer_complete pattern not found") - sys.exit(0) - - new_src = src.replace(old_wait, new_wait) - - # --- Patch 2: Add error handling + cleanup to _write_worker_loop --- - old_loop = """ self._execute_write_task(task)""" - - new_loop = """ try: - self._execute_write_task(task) - except Exception as _e: - logger.error( - "[HANGFIX] req=%s write_task_failed error=%s " - "action=cleanup_and_mark_done", - task.request_id, _e, - ) - try: - _wr = self.worker.moriio_wrapper - with _wr.lock: - _wr.done_req_ids.append(task.request_id) - _wr.done_remote_allocate_req_dict.pop( - task.request_id, None - ) - except Exception: - pass""" - - if old_loop in new_src: - new_src = new_src.replace(old_loop, new_loop, 1) - else: - print("[SETUP] WARN: _write_worker_loop pattern not found for error handling") - - # --- Patch 3: Add deferred task timeout to _process_deferred_tasks --- - old_deferred = """ def _process_deferred_tasks(self) -> None: - \"\"\"Process tasks that were previously deferred.\"\"\" - if not self._deferred_tasks: - return - - still_deferred: list[WriteTask] = [] - for task in self._deferred_tasks: - if self._is_remote_ready(task): - self._execute_write_task(task) - else: - still_deferred.append(task) - - self._deferred_tasks = still_deferred""" - - new_deferred = """ def _process_deferred_tasks(self) -> None: - \"\"\"Process tasks that were previously deferred.\"\"\" - # [PATCHED] deferred task timeout — prune stale tasks - import time as _time, os as _os - if not self._deferred_tasks: - return - - _DEFER_TIMEOUT = float( - _os.environ.get("VLLM_MORIIO_DEFER_TIMEOUT", "60")) - - still_deferred: list[WriteTask] = [] - for task in self._deferred_tasks: - _age = _time.monotonic() - getattr(task, "_defer_ts", _time.monotonic()) - if _age > _DEFER_TIMEOUT: - logger.error( - "[HANGFIX] req=%s deferred_task_expired age=%.1fs " - "action=drop_and_mark_done", - task.request_id, _age, - ) - try: - _wr = self.worker.moriio_wrapper - with _wr.lock: - _wr.done_req_ids.append(task.request_id) - _wr.done_remote_allocate_req_dict.pop( - task.request_id, None) - except Exception: - pass - continue - if self._is_remote_ready(task): - try: - self._execute_write_task(task) - except Exception as _e: - logger.error( - "[HANGFIX] req=%s deferred_write_failed error=%s", - task.request_id, _e, - ) - try: - _wr = self.worker.moriio_wrapper - with _wr.lock: - _wr.done_req_ids.append(task.request_id) - _wr.done_remote_allocate_req_dict.pop( - task.request_id, None) - except Exception: - pass - else: - still_deferred.append(task) - - self._deferred_tasks = still_deferred""" - - if old_deferred in new_src: - new_src = new_src.replace(old_deferred, new_deferred, 1) - else: - print("[SETUP] WARN: _process_deferred_tasks pattern not found") - - # --- Patch 4: Stamp defer time when task is deferred --- - old_defer_add = """ self._deferred_tasks.append(task)""" - new_defer_add = """ import time as _time2 - if not hasattr(task, "_defer_ts"): - task._defer_ts = _time2.monotonic() - self._deferred_tasks.append(task)""" - if old_defer_add in new_src: - new_src = new_src.replace(old_defer_add, new_defer_add, 1) - else: - print("[SETUP] WARN: deferred task timestamp patch target not found") - - open(f, "w").write(new_src) - print("[SETUP] Patched: transfer timeout + writer error handling") - -except Exception as e: - print(f"[SETUP] WARN patch transfer_timeout: {e}", file=sys.stderr) -' - _SETUP_INSTALLED+=("MoRIIO-transfer-timeout-patch") -} - -# --------------------------------------------------------------------------- -# 10. Patch MoRIIO start_load_kv busy-spin (same pattern as save_kv_layer) -# The READ-mode spin loop in start_load_kv has the same unbounded-spin -# issue as save_kv_layer. Add timeout + sleep + null guard. -# --------------------------------------------------------------------------- -patch_moriio_load_kv_timeout() { - python3 -c ' -import os, sys - -try: - import vllm.distributed.kv_transfer.kv_connector.v1.moriio.moriio_connector as mc - f = mc.__file__ - src = open(f).read() - - if "[PATCHED] start_load_kv timeout" in src: - print("[SETUP] start_load_kv timeout patch already applied") - sys.exit(0) - - old = """ while True: - if ( - self._ready_requests.empty() - and remote_engine_id not in self.load_ready_flag - and wait_handshake_readd_req - ): - continue""" - - if old not in src: - print("[SETUP] WARN: start_load_kv busy-spin pattern not found, skipping") - sys.exit(0) - - new = """ # [PATCHED] start_load_kv timeout — prevent model worker deadlock - if remote_engine_id is None and not wait_handshake_readd_req: - self._reqs_to_send.update(metadata.reqs_to_send) - return - import time as _time, os as _os - _wait_start = _time.monotonic() - _LOAD_KV_TIMEOUT = float(_os.environ.get("VLLM_MORIIO_HANDSHAKE_TIMEOUT", "30")) - while True: - if ( - self._ready_requests.empty() - and remote_engine_id not in self.load_ready_flag - and wait_handshake_readd_req - ): - if _time.monotonic() - _wait_start > _LOAD_KV_TIMEOUT: - import logging as _logging - _logging.getLogger("vllm.moriio").warning( - "[HANGFIX] start_load_kv: timeout (%.1fs) waiting for " - "load_ready_flag[%s]", _time.monotonic() - _wait_start, - remote_engine_id) - break - _time.sleep(0.001) - continue""" - - new_src = src.replace(old, new) - if new_src == src: - print("[SETUP] WARN: start_load_kv replacement had no effect") - sys.exit(0) - - open(f, "w").write(new_src) - print("[SETUP] Patched start_load_kv busy-spin with timeout + sleep") -except Exception as e: - print(f"[SETUP] WARN patch start_load_kv: {e}", file=sys.stderr) -' - _SETUP_INSTALLED+=("MoRIIO-load-kv-timeout-patch") -} - -# --------------------------------------------------------------------------- -# 11. Fix READ-mode scheduler assertion in _update_from_kv_xfer_finished -# vLLM asserts that a request in finished_recving must be either -# WAITING_FOR_REMOTE_KVS or finished. In READ mode the request can -# transition to RUNNING before the aggregated recv notification arrives, -# crashing the engine with AssertionError. -# (present in v0.17.1 & v0.18.0) -# --------------------------------------------------------------------------- -patch_scheduler_read_mode_fix() { - python3 -c ' -import os, sys - -try: - import vllm.v1.core.sched.scheduler as smod - f = smod.__file__ - src = open(f).read() - - if "[PATCHED] read-mode recv assertion" in src: - print("[SETUP] scheduler read-mode assertion fix already applied") - sys.exit(0) - - old_recv = """ for req_id in kv_connector_output.finished_recving or (): - logger.debug("Finished recving KV transfer for request %s", req_id) - assert req_id in self.requests - req = self.requests[req_id] - if req.status == RequestStatus.WAITING_FOR_REMOTE_KVS: - self.finished_recving_kv_req_ids.add(req_id) - else: - assert RequestStatus.is_finished(req.status) - self._free_blocks(self.requests[req_id])""" - - new_recv = """ # [PATCHED] read-mode recv assertion — handle intermediate states - for req_id in kv_connector_output.finished_recving or (): - logger.debug("Finished recving KV transfer for request %s", req_id) - if req_id not in self.requests: - logger.debug("Request %s already removed, skipping recv", req_id) - continue - req = self.requests[req_id] - if req.status == RequestStatus.WAITING_FOR_REMOTE_KVS: - self.finished_recving_kv_req_ids.add(req_id) - elif RequestStatus.is_finished(req.status): - self._free_blocks(self.requests[req_id]) - else: - logger.debug( - "Request %s recv finished but status=%s (not " - "WAITING_FOR_REMOTE_KVS or finished), skipping " - "block free — will be freed on request completion", - req_id, req.status.name)""" - - if old_recv not in src: - print("[SETUP] WARN: scheduler finished_recving pattern not found, skipping") - sys.exit(0) - - new_src = src.replace(old_recv, new_recv, 1) - - old_send = """ for req_id in kv_connector_output.finished_sending or (): - logger.debug("Finished sending KV transfer for request %s", req_id) - assert req_id in self.requests - self._free_blocks(self.requests[req_id])""" - - new_send = """ for req_id in kv_connector_output.finished_sending or (): - logger.debug("Finished sending KV transfer for request %s", req_id) - if req_id not in self.requests: - logger.debug("Request %s already removed, skipping send", req_id) - continue - self._free_blocks(self.requests[req_id])""" - - if old_send in new_src: - new_src = new_src.replace(old_send, new_send, 1) - else: - print("[SETUP] WARN: scheduler finished_sending pattern not found") - - open(f, "w").write(new_src) - print("[SETUP] Patched: scheduler _update_from_kv_xfer_finished read-mode fix") - -except Exception as e: - print(f"[SETUP] WARN patch scheduler read-mode: {e}", file=sys.stderr) -' - _SETUP_INSTALLED+=("scheduler-read-mode-fix") -} - -# --------------------------------------------------------------------------- -# 12. Idle KV block reaper for disaggregated prefill (READ mode) -# The RIXL notification path can lose `finished_sending` signals under -# high concurrency with ibv_post_send failures. This leaves KV blocks -# permanently allocated on the prefill engine even after the decode has -# finished reading. Over multiple benchmark rounds, leaked blocks -# accumulate and eventually saturate the prefill KV cache. -# -# Fix: instrument the scheduler's `schedule()` method to detect idle -# periods (0 running, 0 waiting for >5s) and force-free blocks for -# any remaining requests whose status is finished. -# --------------------------------------------------------------------------- -patch_prefill_idle_kv_reaper() { - python3 -c ' -import os, sys - -try: - import vllm.v1.core.sched.scheduler as smod - f = smod.__file__ - src = open(f).read() - - if "[PATCHED] idle-kv-reaper" in src: - print("[SETUP] idle KV block reaper already applied") - sys.exit(0) - - # Find the _update_from_kv_xfer_finished method end and add reaper logic - # We inject into the method that processes KV transfer completions. - marker = "[PATCHED] read-mode recv assertion" - if marker not in src: - print("[SETUP] WARN: scheduler read-mode patch not found, skipping reaper") - sys.exit(0) - - # Add reaper state initialization to __init__ - old_init_marker = "self.finished_recving_kv_req_ids" - if old_init_marker not in src: - print("[SETUP] WARN: finished_recving_kv_req_ids not found in scheduler") - sys.exit(0) - - # Find the first occurrence to insert reaper state - init_pos = src.find(old_init_marker) - # Find the line containing it - line_end = src.find("\n", init_pos) - init_line = src[init_pos:line_end] - - # Add reaper state after this line - reaper_init = init_line + """ - # [PATCHED] idle-kv-reaper state - self._idle_kv_reaper_ts = 0.0 - self._idle_kv_reaper_active = False""" - - src = src.replace(init_line, reaper_init, 1) - - # Now add the reaper logic at the end of _update_from_kv_xfer_finished - # Find the finished_sending handler we patched - send_handler = """ for req_id in kv_connector_output.finished_sending or (): - logger.debug("Finished sending KV transfer for request %s", req_id) - if req_id not in self.requests: - logger.debug("Request %s already removed, skipping send", req_id) - continue - self._free_blocks(self.requests[req_id])""" - - reaper_logic = send_handler + """ - - # [PATCHED] idle-kv-reaper — force-free leaked prefill KV blocks - import time as _time - _REAPER_IDLE_SECS = 5.0 - _num_running = sum(1 for r in self.requests.values() - if r.status == RequestStatus.RUNNING) - _should_reap = (_num_running == 0) - - if _should_reap: - if not self._idle_kv_reaper_active: - self._idle_kv_reaper_active = True - self._idle_kv_reaper_ts = _time.monotonic() - elif _time.monotonic() - self._idle_kv_reaper_ts > _REAPER_IDLE_SECS: - _reaped = 0 - _reap_ids = [] - for _rid, _req in list(self.requests.items()): - if RequestStatus.is_finished(_req.status): - _reap_ids.append(_rid) - for _rid in _reap_ids: - try: - _req = self.requests[_rid] - self._free_blocks(_req) - _reaped += 1 - except Exception as _e: - logger.debug("[KV-REAPER] free_blocks failed for %s: %s", _rid, _e) - if _reaped > 0: - logger.warning( - "[KV-REAPER] Force-freed blocks for %d finished " - "requests after %.1fs idle", - _reaped, _time.monotonic() - self._idle_kv_reaper_ts) - self._idle_kv_reaper_ts = _time.monotonic() - else: - self._idle_kv_reaper_active = False""" - - if send_handler in src: - src = src.replace(send_handler, reaper_logic, 1) - else: - print("[SETUP] WARN: send handler not found for reaper injection") - sys.exit(0) - - open(f, "w").write(src) - print("[SETUP] Patched: idle KV block reaper for prefill") - -except Exception as e: - print(f"[SETUP] WARN patch idle-kv-reaper: {e}", file=sys.stderr) -' - _SETUP_INSTALLED+=("idle-kv-reaper") -} - # --------------------------------------------------------------------------- # SGLang: Patch aiter gluon pa_mqa_logits — fix 2D → 3D instr_shape for # Triton ≥ 3.5. @@ -742,11 +192,6 @@ install_transformers_glm5() { if [[ "$ENGINE" == "vllm-disagg" ]]; then install_recipe_deps install_amd_quark - patch_moriio_save_kv_timeout - patch_moriio_transfer_timeout - patch_moriio_load_kv_timeout - patch_scheduler_read_mode_fix - patch_prefill_idle_kv_reaper # ========================================================================= # vLLM: Export UCX/RIXL paths (persists since this file is sourced) diff --git a/benchmarks/multi_node/amd_utils/submit.sh b/benchmarks/multi_node/amd_utils/submit.sh index fa3d65418..fc91a78e8 100755 --- a/benchmarks/multi_node/amd_utils/submit.sh +++ b/benchmarks/multi_node/amd_utils/submit.sh @@ -102,7 +102,6 @@ export PROFILER_ARGS=$profiler_args # Engine-specific xP/yD semantics and TP exports if [[ "$ENGINE" == "vllm-disagg" ]]; then export PROXY_STREAM_IDLE_TIMEOUT=${PROXY_STREAM_IDLE_TIMEOUT:-300} - export VLLM_MORIIO_CONNECTOR_READ_MODE=${VLLM_MORIIO_CONNECTOR_READ_MODE:-1} fi # xP = prefill workers, yD = decode workers (may span multiple nodes) export xP=$PREFILL_WORKERS diff --git a/benchmarks/multi_node/minimaxm3_fp8_mi355x_vllm-disagg.sh b/benchmarks/multi_node/minimaxm3_fp8_mi355x_vllm-disagg.sh new file mode 100644 index 000000000..f54940e29 --- /dev/null +++ b/benchmarks/multi_node/minimaxm3_fp8_mi355x_vllm-disagg.sh @@ -0,0 +1,83 @@ +#!/usr/bin/env bash + +source "$(dirname "$0")/../benchmark_lib.sh" + +check_env_vars \ + CONC_LIST \ + ISL \ + OSL \ + IMAGE \ + SPEC_DECODING \ + MODEL_PATH \ + PREFILL_NUM_WORKERS \ + PREFILL_TP \ + PREFILL_EP \ + PREFILL_DP_ATTN \ + DECODE_NUM_WORKERS \ + DECODE_TP \ + DECODE_EP \ + DECODE_DP_ATTN \ + PREFILL_NODES \ + DECODE_NODES \ + RANDOM_RANGE_RATIO \ + FRAMEWORK + +if [[ -n "$SLURM_JOB_ID" ]]; then + echo "JOB $SLURM_JOB_ID running on $SLURMD_NODENAME" +fi + +set -x + +cd "$GITHUB_WORKSPACE/benchmarks/multi_node/amd_utils" || exit 1 + +export TIME_LIMIT="08:00:00" +# MiniMax-M3 MXFP8 (~414 GB) is pre-staged in this cluster's shared HF cache +# (/it-share/hf-hub-cache/models--MiniMaxAI--MiniMax-M3-MXFP8), not the default +# /it-share/data the launcher sets. Point the disagg model dir there for M3 only; +# submit.sh exports MODEL_DIR=$MODEL_PATH and job.slurm resolves the snapshot under +# it and bind-mounts MODEL_DIR into the prefill/decode serving containers. +export MODEL_PATH=/it-share/hf-hub-cache +export MODEL_NAME=$MODEL_NAME +export CONTAINER_IMAGE=$IMAGE + +if [[ "${PREFILL_EP:-1}" -eq 1 ]]; then + export PREFILL_ENABLE_EP=false +else + export PREFILL_ENABLE_EP=true +fi + +if [[ "$PREFILL_DP_ATTN" == "true" ]]; then + export PREFILL_ENABLE_DP=true +else + export PREFILL_ENABLE_DP=false +fi + +if [[ "${DECODE_EP:-1}" -eq 1 ]]; then + export DECODE_ENABLE_EP=false +else + export DECODE_ENABLE_EP=true +fi + +if [[ "$DECODE_DP_ATTN" == "true" ]]; then + export DECODE_ENABLE_DP=true +else + export DECODE_ENABLE_DP=false +fi + +JOB_ID=$(bash ./submit.sh $PREFILL_NODES \ + $PREFILL_NUM_WORKERS \ + $DECODE_NODES \ + $DECODE_NUM_WORKERS \ + $ISL $OSL "${CONC_LIST// /x}" inf \ + ${PREFILL_ENABLE_EP} ${PREFILL_ENABLE_DP} \ + ${DECODE_ENABLE_EP} ${DECODE_ENABLE_DP} \ + ${PREFILL_TP} ${DECODE_TP} \ + ${RANDOM_RANGE_RATIO} \ + "${NODELIST:-}") + +if [[ $? -ne 0 ]]; then + echo "Failed to submit job" >&2 + exit 1 +fi + +echo "$JOB_ID" diff --git a/perf-changelog.yaml b/perf-changelog.yaml index d6a5f35e4..54fb2d7dd 100644 --- a/perf-changelog.yaml +++ b/perf-changelog.yaml @@ -4153,3 +4153,15 @@ - "Run the PR #1891 MiniMax-M3 MXFP8 B300 Dynamo-vLLM recipe set on top of current main." - "Uses the vllm/vllm-openai:minimax-m3-0618-x86_64-cu130 image and the TEP4/TEP8 8k1k topologies not covered by PR #1890." pr-link: https://github.com/SemiAnalysisAI/InferenceX/pull/1891 + +- config-keys: + - minimaxm3-fp8-mi355x-vllm-disagg + description: + - "Initial submission: MiniMax-M3 MXFP8 MI355X vLLM disaggregated (prefill/decode) smoke test on the day-zero ROCm image (vllm/vllm-openai-rocm:minimax-m3) — 1 prefill (TP8) + 1 decode (TP8) across conc 1,2,4,8,16, validating the MoRI-IO KV-transfer disagg pipeline end-to-end for M3" + - "Layered on the MoRI-IO patch-removal infra (#1585): uses benchmarks/multi_node/amd_utils with the runtime MoRI patches removed" + - "Per-worker serve flags (models_vllm.yaml MiniMax-M3-MXFP8): --block-size 128 (MSA), --language-model-only, --kv-cache-dtype fp8, --attention-backend TRITON_ATTN, minimax_m3 parsers; no EP (TP8, MoE experts TP-sharded)" + - "M3 disagg script points MODEL_PATH at the cluster's shared HF cache (/it-share/hf-hub-cache) where the ~414 GB MiniMax-M3-MXFP8 checkpoint is pre-staged, instead of the launcher default /it-share/data; scoped to M3 only (other disagg models keep /it-share/data)" + - "Sweeps conc 1,2,4,8,16,32,64,128,256,512,1024 at both 1k1k and 8k1k (1P TP8 + 1D TP8). The 8k1k point makes the multi-node eval policy (8k1k + conc >= 16) mark one lm-eval on the highest-max-conc layout (eval-conc=median), validating the disagg pipeline's correctness; run with non-canary-full-sweep-enabled so the eval entry actually runs" + - "Adds two asymmetric prefill/decode layouts at both 1k1k and 8k1k alongside the TP8+TP8 sweep: 1P TP4 + 1D TP8 (smaller prefill, full-node decode) at conc 1,2,4,8,16,32,64,128,256; and balanced 1P TP4 + 1D TP4 at conc 64,128,256,512,1024. Per-worker TP comes from the master-config prefill/decode tp (server_vllm.sh rewrites the models_vllm.yaml --tensor-parallel-size placeholder); no EP, dp-attn off, PREFILL_NODES=1/DECODE_NODES=1 (TP4 uses half an 8-GPU node)" + - "Adds a 2P TP4 + 1D TP8 layout at both 1k1k and 8k1k for high conc 256,512,768,1024: two TP4 prefill workers (num-worker 2, PREFILL_NODES=2, each TP4 on half an 8-GPU node) feeding one TP8 decode (DECODE_NODES=1); 3 nodes total" + pr-link: https://github.com/SemiAnalysisAI/InferenceX/pull/1762