Skip to content

Commit

Permalink
improve consensus test
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Nov 17, 2016
1 parent 54fbf5f commit 51c436d
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 44 deletions.
94 changes: 70 additions & 24 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ const (

const (
addPartition CmdType = iota
addBroker
removeBroker
)

type CmdType int
Expand All @@ -48,18 +50,29 @@ type Options struct {
DataDir string
BindAddr string
LogDir string
ID int

numPartitions int
transport raft.Transport
DefaultNumPartitions int
Brokers []*cluster.Broker

transport raft.Transport
}

type broker struct {
ID int `json:"id"`
Host string `json:"host"`
Port int `json:"port"`
}

type Broker struct {
Options

mu sync.Mutex

// state for fsm
partitions []*cluster.TopicPartition
topics map[string][]*cluster.TopicPartition
peers []*cluster.Broker

peerStore raft.PeerStore
transport raft.Transport
Expand All @@ -76,9 +89,17 @@ func New(options Options) *Broker {
}

func (s *Broker) Open() error {
conf := raft.DefaultConfig()
host, port, err := net.SplitHostPort(s.BindAddr)
if err != nil {
return err
}
s.Brokers = append(s.Brokers, &cluster.Broker{
Host: host,
Port: port,
ID: s.ID,
})

conf.EnableSingleNode = true
conf := raft.DefaultConfig()

addr, err := net.ResolveTCPAddr("tcp", s.BindAddr)
if err != nil {
Expand All @@ -88,11 +109,25 @@ func (s *Broker) Open() error {
if s.transport == nil {
s.transport, err = raft.NewTCPTransport(s.BindAddr, addr, 3, timeout, os.Stderr)
if err != nil {
return errors.Wrap(err, "tcp transport failede")
return errors.Wrap(err, "tcp transport failed")
}
}

s.peerStore = raft.NewJSONPeers(s.DataDir, s.transport)
os.MkdirAll(s.DataDir, 0755)

if len(s.Brokers) == 1 {
conf.EnableSingleNode = true
} else {
var peers []string
for _, b := range s.Brokers {
peers = append(peers, b.Addr())
}
err = s.peerStore.SetPeers(peers)
if err != nil {
return errors.Wrap(err, "set peers failed")
}
}

snapshots, err := raft.NewFileSnapshotStore(s.DataDir, 2, os.Stderr)
if err != nil {
Expand All @@ -117,22 +152,14 @@ func (s *Broker) Close() error {
return s.raft.Shutdown().Error()
}

func (s *Broker) IsController() bool {
return s.raft.State() == raft.Leader
func (s *Broker) IsController() (bool, error) {
return s.raft.State() == raft.Leader, nil
}

func (s *Broker) ControllerID() string {
return s.raft.Leader()
}

func (s *Broker) BrokerID() string {
return s.transport.LocalAddr()
}

func (s *Broker) Brokers() ([]string, error) {
return s.peerStore.Peers()
}

func (s *Broker) Partitions() ([]*cluster.TopicPartition, error) {
return s.partitions, nil
}
Expand All @@ -156,18 +183,21 @@ func (s *Broker) Partition(topic string, partition int32) (*cluster.TopicPartiti

func (s *Broker) NumPartitions() (int, error) {
// TODO: need to get to get from store
if s.numPartitions == 0 {
if s.DefaultNumPartitions == 0 {
return 4, nil
} else {
return s.numPartitions, nil
return s.DefaultNumPartitions, nil
}

}

func (s *Broker) AddPartition(partition cluster.TopicPartition) error {
return s.apply(addPartition, partition)
}

func (s *Broker) AddBroker(broker cluster.Broker) error {
return s.apply(addBroker, broker)
}

func (s *Broker) apply(cmdType CmdType, data interface{}) error {
c, err := newCommand(cmdType, data)
if err != nil {
Expand Down Expand Up @@ -197,11 +227,17 @@ func (s *Broker) addPartition(partition *cluster.TopicPartition) {
}
}

func (s *Broker) addBroker(broker *cluster.Broker) {
s.mu.Lock()
defer s.mu.Unlock()
s.Brokers = append(s.Brokers, broker)
}

func (s *Broker) IsLeaderOfPartition(partition *cluster.TopicPartition) bool {
// TODO: switch this to a map for perf
for _, p := range s.topics[partition.Topic] {
if p.Partition == partition.Partition {
if partition.Leader == s.BrokerID() {
if partition.Leader == s.ID {
return true
}
break
Expand All @@ -218,7 +254,7 @@ func (s *Broker) Topics() []string {
return topics
}

func (s *Broker) Join(addr string) error {
func (s *Broker) Join(id int, addr string) error {
f := s.raft.AddPeer(addr)
return f.Error()
}
Expand All @@ -229,6 +265,16 @@ func (s *Broker) Apply(l *raft.Log) interface{} {
panic(errors.Wrap(err, "json unmarshal failed"))
}
switch c.Cmd {
case addBroker:
broker := new(cluster.Broker)
b, err := c.Data.MarshalJSON()
if err != nil {
panic(errors.Wrap(err, "json unmarshal failed"))
}
if err := json.Unmarshal(b, broker); err != nil {
panic(errors.Wrap(err, "json unmarshal failed"))
}
s.addBroker(broker)
case addPartition:
p := new(cluster.TopicPartition)
b, err := c.Data.MarshalJSON()
Expand Down Expand Up @@ -256,7 +302,7 @@ func (s *Broker) CreateTopic(topic string, partitions int32) error {
return err
}

brokers, err := s.Brokers()
brokers := s.Brokers
if err != nil {
return err
}
Expand All @@ -265,9 +311,9 @@ func (s *Broker) CreateTopic(topic string, partitions int32) error {
partition := cluster.TopicPartition{
Partition: int32(i),
Topic: topic,
Leader: broker,
PreferredLeader: broker,
Replicas: []string{broker},
Leader: broker.ID,
PreferredLeader: broker.ID,
Replicas: []int{broker.ID},
}
if err := s.AddPartition(partition); err != nil {
return err
Expand Down
91 changes: 71 additions & 20 deletions broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package broker
import (
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"

Expand All @@ -13,55 +14,105 @@ import (
func TestStoreOpen(t *testing.T) {
DataDir, _ := ioutil.TempDir("", "storetest")
defer os.RemoveAll(DataDir)
BindAddr := "127.0.0.1:4000"

bind0 := "127.0.0.1:4000"
bind1 := "127.0.0.1:4001"
bind2 := "127.0.0.1:4002"

s0 := New(Options{
DataDir: DataDir,
BindAddr: BindAddr,
numPartitions: 2,
DataDir: filepath.Join(DataDir, "0"),
BindAddr: bind0,
ID: 0,
DefaultNumPartitions: 2,
Brokers: []*cluster.Broker{{
Host: "127.0.0.1",
Port: "4001",
ID: 1,
}, {
Host: "127.0.0.1",
Port: "4002",
ID: 2,
}},
})
assert.NotNil(t, s0)

err := s0.Open()
assert.NoError(t, err)
defer s0.Close()

_, err = s0.WaitForLeader(10 * time.Second)
assert.NoError(t, err)

DataDir, _ = ioutil.TempDir("", "storetest")
defer os.RemoveAll(DataDir)
BindAddr = "127.0.0.1:4001"
s1 := New(Options{
DataDir: DataDir,
BindAddr: BindAddr,
numPartitions: 2,
DataDir: filepath.Join(DataDir, "1"),
BindAddr: bind1,
ID: 1,
DefaultNumPartitions: 2,
Brokers: []*cluster.Broker{{
Host: "127.0.0.1",
Port: "4000",
ID: 0,
}, {
Host: "127.0.0.1",
Port: "4002",
ID: 2,
}},
})
err = s1.Open()
assert.NoError(t, err)
defer s1.Close()

err = s0.Join(s1.BrokerID())
s2 := New(Options{
DataDir: filepath.Join(DataDir, "2"),
BindAddr: bind2,
ID: 2,
DefaultNumPartitions: 2,
Brokers: []*cluster.Broker{{
Host: "127.0.0.1",
Port: "4000",
ID: 0,
}, {
Host: "127.0.0.1",
Port: "4001",
ID: 1,
}},
})
err = s2.Open()
assert.NoError(t, err)
defer s2.Close()

l, err := s0.WaitForLeader(10 * time.Second)
assert.NoError(t, err)

tp := cluster.TopicPartition{
Topic: "test",
Partition: 0,
Leader: s0.BrokerID(),
PreferredLeader: s0.BrokerID(),
Replicas: []string{s0.BrokerID()},
Leader: s0.ID,
PreferredLeader: s0.ID,
Replicas: []int{s0.ID},
}

var peer, leader *Broker
bs := []*Broker{s0, s1, s2}
for _, b := range bs {
if b.BindAddr == l {
leader = b
} else {
peer = b
}
}

err = s0.AddPartition(tp)
err = leader.AddPartition(tp)
assert.NoError(t, err)

err = s0.WaitForAppliedIndex(2, 10*time.Second)
assert.NoError(t, err)

isLeader := s0.IsLeaderOfPartition(&tp)
assert.True(t, isLeader)

err = s1.WaitForAppliedIndex(2, 10*time.Second)
err = peer.WaitForAppliedIndex(2, 10*time.Second)
assert.NoError(t, err)

ps, err := s1.Partitions()
// check that consensus was made to peer
ps, err := peer.Partitions()
assert.NoError(t, err)
for _, p := range ps {
assert.Equal(t, tp.Topic, p.Topic)
Expand Down
13 changes: 13 additions & 0 deletions cluster/broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package cluster

import "fmt"

type Broker struct {
ID int `json:"id"`
Host string `json:"host"`
Port string `json:"port"`
}

func (b *Broker) Addr() string {
return fmt.Sprintf("%s:%s", b.Host, b.Port)
}

0 comments on commit 51c436d

Please sign in to comment.