From 51c436df94fe7a70dbbc3e2d7ae8dcfd8a508e13 Mon Sep 17 00:00:00 2001 From: Travis Jeffery Date: Thu, 17 Nov 2016 05:24:38 -0500 Subject: [PATCH] improve consensus test --- broker/broker.go | 94 ++++++++++++++++++++++++++++++++----------- broker/broker_test.go | 91 ++++++++++++++++++++++++++++++++--------- cluster/broker.go | 13 ++++++ 3 files changed, 154 insertions(+), 44 deletions(-) create mode 100644 cluster/broker.go diff --git a/broker/broker.go b/broker/broker.go index 22624c96..e6e8a7ed 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -22,6 +22,8 @@ const ( const ( addPartition CmdType = iota + addBroker + removeBroker ) type CmdType int @@ -48,9 +50,18 @@ type Options struct { DataDir string BindAddr string LogDir string + ID int - numPartitions int - transport raft.Transport + DefaultNumPartitions int + Brokers []*cluster.Broker + + transport raft.Transport +} + +type broker struct { + ID int `json:"id"` + Host string `json:"host"` + Port int `json:"port"` } type Broker struct { @@ -58,8 +69,10 @@ type Broker struct { mu sync.Mutex + // state for fsm partitions []*cluster.TopicPartition topics map[string][]*cluster.TopicPartition + peers []*cluster.Broker peerStore raft.PeerStore transport raft.Transport @@ -76,9 +89,17 @@ func New(options Options) *Broker { } func (s *Broker) Open() error { - conf := raft.DefaultConfig() + host, port, err := net.SplitHostPort(s.BindAddr) + if err != nil { + return err + } + s.Brokers = append(s.Brokers, &cluster.Broker{ + Host: host, + Port: port, + ID: s.ID, + }) - conf.EnableSingleNode = true + conf := raft.DefaultConfig() addr, err := net.ResolveTCPAddr("tcp", s.BindAddr) if err != nil { @@ -88,11 +109,25 @@ func (s *Broker) Open() error { if s.transport == nil { s.transport, err = raft.NewTCPTransport(s.BindAddr, addr, 3, timeout, os.Stderr) if err != nil { - return errors.Wrap(err, "tcp transport failede") + return errors.Wrap(err, "tcp transport failed") } } s.peerStore = raft.NewJSONPeers(s.DataDir, s.transport) + os.MkdirAll(s.DataDir, 0755) + + if len(s.Brokers) == 1 { + conf.EnableSingleNode = true + } else { + var peers []string + for _, b := range s.Brokers { + peers = append(peers, b.Addr()) + } + err = s.peerStore.SetPeers(peers) + if err != nil { + return errors.Wrap(err, "set peers failed") + } + } snapshots, err := raft.NewFileSnapshotStore(s.DataDir, 2, os.Stderr) if err != nil { @@ -117,22 +152,14 @@ func (s *Broker) Close() error { return s.raft.Shutdown().Error() } -func (s *Broker) IsController() bool { - return s.raft.State() == raft.Leader +func (s *Broker) IsController() (bool, error) { + return s.raft.State() == raft.Leader, nil } func (s *Broker) ControllerID() string { return s.raft.Leader() } -func (s *Broker) BrokerID() string { - return s.transport.LocalAddr() -} - -func (s *Broker) Brokers() ([]string, error) { - return s.peerStore.Peers() -} - func (s *Broker) Partitions() ([]*cluster.TopicPartition, error) { return s.partitions, nil } @@ -156,18 +183,21 @@ func (s *Broker) Partition(topic string, partition int32) (*cluster.TopicPartiti func (s *Broker) NumPartitions() (int, error) { // TODO: need to get to get from store - if s.numPartitions == 0 { + if s.DefaultNumPartitions == 0 { return 4, nil } else { - return s.numPartitions, nil + return s.DefaultNumPartitions, nil } - } func (s *Broker) AddPartition(partition cluster.TopicPartition) error { return s.apply(addPartition, partition) } +func (s *Broker) AddBroker(broker cluster.Broker) error { + return s.apply(addBroker, broker) +} + func (s *Broker) apply(cmdType CmdType, data interface{}) error { c, err := newCommand(cmdType, data) if err != nil { @@ -197,11 +227,17 @@ func (s *Broker) addPartition(partition *cluster.TopicPartition) { } } +func (s *Broker) addBroker(broker *cluster.Broker) { + s.mu.Lock() + defer s.mu.Unlock() + s.Brokers = append(s.Brokers, broker) +} + func (s *Broker) IsLeaderOfPartition(partition *cluster.TopicPartition) bool { // TODO: switch this to a map for perf for _, p := range s.topics[partition.Topic] { if p.Partition == partition.Partition { - if partition.Leader == s.BrokerID() { + if partition.Leader == s.ID { return true } break @@ -218,7 +254,7 @@ func (s *Broker) Topics() []string { return topics } -func (s *Broker) Join(addr string) error { +func (s *Broker) Join(id int, addr string) error { f := s.raft.AddPeer(addr) return f.Error() } @@ -229,6 +265,16 @@ func (s *Broker) Apply(l *raft.Log) interface{} { panic(errors.Wrap(err, "json unmarshal failed")) } switch c.Cmd { + case addBroker: + broker := new(cluster.Broker) + b, err := c.Data.MarshalJSON() + if err != nil { + panic(errors.Wrap(err, "json unmarshal failed")) + } + if err := json.Unmarshal(b, broker); err != nil { + panic(errors.Wrap(err, "json unmarshal failed")) + } + s.addBroker(broker) case addPartition: p := new(cluster.TopicPartition) b, err := c.Data.MarshalJSON() @@ -256,7 +302,7 @@ func (s *Broker) CreateTopic(topic string, partitions int32) error { return err } - brokers, err := s.Brokers() + brokers := s.Brokers if err != nil { return err } @@ -265,9 +311,9 @@ func (s *Broker) CreateTopic(topic string, partitions int32) error { partition := cluster.TopicPartition{ Partition: int32(i), Topic: topic, - Leader: broker, - PreferredLeader: broker, - Replicas: []string{broker}, + Leader: broker.ID, + PreferredLeader: broker.ID, + Replicas: []int{broker.ID}, } if err := s.AddPartition(partition); err != nil { return err diff --git a/broker/broker_test.go b/broker/broker_test.go index a0bc7b16..28f372e8 100644 --- a/broker/broker_test.go +++ b/broker/broker_test.go @@ -3,6 +3,7 @@ package broker import ( "io/ioutil" "os" + "path/filepath" "testing" "time" @@ -13,12 +14,25 @@ import ( func TestStoreOpen(t *testing.T) { DataDir, _ := ioutil.TempDir("", "storetest") defer os.RemoveAll(DataDir) - BindAddr := "127.0.0.1:4000" + + bind0 := "127.0.0.1:4000" + bind1 := "127.0.0.1:4001" + bind2 := "127.0.0.1:4002" s0 := New(Options{ - DataDir: DataDir, - BindAddr: BindAddr, - numPartitions: 2, + DataDir: filepath.Join(DataDir, "0"), + BindAddr: bind0, + ID: 0, + DefaultNumPartitions: 2, + Brokers: []*cluster.Broker{{ + Host: "127.0.0.1", + Port: "4001", + ID: 1, + }, { + Host: "127.0.0.1", + Port: "4002", + ID: 2, + }}, }) assert.NotNil(t, s0) @@ -26,42 +40,79 @@ func TestStoreOpen(t *testing.T) { assert.NoError(t, err) defer s0.Close() - _, err = s0.WaitForLeader(10 * time.Second) - assert.NoError(t, err) - - DataDir, _ = ioutil.TempDir("", "storetest") - defer os.RemoveAll(DataDir) - BindAddr = "127.0.0.1:4001" s1 := New(Options{ - DataDir: DataDir, - BindAddr: BindAddr, - numPartitions: 2, + DataDir: filepath.Join(DataDir, "1"), + BindAddr: bind1, + ID: 1, + DefaultNumPartitions: 2, + Brokers: []*cluster.Broker{{ + Host: "127.0.0.1", + Port: "4000", + ID: 0, + }, { + Host: "127.0.0.1", + Port: "4002", + ID: 2, + }}, }) err = s1.Open() assert.NoError(t, err) defer s1.Close() - err = s0.Join(s1.BrokerID()) + s2 := New(Options{ + DataDir: filepath.Join(DataDir, "2"), + BindAddr: bind2, + ID: 2, + DefaultNumPartitions: 2, + Brokers: []*cluster.Broker{{ + Host: "127.0.0.1", + Port: "4000", + ID: 0, + }, { + Host: "127.0.0.1", + Port: "4001", + ID: 1, + }}, + }) + err = s2.Open() + assert.NoError(t, err) + defer s2.Close() + + l, err := s0.WaitForLeader(10 * time.Second) assert.NoError(t, err) tp := cluster.TopicPartition{ Topic: "test", Partition: 0, - Leader: s0.BrokerID(), - PreferredLeader: s0.BrokerID(), - Replicas: []string{s0.BrokerID()}, + Leader: s0.ID, + PreferredLeader: s0.ID, + Replicas: []int{s0.ID}, + } + + var peer, leader *Broker + bs := []*Broker{s0, s1, s2} + for _, b := range bs { + if b.BindAddr == l { + leader = b + } else { + peer = b + } } - err = s0.AddPartition(tp) + err = leader.AddPartition(tp) + assert.NoError(t, err) + + err = s0.WaitForAppliedIndex(2, 10*time.Second) assert.NoError(t, err) isLeader := s0.IsLeaderOfPartition(&tp) assert.True(t, isLeader) - err = s1.WaitForAppliedIndex(2, 10*time.Second) + err = peer.WaitForAppliedIndex(2, 10*time.Second) assert.NoError(t, err) - ps, err := s1.Partitions() + // check that consensus was made to peer + ps, err := peer.Partitions() assert.NoError(t, err) for _, p := range ps { assert.Equal(t, tp.Topic, p.Topic) diff --git a/cluster/broker.go b/cluster/broker.go new file mode 100644 index 00000000..b643a9f7 --- /dev/null +++ b/cluster/broker.go @@ -0,0 +1,13 @@ +package cluster + +import "fmt" + +type Broker struct { + ID int `json:"id"` + Host string `json:"host"` + Port string `json:"port"` +} + +func (b *Broker) Addr() string { + return fmt.Sprintf("%s:%s", b.Host, b.Port) +}