Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions nemo_curator/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any

from nemo_curator.core.utils import ignore_ray_head_node
Expand Down Expand Up @@ -52,8 +53,22 @@ def __init__(self, config: dict[str, Any] | None = None, ignore_head_node: bool
self.ignore_head_node = ignore_head_node or ignore_ray_head_node()

@abstractmethod
def execute(self, stages: list["ProcessingStage"], initial_tasks: list[Task] | None = None) -> None:
"""Execute the pipeline."""
def execute(
self,
stages: list["ProcessingStage"],
initial_tasks: list[Task] | None = None,
checkpoint_path: str | Path | None = None,
) -> None:
"""Execute the pipeline.

Args:
stages: Execution stages to run.
initial_tasks: Initial tasks. Empty list / ``EmptyTask`` is used when ``None``.
checkpoint_path: If provided, lineage records (parents, children, type,
completed flag) for every task that flows through the pipeline are
persisted to an LMDB file at this path. The file is owned by a
single Ray actor and is safe to place on NFS/Lustre.
"""


class BaseStageAdapter:
Expand Down
26 changes: 25 additions & 1 deletion nemo_curator/backends/ray_actor_pool/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import uuid
from copy import deepcopy
from pathlib import Path
from typing import TYPE_CHECKING

import numpy as np
Expand All @@ -25,6 +26,7 @@
from nemo_curator.backends.base import BaseExecutor
from nemo_curator.backends.utils import RayStageSpecKeys, execute_setup_on_node, register_loguru_serializer
from nemo_curator.tasks import EmptyTask, Task
from nemo_curator.utils.lineage_store import LINEAGE_ACTOR_NAME, LineageWriterActor

from .adapter import RayActorPoolStageAdapter
from .raft_adapter import RayActorPoolRAFTAdapter
Expand Down Expand Up @@ -78,12 +80,19 @@ def __init__(
self.show_progress = show_progress
self.progress_interval = progress_interval

def execute(self, stages: list["ProcessingStage"], initial_tasks: list[Task] | None = None) -> list[Task]: # noqa: PLR0912
def execute( # noqa: PLR0912, PLR0915, C901
self,
stages: list["ProcessingStage"],
initial_tasks: list[Task] | None = None,
checkpoint_path: str | Path | None = None,
) -> list[Task]:
"""Execute the pipeline stages using ActorPool.

Args:
stages: List of processing stages to execute
initial_tasks: Initial tasks to process (can be None for empty start)
checkpoint_path: If provided, spawn a :class:`LineageWriterActor` that
records the task DAG to LMDB at this path for the duration of the run.

Returns:
List of final processed tasks
Expand All @@ -93,10 +102,19 @@ def execute(self, stages: list["ProcessingStage"], initial_tasks: list[Task] | N

session_id = uuid.uuid4().bytes

lineage_actor = None
try:
# Initialize Ray and register loguru serializer
register_loguru_serializer()
ray.init(ignore_reinit_error=True, runtime_env=_parse_runtime_env(self.config.get("runtime_env", {})))
if checkpoint_path is not None:
Comment on lines 103 to +110
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 get_if_exists=True silently reuses an actor with a different checkpoint path

If a LineageWriterActor from a previous pipeline run is still alive, get_if_exists=True returns the existing actor without calling its constructor — meaning the path=absolute_checkpoint_path argument is silently ignored and the new run writes lineage into the old file. The same pattern appears in ray_data/executor.py and xenna/executor.py. Consider using a run-scoped unique actor name or validating the returned actor's path.

absolute_checkpoint_path = str(Path(checkpoint_path).absolute())
lineage_actor = LineageWriterActor.options(
name=LINEAGE_ACTOR_NAME,
lifetime="detached",
get_if_exists=True,
).remote(path=absolute_checkpoint_path)
logger.info(f"Spawned LineageWriterActor; checkpoint at {absolute_checkpoint_path}")

# Execute setup on node for all stages BEFORE processing begins
execute_setup_on_node(stages, ignore_head_node=self.ignore_head_node)
Expand Down Expand Up @@ -160,6 +178,12 @@ def execute(self, stages: list["ProcessingStage"], initial_tasks: list[Task] | N

return final_results
finally:
if lineage_actor is not None:
try:
ray.get(lineage_actor.close.remote())
except Exception as e: # noqa: BLE001
logger.warning(f"Failed to close LineageWriterActor: {e}")
ray.kill(lineage_actor)
# Clean up all Ray resources including named actors
logger.info("Shutting down Ray to clean up all resources...")
ray.shutdown()
Expand Down
26 changes: 25 additions & 1 deletion nemo_curator/backends/ray_data/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from pathlib import Path
from typing import TYPE_CHECKING, Any

import ray
Expand All @@ -21,6 +22,7 @@
from nemo_curator.backends.base import BaseExecutor
from nemo_curator.backends.utils import execute_setup_on_node, register_loguru_serializer
from nemo_curator.tasks import EmptyTask, Task
from nemo_curator.utils.lineage_store import LINEAGE_ACTOR_NAME, LineageWriterActor

from .adapter import RayDataStageAdapter

Expand All @@ -41,12 +43,19 @@ class RayDataExecutor(BaseExecutor):
def __init__(self, config: dict[str, Any] | None = None, ignore_head_node: bool = False):
super().__init__(config, ignore_head_node)

def execute(self, stages: list["ProcessingStage"], initial_tasks: list[Task] | None = None) -> list[Task]:
def execute(
self,
stages: list["ProcessingStage"],
initial_tasks: list[Task] | None = None,
checkpoint_path: str | Path | None = None,
) -> list[Task]:
"""Execute the pipeline stages using Ray Data.

Args:
stages (list[ProcessingStage]): List of processing stages to execute
initial_tasks (list[Task], optional): Initial tasks to process (can be None for empty start)
checkpoint_path (str | Path, optional): If provided, spawn a :class:`LineageWriterActor`
that records the task DAG to LMDB at this path for the duration of the run.

Returns:
list[Task]: List of final processed tasks
Expand All @@ -60,6 +69,7 @@ def execute(self, stages: list["ProcessingStage"], initial_tasks: list[Task] | N
# Initialize with initial tasks if provided, otherwise start with EmptyTask
tasks: list[Task] = initial_tasks or [EmptyTask]
output_tasks: list[Task] = []
lineage_actor = None
# When runtime_env with pip is used, Ray's pip plugin sets up per-stage virtualenvs
# lazily on first task dispatch by cloning the current virtualenv. The NeMo Curator
# container's /opt/venv is created with `uv venv --seed` so pip is available in clones.
Expand All @@ -69,6 +79,14 @@ def execute(self, stages: list["ProcessingStage"], initial_tasks: list[Task] | N
ray.init(
ignore_reinit_error=True, runtime_env={"env_vars": {"RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES": ""}}
)
if checkpoint_path is not None:
absolute_checkpoint_path = str(Path(checkpoint_path).absolute())
lineage_actor = LineageWriterActor.options(
name=LINEAGE_ACTOR_NAME,
lifetime="detached",
get_if_exists=True,
).remote(path=absolute_checkpoint_path)
logger.info(f"Spawned LineageWriterActor; checkpoint at {absolute_checkpoint_path}")

# Convert tasks to dataset
current_dataset = self._tasks_to_dataset(tasks)
Expand Down Expand Up @@ -97,6 +115,12 @@ def execute(self, stages: list["ProcessingStage"], initial_tasks: list[Task] | N
output_tasks = self._dataset_to_tasks(current_dataset)
logger.info(f"Pipeline completed. Final results: {len(output_tasks)} tasks")
finally:
if lineage_actor is not None:
try:
ray.get(lineage_actor.close.remote())
except Exception as e: # noqa: BLE001
logger.warning(f"Failed to close LineageWriterActor: {e}")
ray.kill(lineage_actor)
# This ensures we unset all the env vars set above during initialize and kill the pending actors.
ray.shutdown()
return output_tasks
Expand Down
27 changes: 26 additions & 1 deletion nemo_curator/backends/xenna/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from pathlib import Path
from typing import Any

import ray
Expand All @@ -24,6 +25,7 @@
from nemo_curator.backends.xenna.adapter import create_named_xenna_stage_adapter
from nemo_curator.stages.base import ProcessingStage
from nemo_curator.tasks import EmptyTask, Task
from nemo_curator.utils.lineage_store import LINEAGE_ACTOR_NAME, LineageWriterActor


class XennaExecutor(BaseExecutor):
Expand Down Expand Up @@ -59,12 +61,20 @@ def __init__(self, config: dict[str, Any] | None = None, ignore_head_node: bool
"autoscale_interval_s": 180,
}

def execute(self, stages: list[ProcessingStage], initial_tasks: list[Task] | None = None) -> list[Task]:
def execute(
self,
stages: list[ProcessingStage],
initial_tasks: list[Task] | None = None,
checkpoint_path: str | Path | None = None,
) -> list[Task]:
"""Execute the pipeline using Cosmos-Xenna.

Args:
stages (list[ProcessingStage]): The stages to run
initial_tasks (list[Task], optional): The initial tasks to run. Empty list of Task is used if not provided.
checkpoint_path (str | Path, optional): If provided, spawn a :class:`LineageWriterActor`
that records the task DAG (parents, children, type, completed flag) to LMDB at
this path for the duration of the run.

Returns:
list[Task]: List of output tasks from the pipeline
Expand Down Expand Up @@ -134,6 +144,7 @@ def execute(self, stages: list[ProcessingStage], initial_tasks: list[Task] | Non
# Log pipeline configuration
logger.info(f"Execution mode: {exec_mode.name}")

lineage_actor = None
try:
register_loguru_serializer()
# Prevent Ray from overriding accelerator env vars when num_gpus=0, letting Xenna manage them instead.
Expand All @@ -146,13 +157,27 @@ def execute(self, stages: list[ProcessingStage], initial_tasks: list[Task] | Non
}
},
)
if checkpoint_path is not None:
absolute_checkpoint_path = str(Path(checkpoint_path).absolute())
lineage_actor = LineageWriterActor.options(
name=LINEAGE_ACTOR_NAME,
lifetime="detached",
get_if_exists=True,
).remote(path=absolute_checkpoint_path)
logger.info(f"Spawned LineageWriterActor; checkpoint at {absolute_checkpoint_path}")
# Run the pipeline (this will re-initialize ray but that'll be a no-op and the ray.init above will take precedence)
results = pipelines_v1.run_pipeline(pipeline_spec)
logger.info(f"Pipeline completed successfully with {len(results) if results else 0} output tasks")
except Exception as e:
logger.error(f"Pipeline execution failed: {e}")
raise
finally:
if lineage_actor is not None:
try:
ray.get(lineage_actor.close.remote())
except Exception as e: # noqa: BLE001
logger.warning(f"Failed to close LineageWriterActor: {e}")
ray.kill(lineage_actor)
# This ensures we unset all the env vars set above during initialize and kill the pending actors.
ray.shutdown()
return results if results else []
Expand Down
22 changes: 19 additions & 3 deletions nemo_curator/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from pathlib import Path
from typing import Any

from loguru import logger

from nemo_curator.backends.base import BaseExecutor
from nemo_curator.stages.base import CompositeStage, ProcessingStage
from nemo_curator.stages.base import CompositeStage, ProcessingStage, assign_root_lineage
from nemo_curator.tasks import Task


Expand Down Expand Up @@ -174,18 +175,31 @@ def describe(self) -> str:

return "\n".join(lines)

def run(self, executor: BaseExecutor | None = None, initial_tasks: list[Task] | None = None) -> list[Task] | None:
def run(
self,
executor: BaseExecutor | None = None,
initial_tasks: list[Task] | None = None,
checkpoint_path: str | Path | None = None,
) -> list[Task] | None:
"""Run the pipeline.

Args:
executor (BaseExecutor): Executor to use
initial_tasks (list[Task], optional): Initial tasks to start the pipeline with. Defaults to None.
checkpoint_path (str | Path, optional): If provided, a single LMDB file at this path
records lineage (parents, children, task type, completed flag) for every task that
flows through the pipeline, keyed by ``_udid``. Owned by one Ray actor, so the file
may live on NFS/Lustre. When omitted, no lineage is persisted.

Returns:
list[Task] | None: List of tasks
"""
self.build()

if checkpoint_path is not None:
checkpoint_path = Path(checkpoint_path).absolute()
checkpoint_path.parent.mkdir(parents=True, exist_ok=True)

if executor is None:
from nemo_curator.backends.xenna import XennaExecutor

Expand All @@ -212,4 +226,6 @@ def run(self, executor: BaseExecutor | None = None, initial_tasks: list[Task] |
"The executor will schedule GPU stages on GPUs not held by Serve."
)

return executor.execute(self.stages, initial_tasks)
if initial_tasks:
assign_root_lineage(initial_tasks)
return executor.execute(self.stages, initial_tasks, checkpoint_path=checkpoint_path)
Loading
Loading