-
-
Notifications
You must be signed in to change notification settings - Fork 364
/
Copy pathraft.go
154 lines (136 loc) · 3.55 KB
/
raft.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package broker
import (
"encoding/json"
"net"
"os"
"path/filepath"
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
"github.com/pkg/errors"
"github.com/travisjeffery/jocko/jocko"
)
type CmdType int
const (
addPartition CmdType = iota
addBroker
removeBroker
deleteTopic
)
type command struct {
Cmd CmdType `json:"type"`
Data *json.RawMessage `json:"data"`
}
func newCommand(cmd CmdType, data interface{}) (c command, err error) {
var b []byte
b, err = json.Marshal(data)
if err != nil {
return c, err
}
r := json.RawMessage(b)
return command{
Cmd: cmd,
Data: &r,
}, nil
}
// setupRaft is used to configure and create the raft node
func (b *Broker) setupRaft() (err error) {
addr := &net.TCPAddr{IP: net.ParseIP(b.bindAddr), Port: b.raftPort}
if b.raftTransport == nil {
b.raftTransport, err = raft.NewTCPTransport(addr.String(), nil, 3, timeout, os.Stderr)
if err != nil {
return errors.Wrap(err, "tcp transport failed")
}
}
path := filepath.Join(b.dataDir, raftState)
if err = os.MkdirAll(path, 0755); err != nil {
return errors.Wrap(err, "data directory mkdir failed")
}
b.raftPeers = raft.NewJSONPeers(path, b.raftTransport)
var peers []string
for _, p := range b.peers {
addr := &net.TCPAddr{IP: net.ParseIP(p.IP), Port: p.RaftPort}
peers = append(peers, addr.String())
}
if err = b.raftPeers.SetPeers(peers); err != nil {
return err
}
snapshots, err := raft.NewFileSnapshotStore(path, 2, os.Stderr)
if err != nil {
return err
}
boltStore, err := raftboltdb.NewBoltStore(filepath.Join(path, "raft.db"))
if err != nil {
return errors.Wrap(err, "bolt store failed")
}
b.raftStore = boltStore
leaderCh := make(chan bool, 1)
b.raftLeaderCh = leaderCh
b.raftConfig.NotifyCh = leaderCh
b.raftConfig.StartAsLeader = !b.devDisableBootstrap
raft, err := raft.NewRaft(b.raftConfig, b, boltStore, boltStore, snapshots, b.raftPeers, b.raftTransport)
if err != nil {
if b.raftStore != nil {
b.raftStore.Close()
}
b.raftTransport.Close()
return errors.Wrap(err, "raft failed")
}
b.raft = raft
return nil
}
func (s *Broker) raftApply(cmdType CmdType, data interface{}) error {
c, err := newCommand(cmdType, data)
if err != nil {
return err
}
b, err := json.Marshal(c)
if err != nil {
return err
}
f := s.raft.Apply(b, timeout)
return f.Error()
}
func (s *Broker) Apply(l *raft.Log) interface{} {
var c command
if err := json.Unmarshal(l.Data, &c); err != nil {
panic(errors.Wrap(err, "json unmarshal failed"))
}
s.logger.Debug("broker/apply cmd [%d]", c.Cmd)
switch c.Cmd {
case addBroker:
broker := new(jocko.BrokerConn)
b, err := c.Data.MarshalJSON()
if err != nil {
panic(errors.Wrap(err, "json unmarshal failed"))
}
if err := json.Unmarshal(b, broker); err != nil {
panic(errors.Wrap(err, "json unmarshal failed"))
}
s.addBroker(broker)
case addPartition:
p := new(jocko.Partition)
b, err := c.Data.MarshalJSON()
if err != nil {
panic(errors.Wrap(err, "json unmarshal failed"))
}
if err := json.Unmarshal(b, p); err != nil {
panic(errors.Wrap(err, "json unmarshal failed"))
}
if err := s.StartReplica(p); err != nil {
panic(errors.Wrap(err, "start replica failed"))
}
case deleteTopic:
p := new(jocko.Partition)
b, err := c.Data.MarshalJSON()
if err != nil {
panic(errors.Wrap(err, "json unmarshal failed"))
}
if err := json.Unmarshal(b, p); err != nil {
panic(errors.Wrap(err, "json unmarshal failed"))
}
if err := s.deleteTopic(p); err != nil {
panic(errors.Wrap(err, "topic delete failed"))
}
}
return nil
}