Skip to content

Commit

Permalink
Merge pull request #13 from libp2p/feat/buffer-by-default
Browse files Browse the repository at this point in the history
Enable buffering by default
  • Loading branch information
magik6k authored Jun 25, 2019
2 parents 3643ae4 + 7750ab7 commit 0fef0d6
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 1 deletion.
3 changes: 2 additions & 1 deletion p2p/host/eventbus/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ var _ event.Subscription = (*sub)(nil)
// publishers to get blocked. CancelFunc is guaranteed to return after last send
// to the channel
func (b *basicBus) Subscribe(evtTypes interface{}, opts ...event.SubscriptionOpt) (_ event.Subscription, err error) {
var settings subSettings
settings := subSettings(subSettingsDefault)
for _, opt := range opts {
if err := opt(&settings); err != nil {
return nil, err
Expand Down Expand Up @@ -184,6 +184,7 @@ func (b *basicBus) Subscribe(evtTypes interface{}, opts ...event.SubscriptionOpt
// emit(EventT{})
func (b *basicBus) Emitter(evtType interface{}, opts ...event.EmitterOpt) (e event.Emitter, err error) {
var settings emitterSettings

for _, opt := range opts {
if err := opt(&settings); err != nil {
return nil, err
Expand Down
11 changes: 11 additions & 0 deletions p2p/host/eventbus/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ func (EventA) String() string {
return "Oh, Hello"
}

func TestDefaultSubIsBuffered(t *testing.T) {
bus := NewBus()
s, err := bus.Subscribe(new(EventA))
if err != nil {
t.Fatal(err)
}
if cap(s.(*sub).ch) == 0 {
t.Fatalf("without any options subscribe should be buffered. was %d", cap(s.(*sub).ch))
}
}

func TestEmit(t *testing.T) {
bus := NewBus()
sub, err := bus.Subscribe(new(EventA))
Expand Down
4 changes: 4 additions & 0 deletions p2p/host/eventbus/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ type subSettings struct {
buffer int
}

var subSettingsDefault = subSettings{
buffer: 16,
}

func BufSize(n int) func(interface{}) error {
return func(s interface{}) error {
s.(*subSettings).buffer = n
Expand Down

0 comments on commit 0fef0d6

Please sign in to comment.