From 6b2a1272fd38e37f962c5356ae4e3b6b70e1199a Mon Sep 17 00:00:00 2001 From: Robin Date: Wed, 27 May 2026 13:17:19 +0200 Subject: [PATCH] fix: data race in loginInboundConn during Forge login relay loginInboundConn's mutable state (outstandingResponses map, loginMessagesToSend deque, isLoginEventFired, onAllMessagesHandled) is accessed from two goroutines: SendLoginPluginMessage runs on event-handler / relay goroutines while handleLoginPluginResponse runs on the client read loop. The fields had no synchronization, producing a data race on the outstandingResponses map (write at SendLoginPluginMessage vs read/delete + len() in handleLoginPluginResponse), reproducible under `go test -race` via TestModernForgeIntegration_FullJoinFlow. Guard the shared fields with a mutex. External callbacks (the message consumer and onAllMessagesHandled) and connection I/O (WritePacket / BufferPacket / Flush) are invoked without the lock held to avoid re-entrant deadlocks, since a consumer may call back into SendLoginPluginMessage. Behavior is otherwise unchanged: the all-handled callback still fires after the consumer runs when no responses remain. Adds TestLoginInboundConn_ConcurrentSendAndResponse, a focused regression test that races the two methods and fails under -race before the fix. --- pkg/edition/java/proxy/login_inbound.go | 68 +++++++++++++++----- pkg/edition/java/proxy/login_inbound_test.go | 43 +++++++++++++ 2 files changed, 95 insertions(+), 16 deletions(-) create mode 100644 pkg/edition/java/proxy/login_inbound_test.go diff --git a/pkg/edition/java/proxy/login_inbound.go b/pkg/edition/java/proxy/login_inbound.go index 80e090cd2..e0914bb15 100644 --- a/pkg/edition/java/proxy/login_inbound.go +++ b/pkg/edition/java/proxy/login_inbound.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net" + "sync" "github.com/gammazero/deque" "go.minekube.com/common/minecraft/component" @@ -43,9 +44,16 @@ type Inbound interface { } type loginInboundConn struct { - delegate *initialInbound + delegate *initialInbound + sequenceCounter atomic.Int32 + + // mu guards the fields below, which are accessed both from event-handler + // goroutines (SendLoginPluginMessage) and the client read loop + // (handleLoginPluginResponse). External callbacks (the message consumer, + // onAllMessagesHandled) and connection I/O are always invoked without the + // lock held to avoid re-entrant deadlocks. + mu sync.Mutex outstandingResponses map[int]MessageConsumer - sequenceCounter atomic.Int32 loginMessagesToSend deque.Deque[*packet.LoginPluginMessage] isLoginEventFired bool onAllMessagesHandled func() error @@ -94,46 +102,70 @@ func (l *loginInboundConn) SendLoginPluginMessage(identifier message.ChannelIden } id := int(l.sequenceCounter.Inc()) - l.outstandingResponses[id] = consumer - msg := &packet.LoginPluginMessage{ ID: id, Channel: identifier.ID(), Data: contents, } - if l.isLoginEventFired { + + l.mu.Lock() + l.outstandingResponses[id] = consumer + fired := l.isLoginEventFired + if !fired { + l.loginMessagesToSend.PushBack(msg) + } + l.mu.Unlock() + + if fired { return l.delegate.WritePacket(msg) } - l.loginMessagesToSend.PushBack(msg) return nil } func (l *loginInboundConn) handleLoginPluginResponse(res *packet.LoginPluginResponse) (err error) { + l.mu.Lock() consumer, ok := l.outstandingResponses[res.ID] if !ok { + l.mu.Unlock() return nil } delete(l.outstandingResponses, res.ID) + l.mu.Unlock() - defer func() { - if len(l.outstandingResponses) == 0 && l.onAllMessagesHandled != nil { - err = errors.Join(err, l.onAllMessagesHandled()) - } - }() + // Invoke the consumer without the lock held; it may call back into + // SendLoginPluginMessage (which also takes the lock). if res.Success { - return consumer.OnMessageResponse(res.Data) + err = consumer.OnMessageResponse(res.Data) + } else { + err = consumer.OnMessageResponse(nil) } - return consumer.OnMessageResponse(nil) + + // After the consumer ran (it may have queued more messages), fire the + // all-handled callback if nothing is outstanding. + l.mu.Lock() + done := len(l.outstandingResponses) == 0 + onAllMessagesHandled := l.onAllMessagesHandled + l.mu.Unlock() + if done && onAllMessagesHandled != nil { + err = errors.Join(err, onAllMessagesHandled()) + } + return err } func (l *loginInboundConn) loginEventFired(onAllMessagesHandled func() error) error { + l.mu.Lock() l.isLoginEventFired = true l.onAllMessagesHandled = onAllMessagesHandled - if l.loginMessagesToSend.Len() == 0 { + msgs := make([]*packet.LoginPluginMessage, 0, l.loginMessagesToSend.Len()) + for l.loginMessagesToSend.Len() != 0 { + msgs = append(msgs, l.loginMessagesToSend.PopFront()) + } + l.mu.Unlock() + + if len(msgs) == 0 { return onAllMessagesHandled() } - for l.loginMessagesToSend.Len() != 0 { - msg := l.loginMessagesToSend.PopFront() + for _, msg := range msgs { if err := l.delegate.BufferPacket(msg); err != nil { return err } @@ -145,7 +177,9 @@ func (l *loginInboundConn) loginEventFired(onAllMessagesHandled func() error) er // Called before the Modern Forge login relay to prevent the PreLogin // completion callback from re-firing when relay responses are processed. func (l *loginInboundConn) clearOnAllMessagesHandled() { + l.mu.Lock() l.onAllMessagesHandled = nil + l.mu.Unlock() } func (l *loginInboundConn) disconnect(reason component.Component) error { @@ -154,7 +188,9 @@ func (l *loginInboundConn) disconnect(reason component.Component) error { } func (l *loginInboundConn) cleanup() { + l.mu.Lock() l.loginMessagesToSend.Clear() l.outstandingResponses = map[int]MessageConsumer{} l.onAllMessagesHandled = nil + l.mu.Unlock() } diff --git a/pkg/edition/java/proxy/login_inbound_test.go b/pkg/edition/java/proxy/login_inbound_test.go new file mode 100644 index 000000000..118c739d1 --- /dev/null +++ b/pkg/edition/java/proxy/login_inbound_test.go @@ -0,0 +1,43 @@ +package proxy + +import ( + "sync" + "testing" + + "go.minekube.com/gate/pkg/edition/java/proto/packet" + "go.minekube.com/gate/pkg/edition/java/proxy/message" +) + +type funcMessageConsumer func([]byte) error + +func (f funcMessageConsumer) OnMessageResponse(b []byte) error { return f(b) } + +// loginInboundConn is driven from two goroutines during a Forge login relay: the +// relay/backend goroutine calls SendLoginPluginMessage while the client read loop +// calls handleLoginPluginResponse. Its shared state must be safe for that. +// Run with -race to detect unsynchronized access. +func TestLoginInboundConn_ConcurrentSendAndResponse(t *testing.T) { + l := newTestLoginInboundConn(&testMinecraftConn{}) + id, err := message.ChannelIdentifierFrom("test:channel") + if err != nil { + t.Fatal(err) + } + consumer := funcMessageConsumer(func([]byte) error { return nil }) + + const n = 2000 + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + for i := 0; i < n; i++ { + _ = l.SendLoginPluginMessage(id, []byte{0x01}, consumer) + } + }() + go func() { + defer wg.Done() + for i := 0; i < n; i++ { + _ = l.handleLoginPluginResponse(&packet.LoginPluginResponse{ID: i, Success: true}) + } + }() + wg.Wait() +}