Add unique and deterministic id generation#1993
Conversation
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
Greptile SummaryThis PR introduces
Confidence Score: 2/5Not safe to merge — the test file has a broken import that prevents all pipeline tests from running, and the root-lineage assignment is fragile under task reuse. The test file imports tests/pipelines/test_pipelines.py (broken import) and nemo_curator/pipeline/pipeline.py (inline root-lineage loop that is neither exported nor guarded against reuse) Important Files Changed
Sequence DiagramsequenceDiagram
participant Caller
participant Pipeline
participant Task
participant ProcessingStage
participant assign_child_lineage
Caller->>Pipeline: run(initial_tasks)
Pipeline->>Pipeline: build()
loop each root task[i]
Pipeline->>Task: _set_lineage([], i)
Task-->>Pipeline: "_lineage_path="i", _udid=sha256("i")[:32]"
end
Pipeline->>Executor: execute(stages, initial_tasks)
loop each stage
Executor->>ProcessingStage: process_batch(tasks)
loop each task
ProcessingStage->>ProcessingStage: process(task) → result
ProcessingStage->>assign_child_lineage: ([task._lineage_path], result)
loop each child[i] where _udid is empty
assign_child_lineage->>Task: _set_lineage(parent_paths, i)
Task-->>assign_child_lineage: _lineage_path, _udid set
end
assign_child_lineage-->>ProcessingStage: [children with lineage]
end
ProcessingStage-->>Executor: results
end
Executor-->>Caller: final tasks
Reviews (5): Last reviewed commit: "Assign ids at for initial tasks" | Re-trigger Greptile |
| def _set_lineage(self, parent_lineage_paths: list[str], child_index: int) -> None: | ||
| # DAG structure does. Clear cache directories when changing config. | ||
| parts = [*[p for p in parent_lineage_paths if p], str(child_index)] | ||
| self._lineage_path = "_".join(parts) | ||
| self._udid = hashlib.sha256(self._lineage_path.encode()).hexdigest()[:32] |
There was a problem hiding this comment.
Lineage path collision:
_ separator is ambiguous for multi-parent joins
_set_lineage joins all parent paths with _ and then appends the child index with _. Because _ is also the separator used within a parent's own path, the resulting string is not injective.
Concrete collision — both of these produce "3_0_7":
- Single-parent chain: parent with path
"3_0"at child index 7 →_set_lineage(["3_0"], 7)→parts = ["3_0", "7"]→"3_0_7" - Fan-in: two parents with paths
["3", "0"]at child index 7 →_set_lineage(["3", "0"], 7)→parts = ["3", "0", "7"]→"3_0_7"
The PR description's own example ("3_0_7" = 4th root, 1st child, 8th grandchild) is itself ambiguous by this encoding. Any pipeline that combines deep sequential chains with multi-parent fan-in stages is at risk of two structurally distinct tasks sharing the same _udid, directly undermining the caching and file-naming goals this PR is designed to serve. A fix would use an unambiguous encoding for the multi-parent case — for example, hashing the ordered list of parent paths with a null-byte separator before appending the child index, rather than flattening them all with _.
| def _set_lineage(self, parent_lineage_paths: list[str], child_index: int) -> None: | ||
| # DAG structure does. Clear cache directories when changing config. | ||
| parts = [*[p for p in parent_lineage_paths if p], str(child_index)] |
There was a problem hiding this comment.
The comment is clearly truncated — "DAG structure does." reads as the tail of a sentence, not the beginning. Based on the PR description the intended message is that hyperparameter changes don't change
_udid, only structural (DAG-shape) changes do.
| def _set_lineage(self, parent_lineage_paths: list[str], child_index: int) -> None: | |
| # DAG structure does. Clear cache directories when changing config. | |
| parts = [*[p for p in parent_lineage_paths if p], str(child_index)] | |
| def _set_lineage(self, parent_lineage_paths: list[str], child_index: int) -> None: | |
| # Hyperparameter changes do not invalidate `_udid` — only DAG structure does. | |
| # Clear cache directories when changing stage config. | |
| parts = [*[p for p in parent_lineage_paths if p], str(child_index)] |
| Each surviving ``children[i]`` gets ``_lineage_path`` and ``_uuid`` derived | ||
| from ``(parent_paths, i)`` so that the same pipeline run twice on the same | ||
| inputs produces byte-identical task IDs. Call this from any custom | ||
| ``process_batch`` override to keep outputs consistent with the rest of the | ||
| pipeline. |
There was a problem hiding this comment.
The docstring mentions
_uuid but the function never touches _uuid — it only sets _lineage_path and _udid. Referencing _uuid here will mislead implementers of custom process_batch overrides who are deciding which field to use for stable identifiers.
| Each surviving ``children[i]`` gets ``_lineage_path`` and ``_uuid`` derived | |
| from ``(parent_paths, i)`` so that the same pipeline run twice on the same | |
| inputs produces byte-identical task IDs. Call this from any custom | |
| ``process_batch`` override to keep outputs consistent with the rest of the | |
| pipeline. | |
| Each surviving ``children[i]`` gets ``_lineage_path`` and ``_udid`` derived | |
| from ``(parent_paths, i)`` so that the same pipeline run twice on the same | |
| inputs produces byte-identical task IDs. Call this from any custom | |
| ``process_batch`` override to keep outputs consistent with the rest of the | |
| pipeline. |
| outputs.extend(assign_child_lineage([task._lineage_path], raw)) | ||
| return outputs | ||
|
|
||
| Outputs that skip this step will carry empty ``_uuid``/``_lineage_path``. |
There was a problem hiding this comment.
Same
_uuid / _udid mix-up: _uuid is always a random uuid4 and is never empty; the field that will be empty when lineage is skipped is _udid.
| Outputs that skip this step will carry empty ``_uuid``/``_lineage_path``. | |
| Outputs that skip this step will carry empty ``_udid``/``_lineage_path``. |
|
/ok to test c1eb896 |
|
@VibhuJawa @praateekmahajan @sarahyurick Could you please review this PR when you a chance? |
VibhuJawa
left a comment
There was a problem hiding this comment.
So the logic looks sound to me but will really lean on @praateekmahajan because i am not familiar with this side of the codebase.
|
/claude review |
| results.extend(result) | ||
| else: | ||
| results.append(result) | ||
| results.extend(assign_child_lineage([task._lineage_path], result)) |
There was a problem hiding this comment.
Bug: _udid collisions when multiple root tasks enter a 1:1 first stage.
All root tasks start with _lineage_path = "". The _set_lineage helper filters out empty parent paths, so every 1:1 child gets _lineage_path = "0" regardless of which root task it came from:
# Two root tasks (both _lineage_path=""), 1:1 stage:
# root_a → assign_child_lineage([""], child_a) → child_a._lineage_path = "0"
# root_b → assign_child_lineage([""], child_b) → child_b._lineage_path = "0" # collision!The PR description's own example ("3_0_7" = "4th root task, …") implies root tasks should carry their batch index, but nothing assigns it. The executors pass initial_tasks straight into the first stage.
Fix: assign root lineage before the first stage runs, e.g. in Pipeline.run or in each executor:
for i, task in enumerate(initial_tasks):
task._set_lineage([], i)The tests only cover single-root-task scenarios, so this isn't caught today. A test with 2+ root tasks through a 1:1 stage would reproduce it immediately.
There was a problem hiding this comment.
I am also a bit tripped up about the counting part, but I think the concern about the collision is wrong, I don't see a collision happening because either:
- we start with a
[EmptyTask]and so it produces e.g., a list ofFileGroupTasks which will be handled correctly - we start with something else, in which case it is assigned a root lineage in
pipeline.py
But perhaps I am thinking too narrowly about the possibilities of scenario 1.
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
|
/ok to test f806a55 |
sarahyurick
left a comment
There was a problem hiding this comment.
Logic LGTM. Left a minor comment.
|
@sarahyurick could you please approve? @praateekmahajan could you please review? |
| # It is propagated to children and hashed into `_udid`, the deterministic | ||
| # task id. | ||
| _lineage_path: str = field(init=False, default="") | ||
| _udid: str = field(init=False, default="") |
There was a problem hiding this comment.
Instead of adding udid on top of uuid, task_id I think we can just keep one of them and remove others. I believe uuid is only used by add id and I'm pretty sure it'll be unaffected too
There was a problem hiding this comment.
Some of the stages use the uuid so I couldn't touch it. Task_id is an integer number and not sure if we should touch it.
There was a problem hiding this comment.
Can you share which ones, I can help decide if we can yank them. Would prefer to not add new _id fiields. Similarly for task_id.. Would want to avoid as much as possible to create new fields without depreciating similar old fields..
There was a problem hiding this comment.
Ok, will use task_id and won't add the _udid. Will update the code.
Co-authored-by: Praateek Mahajan <praateekmahajan@users.noreply.github.com> Signed-off-by: Onur Yilmaz <35306097+oyilmaz-nvidia@users.noreply.github.com>
| ] | ||
|
|
||
|
|
||
| def _drive(pipeline: Pipeline, initial_tasks: list[Task]) -> list[Task]: |
There was a problem hiding this comment.
I think this test might better live inside tests/backends/test_integration.py within backends or you can create a new one.
Instead of having drive as the functionality, let's just run the pipeline how users run without interfacing with these lower-level functionality, primarily because of tomorrow the contract changes then we don't want to be testing things with a lower level utility
| def test_pipeline_udid_deterministic_across_runs(): | ||
| def run_once() -> tuple[list[str], list[str]]: | ||
| pipeline = Pipeline(name="det", stages=[_Repeat(times=2), _Repeat(times=3)]) | ||
| pipeline.build() |
There was a problem hiding this comment.
build() is a no-op so far, AI continues to consider them important.. Same as above, let's test how users run our pipelines. i..e. Pipeiline(...).run() and then check the output tasks.
|
|
||
| Pipeline topology: | ||
|
|
||
| Input ─▶ FanOut(3) ─▶ Passthrough ─▶ FanIn ─▶ Passthrough ─▶ Output |
There was a problem hiding this comment.
super nit : How is ruff passing on these characters?
Can we keep it to Input -> FanOut(3) -> Passthrough -> FanIn...
There was a problem hiding this comment.
Not sure but it's passing :D
| # Drive stage-by-stage so we can inspect each intermediate set of tasks. | ||
| after_fanout = BaseStageAdapter(pipeline.stages[0]).process_batch([root]) | ||
| after_passthrough_1 = BaseStageAdapter(pipeline.stages[1]).process_batch(after_fanout) | ||
| after_fanin = BaseStageAdapter(pipeline.stages[2]).process_batch(after_passthrough_1) | ||
| after_passthrough_2 = BaseStageAdapter(pipeline.stages[3]).process_batch(after_fanin) |
There was a problem hiding this comment.
I think there is a bug here.. your FanIn stage if you do pipeline.run() isn't technically a fan-in..
Once you modify your tests to how I shared above you'll see, that the output tasks woudn't be what you expect.
For a fanin to be you need to define batch_size property on the stage. So a good reason to move the tests to follow similar pattern to tests/backends/test_integration.py
I could be wrong but some of these fan outs and fan ins might already be there with some other properties being tested. So if we can reuse existing tests while covering same functionality I'd prefer that 100%
| return False | ||
| parts = [*[p for p in parent_lineage_paths if p], str(child_index)] | ||
| self._lineage_path = "_".join(parts) | ||
| self._udid = hashlib.sha256(self._lineage_path.encode()).hexdigest()[:32] |
There was a problem hiding this comment.
Maybe move this to tasks/utils.py (a separate staticmethod here in this calass) and then also have a test in test_tasks.py that we indeed to that the same hasing we expect and that we pass lineage_path to it
| def _sha256_32(s: str) -> str: | ||
| return hashlib.sha256(s.encode()).hexdigest()[:32] |
There was a problem hiding this comment.
related to my comment on staticmethod for the shaing
| results.extend(result) | ||
| else: | ||
| results.append(result) | ||
| results.extend(assign_child_lineage([task._lineage_path], result)) |
There was a problem hiding this comment.
Can we not have this functionality inside stages/base.py:process_batch..
Users for batch style stages often override it therefore users will be at risk of not calling assign_child_lineage..
I believe another nemo_curator/backends/base.py:process_batch would work
There was a problem hiding this comment.
@praateekmahajan Not sure how another process_batch would work here. That function is called by the executors and there is a contract between the user stages and executors.
This is the reason I have this assign_child_lineage as a function and if the user doesn't call it, the _udid won't be generated hence I can warn the user if the resumability is enabled later on.
There was a problem hiding this comment.
Oh, I misunderstood it. The reason I couldn't do it is that base stage class returns the flattened list of tasks and I lose the parent child relationship.
So, I need an update in the ProcessingStage.process_batch and I don't think we'll be able to do it since many user have already been using it.
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
| from nemo_curator.backends.base import BaseStageAdapter | ||
| from nemo_curator.pipeline.pipeline import Pipeline | ||
| from nemo_curator.stages.base import ProcessingStage | ||
| from nemo_curator.stages.base import ProcessingStage, assign_child_lineage, assign_root_lineage |
There was a problem hiding this comment.
assign_root_lineage is imported but never defined
nemo_curator.stages.base exports only assign_child_lineage — assign_root_lineage does not exist anywhere in the codebase. This import fails at collection time with ImportError: cannot import name 'assign_root_lineage' from 'nemo_curator.stages.base', which prevents every test in this file (including the pre-existing ones) from running. The root-lineage assignment lives inline in Pipeline.run() via a bare loop over task._set_lineage([], i), but it is never extracted into an exported helper function that tests can call directly.
| if initial_tasks: | ||
| # Assign deterministic root-level lineage to initial pipeline tasks | ||
| for i, task in enumerate(initial_tasks): | ||
| task._set_lineage([], i) |
There was a problem hiding this comment.
Root lineage assignment only in
run() — re-entrant pipelines silently corrupt lineage
pipeline.run() calls build() internally (line 187), which replaces self.stages with the decomposed execution stages. If a caller passes the same Task objects to run() a second time (e.g. for a retry or second pipeline sharing inputs), the if self._udid: return False guard in _set_lineage silently preserves the lineage from the first run. The tasks keep their first-run position even if their index in initial_tasks changed, silently producing wrong _udid values for all downstream children without any error.
Deterministic task identifiers via DAG-path lineage
Why
Task._uuidis generated byuuid.uuid4()on construction, so running the same pipeline twice on the same inputs produces a completely different set of IDs. That makes any kind of caching, checkpointing, or "resume from intermediate output" impossible — and it already affects real code: dedup stages name Parquet files after_uuid(minhash.py:309, buckets_to_edges.py:83, kmeans.py:235), andAddIdprefixes document IDs with it (add_id.py:71).What changed
Two new fields on
Task, derived from the task's path through the pipeline DAG:_uuiduuid.uuid4(). There are existing calls and existing call sites keep working._lineage_path"3_0_7"= 4th root, 1st child, 8th grandchild._udidsha256(_lineage_path)[:32]— 32-char hex, deterministic across runs. Use this for cache keys / output filenames.Lineage assignment happens at one chokepoint — the default
ProcessingStage.process_batchcalls a new module-level helper,nemo_curator.stages.base.assign_child_lineage(parent_paths, result), on the output of eachprocess()call. Stages that overrideprocess_batchare responsible for calling the helper themselves (documented in the docstring); if they forget, outputs have empty_lineage_path/_udid— a loud, easy-to-catch failure rather than a silent mis-id.Examples
_lineage_path_udid"3"(root task, 4th output of first stage)4e07408562bedb8b60ce05c1decfe3ad"3_0"(its single child)sha256("3_0")[:32]"3_0_7"(further descendant)sha256("3_0_7")[:32]"0_0_1_0_2_0_0"(fan-in of 3 parents + idx 0)sha256(...)[:32]Files
nemo_curator/tasks/tasks.py_lineage_pathand_udidfields +_set_lineage(parents, idx)helper onTask._uuiduntouched.nemo_curator/stages/base.pyassign_child_lineage(parent_paths, result)helper; defaultprocess_batchdelegates to it; docstring updates the override contract.tests/tasks/test_tasks.pytest_lineage_path_and_udid_format,test_fanout_udid_from_empty_root,test_udid_deterministic_across_runs. Original_uuiduniqueness test kept as-is.tests/pipelines/test_pipelines.pytest_pipeline_udid_deterministic_across_runsandtest_pipeline_udid_fanout_passthrough_fanin_passthrough(4-stage topology: fan-out → passthrough → fan-in → passthrough; exercises the multi-parent code path).Trade-offs documented in code
_udid. Same DAG shape ⇒ same_udid. Clear the cache or version output dirs when changing stage config.process_batchmust callassign_child_lineagethemselves. No magic safety net._uuidstays random. Anything reading_uuidtoday keeps the same per-run behaviour; new code that needs stability reads_udid.Test plan
test_fanout_tasks_have_unique_uuidstill passes (back-compat for_uuidconfirmed).tests/stages/text/io/writer/untouched and still pass — theirmock_uuid4.call_countassertions remain valid because_uuidis unchanged.