-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathnode.go
497 lines (435 loc) · 18.4 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
/*
Package kshaka is a pure Go implementation of the CASPaxos consensus protocol.
It's name is derived from the Kenyan hip hop group, Kalamashaka.
"CASPaxos is a replicated state machine (RSM) kshaka. Unlike Raft and Multi-Paxos,
it doesn't use leader election and log replication, thus avoiding associated complexity.
Its symmetric peer-to-peer approach achieves optimal commit latency in wide-area networks
and doesn't cause transient unavailability when any [N−1] of N nodes crash." - The CASPaxos whitepaper, https://github.com/rystsov/caspaxos/blob/master/latex/caspaxos.pdf
Example usage:
package main
import (
"fmt"
"github.com/hashicorp/raft-boltdb"
"github.com/komuw/kshaka"
)
func main() {
// The store should, ideally be disk persisted.
// Any that implements hashicorp/raft StableStore interface will suffice
boltStore, err := raftboltdb.NewBoltStore("/tmp/bolt.db")
if err != nil {
panic(err)
}
// The function that will be applied by CASPaxos.
// This will be applied to the current value stored
// under the key passed into the Propose method of the proposer.
var setFunc = func(val []byte) kshaka.ChangeFunction {
return func(current []byte) ([]byte, error) {
return val, nil
}
}
// Note that, in practice, nodes ideally should be
// in different machines each with its own store.
node1 := kshaka.NewNode(1, boltStore)
node2 := kshaka.NewNode(2, boltStore)
node3 := kshaka.NewNode(3, boltStore)
transport1 := &kshaka.InmemTransport{Node: node1}
transport2 := &kshaka.InmemTransport{Node: node2}
transport3 := &kshaka.InmemTransport{Node: node3}
node1.AddTransport(transport1)
node2.AddTransport(transport2)
node3.AddTransport(transport3)
kshaka.MingleNodes(node1, node2, node3)
key := []byte("name")
val := []byte("Masta-Ace")
// make a proposition; consensus via CASPaxos will happen
newstate, err := node2.Propose(key, setFunc(val))
if err != nil {
fmt.Printf("err: %v", err)
}
fmt.Printf("\n newstate: %v \n", newstate)
}
TODO: add system design here.
*/
package kshaka
import (
"bytes"
"encoding/gob"
"fmt"
"sync"
"github.com/pkg/errors"
)
const stableStoreNotFoundErr = "not found"
// Node satisfies the ProposerAcceptor interface.
// A Node is both a proposer and an acceptor. Most people will be interacting with a Node instead of a Proposer/Acceptor.
// note: the fields; acceptorStore, Trans and nodes should not be nil/default values
type Node struct {
// ID should be unique to each node in the cluster.
ID uint64
Metadata map[string]string
Ballot Ballot
nodes []*Node
// In general the "prepare" and "accept" operations affecting the same key should be mutually exclusive.
// How to achieve this is an implementation detail.
// eg in Gryadka it doesn't matter because the operations are implemented as Redis's stored procedures and Redis is single threaded. - Denis Rystsov
// the mux protects the state(acceptorStore)
sync.Mutex
// acceptorStore is a StableStore implementation for durable state
// It provides stable storage for many fields in raftState
acceptorStore StableStore
Trans Transport
}
// NewNode creates a new node.
func NewNode(ID uint64, store StableStore) *Node {
n := &Node{ID: ID, acceptorStore: store}
return n
}
func removeDuplicatesNodes(a []*Node) []*Node {
result := []*Node{}
seen := map[uint64]*Node{}
for _, val := range a {
if _, ok := seen[val.ID]; !ok {
result = append(result, val)
seen[val.ID] = val
}
}
return result
}
// MingleNodes lets each node know about the other, including itself.
func MingleNodes(nodes ...*Node) {
for _, n := range nodes {
incomingNodes := nodes
unDedupedNodes := append(incomingNodes, n.nodes...)
dedupedNodes := removeDuplicatesNodes(unDedupedNodes)
n.nodes = dedupedNodes
}
}
// AddTransport adds transport to a node.
func (n *Node) AddTransport(t Transport) {
n.Trans = t
}
// AddMetadata adds metadata to a node. eg name=myNode, env=production
func (n *Node) AddMetadata(metadata map[string]string) {
n.Metadata = metadata
}
// monotonically increase the Ballot
func (n *Node) incBallot() {
n.Ballot.Counter++
}
// Propose is the method that clients call when they want to submit
// the f change function to a proposer.
// It takes the key whose value you want to apply the ChangeFunction to
// and also the ChangeFunction that will be applied to the value(contents) of that key.
func (n *Node) Propose(key []byte, changeFunc ChangeFunction) ([]byte, error) {
// prepare phase
currentState, err := n.sendPrepare(key)
if err != nil {
fmt.Printf("error: %+v\n", err)
return nil, err
}
fmt.Printf("currentState: %+v %+v\n", currentState, string(currentState))
// accept phase
newState, err := n.sendAccept(key, currentState, changeFunc)
if err != nil {
fmt.Printf("error: %+v\n", err)
return nil, err
}
fmt.Printf("newState: %+v %+v\n", newState, string(newState))
return newState, nil
}
// The proposer generates a Ballot number, B, and sends ”prepare” messages containing that number(and it's ID) to the acceptors.
// Proposer waits for the F + 1 confirmations.
// If all replies from acceptors contain the empty value, then the proposer defines the current state as ∅
// otherwise it picks the value of the tuple with the highest Ballot number.
func (n *Node) sendPrepare(key []byte) ([]byte, error) {
var (
noAcceptors = len(n.nodes)
F = (noAcceptors - 1) / 2 // number of failures we can tolerate
confirmationsNeeded = F + 1
highBallotConfirm Ballot
highBallotConflict = n.Ballot
currentState []byte
numberConflicts int
numberConfirmations int
)
if noAcceptors < minimumNoAcceptors {
return nil, fmt.Errorf("number of acceptors:%v is less than required minimum of:%v", noAcceptors, minimumNoAcceptors)
}
if bytes.Equal(key, acceptedBallotKey(key)) {
return nil, fmt.Errorf("the key:%v is reserved for storing kshaka internal state. chose another key", acceptedBallotKey(key))
}
n.incBallot()
type prepareResult struct {
acceptedState AcceptorState
err error
}
prepareResultChan := make(chan prepareResult, noAcceptors)
for _, a := range n.nodes {
go func(a *Node) {
acceptedState, err := a.Trans.TransportPrepare(n.Ballot, key)
prepareResultChan <- prepareResult{acceptedState, err}
}(a)
}
for i := 0; i < cap(prepareResultChan) && confirmationsNeeded > 0; i++ {
res := <-prepareResultChan
if res.err != nil {
// conflict occurred
numberConflicts++
if res.acceptedState.AcceptedBallot.Counter > highBallotConflict.Counter {
highBallotConflict = res.acceptedState.AcceptedBallot
} else if res.acceptedState.PromisedBallot.Counter > highBallotConflict.Counter {
highBallotConflict = res.acceptedState.PromisedBallot
}
} else {
// confirmation occurred.
numberConfirmations++
if res.acceptedState.AcceptedBallot.Counter >= highBallotConfirm.Counter {
highBallotConfirm = res.acceptedState.AcceptedBallot
currentState = res.acceptedState.State
}
confirmationsNeeded--
}
}
// we didn't get F+1 confirmations
if numberConfirmations < confirmationsNeeded {
n.Ballot.Counter = highBallotConflict.Counter + 1
return nil, fmt.Errorf("confirmations:%v is less than required minimum of:%v", numberConfirmations, confirmationsNeeded)
}
return currentState, nil
}
// Proposer applies the f function to the current state and sends the result, new state,
// along with the generated Ballot number B (an ”accept” message) to the acceptors.
// Proposer waits for the F + 1 confirmations.
// Proposer returns the new state to the client.
func (n *Node) sendAccept(key []byte, currentState []byte, changeFunc ChangeFunction) ([]byte, error) {
/*
Yes, acceptors should store tuple (promised Ballot, accepted Ballot and an accepted value) per key.
Proposers, unlike acceptors, may use the same Ballot number sequence.
If we split a sequence of unique and increasing Ballot numbers into several subsequences then any of them remains unique and increasing, so it's fine.
- Rystsov
*/
var (
noAcceptors = len(n.nodes)
F = (noAcceptors - 1) / 2 // number of failures we can tolerate
confirmationsNeeded = F + 1
highBallotConflict Ballot
numberConflicts int
numberConfirmations int
)
// probably we shouldn't call this method, sendAccept, if we havent called prepare yet and it is finished
if noAcceptors < minimumNoAcceptors {
return nil, fmt.Errorf("number of acceptors:%v is less than required minimum of:%v", noAcceptors, minimumNoAcceptors)
}
if bytes.Equal(key, acceptedBallotKey(key)) {
return nil, fmt.Errorf("the key:%v is reserved for storing kshaka internal state. chose another key", acceptedBallotKey(key))
}
newState, err := changeFunc(currentState)
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("unable to apply the ChangeFunction to value at key:%v", key))
}
// TODO: if newState == nil should we save it, or return error??
// think about this some more
type acceptResult struct {
acceptedState AcceptorState
err error
}
acceptResultChan := make(chan acceptResult, noAcceptors)
for _, a := range n.nodes {
go func(a *Node) {
acceptedState, err := a.Trans.TransportAccept(n.Ballot, key, newState)
acceptResultChan <- acceptResult{acceptedState, err}
}(a)
}
for i := 0; i < cap(acceptResultChan) && confirmationsNeeded > 0; i++ {
res := <-acceptResultChan
if res.err != nil {
// conflict occurred
numberConflicts++
if res.acceptedState.AcceptedBallot.Counter > n.Ballot.Counter {
highBallotConflict = res.acceptedState.AcceptedBallot
} else if res.acceptedState.PromisedBallot.Counter > n.Ballot.Counter {
highBallotConflict = res.acceptedState.PromisedBallot
}
} else {
// confirmation occurred.
numberConfirmations++
confirmationsNeeded--
}
}
// we didn't get F+1 confirmations
if numberConfirmations < confirmationsNeeded {
n.Ballot.Counter = highBallotConflict.Counter + 1
return nil, fmt.Errorf("confirmations:%v is less than required minimum of:%v", numberConfirmations, confirmationsNeeded)
}
return newState, nil
}
// Prepare handles the prepare phase for an acceptor(node).
// An Acceptor returns a conflict if it already saw a greater Ballot number, it also submits the Ballot and accepted value it has.
// Persists the Ballot number as a promise and returns a confirmation either with an empty value (if it hasn’t accepted any value yet)
// or with a tuple of an accepted value and its Ballot number.
func (n *Node) Prepare(b Ballot, key []byte) (AcceptorState, error) {
// TODO: this locks are supposed to be per key
// not method wide
n.Lock()
defer n.Unlock()
state, err := n.acceptorStore.Get(key)
if err != nil && err.Error() == stableStoreNotFoundErr {
// see: issues/10
// TODO: do better
state, err = nil, nil
}
if err != nil {
return AcceptorState{}, errors.Wrap(err, fmt.Sprintf("unable to get state for key:%v from acceptor:%v", key, n.ID))
}
acceptedBallotBytes, err := n.acceptorStore.Get(acceptedBallotKey(key))
if err != nil && err.Error() == stableStoreNotFoundErr {
// unfortunate way of handling errors
// TODO: do better
acceptedBallotBytes, err = nil, nil
}
if err != nil {
return AcceptorState{State: state}, errors.Wrap(err, fmt.Sprintf("unable to get acceptedBallot of acceptor:%v", n.ID))
}
var acceptedBallot Ballot
if !bytes.Equal(acceptedBallotBytes, nil) {
// ie we found an accepted Ballot
acceptedBallotReader := bytes.NewReader(acceptedBallotBytes)
dec := gob.NewDecoder(acceptedBallotReader)
err = dec.Decode(&acceptedBallot)
if err != nil {
return AcceptorState{State: state}, errors.Wrap(err, fmt.Sprintf("unable to get acceptedBallot of acceptor:%v", n.ID))
}
// TODO: also take into account the Node ID to resolve tie-breaks
if acceptedBallot.Counter > b.Counter {
return AcceptorState{AcceptedBallot: acceptedBallot, State: state}, fmt.Errorf("submitted Ballot:%v is less than Ballot:%v of acceptor:%v", b, acceptedBallot, n.ID)
}
}
promisedBallotBytes, err := n.acceptorStore.Get(promisedBallotKey(key))
if err != nil && err.Error() == stableStoreNotFoundErr {
// unfortunate way of handling errors
// TODO: do better
promisedBallotBytes, err = nil, nil
}
if err != nil {
return AcceptorState{State: state, AcceptedBallot: acceptedBallot}, errors.Wrap(err, fmt.Sprintf("unable to get promisedBallot of acceptor:%v", n.ID))
}
var promisedBallot Ballot
if !bytes.Equal(promisedBallotBytes, nil) {
// ie we found an promised Ballot
promisedBallotReader := bytes.NewReader(promisedBallotBytes)
dec := gob.NewDecoder(promisedBallotReader)
err = dec.Decode(&promisedBallot)
if err != nil {
return AcceptorState{State: state, AcceptedBallot: acceptedBallot}, errors.Wrap(err, fmt.Sprintf("unable to get promisedBallot of acceptor:%v", n.ID))
}
// TODO: also take into account the Node ID to resolve tie-breaks
if promisedBallot.Counter > b.Counter {
return AcceptorState{PromisedBallot: promisedBallot, AcceptedBallot: acceptedBallot, State: state}, fmt.Errorf("submitted Ballot:%v is less than Ballot:%v of acceptor:%v", b, promisedBallot, n.ID)
}
}
// TODO: this should be flushed to disk
var BallotBuffer bytes.Buffer
enc := gob.NewEncoder(&BallotBuffer)
err = enc.Encode(b)
if err != nil {
return AcceptorState{AcceptedBallot: acceptedBallot, State: state, PromisedBallot: promisedBallot}, errors.Wrap(err, fmt.Sprintf("unable to encode Ballot:%v", b))
}
err = n.acceptorStore.Set(promisedBallotKey(key), BallotBuffer.Bytes())
if err != nil {
return AcceptorState{AcceptedBallot: acceptedBallot, State: state, PromisedBallot: promisedBallot}, errors.Wrap(err, fmt.Sprintf("unable to flush Ballot:%v to disk", b))
}
return AcceptorState{AcceptedBallot: acceptedBallot, State: state, PromisedBallot: b}, nil
}
// Accept handles the accept phase for an acceptor(node).
// An Acceptor returns a conflict if it already saw a greater Ballot number, it also submits the Ballot and accepted value it has.
// Erases the promise, marks the received tuple (Ballot number, value) as the accepted value and returns a confirmation
func (n *Node) Accept(b Ballot, key []byte, newState []byte) (AcceptorState, error) {
/*
Yes, acceptors should store tuple (promised Ballot, accepted Ballot and an accepted value) per key.
Proposers, unlike acceptors, may use the same Ballot number sequence.
If we split a sequence of unique and increasing Ballot numbers into several subsequences then any of them remains unique and increasing, so it's fine.
- Rystsov
*/
// we still need to unlock even when using a StableStore as the store of state.
// this is because, someone may provide us with non-concurrent safe StableStore
n.Lock()
defer n.Unlock()
state, err := n.acceptorStore.Get(key)
if err != nil && err.Error() == stableStoreNotFoundErr {
// unfortunate way of handling errors
// TODO: do better
state, err = nil, nil
}
if err != nil {
return AcceptorState{}, errors.Wrap(err, fmt.Sprintf("unable to get state for key:%v from acceptor:%v", key, n.ID))
}
acceptedBallotBytes, err := n.acceptorStore.Get(acceptedBallotKey(key))
if err != nil && err.Error() == stableStoreNotFoundErr {
// unfortunate way of handling errors
// TODO: do better
acceptedBallotBytes, err = nil, nil
}
if err != nil {
return AcceptorState{State: state}, errors.Wrap(err, fmt.Sprintf("unable to get acceptedBallot of acceptor:%v", n.ID))
}
var acceptedBallot Ballot
if !bytes.Equal(acceptedBallotBytes, nil) {
// ie we found an accepted Ballot
acceptedBallotReader := bytes.NewReader(acceptedBallotBytes)
dec := gob.NewDecoder(acceptedBallotReader)
err = dec.Decode(&acceptedBallot)
if err != nil {
return AcceptorState{State: state}, errors.Wrap(err, fmt.Sprintf("unable to get acceptedBallot of acceptor:%v", n.ID))
}
// TODO: also take into account the Node ID to resolve tie-breaks
if acceptedBallot.Counter > b.Counter {
return AcceptorState{AcceptedBallot: acceptedBallot, State: state}, fmt.Errorf("submitted Ballot:%v is less than Ballot:%v of acceptor:%v", b, acceptedBallot, n.ID)
}
}
promisedBallotBytes, err := n.acceptorStore.Get(promisedBallotKey(key))
if err != nil && err.Error() == stableStoreNotFoundErr {
// unfortunate way of handling errors
// TODO: do better
promisedBallotBytes, err = nil, nil
}
if err != nil {
return AcceptorState{State: state, AcceptedBallot: acceptedBallot}, errors.Wrap(err, fmt.Sprintf("unable to get promisedBallot of acceptor:%v", n.ID))
}
var promisedBallot Ballot
if !bytes.Equal(promisedBallotBytes, nil) {
// ie we found an promised Ballot
promisedBallotReader := bytes.NewReader(promisedBallotBytes)
dec := gob.NewDecoder(promisedBallotReader)
err = dec.Decode(&promisedBallot)
if err != nil {
return AcceptorState{State: state, AcceptedBallot: acceptedBallot}, errors.Wrap(err, fmt.Sprintf("unable to get promisedBallot of acceptor:%v", n.ID))
}
// TODO: also take into account the Node ID to resolve tie-breaks
if promisedBallot.Counter > b.Counter {
return AcceptorState{PromisedBallot: promisedBallot, AcceptedBallot: acceptedBallot, State: state}, fmt.Errorf("submitted Ballot:%v is less than Ballot:%v of acceptor:%v", b, promisedBallot, n.ID)
}
}
// erase promised Ballot
err = n.acceptorStore.Set(promisedBallotKey(key), nil)
if err != nil {
return AcceptorState{AcceptedBallot: acceptedBallot, State: state, PromisedBallot: promisedBallot}, errors.Wrap(err, fmt.Sprintf("unable to erase Ballot:%v", b))
}
var BallotBuffer bytes.Buffer
enc := gob.NewEncoder(&BallotBuffer)
err = enc.Encode(b)
if err != nil {
return AcceptorState{AcceptedBallot: acceptedBallot, State: state}, errors.Wrap(err, fmt.Sprintf("unable to encode Ballot:%v", b))
}
// TODO. NB: it is possible, from the following logic, for an acceptor to accept a Ballot
// but not accept the new state/value. ie if the call to acceptorStore.Set(acceptedBallotKey, BallotBuffer.Bytes()) succeeds
// but acceptorStore.Set(key, state) fails.
// we should think about the ramifications of that for a second.
err = n.acceptorStore.Set(acceptedBallotKey(key), BallotBuffer.Bytes())
if err != nil {
return AcceptorState{AcceptedBallot: acceptedBallot, State: state}, errors.Wrap(err, fmt.Sprintf("unable to flush Ballot:%v to disk", b))
}
err = n.acceptorStore.Set(key, newState)
if err != nil {
return AcceptorState{AcceptedBallot: b, State: state}, errors.Wrap(err, fmt.Sprintf("unable to flush the new state:%v to disk", state))
}
return AcceptorState{AcceptedBallot: b, State: state}, nil
}