Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 52 additions & 16 deletions pkg/edition/java/proxy/login_inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net"
"sync"

"github.com/gammazero/deque"
"go.minekube.com/common/minecraft/component"
Expand Down Expand Up @@ -43,9 +44,16 @@ type Inbound interface {
}

type loginInboundConn struct {
delegate *initialInbound
delegate *initialInbound
sequenceCounter atomic.Int32

// mu guards the fields below, which are accessed both from event-handler
// goroutines (SendLoginPluginMessage) and the client read loop
// (handleLoginPluginResponse). External callbacks (the message consumer,
// onAllMessagesHandled) and connection I/O are always invoked without the
// lock held to avoid re-entrant deadlocks.
mu sync.Mutex
outstandingResponses map[int]MessageConsumer
sequenceCounter atomic.Int32
loginMessagesToSend deque.Deque[*packet.LoginPluginMessage]
isLoginEventFired bool
onAllMessagesHandled func() error
Expand Down Expand Up @@ -94,46 +102,70 @@ func (l *loginInboundConn) SendLoginPluginMessage(identifier message.ChannelIden
}

id := int(l.sequenceCounter.Inc())
l.outstandingResponses[id] = consumer

msg := &packet.LoginPluginMessage{
ID: id,
Channel: identifier.ID(),
Data: contents,
}
if l.isLoginEventFired {

l.mu.Lock()
l.outstandingResponses[id] = consumer
fired := l.isLoginEventFired
if !fired {
l.loginMessagesToSend.PushBack(msg)
}
l.mu.Unlock()

if fired {
return l.delegate.WritePacket(msg)
}
l.loginMessagesToSend.PushBack(msg)
return nil
}

func (l *loginInboundConn) handleLoginPluginResponse(res *packet.LoginPluginResponse) (err error) {
l.mu.Lock()
consumer, ok := l.outstandingResponses[res.ID]
if !ok {
l.mu.Unlock()
return nil
}
delete(l.outstandingResponses, res.ID)
l.mu.Unlock()

defer func() {
if len(l.outstandingResponses) == 0 && l.onAllMessagesHandled != nil {
err = errors.Join(err, l.onAllMessagesHandled())
}
}()
// Invoke the consumer without the lock held; it may call back into
// SendLoginPluginMessage (which also takes the lock).
if res.Success {
return consumer.OnMessageResponse(res.Data)
err = consumer.OnMessageResponse(res.Data)
} else {
err = consumer.OnMessageResponse(nil)
}
return consumer.OnMessageResponse(nil)

// After the consumer ran (it may have queued more messages), fire the
// all-handled callback if nothing is outstanding.
l.mu.Lock()
done := len(l.outstandingResponses) == 0
onAllMessagesHandled := l.onAllMessagesHandled
l.mu.Unlock()
if done && onAllMessagesHandled != nil {
err = errors.Join(err, onAllMessagesHandled())
}
return err
}

func (l *loginInboundConn) loginEventFired(onAllMessagesHandled func() error) error {
l.mu.Lock()
l.isLoginEventFired = true
l.onAllMessagesHandled = onAllMessagesHandled
if l.loginMessagesToSend.Len() == 0 {
msgs := make([]*packet.LoginPluginMessage, 0, l.loginMessagesToSend.Len())
for l.loginMessagesToSend.Len() != 0 {
msgs = append(msgs, l.loginMessagesToSend.PopFront())
}
l.mu.Unlock()

if len(msgs) == 0 {
return onAllMessagesHandled()
}
for l.loginMessagesToSend.Len() != 0 {
msg := l.loginMessagesToSend.PopFront()
for _, msg := range msgs {
if err := l.delegate.BufferPacket(msg); err != nil {
return err
}
Expand All @@ -145,7 +177,9 @@ func (l *loginInboundConn) loginEventFired(onAllMessagesHandled func() error) er
// Called before the Modern Forge login relay to prevent the PreLogin
// completion callback from re-firing when relay responses are processed.
func (l *loginInboundConn) clearOnAllMessagesHandled() {
l.mu.Lock()
l.onAllMessagesHandled = nil
l.mu.Unlock()
}

func (l *loginInboundConn) disconnect(reason component.Component) error {
Expand All @@ -154,7 +188,9 @@ func (l *loginInboundConn) disconnect(reason component.Component) error {
}

func (l *loginInboundConn) cleanup() {
l.mu.Lock()
l.loginMessagesToSend.Clear()
l.outstandingResponses = map[int]MessageConsumer{}
l.onAllMessagesHandled = nil
l.mu.Unlock()
}
43 changes: 43 additions & 0 deletions pkg/edition/java/proxy/login_inbound_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package proxy

import (
"sync"
"testing"

"go.minekube.com/gate/pkg/edition/java/proto/packet"
"go.minekube.com/gate/pkg/edition/java/proxy/message"
)

type funcMessageConsumer func([]byte) error

func (f funcMessageConsumer) OnMessageResponse(b []byte) error { return f(b) }

// loginInboundConn is driven from two goroutines during a Forge login relay: the
// relay/backend goroutine calls SendLoginPluginMessage while the client read loop
// calls handleLoginPluginResponse. Its shared state must be safe for that.
// Run with -race to detect unsynchronized access.
func TestLoginInboundConn_ConcurrentSendAndResponse(t *testing.T) {
l := newTestLoginInboundConn(&testMinecraftConn{})
id, err := message.ChannelIdentifierFrom("test:channel")
if err != nil {
t.Fatal(err)
}
consumer := funcMessageConsumer(func([]byte) error { return nil })

const n = 2000
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < n; i++ {
_ = l.SendLoginPluginMessage(id, []byte{0x01}, consumer)
}
}()
go func() {
defer wg.Done()
for i := 0; i < n; i++ {
_ = l.handleLoginPluginResponse(&packet.LoginPluginResponse{ID: i, Success: true})
}
}()
wg.Wait()
}
Loading