From b3eff6de5a384ff83cdc8052d1c7fd9d33a40520 Mon Sep 17 00:00:00 2001 From: fossedihelm Date: Tue, 28 Oct 2025 18:45:56 +0100 Subject: [PATCH] priority queue: properly sync the `waiter` manipulation As described in https://github.com/kubernetes-sigs/controller-runtime/issues/3363, there are some circumstances under which `GetWithPriority` is not returning the correct/expected element. This can happen when a `GetWithPriority` is executed and the `Ascend` of the queue is not completed yet, causing not all the items of the BTree to evaluate the same w.waiters.Load() value. Adding a lock to manipulate the waiters will solve the issue. Since the lock is required, there is no need to use an atomic.Int64 anymore. Signed-off-by: fossedihelm --- pkg/controller/priorityqueue/priorityqueue.go | 12 +++++++----- pkg/controller/priorityqueue/priorityqueue_test.go | 7 ++++++- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 98df84c56b..71363f0d17 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -124,8 +124,8 @@ type priorityqueue[T comparable] struct { get chan item[T] // waiters is the number of routines blocked in Get, we use it to determine - // if we can push items. - waiters atomic.Int64 + // if we can push items. Every manipulation has to be protected with the lock. + waiters int64 // Configurable for testing now func() time.Time @@ -269,7 +269,7 @@ func (w *priorityqueue[T]) spin() { } } - if w.waiters.Load() == 0 { + if w.waiters == 0 { // Have to keep iterating here to ensure we update metrics // for further items that became ready and set nextReady. return true @@ -277,7 +277,7 @@ func (w *priorityqueue[T]) spin() { w.metrics.get(item.Key, item.Priority) w.locked.Insert(item.Key) - w.waiters.Add(-1) + w.waiters-- delete(w.items, item.Key) toDelete = append(toDelete, item) w.becameReady.Delete(item.Key) @@ -316,7 +316,9 @@ func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool) return zero, 0, true } - w.waiters.Add(1) + w.lock.Lock() + w.waiters++ + w.lock.Unlock() w.notifyItemOrWaiterAdded() diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index d0cc51f7c5..5cade57e3c 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -378,7 +378,12 @@ var _ = Describe("Controllerworkqueue", func() { }() // Verify the go routine above is now waiting for an item. - Eventually(q.(*priorityqueue[string]).waiters.Load).Should(Equal(int64(1))) + Eventually(func() int64 { + q.(*priorityqueue[string]).lock.Lock() + defer q.(*priorityqueue[string]).lock.Unlock() + + return q.(*priorityqueue[string]).waiters + }).Should(Equal(int64(1))) Consistently(getUnblocked).ShouldNot(BeClosed()) // shut down