From 3545e8065f833abc1d77de37d3f7a077d7d6bfe2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sun, 6 Nov 2022 20:00:52 +0000 Subject: [PATCH] netbs: Address review --- blockstore/net.go | 41 +++++++++++-------- blockstore/net_serve.go | 1 + markets/retrievaladapter/client_blockstore.go | 2 +- node/impl/client/client.go | 3 +- 4 files changed, 27 insertions(+), 20 deletions(-) diff --git a/blockstore/net.go b/blockstore/net.go index fa8b24591fb..c627cae09aa 100644 --- a/blockstore/net.go +++ b/blockstore/net.go @@ -19,12 +19,12 @@ import ( type NetRPCReqType byte const ( - NRpcHas NetRPCReqType = iota - NRpcGet NetRPCReqType = iota - NRpcGetSize NetRPCReqType = iota - NRpcPut NetRPCReqType = iota - NRpcDelete NetRPCReqType = iota - NRpcList NetRPCReqType = iota + NRpcHas NetRPCReqType = iota + NRpcGet + NRpcGetSize + NRpcPut + NRpcDelete + NRpcList // todo cancel req ) @@ -32,16 +32,16 @@ const ( type NetRPCRespType byte const ( - NRpcOK NetRPCRespType = iota - NRpcErr NetRPCRespType = iota - NRpcMore NetRPCRespType = iota + NRpcOK NetRPCRespType = iota + NRpcErr + NRpcMore ) type NetRPCErrType byte const ( - NRpcErrGeneric NetRPCErrType = iota - NRpcErrNotFound NetRPCErrType = iota + NRpcErrGeneric NetRPCErrType = iota + NRpcErrNotFound ) type NetRpcReq struct { @@ -87,7 +87,7 @@ type NetworkStore struct { closed chan struct{} closeLk sync.Mutex - onClose func() + onClose []func() } func NewNetworkStore(mss msgio.ReadWriteCloser) *NetworkStore { @@ -143,7 +143,7 @@ func (n *NetworkStore) OnClose(cb func()) { case <-n.closed: cb() default: - n.onClose = cb + n.onClose = append(n.onClose, cb) } } @@ -154,7 +154,9 @@ func (n *NetworkStore) receive() { close(n.closed) if n.onClose != nil { - n.onClose() + for _, f := range n.onClose { + f() + } } }() @@ -203,6 +205,7 @@ func (n *NetworkStore) sendRpc(rt NetRPCReqType, cids []cid.Cid, data [][]byte) n.respLk.Lock() if n.respMap == nil { + n.respLk.Unlock() return 0, nil, xerrors.Errorf("netstore closed") } n.respMap[rid] = respCh @@ -218,22 +221,24 @@ func (n *NetworkStore) sendRpc(rt NetRPCReqType, cids []cid.Cid, data [][]byte) var rbuf bytes.Buffer // todo buffer pool if err := req.MarshalCBOR(&rbuf); err != nil { n.respLk.Lock() + defer n.respLk.Unlock() + if n.respMap == nil { return 0, nil, xerrors.Errorf("netstore closed") } delete(n.respMap, rid) - n.respLk.Unlock() return 0, nil, err } if err := n.msgStream.WriteMsg(rbuf.Bytes()); err != nil { n.respLk.Lock() + defer n.respLk.Unlock() + if n.respMap == nil { return 0, nil, xerrors.Errorf("netstore closed") } delete(n.respMap, rid) - n.respLk.Unlock() return 0, nil, err } @@ -260,10 +265,10 @@ func (n *NetworkStore) waitResp(ctx context.Context, rch <-chan NetRpcResp, rid } else { err = xerrors.Errorf("block not found, but cid was null") } - default: - err = xerrors.Errorf("unknown error type") case NRpcErrGeneric: err = xerrors.Errorf("generic error") + default: + err = xerrors.Errorf("unknown error type") } return NetRpcResp{}, xerrors.Errorf("netstore error response: %s (%w)", e.Msg, err) diff --git a/blockstore/net_serve.go b/blockstore/net_serve.go index 25226c5c503..f2a63f3813b 100644 --- a/blockstore/net_serve.go +++ b/blockstore/net_serve.go @@ -19,6 +19,7 @@ type NetworkStoreHandler struct { bs Blockstore } +// NOTE: This code isn't yet hardened to accept untrusted input. See TODOs here and in net.go func HandleNetBstoreStream(ctx context.Context, bs Blockstore, mss msgio.ReadWriteCloser) *NetworkStoreHandler { ns := &NetworkStoreHandler{ msgStream: mss, diff --git a/markets/retrievaladapter/client_blockstore.go b/markets/retrievaladapter/client_blockstore.go index 212b47758af..37a80013aec 100644 --- a/markets/retrievaladapter/client_blockstore.go +++ b/markets/retrievaladapter/client_blockstore.go @@ -77,7 +77,7 @@ func (a *APIBlockstoreAccessor) Done(id retrievalmarket.DealID) error { return a.sub.Done(id) } -func (a *APIBlockstoreAccessor) UseRetrievalStore(id retrievalmarket.DealID, sid api.RemoteStoreID) error { +func (a *APIBlockstoreAccessor) RegisterDealToRetrievalStore(id retrievalmarket.DealID, sid api.RemoteStoreID) error { a.accessLk.Lock() defer a.accessLk.Unlock() diff --git a/node/impl/client/client.go b/node/impl/client/client.go index ea2dd2c0d44..c29d390364e 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -849,7 +849,7 @@ func (a *API) doRetrieval(ctx context.Context, order api.RetrievalOrder, sel dat id := a.Retrieval.NextID() if order.RemoteStore != nil { - if err := a.ApiBlockstoreAccessor.UseRetrievalStore(id, *order.RemoteStore); err != nil { + if err := a.ApiBlockstoreAccessor.RegisterDealToRetrievalStore(id, *order.RemoteStore); err != nil { return 0, xerrors.Errorf("registering api store: %w", err) } } @@ -1030,6 +1030,7 @@ func (a *API) outputCAR(ctx context.Context, ds format.DAGService, bs bstore.Blo root, dagSpec.selector, func(node format.Node) error { + // if we're exporting merkle proofs for this dag, export all nodes read by the traversal if dagSpec.exportAll { lk.Lock() defer lk.Unlock()