diff --git a/blockstore/net.go b/blockstore/net.go index fa8b24591fb..c627cae09aa 100644 --- a/blockstore/net.go +++ b/blockstore/net.go @@ -19,12 +19,12 @@ import ( type NetRPCReqType byte const ( - NRpcHas NetRPCReqType = iota - NRpcGet NetRPCReqType = iota - NRpcGetSize NetRPCReqType = iota - NRpcPut NetRPCReqType = iota - NRpcDelete NetRPCReqType = iota - NRpcList NetRPCReqType = iota + NRpcHas NetRPCReqType = iota + NRpcGet + NRpcGetSize + NRpcPut + NRpcDelete + NRpcList // todo cancel req ) @@ -32,16 +32,16 @@ const ( type NetRPCRespType byte const ( - NRpcOK NetRPCRespType = iota - NRpcErr NetRPCRespType = iota - NRpcMore NetRPCRespType = iota + NRpcOK NetRPCRespType = iota + NRpcErr + NRpcMore ) type NetRPCErrType byte const ( - NRpcErrGeneric NetRPCErrType = iota - NRpcErrNotFound NetRPCErrType = iota + NRpcErrGeneric NetRPCErrType = iota + NRpcErrNotFound ) type NetRpcReq struct { @@ -87,7 +87,7 @@ type NetworkStore struct { closed chan struct{} closeLk sync.Mutex - onClose func() + onClose []func() } func NewNetworkStore(mss msgio.ReadWriteCloser) *NetworkStore { @@ -143,7 +143,7 @@ func (n *NetworkStore) OnClose(cb func()) { case <-n.closed: cb() default: - n.onClose = cb + n.onClose = append(n.onClose, cb) } } @@ -154,7 +154,9 @@ func (n *NetworkStore) receive() { close(n.closed) if n.onClose != nil { - n.onClose() + for _, f := range n.onClose { + f() + } } }() @@ -203,6 +205,7 @@ func (n *NetworkStore) sendRpc(rt NetRPCReqType, cids []cid.Cid, data [][]byte) n.respLk.Lock() if n.respMap == nil { + n.respLk.Unlock() return 0, nil, xerrors.Errorf("netstore closed") } n.respMap[rid] = respCh @@ -218,22 +221,24 @@ func (n *NetworkStore) sendRpc(rt NetRPCReqType, cids []cid.Cid, data [][]byte) var rbuf bytes.Buffer // todo buffer pool if err := req.MarshalCBOR(&rbuf); err != nil { n.respLk.Lock() + defer n.respLk.Unlock() + if n.respMap == nil { return 0, nil, xerrors.Errorf("netstore closed") } delete(n.respMap, rid) - n.respLk.Unlock() return 0, nil, err } if err := n.msgStream.WriteMsg(rbuf.Bytes()); err != nil { n.respLk.Lock() + defer n.respLk.Unlock() + if n.respMap == nil { return 0, nil, xerrors.Errorf("netstore closed") } delete(n.respMap, rid) - n.respLk.Unlock() return 0, nil, err } @@ -260,10 +265,10 @@ func (n *NetworkStore) waitResp(ctx context.Context, rch <-chan NetRpcResp, rid } else { err = xerrors.Errorf("block not found, but cid was null") } - default: - err = xerrors.Errorf("unknown error type") case NRpcErrGeneric: err = xerrors.Errorf("generic error") + default: + err = xerrors.Errorf("unknown error type") } return NetRpcResp{}, xerrors.Errorf("netstore error response: %s (%w)", e.Msg, err) diff --git a/blockstore/net_serve.go b/blockstore/net_serve.go index 25226c5c503..f2a63f3813b 100644 --- a/blockstore/net_serve.go +++ b/blockstore/net_serve.go @@ -19,6 +19,7 @@ type NetworkStoreHandler struct { bs Blockstore } +// NOTE: This code isn't yet hardened to accept untrusted input. See TODOs here and in net.go func HandleNetBstoreStream(ctx context.Context, bs Blockstore, mss msgio.ReadWriteCloser) *NetworkStoreHandler { ns := &NetworkStoreHandler{ msgStream: mss, diff --git a/node/impl/client/client.go b/node/impl/client/client.go index ea2dd2c0d44..25c8f8d9c50 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -1030,6 +1030,7 @@ func (a *API) outputCAR(ctx context.Context, ds format.DAGService, bs bstore.Blo root, dagSpec.selector, func(node format.Node) error { + // if we're exporting merkle proofs for this dag, export all nodes read by the traversal if dagSpec.exportAll { lk.Lock() defer lk.Unlock()