-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathreplication.go
More file actions
339 lines (290 loc) · 8.45 KB
/
replication.go
File metadata and controls
339 lines (290 loc) · 8.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
package raft
import (
"errors"
"fmt"
"net"
"sync"
"time"
)
const (
maxFailureScale = 12
failureWait = 10 * time.Millisecond
)
var (
// ErrLogNotFound indicates a given log entry is not available.
ErrLogNotFound = errors.New("log not found")
)
type followerReplication struct {
peer net.Addr
inflight *inflight
stopCh chan uint64
triggerCh chan struct{}
currentTerm uint64
matchIndex uint64
nextIndex uint64
lastContact time.Time
lastContactLock sync.RWMutex
failures uint64
notifyCh chan struct{}
notify []*verifyFuture
notifyLock sync.Mutex
// stepDown is used to indicate to the leader that we
// should step down based on information from a follower.
stepDown chan struct{}
}
// notifyAll is used to notify all the waiting verify futures
// if the follower believes we are still the leader
func (s *followerReplication) notifyAll(leader bool) {
// Clear the waiting notifies minimizing lock time
s.notifyLock.Lock()
n := s.notify
s.notify = nil
s.notifyLock.Unlock()
// Submit our votes
for _, v := range n {
v.vote(leader)
}
}
// LastContact returns the time of last contact
func (s *followerReplication) LastContact() time.Time {
s.lastContactLock.RLock()
last := s.lastContact
s.lastContactLock.RUnlock()
return last
}
// setLastContact sets the last contact to the current time
func (s *followerReplication) setLastContact() {
s.lastContactLock.Lock()
s.lastContact = time.Now()
s.lastContactLock.Unlock()
}
// replicate is a long running routine that is used to manage
// the process of replicating logs to our followers
func (r *Raft) replicate(s *followerReplication) {
// Start an async heartbeating routing
stopHeartbeat := make(chan struct{})
defer close(stopHeartbeat)
r.goFunc(func() { r.heartbeat(s, stopHeartbeat) })
shouldStop := false
for !shouldStop {
select {
case maxIndex := <-s.stopCh:
// Make a best effort to replicate up to this index
if maxIndex > 0 {
r.replicateTo(s, maxIndex)
}
return
case <-s.triggerCh:
shouldStop = r.replicateTo(s, r.getLastLogIndex())
case <-randomTimeout(r.conf.CommitTimeout):
shouldStop = r.replicateTo(s, r.getLastLogIndex())
}
}
}
// replicateTo is used to replicate the logs up to a given last index.
// If the follower log is behind, we take care to bring them up to date
func (r *Raft) replicateTo(s *followerReplication, lastIndex uint64) (shouldStop bool) {
// Create the base request
var l Log
var req AppendEntriesRequest
var resp AppendEntriesResponse
var maxIndex uint64
START:
// Prevent an excessive retry rate on errors
if s.failures > 0 {
select {
case <-time.After(backoff(failureWait, s.failures, maxFailureScale)):
case <-r.shutdownCh:
}
}
req = AppendEntriesRequest{
Term: s.currentTerm,
Leader: r.trans.EncodePeer(r.localAddr),
LeaderCommitIndex: r.getCommitIndex(),
}
// Get the previous log entry based on the nextIndex.
// Guard for the first index, since there is no 0 log entry
// Guard against the previous index being a snapshot as well
if s.nextIndex == 1 {
req.PrevLogEntry = 0
req.PrevLogTerm = 0
} else if (s.nextIndex - 1) == r.getLastSnapshotIndex() {
req.PrevLogEntry = r.getLastSnapshotIndex()
req.PrevLogTerm = r.getLastSnapshotTerm()
} else {
if err := r.logs.GetLog(s.nextIndex-1, &l); err != nil {
if err == ErrLogNotFound {
goto SEND_SNAP
}
r.logger.Errorf("raft: Failed to get log at index %d: %v",
s.nextIndex-1, err)
return
}
// Set the previous index and term (0 if nextIndex is 1)
req.PrevLogEntry = l.Index
req.PrevLogTerm = l.Term
}
// Append up to MaxAppendEntries or up to the lastIndex
req.Entries = make([]*Log, 0, r.conf.MaxAppendEntries)
maxIndex = min(s.nextIndex+uint64(r.conf.MaxAppendEntries)-1, lastIndex)
for i := s.nextIndex; i <= maxIndex; i++ {
oldLog := new(Log)
if err := r.logs.GetLog(i, oldLog); err != nil {
if err == ErrLogNotFound {
goto SEND_SNAP
}
r.logger.Errorf("raft: Failed to get log at index %d: %v", i, err)
return
}
req.Entries = append(req.Entries, oldLog)
}
// Make the RPC call
if err := r.trans.AppendEntries(s.peer, &req, &resp); err != nil {
r.logger.Errorf("raft: Failed to AppendEntries to %v: %v", s.peer, err)
s.failures++
return
}
// Check for a newer term, stop running
if resp.Term > req.Term {
r.logger.Errorf("raft: peer %v has newer term, stopping replication", s.peer)
s.notifyAll(false) // No longer leader
asyncNotifyCh(s.stepDown)
return true
}
// Update the last contact
s.setLastContact()
// Update the s based on success
if resp.Success {
// Mark any inflight logs as committed
s.inflight.CommitRange(s.nextIndex, maxIndex)
// Update the indexes
s.matchIndex = maxIndex
s.nextIndex = maxIndex + 1
// Clear any failures
s.failures = 0
// Notify still leader
s.notifyAll(true)
} else {
s.nextIndex = max(min(s.nextIndex-1, resp.LastLog+1), 1)
s.matchIndex = s.nextIndex - 1
s.failures++
r.logger.Infof("raft: AppendEntries to %v rejected, sending older logs (next: %d)", s.peer, s.nextIndex)
}
CHECK_MORE:
// Check if there are more logs to replicate
if s.nextIndex <= lastIndex {
goto START
}
return
// SEND_SNAP is used when we fail to get a log, usually because the follower
// is too far behind, and we must ship a snapshot down instead
SEND_SNAP:
stop, err := r.sendLatestSnapshot(s)
// Check if we should stop
if stop {
return true
}
// Check for an error
if err != nil {
r.logger.Errorf("raft: Failed to send snapshot to %v: %v", s.peer, err)
return
}
// Check if there is more to replicate
goto CHECK_MORE
}
// sendLatestSnapshot is used to send the latest snapshot we have
// down to our follower
func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
// Get the snapshots
snapshots, err := r.snapshots.List()
if err != nil {
r.logger.Errorf("raft: Failed to list snapshots: %v", err)
return false, err
}
// Check we have at least a single snapshot
if len(snapshots) == 0 {
return false, fmt.Errorf("no snapshots found")
}
// Open the most recent snapshot
snapID := snapshots[0].ID
meta, snapshot, err := r.snapshots.Open(snapID)
if err != nil {
r.logger.Errorf("raft: Failed to open snapshot %v: %v", snapID, err)
return false, err
}
defer snapshot.Close()
// Setup the request
req := InstallSnapshotRequest{
Term: s.currentTerm,
Leader: r.trans.EncodePeer(r.localAddr),
LastLogIndex: meta.Index,
LastLogTerm: meta.Term,
Peers: meta.Peers,
Size: meta.Size,
}
// Make the call
var resp InstallSnapshotResponse
r.logger.Printf("************** sending snapshot : %v to %s ", snapID, s.peer)
if err := r.trans.InstallSnapshot(s.peer, &req, &resp, snapshot); err != nil {
r.logger.Errorf("raft: Failed to install snapshot %v: %v", snapID, err)
s.failures++
return false, err
}
// Check for a newer term, stop running
if resp.Term > req.Term {
r.logger.Errorf("raft: peer %v has newer term, stopping replication", s.peer)
s.notifyAll(false) // No longer leader
asyncNotifyCh(s.stepDown)
return true, nil
}
// Update the last contact
s.setLastContact()
// Check for success
if resp.Success {
// Mark any inflight logs as committed
s.inflight.CommitRange(s.matchIndex+1, meta.Index)
// Update the indexes
s.matchIndex = meta.Index
s.nextIndex = s.matchIndex + 1
// Clear any failures
s.failures = 0
// Notify we are still leader
s.notifyAll(true)
} else {
s.failures++
r.logger.Infof("raft: InstallSnapshot to %v rejected", s.peer)
}
return false, nil
}
// hearbeat is used to periodically invoke AppendEntries on a peer
// to ensure they don't time out. This is done async of replicate(),
// since that routine could potentially be blocked on disk IO
func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
var failures uint64
req := AppendEntriesRequest{
Term: s.currentTerm,
Leader: r.trans.EncodePeer(r.localAddr),
}
var resp AppendEntriesResponse
for {
// Wait for the next heartbeat interval or forced notify
select {
case <-s.notifyCh:
case <-randomTimeout(r.conf.HeartbeatTimeout / 10):
case <-stopCh:
return
}
if err := r.trans.AppendEntries(s.peer, &req, &resp); err != nil {
r.logger.Errorf("raft: Failed to heartbeat to %v: %v", s.peer, err)
failures++
select {
case <-time.After(backoff(failureWait, failures, maxFailureScale)):
case <-stopCh:
}
} else {
s.setLastContact()
failures = 0
s.notifyAll(resp.Success)
}
}
}