diff --git a/src/app/libp2p_helper/src/codanet.go b/src/app/libp2p_helper/src/codanet.go index b54ed689bee..65b3c20c352 100644 --- a/src/app/libp2p_helper/src/codanet.go +++ b/src/app/libp2p_helper/src/codanet.go @@ -234,7 +234,8 @@ type Helper struct { ConnectionManager *CodaConnectionManager BandwidthCounter *metrics.BandwidthCounter MsgStats *MessageStats - Seeds []peer.AddrInfo + _seeds []peer.AddrInfo + seedsMutex sync.RWMutex NodeStatus []byte HeartbeatPeer func(peer.ID) } @@ -368,6 +369,13 @@ func (h *Helper) SetGatingState(gs *CodaGatingConfig) { } } +func (h *Helper) AddSeeds(infos ...peer.AddrInfo) { + // TODO: this "_seeds" field is never read anywhere, is it needed? + h.seedsMutex.Lock() + h._seeds = append(h._seeds, infos...) + h.seedsMutex.Unlock() +} + func (gs *CodaGatingState) TrustPeer(p peer.ID) { gs.trustedPeersMutex.Lock() gs.trustedPeers[p] = struct{}{} @@ -751,7 +759,7 @@ func MakeHelper(ctx context.Context, listenOn []ma.Multiaddr, externalAddr ma.Mu ConnectionManager: connManager, BandwidthCounter: bandwidthCounter, MsgStats: &MessageStats{min: math.MaxUint64}, - Seeds: seeds, + _seeds: seeds, HeartbeatPeer: func(p peer.ID) { lanPatcher.Heartbeat(p) wanPatcher.Heartbeat(p) diff --git a/src/app/libp2p_helper/src/libp2p_helper/app.go b/src/app/libp2p_helper/src/libp2p_helper/app.go index 309fb6a2246..d5679953026 100644 --- a/src/app/libp2p_helper/src/libp2p_helper/app.go +++ b/src/app/libp2p_helper/src/libp2p_helper/app.go @@ -8,7 +8,6 @@ import ( "math" "os" "strconv" - "sync" "time" ipc "libp2p_ipc" @@ -29,14 +28,13 @@ func newApp() *app { return &app{ P2p: nil, Ctx: ctx, - Subs: make(map[uint64]subscription), - Topics: make(map[string]*pubsub.Topic), - ValidatorMutex: &sync.Mutex{}, - Validators: make(map[uint64]*validationStatus), - Streams: make(map[uint64]net.Stream), + _subs: make(map[uint64]subscription), + _topics: make(map[string]*pubsub.Topic), + _validators: make(map[uint64]*validationStatus), + _streams: make(map[uint64]net.Stream), OutChan: outChan, Out: bufio.NewWriter(os.Stdout), - AddedPeers: []peer.AddrInfo{}, + _addedPeers: []peer.AddrInfo{}, MetricsRefreshTime: time.Minute, metricsCollectionStarted: false, metricsServer: nil, @@ -64,6 +62,151 @@ func (app *app) NextId() uint64 { return app.counter } +func (app *app) AddPeers(infos ...peer.AddrInfo) { + app.addedPeersMutex.Lock() + defer app.addedPeersMutex.Unlock() + app._addedPeers = append(app._addedPeers, infos...) +} + +func (app *app) GetAddedPeers() []peer.AddrInfo { + app.addedPeersMutex.RLock() + defer app.addedPeersMutex.RUnlock() + copyOfAddedPeers := make([]peer.AddrInfo, len(app._addedPeers)) + copy(copyOfAddedPeers, app._addedPeers) + return copyOfAddedPeers +} + +func (app *app) ResetAddedPeers() { + app.addedPeersMutex.Lock() + defer app.addedPeersMutex.Unlock() + app._addedPeers = nil +} + +func (app *app) AddStream(stream net.Stream) uint64 { + streamIdx := app.NextId() + app.streamsMutex.Lock() + defer app.streamsMutex.Unlock() + app._streams[streamIdx] = stream + return streamIdx +} + +func (app *app) CloseStream(streamId uint64) error { + app.streamsMutex.Lock() + defer app.streamsMutex.Unlock() + if stream, ok := app._streams[streamId]; ok { + delete(app._streams, streamId) + err := stream.Close() + if err != nil { + return badp2p(err) + } + return nil + } + return badRPC(errors.New("unknown stream_idx")) +} + +func (app *app) ResetStream(streamId uint64) error { + app.streamsMutex.Lock() + defer app.streamsMutex.Unlock() + if stream, ok := app._streams[streamId]; ok { + delete(app._streams, streamId) + err := stream.Reset() + if err != nil { + return badp2p(err) + } + return nil + } + return badRPC(errors.New("unknown stream_idx")) +} + +func (app *app) StreamWrite(streamId uint64, data []byte) error { + // TODO Consider using a more fine-grained locking strategy, + // not using a global mutex to lock on a message sending + app.streamsMutex.Lock() + defer app.streamsMutex.Unlock() + if stream, ok := app._streams[streamId]; ok { + n, err := stream.Write(data) + if err != nil { + // TODO check that it's correct to error out, not repeat writing + delete(app._streams, streamId) + close_err := stream.Close() + if close_err != nil { + app.P2p.Logger.Errorf("failed to close stream %d after encountering write failure (%s): %s", streamId, err.Error(), close_err.Error()) + } + return wrapError(badp2p(err), fmt.Sprintf("only wrote %d out of %d bytes", n, len(data))) + } + return nil + } + return badRPC(errors.New("unknown stream_idx")) +} + +func (app *app) AddValidator() (uint64, chan pubsub.ValidationResult) { + seqno := app.NextId() + ch := make(chan pubsub.ValidationResult) + app.validatorMutex.Lock() + defer app.validatorMutex.Unlock() + app._validators[seqno] = new(validationStatus) + app._validators[seqno].Completion = ch + return seqno, ch +} + +func (app *app) RemoveValidator(seqno uint64) { + app.validatorMutex.Lock() + defer app.validatorMutex.Unlock() + delete(app._validators, seqno) +} + +func (app *app) TimeoutValidator(seqno uint64) { + now := time.Now() + app.validatorMutex.Lock() + defer app.validatorMutex.Unlock() + app._validators[seqno].TimedOutAt = &now +} + +func (app *app) FinishValidator(seqno uint64, finish func(st *validationStatus)) bool { + app.validatorMutex.Lock() + defer app.validatorMutex.Unlock() + if st, ok := app._validators[seqno]; ok { + finish(st) + delete(app._validators, seqno) + return true + } else { + return false + } +} + +func (app *app) AddTopic(topicName string, topic *pubsub.Topic) { + app.topicsMutex.Lock() + defer app.topicsMutex.Unlock() + app._topics[topicName] = topic +} + +func (app *app) GetTopic(topicName string) (*pubsub.Topic, bool) { + app.topicsMutex.RLock() + defer app.topicsMutex.RUnlock() + topic, has := app._topics[topicName] + return topic, has +} + +func (app *app) AddSubscription(subId uint64, sub subscription) { + app.subsMutex.Lock() + defer app.subsMutex.Unlock() + app._subs[subId] = sub +} + +func (app *app) CancelSubscription(subId uint64) bool { + app.subsMutex.Lock() + defer app.subsMutex.Unlock() + + if sub, ok := app._subs[subId]; ok { + sub.Sub.Cancel() + sub.Cancel() + delete(app._subs, subId) + return true + } + + return false +} + func parseMultiaddrWithID(ma multiaddr.Multiaddr, id peer.ID) (*codaPeerInfo, error) { ipComponent, tcpMaddr := multiaddr.SplitFirst(ma) if !(ipComponent.Protocol().Code == multiaddr.P_IP4 || ipComponent.Protocol().Code == multiaddr.P_IP6) { @@ -96,6 +239,7 @@ func addrInfoOfString(maddr string) (*peer.AddrInfo, error) { return info, nil } +// Writes a message back to the OCaml node func (app *app) writeMsg(msg *capnp.Message) { if app.NoUpcalls { return diff --git a/src/app/libp2p_helper/src/libp2p_helper/config_msg.go b/src/app/libp2p_helper/src/libp2p_helper/config_msg.go index ef9e215a5c3..b992900149c 100644 --- a/src/app/libp2p_helper/src/libp2p_helper/config_msg.go +++ b/src/app/libp2p_helper/src/libp2p_helper/config_msg.go @@ -36,7 +36,7 @@ func (msg BeginAdvertisingReq) handle(app *app, seqno uint64) *capnp.Message { return mkRpcRespError(seqno, needsConfigure()) } app.SetConnectionHandlers() - for _, info := range app.AddedPeers { + for _, info := range app.GetAddedPeers() { app.P2p.Logger.Debug("Trying to connect to: ", info) err := app.P2p.Host.Connect(app.Ctx, info) if err != nil { @@ -334,7 +334,7 @@ func (msg ConfigureReq) handle(app *app, seqno uint64) *capnp.Message { return mkRpcRespError(seqno, badRPC(err)) } - app.AddedPeers = append(app.AddedPeers, seeds...) + app.AddPeers(seeds...) directPeersMaList, err := m.DirectPeers() if err != nil { @@ -372,12 +372,12 @@ func (msg ConfigureReq) handle(app *app, seqno uint64) *capnp.Message { if err != nil { return mkRpcRespError(seqno, badRPC(err)) } - gatingConfig, err := readGatingConfig(gc, app.AddedPeers) + gatingConfig, err := readGatingConfig(gc, app.GetAddedPeers()) if err != nil { return mkRpcRespError(seqno, badRPC(err)) } if gc.CleanAddedPeers() { - app.AddedPeers = nil + app.ResetAddedPeers() } stateDir, err := m.Statedir() @@ -593,13 +593,13 @@ func (m SetGatingConfigReq) handle(app *app, seqno uint64) *capnp.Message { var gatingConfig *codanet.CodaGatingConfig gc, err := SetGatingConfigReqT(m).GatingConfig() if err == nil { - gatingConfig, err = readGatingConfig(gc, app.AddedPeers) + gatingConfig, err = readGatingConfig(gc, app.GetAddedPeers()) } if err != nil { return mkRpcRespError(seqno, badRPC(err)) } if gc.CleanAddedPeers() { - app.AddedPeers = nil + app.ResetAddedPeers() } app.P2p.SetGatingState(gatingConfig) diff --git a/src/app/libp2p_helper/src/libp2p_helper/config_msg_test.go b/src/app/libp2p_helper/src/libp2p_helper/config_msg_test.go index d5a6330492c..674899e1cd3 100644 --- a/src/app/libp2p_helper/src/libp2p_helper/config_msg_test.go +++ b/src/app/libp2p_helper/src/libp2p_helper/config_msg_test.go @@ -40,7 +40,7 @@ func TestDHTDiscovery_TwoNodes(t *testing.T) { require.NoError(t, err) appB, _ := newTestApp(t, appAInfos, true) - appB.AddedPeers = appAInfos + appB.AddPeers(appAInfos...) appB.NoMDNS = true // begin appB and appA's DHT advertising diff --git a/src/app/libp2p_helper/src/libp2p_helper/data.go b/src/app/libp2p_helper/src/libp2p_helper/data.go index e07f903ef02..bf1748aaf49 100644 --- a/src/app/libp2p_helper/src/libp2p_helper/data.go +++ b/src/app/libp2p_helper/src/libp2p_helper/data.go @@ -23,16 +23,19 @@ import ( type app struct { P2p *codanet.Helper Ctx context.Context - Subs map[uint64]subscription - Topics map[string]*pubsub.Topic - Validators map[uint64]*validationStatus - ValidatorMutex *sync.Mutex - Streams map[uint64]net.Stream - StreamsMutex sync.Mutex + _subs map[uint64]subscription + subsMutex sync.Mutex + _topics map[string]*pubsub.Topic + topicsMutex sync.RWMutex + _validators map[uint64]*validationStatus + validatorMutex sync.Mutex + _streams map[uint64]net.Stream + streamsMutex sync.Mutex Out *bufio.Writer OutChan chan *capnp.Message Bootstrapper io.Closer - AddedPeers []peer.AddrInfo + addedPeersMutex sync.RWMutex + _addedPeers []peer.AddrInfo UnsafeNoTrustIP bool MetricsRefreshTime time.Duration metricsCollectionStarted bool @@ -54,8 +57,6 @@ type app struct { type subscription struct { Sub *pubsub.Subscription - Idx uint64 - Ctx context.Context Cancel context.CancelFunc } diff --git a/src/app/libp2p_helper/src/libp2p_helper/incoming_msg.go b/src/app/libp2p_helper/src/libp2p_helper/incoming_msg.go index 38b43577279..7346b7819ac 100644 --- a/src/app/libp2p_helper/src/libp2p_helper/incoming_msg.go +++ b/src/app/libp2p_helper/src/libp2p_helper/incoming_msg.go @@ -40,6 +40,7 @@ var pushMesssageExtractors = map[ipc.Libp2pHelperInterface_PushMessage_Which]ext ipc.Libp2pHelperInterface_PushMessage_Which_heartbeatPeer: fromHeartbeatPeerPush, } +// Handles messages coming from the OCaml process func (app *app) handleIncomingMsg(msg *ipc.Libp2pHelperInterface_Message) { if msg.HasRpcRequest() { resp, err := func() (*capnp.Message, error) { diff --git a/src/app/libp2p_helper/src/libp2p_helper/peer_msg.go b/src/app/libp2p_helper/src/libp2p_helper/peer_msg.go index a5a88fe9edd..74daed42656 100644 --- a/src/app/libp2p_helper/src/libp2p_helper/peer_msg.go +++ b/src/app/libp2p_helper/src/libp2p_helper/peer_msg.go @@ -40,7 +40,7 @@ func (m AddPeerReq) handle(app *app, seqno uint64) *capnp.Message { return mkRpcRespError(seqno, badRPC(err)) } - app.AddedPeers = append(app.AddedPeers, *info) + app.AddPeers(*info) app.P2p.GatingState().TrustPeer(info.ID) if app.Bootstrapper != nil { @@ -50,7 +50,7 @@ func (m AddPeerReq) handle(app *app, seqno uint64) *capnp.Message { app.P2p.Logger.Info("addPeer Trying to connect to: ", info) if AddPeerReqT(m).IsSeed() { - app.P2p.Seeds = append(app.P2p.Seeds, *info) + app.P2p.AddSeeds(*info) } err = app.P2p.Host.Connect(app.Ctx, *info) diff --git a/src/app/libp2p_helper/src/libp2p_helper/pubsub_msg.go b/src/app/libp2p_helper/src/libp2p_helper/pubsub_msg.go index 0008a8d483a..8ef176ae03e 100644 --- a/src/app/libp2p_helper/src/libp2p_helper/pubsub_msg.go +++ b/src/app/libp2p_helper/src/libp2p_helper/pubsub_msg.go @@ -36,9 +36,7 @@ func (m ValidationPush) handle(app *app) { return } seqno := vid.Id() - app.ValidatorMutex.Lock() - defer app.ValidatorMutex.Unlock() - if st, ok := app.Validators[seqno]; ok { + found := app.FinishValidator(seqno, func(st *validationStatus) { res := ValidationUnknown switch ValidationPushT(m).Result() { case ipc.ValidationResult_accept: @@ -54,8 +52,9 @@ func (m ValidationPush) handle(app *app) { if st.TimedOutAt != nil { app.P2p.Logger.Errorf("validation for item %d took %d seconds", seqno, time.Now().Add(validationTimeout).Sub(*st.TimedOutAt)) } - delete(app.Validators, seqno) - } else { + }) + + if !found { app.P2p.Logger.Warnf("handleValidation: validation seqno %d unknown", seqno) } } @@ -87,12 +86,12 @@ func (m PublishReq) handle(app *app, seqno uint64) *capnp.Message { return mkRpcRespError(seqno, badRPC(err)) } - if topic, has = app.Topics[topicName]; !has { + if topic, has = app.GetTopic(topicName); !has { topic, err = app.P2p.Pubsub.Join(topicName) if err != nil { return mkRpcRespError(seqno, badp2p(err)) } - app.Topics[topicName] = topic + app.AddTopic(topicName, topic) } if err := topic.Publish(app.Ctx, data); err != nil { @@ -136,7 +135,7 @@ func (m SubscribeReq) handle(app *app, seqno uint64) *capnp.Message { return mkRpcRespError(seqno, badp2p(err)) } - app.Topics[topicName] = topic + app.AddTopic(topicName, topic) err = app.P2p.Pubsub.RegisterTopicValidator(topicName, func(ctx context.Context, id peer.ID, msg *pubsub.Message) pubsub.ValidationResult { app.P2p.Logger.Debugf("Received gossip message on topic %s from %s", topicName, id.Pretty()) @@ -148,12 +147,7 @@ func (m SubscribeReq) handle(app *app, seqno uint64) *capnp.Message { seenAt := time.Now() - seqno := app.NextId() - ch := make(chan pubsub.ValidationResult) - app.ValidatorMutex.Lock() - app.Validators[seqno] = new(validationStatus) - app.Validators[seqno].Completion = ch - app.ValidatorMutex.Unlock() + seqno, ch := app.AddValidator() app.P2p.Logger.Info("validating a new pubsub message ...") @@ -161,17 +155,14 @@ func (m SubscribeReq) handle(app *app, seqno uint64) *capnp.Message { if err != nil && !app.UnsafeNoTrustIP { app.P2p.Logger.Errorf("failed to connect to peer %s that just sent us a pubsub message, dropping it", peer.Encode(id)) - app.ValidatorMutex.Lock() - defer app.ValidatorMutex.Unlock() - delete(app.Validators, seqno) + app.RemoveValidator(seqno) return pubsub.ValidationIgnore } deadline, ok := ctx.Deadline() if !ok { app.P2p.Logger.Errorf("no deadline set on validation context") - defer app.ValidatorMutex.Unlock() - delete(app.Validators, seqno) + app.RemoveValidator(seqno) return pubsub.ValidationIgnore } app.writeMsg(mkGossipReceivedUpcall(sender, deadline, seenAt, msg.Data, seqno, subId)) @@ -187,12 +178,7 @@ func (m SubscribeReq) handle(app *app, seqno uint64) *capnp.Message { validationTimeoutMetric.Inc() - app.ValidatorMutex.Lock() - - now := time.Now() - app.Validators[seqno].TimedOutAt = &now - - app.ValidatorMutex.Unlock() + app.TimeoutValidator(seqno) if app.UnsafeNoTrustIP { app.P2p.Logger.Info("validated anyway!") @@ -228,12 +214,11 @@ func (m SubscribeReq) handle(app *app, seqno uint64) *capnp.Message { } ctx, cancel := context.WithCancel(app.Ctx) - app.Subs[subId] = subscription{ + app.AddSubscription(subId, subscription{ Sub: sub, - Idx: subId, - Ctx: ctx, Cancel: cancel, - } + }) + go func() { for { _, err = sub.Next(ctx) @@ -268,14 +253,12 @@ func (m UnsubscribeReq) handle(app *app, seqno uint64) *capnp.Message { return mkRpcRespError(seqno, badRPC(err)) } subId := subId_.Id() - if sub, ok := app.Subs[subId]; ok { - sub.Sub.Cancel() - sub.Cancel() - delete(app.Subs, subId) + if app.CancelSubscription(subId) { return mkRpcRespSuccess(seqno, func(m *ipc.Libp2pHelperInterface_RpcResponseSuccess) { _, err := m.NewUnsubscribe() panicOnErr(err) }) + } else { + return mkRpcRespError(seqno, badRPC(errors.New("subscription not found"))) } - return mkRpcRespError(seqno, badRPC(errors.New("subscription not found"))) } diff --git a/src/app/libp2p_helper/src/libp2p_helper/pubsub_msg_test.go b/src/app/libp2p_helper/src/libp2p_helper/pubsub_msg_test.go index 1dd1d4dbc99..6d8ae65579f 100644 --- a/src/app/libp2p_helper/src/libp2p_helper/pubsub_msg_test.go +++ b/src/app/libp2p_helper/src/libp2p_helper/pubsub_msg_test.go @@ -27,7 +27,7 @@ func testPublishDo(t *testing.T, app *app, topic string, data []byte, rpcSeqno u _, err = respSuccess.Publish() require.NoError(t, err) - _, has := app.Topics[topic] + _, has := app._topics[topic] require.True(t, has) } @@ -55,9 +55,9 @@ func testSubscribeDo(t *testing.T, app *app, topic string, subId uint64, rpcSeqn _, err = respSuccess.Subscribe() require.NoError(t, err) - _, has := app.Topics[topic] + _, has := app._topics[topic] require.True(t, has) - _, has = app.Subs[subId] + _, has = app._subs[subId] require.True(t, has) } @@ -97,7 +97,7 @@ func TestUnsubscribe(t *testing.T) { _, err = respSuccess.Unsubscribe() require.NoError(t, err) - _, has := testApp.Subs[idx] + _, has := testApp._subs[idx] require.False(t, has) } @@ -121,7 +121,7 @@ func TestValidationPush(t *testing.T) { status := &validationStatus{ Completion: make(chan pubsub.ValidationResult), } - testApp.Validators[seqno] = status + testApp._validators[seqno] = status _, seg, err := capnp.NewMessage(capnp.SingleSegment(nil)) require.NoError(t, err) m, err := ipc.NewRootLibp2pHelperInterface_Validation(seg) @@ -133,7 +133,7 @@ func TestValidationPush(t *testing.T) { require.NoError(t, err) result := <-status.Completion require.Equal(t, pubsubValResults[i], result) - _, has := testApp.Validators[seqno] + _, has := testApp._validators[seqno] require.False(t, has) } } diff --git a/src/app/libp2p_helper/src/libp2p_helper/stream_msg.go b/src/app/libp2p_helper/src/libp2p_helper/stream_msg.go index bc4cc9ad827..c25456db3c7 100644 --- a/src/app/libp2p_helper/src/libp2p_helper/stream_msg.go +++ b/src/app/libp2p_helper/src/libp2p_helper/stream_msg.go @@ -2,13 +2,11 @@ package main import ( "context" - "fmt" "time" ipc "libp2p_ipc" capnp "capnproto.org/go/capnp/v3" - "github.com/go-errors/errors" net "github.com/libp2p/go-libp2p/core/network" peer "github.com/libp2p/go-libp2p/core/peer" protocol "github.com/libp2p/go-libp2p/core/protocol" @@ -35,12 +33,9 @@ func (m AddStreamHandlerReq) handle(app *app, seqno uint64) *capnp.Message { app.P2p.Logger.Errorf("failed to parse remote connection information, silently dropping stream: %s", err.Error()) return } - streamIdx := app.NextId() - app.StreamsMutex.Lock() - defer app.StreamsMutex.Unlock() - app.Streams[streamIdx] = stream - app.writeMsg(mkIncomingStreamUpcall(peerinfo, streamIdx, protocolId)) + streamIdx := app.AddStream(stream) handleStreamReads(app, stream, streamIdx) + app.writeMsg(mkIncomingStreamUpcall(peerinfo, streamIdx, protocolId)) }) return mkRpcRespSuccess(seqno, func(m *ipc.Libp2pHelperInterface_RpcResponseSuccess) { @@ -65,20 +60,14 @@ func (m CloseStreamReq) handle(app *app, seqno uint64) *capnp.Message { return mkRpcRespError(seqno, badRPC(err)) } streamId := sid.Id() - app.StreamsMutex.Lock() - defer app.StreamsMutex.Unlock() - if stream, ok := app.Streams[streamId]; ok { - delete(app.Streams, streamId) - err := stream.Close() - if err != nil { - return mkRpcRespError(seqno, badp2p(err)) - } - return mkRpcRespSuccess(seqno, func(m *ipc.Libp2pHelperInterface_RpcResponseSuccess) { - _, err := m.NewCloseStream() - panicOnErr(err) - }) + err = app.CloseStream(streamId) + if err != nil { + return mkRpcRespError(seqno, err) } - return mkRpcRespError(seqno, badRPC(errors.New("unknown stream_idx"))) + return mkRpcRespSuccess(seqno, func(m *ipc.Libp2pHelperInterface_RpcResponseSuccess) { + _, err := m.NewCloseStream() + panicOnErr(err) + }) } type OpenStreamReqT = ipc.Libp2pHelperInterface_OpenStream_Request @@ -93,7 +82,6 @@ func (m OpenStreamReq) handle(app *app, seqno uint64) *capnp.Message { return mkRpcRespError(seqno, needsConfigure()) } - streamIdx := app.NextId() var peerDecoded peer.ID var protocolId string err := func() error { @@ -133,15 +121,14 @@ func (m OpenStreamReq) handle(app *app, seqno uint64) *capnp.Message { return mkRpcRespError(seqno, badp2p(err)) } - app.StreamsMutex.Lock() - defer app.StreamsMutex.Unlock() - app.Streams[streamIdx] = stream + streamIdx := app.AddStream(stream) go func() { // FIXME HACK: allow time for the openStreamResult to get printed before we start inserting stream events time.Sleep(250 * time.Millisecond) // Note: It is _very_ important that we call handleStreamReads here -- this is how the "caller" side of the stream starts listening to the responses from the RPCs. Do not remove. handleStreamReads(app, stream, streamIdx) }() + return mkRpcRespSuccess(seqno, func(m *ipc.Libp2pHelperInterface_RpcResponseSuccess) { resp, err := m.NewOpenStream() panicOnErr(err) @@ -193,21 +180,14 @@ func (m ResetStreamReq) handle(app *app, seqno uint64) *capnp.Message { return mkRpcRespError(seqno, badRPC(err)) } streamId := sid.Id() - app.StreamsMutex.Lock() - if stream, ok := app.Streams[streamId]; ok { - delete(app.Streams, streamId) - app.StreamsMutex.Unlock() - err := stream.Reset() - if err != nil { - return mkRpcRespError(seqno, badp2p(err)) - } - return mkRpcRespSuccess(seqno, func(m *ipc.Libp2pHelperInterface_RpcResponseSuccess) { - _, err := m.NewResetStream() - panicOnErr(err) - }) + err = app.ResetStream(streamId) + if err != nil { + return mkRpcRespError(seqno, err) } - app.StreamsMutex.Unlock() - return mkRpcRespError(seqno, badRPC(errors.New("unknown stream_idx"))) + return mkRpcRespSuccess(seqno, func(m *ipc.Libp2pHelperInterface_RpcResponseSuccess) { + _, err := m.NewResetStream() + panicOnErr(err) + }) } type SendStreamReqT = ipc.Libp2pHelperInterface_SendStream_Request @@ -235,26 +215,14 @@ func (m SendStreamReq) handle(app *app, seqno uint64) *capnp.Message { } streamId := sid.Id() - // TODO Consider using a more fine-grained locking strategy, - // not using a global mutex to lock on a message sending - app.StreamsMutex.Lock() - defer app.StreamsMutex.Unlock() - if stream, ok := app.Streams[streamId]; ok { - n, err := stream.Write(data) - if err != nil { - // TODO check that it's correct to error out, not repeat writing - delete(app.Streams, streamId) - close_err := stream.Close() - if close_err != nil { - app.P2p.Logger.Errorf("failed to close stream %d after encountering write failure (%s): %s", streamId, err.Error(), close_err.Error()) - } + err = app.StreamWrite(streamId, data) - return mkRpcRespError(seqno, wrapError(badp2p(err), fmt.Sprintf("only wrote %d out of %d bytes", n, len(data)))) - } - return mkRpcRespSuccess(seqno, func(m *ipc.Libp2pHelperInterface_RpcResponseSuccess) { - _, err := m.NewSendStream() - panicOnErr(err) - }) + if err != nil { + return mkRpcRespError(seqno, err) } - return mkRpcRespError(seqno, badRPC(errors.New("unknown stream_idx"))) + + return mkRpcRespSuccess(seqno, func(m *ipc.Libp2pHelperInterface_RpcResponseSuccess) { + _, err := m.NewSendStream() + panicOnErr(err) + }) } diff --git a/src/app/libp2p_helper/src/libp2p_helper/stream_msg_test.go b/src/app/libp2p_helper/src/libp2p_helper/stream_msg_test.go index b21b6d3f263..d3621a2f788 100644 --- a/src/app/libp2p_helper/src/libp2p_helper/stream_msg_test.go +++ b/src/app/libp2p_helper/src/libp2p_helper/stream_msg_test.go @@ -76,7 +76,7 @@ func testOpenStreamDo(t *testing.T, appA *app, appBHost host.Host, appBPort uint require.Equal(t, appA.counter, respStreamId) - _, has := appA.Streams[respStreamId] + _, has := appA._streams[respStreamId] require.True(t, has) return respStreamId @@ -110,7 +110,7 @@ func testCloseStreamDo(t *testing.T, app *app, streamId uint64, rpcSeqno uint64) _, err = respSuccess.CloseStream() require.NoError(t, err) - _, has := app.Streams[streamId] + _, has := app._streams[streamId] require.False(t, has) } @@ -173,7 +173,7 @@ func testResetStreamDo(t *testing.T, app *app, streamId uint64, rpcSeqno uint64) _, err = respSuccess.ResetStream() require.NoError(t, err) - _, has := app.Streams[streamId] + _, has := app._streams[streamId] require.False(t, has) } @@ -201,7 +201,7 @@ func testSendStreamDo(t *testing.T, app *app, streamId uint64, msgBytes []byte, _, err = respSuccess.SendStream() require.NoError(t, err) - _, has := app.Streams[streamId] + _, has := app._streams[streamId] require.True(t, has) } diff --git a/src/app/libp2p_helper/src/libp2p_helper/util_test.go b/src/app/libp2p_helper/src/libp2p_helper/util_test.go index b3e9530259d..4aa7bb3db95 100644 --- a/src/app/libp2p_helper/src/libp2p_helper/util_test.go +++ b/src/app/libp2p_helper/src/libp2p_helper/util_test.go @@ -89,12 +89,11 @@ func newTestAppWithMaxConnsAndCtxAndGrace(t *testing.T, privkey crypto.PrivKey, return &app{ P2p: helper, Ctx: ctx, - Subs: make(map[uint64]subscription), - Topics: make(map[string]*pubsub.Topic), - ValidatorMutex: &sync.Mutex{}, - Validators: make(map[uint64]*validationStatus), - Streams: make(map[uint64]net.Stream), - AddedPeers: make([]peer.AddrInfo, 0, 512), + _subs: make(map[uint64]subscription), + _topics: make(map[string]*pubsub.Topic), + _validators: make(map[uint64]*validationStatus), + _streams: make(map[uint64]net.Stream), + _addedPeers: make([]peer.AddrInfo, 0, 512), OutChan: outChan, MetricsRefreshTime: time.Second * 2, NoUpcalls: noUpcalls,