From 29bca348a98459c8820f9875e82cfd5422ed530b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Thu, 20 Jun 2019 16:27:16 +0100 Subject: [PATCH] integrate the event bus into host implementations (#23) --- p2p/host/blank/blank.go | 43 ++++++++++++++++++++++++++++++++--------- 1 file changed, 34 insertions(+), 9 deletions(-) diff --git a/p2p/host/blank/blank.go b/p2p/host/blank/blank.go index 59189782b3..b98b2e5247 100644 --- a/p2p/host/blank/blank.go +++ b/p2p/host/blank/blank.go @@ -5,12 +5,15 @@ import ( "io" "github.com/libp2p/go-libp2p-core/connmgr" + "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/protocol" + "github.com/libp2p/go-eventbus" + logging "github.com/ipfs/go-log" ma "github.com/multiformats/go-multiaddr" @@ -21,16 +24,26 @@ var log = logging.Logger("blankhost") // BlankHost is the thinnest implementation of the host.Host interface type BlankHost struct { - n network.Network - mux *mstream.MultistreamMuxer - cmgr connmgr.ConnManager + n network.Network + mux *mstream.MultistreamMuxer + cmgr connmgr.ConnManager + eventbus event.Bus + emitters struct { + evtLocalProtocolsUpdated event.Emitter + } } func NewBlankHost(n network.Network) *BlankHost { bh := &BlankHost{ - n: n, - cmgr: &connmgr.NullConnMgr{}, - mux: mstream.NewMultistreamMuxer(), + n: n, + cmgr: &connmgr.NullConnMgr{}, + mux: mstream.NewMultistreamMuxer(), + eventbus: eventbus.NewBus(), + } + + var err error + if bh.emitters.evtLocalProtocolsUpdated, err = bh.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil { + return nil } n.SetStreamHandler(bh.newStreamHandler) @@ -98,8 +111,11 @@ func (bh *BlankHost) NewStream(ctx context.Context, p peer.ID, protos ...protoco return s, nil } -func (bh *BlankHost) RemoveStreamHandler(p protocol.ID) { - bh.Mux().RemoveHandler(string(p)) +func (bh *BlankHost) RemoveStreamHandler(pid protocol.ID) { + bh.Mux().RemoveHandler(string(pid)) + bh.emitters.evtLocalProtocolsUpdated.Emit(event.EvtLocalProtocolsUpdated{ + Removed: []protocol.ID{pid}, + }) } func (bh *BlankHost) SetStreamHandler(pid protocol.ID, handler network.StreamHandler) { @@ -109,6 +125,9 @@ func (bh *BlankHost) SetStreamHandler(pid protocol.ID, handler network.StreamHan handler(is) return nil }) + bh.emitters.evtLocalProtocolsUpdated.Emit(event.EvtLocalProtocolsUpdated{ + Added: []protocol.ID{pid}, + }) } func (bh *BlankHost) SetStreamHandlerMatch(pid protocol.ID, m func(string) bool, handler network.StreamHandler) { @@ -118,11 +137,13 @@ func (bh *BlankHost) SetStreamHandlerMatch(pid protocol.ID, m func(string) bool, handler(is) return nil }) + bh.emitters.evtLocalProtocolsUpdated.Emit(event.EvtLocalProtocolsUpdated{ + Added: []protocol.ID{pid}, + }) } // newStreamHandler is the remote-opened stream handler for network.Network func (bh *BlankHost) newStreamHandler(s network.Stream) { - protoID, handle, err := bh.Mux().Negotiate(s) if err != nil { log.Warning("protocol mux failed: %s", err) @@ -148,3 +169,7 @@ func (bh *BlankHost) Network() network.Network { func (bh *BlankHost) ConnManager() connmgr.ConnManager { return bh.cmgr } + +func (bh *BlankHost) EventBus() event.Bus { + return bh.eventbus +}