-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpeer.go
More file actions
152 lines (130 loc) · 3.24 KB
/
peer.go
File metadata and controls
152 lines (130 loc) · 3.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package raft
import (
"bytes"
"encoding/json"
"io/ioutil"
"net"
"os"
"path/filepath"
"sync"
)
const (
jsonPeerPath = "peers.json"
)
// PeerStore provides an interface for persistent storage and
// retrieval of peers. We use a seperate interface than StableStore
// since the peers may need to be editted by a human operator. For example,
// in a two node cluster, the failure of either node requires human intervention
// since consensus is impossible.
type PeerStore interface {
// Peers returns the list of known peers.
Peers() ([]net.Addr, error)
// SetPeers sets the list of known peers. This is invoked when a peer is
// added or removed.
SetPeers([]net.Addr) error
Init() error
}
// StaticPeers is used to provide a static list of peers.
type StaticPeers struct {
sync.RWMutex
StaticPeers []net.Addr
}
func NewStaticPeers(peers []net.Addr) *StaticPeers {
return &StaticPeers{
StaticPeers: peers,
}
}
func (s *StaticPeers) Len() int {
s.RLock()
defer s.RUnlock()
return len(s.StaticPeers)
}
// Peers implements the PeerStore interface.
func (s *StaticPeers) Peers() ([]net.Addr, error) {
s.RLock()
defer s.RUnlock()
return s.StaticPeers, nil
}
// SetPeers implements the PeerStore interface.
func (s *StaticPeers) SetPeers(p []net.Addr) error {
s.Lock()
defer s.Unlock()
s.StaticPeers = p
return nil
}
// Init implements the PeerStore interface.
func (s *StaticPeers) Init() error {
// this is just for testing
s.Lock()
defer s.Unlock()
s.StaticPeers = s.StaticPeers[:0]
return nil
}
// JSONPeers is used to provide peer persistence on disk in the form
// of a JSON file. This allows human operators to manipulate the file.
type JSONPeers struct {
l sync.Mutex
path string
trans Transport
}
// NewJSONPeers creates a new JSONPeers store. Requires a transport
// to handle the serialization of network addresses
func NewJSONPeers(base string, trans Transport) *JSONPeers {
path := filepath.Join(base, jsonPeerPath)
store := &JSONPeers{
path: path,
trans: trans,
}
return store
}
// Peers implements the PeerStore interface.
func (j *JSONPeers) Peers() ([]net.Addr, error) {
j.l.Lock()
defer j.l.Unlock()
// Read the file
buf, err := ioutil.ReadFile(j.path)
if err != nil && !os.IsNotExist(err) {
return nil, err
}
// Check for no peers
if len(buf) == 0 {
return nil, nil
}
// Decode the peers
var peerSet []string
dec := json.NewDecoder(bytes.NewReader(buf))
if err := dec.Decode(&peerSet); err != nil {
return nil, err
}
// Deserialize each peer
var peers []net.Addr
for _, p := range peerSet {
peers = append(peers, j.trans.DecodePeer([]byte(p)))
}
return peers, nil
}
// SetPeers implements the PeerStore interface.
func (j *JSONPeers) SetPeers(peers []net.Addr) error {
j.l.Lock()
defer j.l.Unlock()
// Encode each peer
var peerSet []string
for _, p := range peers {
peerSet = append(peerSet, string(j.trans.EncodePeer(p)))
}
// Convert to JSON
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
if err := enc.Encode(peerSet); err != nil {
return err
}
// Write out as JSON
return ioutil.WriteFile(j.path, buf.Bytes(), 0755)
}
// Truncates the peer file
func (j *JSONPeers) Init() error {
var buf bytes.Buffer
j.l.Lock()
defer j.l.Unlock()
return ioutil.WriteFile(j.path, buf.Bytes(), 0755)
}