feat: reconcile config with runtime distributed state#162
Merged
Conversation
…ted state Extract shared env var mappings into utils/env_utils.py (single source of truth for cli.py and runtime reconciliation) and add a reconcile_config() function that patches the Arguments dataclass in-place with actual runtime values from the Accelerator and cluster environment variables. Called once in load_trainer() after Accelerator creation — before any consumer (model adapter, trainer, logger) reads config — so that config.to_dict() everywhere reflects the true distributed state (num_processes, process_index, num_machines, gpus_per_node, etc.). The logger module remains completely decoupled from accelerate. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Contributor
There was a problem hiding this comment.
Pull request overview
This PR centralizes multi-node environment variable handling and ensures the runtime Arguments config reflects the actual distributed state after Accelerator initialization, so downstream consumers (especially logging) observe consistent, correct values.
Changes:
- Added
utils/env_utils.pywith shared env-var mapping/lookup and areconcile_config()helper to patch runtime distributed fields intoArguments. - Updated
load_trainer()to callreconcile_config()immediately after creating theAccelerator. - Updated CLI and multinode docs to rely on the shared env-var mapping utilities and document runtime reconciliation behavior.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
src/flow_factory/utils/env_utils.py |
Introduces shared env var mapping + reconcile_config() to sync config with runtime distributed metadata. |
src/flow_factory/trainers/loader.py |
Calls reconcile_config() after Accelerator creation so subsequent components read reconciled values. |
src/flow_factory/hparams/args.py |
Adds runtime distributed fields (process indices, machine metadata, master IP) to Arguments. |
src/flow_factory/cli.py |
Switches CLI env detection to use shared ENV_VAR_MAPPINGS / env_lookup. |
multinode_examples/README.md |
Documents that configs are reconciled at runtime for accurate logged metadata. |
Comments suppressed due to low confidence (3)
src/flow_factory/utils/env_utils.py:93
- When
num_machinesis set,gpus_per_nodeis computed via integer division (num_processes // num_machines) without validating thatnum_processesis divisible bynum_machines. If the values ever disagree, this will silently truncate and log incorrect metadata; consider checking for a remainder and either raising/logging, or preferring an explicitgpus_per_nodeenv var when present.
num_machines = int(num_machines_str)
config.num_machines = num_machines
if num_machines > 0:
config.gpus_per_node = accelerator.num_processes // num_machines
except ValueError:
src/flow_factory/utils/env_utils.py:80
- Invalid integer values in cluster env vars are silently ignored (
except ValueError: pass). This makes misconfiguration hard to debug and can leave defaults in place without any indication; consider logging a warning (including the env var name/value) when parsing fails.
port_str = env_lookup("master_port")
if port_str is not None:
try:
config.main_process_port = int(port_str)
except ValueError:
pass
src/flow_factory/cli.py:48
num_machines_strcomes from env vars and is cast withint(...)without any error handling. If a scheduler sets a non-integer value, the CLI will crash before printing a helpful message; consider wrapping this parse (and the otherint(...)parses in this function) with a try/except that logs a clear error and falls back to{}.
master_ip = env_lookup("master_ip")
num_machines_str = env_lookup("num_machines")
# Consider it a multi-node environment only when both master_ip and num_machines > 1 are present
if not master_ip or not num_machines_str:
return {}
num_machines = int(num_machines_str)
if num_machines <= 1:
return {}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+68
to
+72
| # Authoritative values from accelerator | ||
| config.num_processes = accelerator.num_processes | ||
| config.mixed_precision = accelerator.mixed_precision # type: ignore[assignment] | ||
| config.process_index = accelerator.process_index | ||
| config.local_process_index = accelerator.local_process_index |
…pe: ignore Address review feedback: replace `type: ignore[assignment]` with a runtime check against the declared Literal values. If Accelerate ever returns an unexpected string, the original config value is preserved rather than silently overwritten. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ad of type: ignore" This reverts commit 9bb09f6.
87003697
pushed a commit
to 87003697/Flow-Factory
that referenced
this pull request
Jun 2, 2026
Brings trellis2 branch up-to-date with origin/main, integrating: - PR X-GenGroup#170: DiffusionOPD on-policy distillation trainer - PR X-GenGroup#168: Multi-dataset training with per-source reward routing - PR X-GenGroup#165: GenEval reward + unified trainer sampling pipeline - PR X-GenGroup#163: hparams/training_args.py → package split - PR X-GenGroup#162: Reconcile config with runtime distributed state - PRs X-GenGroup#121,X-GenGroup#146-X-GenGroup#161: CRD algorithm, Docker CUDA 12.9, HF resume, etc. Conflict resolutions: - trainers/registry.py: merged both sides (trellis2_grpo/nft + crd/opd) - rewards/registry.py: merged both sides (trellis2 rewards + geneval) - hparams/training_args.py: deleted (accept main's package split), added trellis2_grpo/nft entries to _registry.py - trainers/grpo.py: removed duplicate evaluate() (unified in BaseTrainer) - trainers/nft.py: adopted generate_samples(), removed duplicate evaluate() - trainers/abc.py: added _extra_eval_inference_kwargs() hook to BaseTrainer evaluate() so Trellis2TrainerMixin can inject stages/render_kwargs - samples/samples.py: merged source/source_id fields with trellis2 additions - rewards/reward_processor.py: merged source-aware gating with trellis2's _store_reward_extra_info - data_utils/loader.py: merged multi-source pipeline with image_3d dataset - data_utils/sampler_loader.py: merged refactored parameters - hparams/args.py: merged multi-source alignment with trellis2's group-aligned - guidance/rewards.md: merged both (PickScore_TextImage_Sum + GenEval) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
utils/env_utils.pyas a single source of truth (previously duplicated incli.pyand a now-removedlogger/runtime_config.py)reconcile_config()that patches theArgumentsdataclass in-place with actual runtime values from Accelerator + cluster env varsload_trainer()after Accelerator creation — before any consumer (model adapter, trainer, logger) reads config — soconfig.to_dict()everywhere reflects the true distributed state (num_processes,process_index,num_machines,gpus_per_node, etc.)accelerateTest plan
from flow_factory.utils.env_utils import reconcile_configimports cleanlyfrom flow_factory.logger import load_loggerworks without accelerate at import timeArguments().to_dict()excludes runtime fields when they areNonenum_processesnum_machines,gpus_per_node,machine_rank,main_process_ip) appear in logged config🤖 Generated with Claude Code