fix(model_gateway): reclaim per-worker mutation lock for absent worker ids#1684
fix(model_gateway): reclaim per-worker mutation lock for absent worker ids#1684slin1237 wants to merge 2 commits into
Conversation
…r ids apply_if_revision, transition_status_inner, and replace_inner each insert a per-worker entry into worker_mutation_locks before confirming the worker exists, but the only cleanup lives in remove_inner and is gated on the worker having been present. A mutation call for an already-removed id (e.g. a late health-probe completion driving apply_if_revision) re-inserts a lock that is never reclaimed, so the map grows without bound. Add a drop_lock_if_orphaned helper invoked on the worker-absent return path of all three methods. It removes the entry via remove_if with an Arc::ptr_eq + strong_count == 2 predicate evaluated under the DashMap shard write lock, so it drops the entry only when it is still the exact Arc this call created and unshared. This stays atomic against a concurrent insert reusing the key and never drops a lock a live mutation needs; the entry is reclaimed only when the worker is truly absent, not on a revision mismatch. Signed-off-by: Simo Lin <25425177+slin1237@users.noreply.github.com>
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughFixes a per-worker mutation lock leak in ChangesPer-worker mutation lock orphan reclaim
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a mechanism to reclaim per-worker mutation locks for absent or removed workers in WorkerRegistry to prevent memory leaks, specifically handling cases like late health probes. It adds a helper method drop_lock_if_orphaned and integrates it into replace, apply_if_revision, and transition_status when a worker is not found, alongside new unit tests. The review feedback correctly points out that drop_lock_if_orphaned is missing from the absent/abort paths of remove_inner, which could still lead to a memory leak if remove is called on an already-removed worker. Additionally, it is suggested to expand the unit tests to verify that duplicate remove calls do not leak locks.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| fn drop_lock_if_orphaned(&self, worker_id: &WorkerId, lock: &Arc<parking_lot::Mutex<()>>) { | ||
| self.worker_mutation_locks | ||
| .remove_if(worker_id, |_, existing| { | ||
| Arc::ptr_eq(existing, lock) && Arc::strong_count(lock) == 2 | ||
| }); | ||
| } |
There was a problem hiding this comment.
While drop_lock_if_orphaned is correctly called in the absent paths of replace_inner, apply_if_revision, and transition_status_inner, it is currently missing from the absent/abort paths of remove_inner.
If remove_inner is called for an already-removed or absent worker (or if it aborts due to an origin mismatch while the worker is absent), it will insert a lock entry via .entry().or_insert_with(...) but return None without reclaiming it, leading to a memory leak.
To fix this, remove_inner should also call drop_lock_if_orphaned when returning None on those paths:
fn remove_inner(
&self,
worker_id: &WorkerId,
expect_origin: Option<WorkerOrigin>,
) -> Option<Arc<dyn Worker>> {
// ...
if let Some(expected) = expect_origin {
if self.origin_of(worker_id) != Some(expected) {
// ...
if !self.workers.contains_key(worker_id) {
self.drop_lock_if_orphaned(worker_id, &lock);
}
return None;
}
}
if let Some((_, worker)) = self.workers.remove(worker_id) {
// ...
Some(worker)
} else {
self.drop_lock_if_orphaned(worker_id, &lock);
None
}
}| .build(), | ||
| ); | ||
| let worker_id = registry.register(worker).unwrap(); | ||
| registry.remove(&worker_id); |
There was a problem hiding this comment.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 86387ba9f7
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| fn drop_lock_if_orphaned(&self, worker_id: &WorkerId, lock: &Arc<parking_lot::Mutex<()>>) { | ||
| self.worker_mutation_locks | ||
| .remove_if(worker_id, |_, existing| { | ||
| Arc::ptr_eq(existing, lock) && Arc::strong_count(lock) == 2 |
There was a problem hiding this comment.
Reclaim shared orphan locks after the last waiter
When an absent-worker mutation shares this Arc with a concurrent remove(&id) for the same already-removed worker id, the strong count is greater than 2 so this helper leaves the map entry behind. The waiting remove_inner call then takes the same mutex, finds no worker, and returns None without any orphan-lock cleanup, so the lock entry remains forever; this preserves the leak for concurrent late probe/status/replace work plus duplicate remove calls on removed ids. Either make every absent path participate in the cleanup or ensure the final waiter removes the orphaned entry.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Clean, focused fix for the per-worker mutation-lock leak. The drop_lock_if_orphaned helper is well-designed — the Arc::ptr_eq + strong_count == 2 guard under the shard write lock correctly handles concurrent callers (last waiter cleans up) and re-registration races. Tests cover both the leak scenario and the over-removal guard. LGTM.
Description
Problem
The worker registry keeps a per-worker mutation-lock map (
worker_mutation_locks) to serialize register/replace/remove/status mutations for a given worker id. Three mutation methods —apply_if_revision,transition_status_inner,replace_inner— acquire the lock withworker_mutation_locks.entry(id).or_insert_with(...)before checking the worker exists. The only cleanup is inremove_inner, gated on the worker having been present. So any mutation call for an id that is no longer registered re-inserts a lock entry that is never reclaimed. This is reachable in production: a health probe that completes after its worker was removed callsregistry.apply_if_revision(worker/manager.rs:479), re-creating the lock entry for the dead id every time, so the map grows unbounded over the process lifetime.Solution
Add a private helper
drop_lock_if_orphanedthat the three methods call on their worker-absent return path (and only there — not on a revision mismatch, where the worker still exists). It removes the entry withDashMap::remove_ifusingArc::ptr_eq(existing, lock) && Arc::strong_count(lock) == 2. Becauseremove_ifevaluates the predicate while holding the shard write lock, the check is atomic against a concurrentregister_inner/remove_innerthat clones the sameArc: if any other party holds a clone the strong count exceeds 2 and the entry is left in place; otherwise only the map and this call hold it, so removal is safe and cannot strand a lock a concurrent live mutation is using. The existing serialization (lock held across the index diff + event emit) is untouched.Changes
WorkerRegistry::drop_lock_if_orphaned(worker_id, lock)that conditionally removes an orphaned per-worker mutation lock under the shard lock (Arc::ptr_eq+strong_count == 2guard).replace_inner,apply_if_revision, andtransition_status_inner; convert the latter two from?to an explicit match so the absent branch reclaims the lock without firing on a revision mismatch.#[cfg(test)] pub(crate) mutation_lock_count()accessor.Test Plan
test_mutation_on_absent_worker_does_not_leak_lock— register W, remove it (assertmutation_lock_count() == 0), then callapply_if_revision/transition_status/replace32× each for W's removed id plusapply_if_revision32× for a never-registered id; assert count stays 0. Without the fix the map grows (observed len 2).test_revision_mismatch_keeps_lock_for_present_worker— register W, callapply_if_revisionwith a stale revision; assert it returnsNoneandmutation_lock_count() == 1, proving the reclaim fires only for truly-absent workers and never over-removes a live worker's lock.Authoritative gate (sccache disabled,
RUSTC_WRAPPER=""):Checklist
cargo +nightly fmtpassescargo clippy --all-targets --all-features -- -D warningspassesSummary by CodeRabbit
Tests
Chores