Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions nemo_curator/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any

from nemo_curator.core.utils import ignore_ray_head_node
Expand Down Expand Up @@ -52,8 +53,22 @@ def __init__(self, config: dict[str, Any] | None = None, ignore_head_node: bool
self.ignore_head_node = ignore_head_node or ignore_ray_head_node()

@abstractmethod
def execute(self, stages: list["ProcessingStage"], initial_tasks: list[Task] | None = None) -> None:
"""Execute the pipeline."""
def execute(
self,
stages: list["ProcessingStage"],
initial_tasks: list[Task] | None = None,
checkpoint_path: str | Path | None = None,
) -> None:
"""Execute the pipeline.

Args:
stages: Execution stages to run.
initial_tasks: Initial tasks. Empty list / ``EmptyTask`` is used when ``None``.
checkpoint_path: If provided, lineage records (parents, children, type,
completed flag) for every task that flows through the pipeline are
persisted to an LMDB file at this path. The file is owned by a
single Ray actor and is safe to place on NFS/Lustre.
"""


class BaseStageAdapter:
Expand Down
27 changes: 25 additions & 2 deletions nemo_curator/backends/ray_actor_pool/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import uuid
from copy import deepcopy
from pathlib import Path
from typing import TYPE_CHECKING

import numpy as np
Expand All @@ -25,6 +26,7 @@
from nemo_curator.backends.base import BaseExecutor
from nemo_curator.backends.utils import RayStageSpecKeys, execute_setup_on_node, register_loguru_serializer
from nemo_curator.tasks import EmptyTask, Task
from nemo_curator.utils.lineage_store import LINEAGE_ACTOR_NAME, LineageWriterActor

from .adapter import RayActorPoolStageAdapter
from .raft_adapter import RayActorPoolRAFTAdapter
Expand Down Expand Up @@ -78,12 +80,19 @@ def __init__(
self.show_progress = show_progress
self.progress_interval = progress_interval

def execute(self, stages: list["ProcessingStage"], initial_tasks: list[Task] | None = None) -> list[Task]: # noqa: PLR0912
def execute( # noqa: PLR0912, PLR0915, C901
self,
stages: list["ProcessingStage"],
initial_tasks: list[Task] | None = None,
checkpoint_path: str | Path | None = None,
) -> list[Task]:
"""Execute the pipeline stages using ActorPool.

Args:
stages: List of processing stages to execute
initial_tasks: Initial tasks to process (can be None for empty start)
checkpoint_path: If provided, spawn a :class:`LineageWriterActor` that
records the task DAG to LMDB at this path for the duration of the run.

Returns:
List of final processed tasks
Expand All @@ -93,10 +102,19 @@ def execute(self, stages: list["ProcessingStage"], initial_tasks: list[Task] | N

session_id = uuid.uuid4().bytes

lineage_actor = None
try:
# Initialize Ray and register loguru serializer
register_loguru_serializer()
ray.init(ignore_reinit_error=True, runtime_env=_parse_runtime_env(self.config.get("runtime_env", {})))
if checkpoint_path is not None:
absolute_checkpoint_path = str(Path(checkpoint_path).absolute())
lineage_actor = LineageWriterActor.options(
name=LINEAGE_ACTOR_NAME,
lifetime="detached",
get_if_exists=True,
).remote(path=absolute_checkpoint_path)
logger.info(f"Spawned LineageWriterActor; checkpoint at {absolute_checkpoint_path}")

# Execute setup on node for all stages BEFORE processing begins
execute_setup_on_node(stages, ignore_head_node=self.ignore_head_node)
Expand Down Expand Up @@ -157,9 +175,14 @@ def execute(self, stages: list["ProcessingStage"], initial_tasks: list[Task] | N
# Return final results directly - no need for ray.get()
final_results = current_tasks or []
logger.info(f"\nPipeline completed. Final results: {len(final_results)} tasks")

return final_results
finally:
if lineage_actor is not None:
try:
ray.get(lineage_actor.close.remote())
except Exception as e: # noqa: BLE001
logger.warning(f"Failed to close LineageWriterActor: {e}")
ray.kill(lineage_actor)
# Clean up all Ray resources including named actors
logger.info("Shutting down Ray to clean up all resources...")
ray.shutdown()
Expand Down
26 changes: 25 additions & 1 deletion nemo_curator/backends/ray_data/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from pathlib import Path
from typing import TYPE_CHECKING, Any

import ray
Expand All @@ -21,6 +22,7 @@
from nemo_curator.backends.base import BaseExecutor
from nemo_curator.backends.utils import execute_setup_on_node, register_loguru_serializer
from nemo_curator.tasks import EmptyTask, Task
from nemo_curator.utils.lineage_store import LINEAGE_ACTOR_NAME, LineageWriterActor

from .adapter import RayDataStageAdapter

Expand All @@ -41,12 +43,19 @@ class RayDataExecutor(BaseExecutor):
def __init__(self, config: dict[str, Any] | None = None, ignore_head_node: bool = False):
super().__init__(config, ignore_head_node)

def execute(self, stages: list["ProcessingStage"], initial_tasks: list[Task] | None = None) -> list[Task]:
def execute(
self,
stages: list["ProcessingStage"],
initial_tasks: list[Task] | None = None,
checkpoint_path: str | Path | None = None,
) -> list[Task]:
"""Execute the pipeline stages using Ray Data.

Args:
stages (list[ProcessingStage]): List of processing stages to execute
initial_tasks (list[Task], optional): Initial tasks to process (can be None for empty start)
checkpoint_path (str | Path, optional): If provided, spawn a :class:`LineageWriterActor`
that records the task DAG to LMDB at this path for the duration of the run.

Returns:
list[Task]: List of final processed tasks
Expand All @@ -60,6 +69,7 @@ def execute(self, stages: list["ProcessingStage"], initial_tasks: list[Task] | N
# Initialize with initial tasks if provided, otherwise start with EmptyTask
tasks: list[Task] = initial_tasks or [EmptyTask]
output_tasks: list[Task] = []
lineage_actor = None
# When runtime_env with pip is used, Ray's pip plugin sets up per-stage virtualenvs
# lazily on first task dispatch by cloning the current virtualenv. The NeMo Curator
# container's /opt/venv is created with `uv venv --seed` so pip is available in clones.
Expand All @@ -69,6 +79,14 @@ def execute(self, stages: list["ProcessingStage"], initial_tasks: list[Task] | N
ray.init(
ignore_reinit_error=True, runtime_env={"env_vars": {"RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES": ""}}
)
if checkpoint_path is not None:
absolute_checkpoint_path = str(Path(checkpoint_path).absolute())
lineage_actor = LineageWriterActor.options(
name=LINEAGE_ACTOR_NAME,
lifetime="detached",
get_if_exists=True,
).remote(path=absolute_checkpoint_path)
logger.info(f"Spawned LineageWriterActor; checkpoint at {absolute_checkpoint_path}")

# Convert tasks to dataset
current_dataset = self._tasks_to_dataset(tasks)
Expand Down Expand Up @@ -97,6 +115,12 @@ def execute(self, stages: list["ProcessingStage"], initial_tasks: list[Task] | N
output_tasks = self._dataset_to_tasks(current_dataset)
logger.info(f"Pipeline completed. Final results: {len(output_tasks)} tasks")
finally:
if lineage_actor is not None:
try:
ray.get(lineage_actor.close.remote())
except Exception as e: # noqa: BLE001
logger.warning(f"Failed to close LineageWriterActor: {e}")
ray.kill(lineage_actor)
# This ensures we unset all the env vars set above during initialize and kill the pending actors.
ray.shutdown()
return output_tasks
Expand Down
27 changes: 26 additions & 1 deletion nemo_curator/backends/xenna/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from pathlib import Path
from typing import Any

import ray
Expand All @@ -24,6 +25,7 @@
from nemo_curator.backends.xenna.adapter import create_named_xenna_stage_adapter
from nemo_curator.stages.base import ProcessingStage
from nemo_curator.tasks import EmptyTask, Task
from nemo_curator.utils.lineage_store import LINEAGE_ACTOR_NAME, LineageWriterActor


class XennaExecutor(BaseExecutor):
Expand Down Expand Up @@ -59,12 +61,20 @@ def __init__(self, config: dict[str, Any] | None = None, ignore_head_node: bool
"autoscale_interval_s": 180,
}

def execute(self, stages: list[ProcessingStage], initial_tasks: list[Task] | None = None) -> list[Task]:
def execute(
self,
stages: list[ProcessingStage],
initial_tasks: list[Task] | None = None,
checkpoint_path: str | Path | None = None,
) -> list[Task]:
"""Execute the pipeline using Cosmos-Xenna.

Args:
stages (list[ProcessingStage]): The stages to run
initial_tasks (list[Task], optional): The initial tasks to run. Empty list of Task is used if not provided.
checkpoint_path (str | Path, optional): If provided, spawn a :class:`LineageWriterActor`
that records the task DAG (parents, children, type, completed flag) to LMDB at
this path for the duration of the run.

Returns:
list[Task]: List of output tasks from the pipeline
Expand Down Expand Up @@ -134,6 +144,7 @@ def execute(self, stages: list[ProcessingStage], initial_tasks: list[Task] | Non
# Log pipeline configuration
logger.info(f"Execution mode: {exec_mode.name}")

lineage_actor = None
try:
register_loguru_serializer()
# Prevent Ray from overriding accelerator env vars when num_gpus=0, letting Xenna manage them instead.
Expand All @@ -146,13 +157,27 @@ def execute(self, stages: list[ProcessingStage], initial_tasks: list[Task] | Non
}
},
)
if checkpoint_path is not None:
absolute_checkpoint_path = str(Path(checkpoint_path).absolute())
lineage_actor = LineageWriterActor.options(
name=LINEAGE_ACTOR_NAME,
lifetime="detached",
get_if_exists=True,
).remote(path=absolute_checkpoint_path)
logger.info(f"Spawned LineageWriterActor; checkpoint at {absolute_checkpoint_path}")
# Run the pipeline (this will re-initialize ray but that'll be a no-op and the ray.init above will take precedence)
results = pipelines_v1.run_pipeline(pipeline_spec)
logger.info(f"Pipeline completed successfully with {len(results) if results else 0} output tasks")
except Exception as e:
logger.error(f"Pipeline execution failed: {e}")
raise
finally:
if lineage_actor is not None:
try:
ray.get(lineage_actor.close.remote())
except Exception as e: # noqa: BLE001
logger.warning(f"Failed to close LineageWriterActor: {e}")
ray.kill(lineage_actor)
# This ensures we unset all the env vars set above during initialize and kill the pending actors.
ray.shutdown()
return results if results else []
Expand Down
31 changes: 28 additions & 3 deletions nemo_curator/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from pathlib import Path
from typing import Any

from loguru import logger

from nemo_curator.backends.base import BaseExecutor
from nemo_curator.stages.base import CompositeStage, ProcessingStage
from nemo_curator.stages.base import CompositeStage, ProcessingStage, assign_root_lineage
from nemo_curator.tasks import Task


Expand Down Expand Up @@ -80,6 +81,15 @@ def build(self) -> None:
self.stages = execution_stages
self.decomposition_info = decomposition_info

# 3. Flag the terminal execution stage so its default process_batch can
# incrementally mark emitted leaves completed via the lineage store.
# Reset all stages first so re-builds and instances reused across
# pipelines do not leak a stale True.
for stage in self.stages:
stage._is_terminal_stage = False
if self.stages:
self.stages[-1]._is_terminal_stage = True

def _decompose_stages(
self, stages: list[ProcessingStage | CompositeStage]
) -> tuple[list[ProcessingStage], dict[str, list[str]]]:
Expand Down Expand Up @@ -174,18 +184,31 @@ def describe(self) -> str:

return "\n".join(lines)

def run(self, executor: BaseExecutor | None = None, initial_tasks: list[Task] | None = None) -> list[Task] | None:
def run(
self,
executor: BaseExecutor | None = None,
initial_tasks: list[Task] | None = None,
checkpoint_path: str | Path | None = None,
) -> list[Task] | None:
"""Run the pipeline.

Args:
executor (BaseExecutor): Executor to use
initial_tasks (list[Task], optional): Initial tasks to start the pipeline with. Defaults to None.
checkpoint_path (str | Path, optional): If provided, a single LMDB file at this path
records lineage (parents, children, task type, completed flag) for every task that
flows through the pipeline, keyed by ``_udid``. Owned by one Ray actor, so the file
may live on NFS/Lustre. When omitted, no lineage is persisted.

Returns:
list[Task] | None: List of tasks
"""
self.build()

if checkpoint_path is not None:
checkpoint_path = Path(checkpoint_path).absolute()
checkpoint_path.parent.mkdir(parents=True, exist_ok=True)

if executor is None:
from nemo_curator.backends.xenna import XennaExecutor

Expand All @@ -212,4 +235,6 @@ def run(self, executor: BaseExecutor | None = None, initial_tasks: list[Task] |
"The executor will schedule GPU stages on GPUs not held by Serve."
)

return executor.execute(self.stages, initial_tasks)
if initial_tasks:
assign_root_lineage(initial_tasks)
return executor.execute(self.stages, initial_tasks, checkpoint_path=checkpoint_path)
Loading
Loading