Skip to content

Commit

Permalink
initial implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
advdv committed Jun 20, 2019
1 parent bacef9d commit a4b43f3
Show file tree
Hide file tree
Showing 17 changed files with 796 additions and 1 deletion.
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,11 @@
# brahms
Byzantine Resilient Gossip Protocol
This is an experimental implementation of [Brahms: Byzantine resilient random membership sampling](https://www.cs.technion.ac.il/~gabik/publications/Brahms-COMNET.pdf). It describes a byzantine resilient protocol that creates a well-connected overlay network with each member only needing to knowing at most `O(∛n)` other peers.

## TODO
- [ ] decide, use and test an actual network transport
- [ ] instead of Node ids, work with ip addresses and ports
- [ ] fix the myriad of race conditions on shared memory variables
- [ ] implement validation of the sample by probing
- [ ] implement a limited push with a small proof of work
- [ ] adjust l1 and l2 as the network grobs using an esimate as described [here](https://research.neustar.biz/2012/07/09/sketch-of-the-day-k-minimum-values/)
- [ ] fix send on closed channel bug with update calls with too short a context
76 changes: 76 additions & 0 deletions brahms.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package brahms

import (
"context"
"math/rand"
"time"
)

// Brahms implements the gossip protocol and takes an old view 'v' and returns a
// new view.
func Brahms(self NID, rnd *rand.Rand, p P, to time.Duration, s *Sampler, tr Transport, pushes <-chan NID, v View) View {

// reset push/pull views (line 21)
push, pull := View{}, View{}

// perform sends and write results to these channels
cpull := make(chan View, p.βl1())
func() {
ctx, cancel := context.WithTimeout(context.Background(), to)
defer cancel()

// push our own id to peers picked from the current view (line 22)
for id := range v.Pick(rnd, p.αl1()) {
go tr.Push(ctx, self, id)
}

// send pull requests to peers picked from the current view (line 25)
for id := range v.Pick(rnd, p.βl1()) {
go tr.Pull(ctx, cpull, id)
}

// wait for time unit to be done, cancels any open pushes/pulls (line 27)
<-ctx.Done()
close(cpull)
}()

// drain the buffer of all ids pushed to us (line 28)
DRAIN:
for {
select {
case id := <-pushes:
push[id] = struct{}{}
default:
break DRAIN
}
}

// add all peers that we received as replies from our pull requests (line 32)
for pv := range cpull {
for id := range pv {

//NOTE: we divert from the paper by ignoring any pulls
if id == self {
continue
}

pull[id] = struct{}{}
}
}

// only update our view if the nr of pushed ids was not too high (line 35)
// NOTE: we divert from the paper here. We're happy to update if either pull
// or push yielded us some nodes not necessarily both.
if len(push) <= p.αl1() && (len(push) > 0 || len(pull) > 0) {

// construct our new view from what we've seen this round (line 36)
v = push.Pick(rnd, p.αl1()).
Concat(pull.Pick(rnd, p.βl1())).
Concat(s.Sample().Pick(rnd, p.γl1()))
}

// update the sampler with resuling push/pull (line 37)
s.Update(push.Concat(pull))

return v
}
84 changes: 84 additions & 0 deletions brahms_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package brahms

import (
"math/rand"
"testing"
"time"

"github.com/advanderveer/go-test"
)

func TestBrahmsNoReply(t *testing.T) {
p, _ := NewParams(0.1, 0.7, 0.2, 10, 2)
r := rand.New(rand.NewSource(1))
s := NewSampler(r, p.l2())
self := NID{0x01}

p0 := make(chan NID)
v0 := NewView(NID{0x01})
tr0 := NewMockTransport()

v1 := Brahms(self, r, p, time.Millisecond*10, s, tr0, p0, v0)

//view should be unchanged transport returned nothing
test.Equals(t, v0, v1)

//should have pushed our own nid
test.Equals(t, true, tr0.DidPush(self))

//sample should be empty
test.Equals(t, View{}, s.Sample())
}

func TestBrahmsWithJustPushes(t *testing.T) {
p, _ := NewParams(0.1, 0.7, 0.2, 10, 2)
r := rand.New(rand.NewSource(1))
s := NewSampler(r, p.l2())
self := NID{0x01}

id1 := NID{0x01, 0x02}
p0 := make(chan NID, 10)
p0 <- id1
v0 := NewView(NID{0x01})
tr0 := NewMockTransport()

// with just a pull response we update the view with just that info
v1 := Brahms(self, r, p, time.Millisecond*10, s, tr0, p0, v0)
test.Equals(t, 0, len(p0))
test.Equals(t, NewView(id1), v1)

// but the pushed id should have been added to the sample
test.Equals(t, NewView(id1), s.Sample())

t.Run("with too many pushes", func(t *testing.T) {
p1 := make(chan NID, 10)
p1 <- NID{0xaa, 0xaa}
p1 <- NID{0xbb, 0xbb} //with the given params this is too much push

v2 := Brahms(NID{0xff, 0xff}, r, p, time.Millisecond*10, s, tr0, p1, v0)

//with too many pushes the view shouldn't have changed
test.Equals(t, v2, v0)
})
}

func TestBrahmsWithPullsAndPushes(t *testing.T) {
p, _ := NewParams(0.1, 0.7, 0.2, 10, 2)
r := rand.New(rand.NewSource(1))
s := NewSampler(r, p.l2())
self := NID{0x01}
other := NID{0x02}

v0 := NewView(other)
pull1 := NID{0x01, 0x02}
push1 := NID{0x02, 0x02}
p0 := make(chan NID, 10)
p0 <- push1
tr0 := NewMockTransport()
tr0.SetPull(other, NewView(pull1, pull1))

//with both pushes and pulls the view should get updated
v1 := Brahms(self, r, p, time.Millisecond*10, s, tr0, p0, v0)
test.Equals(t, NewView(pull1, push1), v1)
test.Equals(t, NewView(pull1, push1), s.Sample())
}
57 changes: 57 additions & 0 deletions core.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package brahms

import (
"math/rand"
"time"
)

// Core implements the core algorithm
type Core struct {
rnd *rand.Rand
self NID
view View
pushes chan NID
params P
sampler *Sampler
tr Transport
}

// NewCore initializes the core
func NewCore(rnd *rand.Rand, self NID, v0 View, p P, tr Transport) (a *Core) {
a = &Core{
self: self,
view: v0,
pushes: make(chan NID, 100),
params: p,
sampler: NewSampler(rnd, p.l2()),
tr: tr,
rnd: rnd,
}

//initialize the sampler with our initial view
a.sampler.Update(v0)
return
}

// @TODO probe for sample validation

// ID returns this core's id
func (h *Core) ID() NID { return h.self }

// UpdateView runs the algorithm to update the view
func (h *Core) UpdateView(to time.Duration) {
h.view = Brahms(h.self, h.rnd, h.params, to, h.sampler, h.tr, h.pushes, h.view)
}

// HandlePull responds to pulls by returning a copy of our view
func (h *Core) HandlePull() View {
return h.view.Copy()
}

// HandlePush handles incoming ids
func (h *Core) HandlePush(id NID) {
select {
case h.pushes <- id:
default: //push buffer is full, discard
}
}
91 changes: 91 additions & 0 deletions core_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package brahms

import (
"bytes"
"encoding/binary"
"fmt"
"math"
"math/rand"
"path/filepath"
"testing"
"time"

"github.com/advanderveer/go-test"
)

func TestMiniNetCore(t *testing.T) {
rnd := rand.New(rand.NewSource(1))
prm, _ := NewParams(0.45, 0.45, 0.1, 100, 10)

//create a mini network with three cores
tr := NewMemNetTransport()
c1 := NewCore(rnd, NID{0x01}, NewView(NID{0x02}), prm, tr)
tr.AddCore(c1)
c2 := NewCore(rnd, NID{0x02}, NewView(NID{0x03}), prm, tr)
tr.AddCore(c2)
c3 := NewCore(rnd, NID{0x03}, NewView(), prm, tr)
tr.AddCore(c3)

// after two iterations we should have a connected graph
for i := 0; i < 2; i++ {
c1.UpdateView(time.Millisecond)
c2.UpdateView(time.Millisecond)
c3.UpdateView(time.Millisecond)
}

// view and sampler should show a connected graph
test.Equals(t, NewView(NID{0x02}, NID{0x03}), c1.view)
test.Equals(t, NewView(NID{0x02}, NID{0x03}), c1.sampler.Sample())
test.Equals(t, NewView(NID{0x01}, NID{0x03}), c2.view)
test.Equals(t, NewView(NID{0x01}, NID{0x03}), c2.sampler.Sample())
test.Equals(t, NewView(NID{0x01}, NID{0x02}), c3.view)
test.Equals(t, NewView(NID{0x01}, NID{0x02}), c3.sampler.Sample())
}

func TestLargeNetwork(t *testing.T) {
r := rand.New(rand.NewSource(1))
n := uint64(100)
q := 20
m := 2.0
l := int(math.Round(m * math.Pow(float64(n), 1.0/3)))
p, _ := NewParams(
0.45,
0.45,
0.1,
l, l,
)

tr := NewMemNetTransport()
cores := make([]*Core, 0, n)
for i := uint64(1); i <= n; i++ {
id := NID{}
other := NID{}
binary.LittleEndian.PutUint64(id[:], i)
binary.LittleEndian.PutUint64(other[:], i+1)
if i == n {
other = NID{0x01}
}

c := NewCore(r, id, NewView(other), p, tr)
tr.AddCore(c)
cores = append(cores, c)
}

for i := 0; i < q; i++ {
buf := bytes.NewBuffer(nil)
draw(t, buf, cores)
drawPNG(t, buf, fmt.Sprintf(filepath.Join("draws", "network_%d.png"), i))

for _, c := range cores {
c.UpdateView(time.Microsecond * 700)
}
}

var tot float64
for _, c := range cores {
tot += float64(len(c.view))
}

fmt.Println("l1/l2:", p.l2(), "avg:", tot/float64(len(cores)))

}
41 changes: 41 additions & 0 deletions draw_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package brahms

import (
"fmt"
"io"
"os"
"os/exec"
"testing"

"github.com/advanderveer/go-test"
)

func drawPNG(t *testing.T, buf io.Reader, name string) {
f, err := os.Create(name)
test.Ok(t, err)
defer f.Close()

cmd := exec.Command("neato", "-Tpng")
cmd.Stdin = buf
cmd.Stdout = f
test.Ok(t, cmd.Run())
}

func draw(t testing.TB, w io.Writer, cores []*Core) {
fmt.Fprintln(w, `digraph {`)
fmt.Fprintln(w, `layout=neato;`)
fmt.Fprintln(w, `overlap=scalexy;`)
fmt.Fprintln(w, `sep="+1";`)

for _, c := range cores {
fmt.Fprintf(w, "\t"+`"%.6x" [style="filled,solid",label="%.6x"`, c.ID(), c.ID())
fmt.Fprintf(w, `,fillcolor="#ffffff"`)
fmt.Fprintf(w, "]\n")

for id := range c.view {
fmt.Fprintf(w, "\t"+`"%.6x" -> "%.6x";`+"\n", c.ID(), id)
}
}

fmt.Fprintln(w, `}`)
}
1 change: 1 addition & 0 deletions draws/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.png
8 changes: 8 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module github.com/advanderveer/brahms

go 1.12

require (
github.com/advanderveer/go-test v1.0.1
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
github.com/advanderveer/go-test v1.0.1 h1:FuP/a1WOTT66sx7J6oeY+7ACmNEvBxiJh6ZdR1fc5pQ=
github.com/advanderveer/go-test v1.0.1/go.mod h1:cTvlXX1T7hQFO4xVUN2FEVnwEkhzUZzapJOi4JCoX1I=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
Loading

0 comments on commit a4b43f3

Please sign in to comment.