Skip to content

Commit 6fef647

Browse files
GiGurraclaude
andcommitted
Fix notifier Ping called with cancelled context
In waitOnce, the inner context is cancelled by drainErrChan to interrupt WaitForNotification, but the subsequent Ping call was using that same cancelled context, meaning it would always fail with context.Canceled. Save a reference to the parent (still-live) context before creating the inner cancellable context, and use it for the Ping call. Also adds a testPingInterval field to avoid the 5s wait in tests, and extends ListenerMock with a pingFunc for testing Ping behavior. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent c449b62 commit 6fef647

2 files changed

Lines changed: 66 additions & 4 deletions

File tree

internal/notifier/notifier.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package notifier
22

33
import (
4+
"cmp"
45
"context"
56
"errors"
67
"fmt"
@@ -78,7 +79,8 @@ type Notifier struct {
7879
baseservice.BaseService
7980
startstop.BaseStartStop
8081

81-
disableSleep bool // for tests only; disable sleep on exponential backoff
82+
disableSleep bool // for tests only; disable sleep on exponential backoff
83+
testPingInterval time.Duration // for tests only; override the 5s ping interval
8284
listener riverdriver.Listener
8385
notificationBuf chan *riverdriver.Notification
8486
testSignals notifierTestSignals
@@ -345,6 +347,12 @@ func (n *Notifier) waitOnce(ctx context.Context) error {
345347
n.waitCancel()
346348
})
347349

350+
// Save a reference to the parent context before creating the inner
351+
// cancellable context. The inner context is cancelled by drainErrChan to
352+
// interrupt WaitForNotification, but we still need a live context for the
353+
// Ping health check afterward.
354+
pingCtx := ctx
355+
348356
ctx, cancel := context.WithCancel(ctx)
349357
defer cancel()
350358

@@ -382,7 +390,8 @@ func (n *Notifier) waitOnce(ctx context.Context) error {
382390
return nil
383391
}
384392

385-
needPingCtx, needPingCancel := context.WithTimeout(ctx, 5*time.Second)
393+
pingInterval := cmp.Or(n.testPingInterval, 5*time.Second)
394+
needPingCtx, needPingCancel := context.WithTimeout(ctx, pingInterval)
386395
defer needPingCancel()
387396

388397
// * Wait for notifications
@@ -397,8 +406,10 @@ func (n *Notifier) waitOnce(ctx context.Context) error {
397406
if err := drainErrChan(); err != nil {
398407
return err
399408
}
400-
// Ping the conn to see if it's still alive
401-
if err := n.listener.Ping(ctx); err != nil {
409+
// Ping the conn to see if it's still alive. Use pingCtx (the parent
410+
// context) because the inner ctx was cancelled by drainErrChan above
411+
// to interrupt WaitForNotification.
412+
if err := n.listener.Ping(pingCtx); err != nil {
402413
return err
403414
}
404415

internal/notifier/notifier_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,51 @@ func TestNotifier(t *testing.T) {
531531
require.EqualError(t, notifier.testSignals.BackoffError.WaitOrTimeout(), "error during wait")
532532
})
533533

534+
t.Run("PingUsesNonCancelledContext", func(t *testing.T) {
535+
t.Parallel()
536+
537+
notifier, _ := setup(t, nil)
538+
539+
// Use a very short ping interval so the test doesn't take 5 seconds.
540+
notifier.testPingInterval = 50 * time.Millisecond
541+
542+
var (
543+
pingCtxCancelled bool
544+
pingCalled = make(chan struct{})
545+
pingOnce sync.Once
546+
)
547+
548+
listenerMock := NewListenerMock(notifier.listener)
549+
listenerMock.waitForNotificationFunc = func(ctx context.Context) (*riverdriver.Notification, error) {
550+
// Block until the context is cancelled (which happens when
551+
// drainErrChan runs after the ping interval elapses).
552+
<-ctx.Done()
553+
return nil, ctx.Err()
554+
}
555+
listenerMock.pingFunc = func(ctx context.Context) error {
556+
pingOnce.Do(func() {
557+
pingCtxCancelled = ctx.Err() != nil
558+
close(pingCalled)
559+
})
560+
return nil
561+
}
562+
notifier.listener = listenerMock
563+
564+
start(t, notifier)
565+
566+
notifier.testSignals.ListeningBegin.WaitOrTimeout()
567+
568+
select {
569+
case <-pingCalled:
570+
case <-time.After(5 * time.Second):
571+
require.FailNow(t, "Timed out waiting for Ping to be called")
572+
}
573+
574+
require.False(t, pingCtxCancelled,
575+
"Ping should receive a non-cancelled context; the inner context is "+
576+
"cancelled to interrupt WaitForNotification, but Ping needs a live context")
577+
})
578+
534579
t.Run("StillFunctionalAfterMainLoopFailure", func(t *testing.T) {
535580
t.Parallel()
536581

@@ -584,6 +629,7 @@ type ListenerMock struct {
584629

585630
connectFunc func(ctx context.Context) error
586631
listenFunc func(ctx context.Context, topic string) error
632+
pingFunc func(ctx context.Context) error
587633
waitForNotificationFunc func(ctx context.Context) (*riverdriver.Notification, error)
588634
}
589635

@@ -593,6 +639,7 @@ func NewListenerMock(listener riverdriver.Listener) *ListenerMock {
593639

594640
connectFunc: listener.Connect,
595641
listenFunc: listener.Listen,
642+
pingFunc: listener.Ping,
596643
waitForNotificationFunc: listener.WaitForNotification,
597644
}
598645
}
@@ -605,6 +652,10 @@ func (l *ListenerMock) Listen(ctx context.Context, topic string) error {
605652
return l.listenFunc(ctx, topic)
606653
}
607654

655+
func (l *ListenerMock) Ping(ctx context.Context) error {
656+
return l.pingFunc(ctx)
657+
}
658+
608659
func (l *ListenerMock) WaitForNotification(ctx context.Context) (*riverdriver.Notification, error) {
609660
return l.waitForNotificationFunc(ctx)
610661
}

0 commit comments

Comments
 (0)