This repository was archived by the owner on Mar 3, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathStream.go
More file actions
133 lines (108 loc) · 3.02 KB
/
Stream.go
File metadata and controls
133 lines (108 loc) · 3.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package packet
import (
"encoding/binary"
"errors"
"net"
"sync/atomic"
)
// Stream represents a writable and readable network stream.
type Stream struct {
Incoming <-chan *Packet
Outgoing chan<- *Packet
in chan *Packet
out chan *Packet
connection atomic.Value
closeWriter chan struct{}
onError func(IOError)
}
// NewStream creates a new stream with the given channel buffer size.
func NewStream(channelBufferSize int) *Stream {
stream := &Stream{
in: make(chan *Packet, channelBufferSize),
out: make(chan *Packet, channelBufferSize),
closeWriter: make(chan struct{}),
onError: func(IOError) {},
}
// The public fields point to the same channels,
// but can only be used for receiving or sending,
// respectively.
stream.Incoming = stream.in
stream.Outgoing = stream.out
return stream
}
// Connection returns the internal TCP/UDP connection object.
func (stream *Stream) Connection() net.Conn {
return stream.connection.Load().(net.Conn)
}
// SetConnection sets the connection that the stream uses and
// it can be called multiple times on a single stream,
// effectively allowing you to hot-swap connections in failure cases.
func (stream *Stream) SetConnection(connection net.Conn) {
if connection == nil {
panic(errors.New("SetConnection using nil connection"))
}
stream.connection.Store(connection)
go stream.read(connection)
go stream.write(connection)
}
// OnError sets the callback that should be called when IO errors occur.
func (stream *Stream) OnError(callback func(IOError)) {
if callback == nil {
panic(errors.New("OnError using nil callback"))
}
stream.onError = callback
}
// Close frees up the resources used by the stream and closes the connection.
func (stream *Stream) Close() {
stream.Connection().Close()
close(stream.in)
}
// read starts a blocking routine that will read incoming messages.
// This function is meant to be called as a concurrent goroutine.
func (stream *Stream) read(connection net.Conn) {
defer func() {
stream.closeWriter <- struct{}{}
}()
var length int64
typeBuffer := make([]byte, 1)
for {
_, err := connection.Read(typeBuffer)
if err != nil {
stream.onError(IOError{connection, err})
return
}
err = binary.Read(connection, binary.BigEndian, &length)
if err != nil {
stream.onError(IOError{connection, err})
return
}
data := make([]byte, length)
readLength := 0
n := 0
for readLength < len(data) {
n, err = connection.Read(data[readLength:])
readLength += n
if err != nil {
stream.onError(IOError{connection, err})
return
}
}
stream.in <- New(typeBuffer[0], data)
}
}
// write starts a blocking routine that will write outgoing messages.
// This function is meant to be called as a concurrent goroutine.
func (stream *Stream) write(connection net.Conn) {
for {
select {
case <-stream.closeWriter:
return
case packet := <-stream.out:
err := packet.Write(connection)
if err != nil {
stream.onError(IOError{connection, err})
return
}
}
}
}