Skip to content

Commit

Permalink
netbs: Address review
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Nov 7, 2022
1 parent a47fdbb commit 3545e80
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 20 deletions.
41 changes: 23 additions & 18 deletions blockstore/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,29 @@ import (
type NetRPCReqType byte

const (
NRpcHas NetRPCReqType = iota
NRpcGet NetRPCReqType = iota
NRpcGetSize NetRPCReqType = iota
NRpcPut NetRPCReqType = iota
NRpcDelete NetRPCReqType = iota
NRpcList NetRPCReqType = iota
NRpcHas NetRPCReqType = iota
NRpcGet
NRpcGetSize
NRpcPut
NRpcDelete
NRpcList

// todo cancel req
)

type NetRPCRespType byte

const (
NRpcOK NetRPCRespType = iota
NRpcErr NetRPCRespType = iota
NRpcMore NetRPCRespType = iota
NRpcOK NetRPCRespType = iota
NRpcErr
NRpcMore
)

type NetRPCErrType byte

const (
NRpcErrGeneric NetRPCErrType = iota
NRpcErrNotFound NetRPCErrType = iota
NRpcErrGeneric NetRPCErrType = iota
NRpcErrNotFound
)

type NetRpcReq struct {
Expand Down Expand Up @@ -87,7 +87,7 @@ type NetworkStore struct {
closed chan struct{}

closeLk sync.Mutex
onClose func()
onClose []func()
}

func NewNetworkStore(mss msgio.ReadWriteCloser) *NetworkStore {
Expand Down Expand Up @@ -143,7 +143,7 @@ func (n *NetworkStore) OnClose(cb func()) {
case <-n.closed:
cb()
default:
n.onClose = cb
n.onClose = append(n.onClose, cb)
}
}

Expand All @@ -154,7 +154,9 @@ func (n *NetworkStore) receive() {

close(n.closed)
if n.onClose != nil {
n.onClose()
for _, f := range n.onClose {
f()
}
}
}()

Expand Down Expand Up @@ -203,6 +205,7 @@ func (n *NetworkStore) sendRpc(rt NetRPCReqType, cids []cid.Cid, data [][]byte)

n.respLk.Lock()
if n.respMap == nil {
n.respLk.Unlock()
return 0, nil, xerrors.Errorf("netstore closed")
}
n.respMap[rid] = respCh
Expand All @@ -218,22 +221,24 @@ func (n *NetworkStore) sendRpc(rt NetRPCReqType, cids []cid.Cid, data [][]byte)
var rbuf bytes.Buffer // todo buffer pool
if err := req.MarshalCBOR(&rbuf); err != nil {
n.respLk.Lock()
defer n.respLk.Unlock()

if n.respMap == nil {
return 0, nil, xerrors.Errorf("netstore closed")
}
delete(n.respMap, rid)
n.respLk.Unlock()

return 0, nil, err
}

if err := n.msgStream.WriteMsg(rbuf.Bytes()); err != nil {
n.respLk.Lock()
defer n.respLk.Unlock()

if n.respMap == nil {
return 0, nil, xerrors.Errorf("netstore closed")
}
delete(n.respMap, rid)
n.respLk.Unlock()

return 0, nil, err
}
Expand All @@ -260,10 +265,10 @@ func (n *NetworkStore) waitResp(ctx context.Context, rch <-chan NetRpcResp, rid
} else {
err = xerrors.Errorf("block not found, but cid was null")
}
default:
err = xerrors.Errorf("unknown error type")
case NRpcErrGeneric:
err = xerrors.Errorf("generic error")
default:
err = xerrors.Errorf("unknown error type")
}

return NetRpcResp{}, xerrors.Errorf("netstore error response: %s (%w)", e.Msg, err)
Expand Down
1 change: 1 addition & 0 deletions blockstore/net_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type NetworkStoreHandler struct {
bs Blockstore
}

// NOTE: This code isn't yet hardened to accept untrusted input. See TODOs here and in net.go
func HandleNetBstoreStream(ctx context.Context, bs Blockstore, mss msgio.ReadWriteCloser) *NetworkStoreHandler {
ns := &NetworkStoreHandler{
msgStream: mss,
Expand Down
2 changes: 1 addition & 1 deletion markets/retrievaladapter/client_blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (a *APIBlockstoreAccessor) Done(id retrievalmarket.DealID) error {
return a.sub.Done(id)
}

func (a *APIBlockstoreAccessor) UseRetrievalStore(id retrievalmarket.DealID, sid api.RemoteStoreID) error {
func (a *APIBlockstoreAccessor) RegisterDealToRetrievalStore(id retrievalmarket.DealID, sid api.RemoteStoreID) error {
a.accessLk.Lock()
defer a.accessLk.Unlock()

Expand Down
3 changes: 2 additions & 1 deletion node/impl/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ func (a *API) doRetrieval(ctx context.Context, order api.RetrievalOrder, sel dat
id := a.Retrieval.NextID()

if order.RemoteStore != nil {
if err := a.ApiBlockstoreAccessor.UseRetrievalStore(id, *order.RemoteStore); err != nil {
if err := a.ApiBlockstoreAccessor.RegisterDealToRetrievalStore(id, *order.RemoteStore); err != nil {
return 0, xerrors.Errorf("registering api store: %w", err)
}
}
Expand Down Expand Up @@ -1030,6 +1030,7 @@ func (a *API) outputCAR(ctx context.Context, ds format.DAGService, bs bstore.Blo
root,
dagSpec.selector,
func(node format.Node) error {
// if we're exporting merkle proofs for this dag, export all nodes read by the traversal
if dagSpec.exportAll {
lk.Lock()
defer lk.Unlock()
Expand Down

0 comments on commit 3545e80

Please sign in to comment.