Skip to content

Commit

Permalink
feat(storageminer): also list data transfers with no id
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahhoward committed Dec 22, 2021
1 parent 8a923fb commit 1078dff
Showing 1 changed file with 58 additions and 10 deletions.
68 changes: 58 additions & 10 deletions node/impl/storminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,13 +569,38 @@ func (sm *StorageMinerAPI) MarketDataTransferDiagnostics(ctx context.Context, mp
if !ok {
return nil, errors.New("api only works for non-mock graphsync implementation")
}

inProgressChannels, err := sm.DataTransfer.InProgressChannels(ctx)
if err != nil {
return nil, err
}

allReceivingChannels := make(map[datatransfer.ChannelID]datatransfer.ChannelState)
allSendingChannels := make(map[datatransfer.ChannelID]datatransfer.ChannelState)
for channelID, channel := range inProgressChannels {
if channel.OtherPeer() != mpid {
continue
}
if channel.Status() == datatransfer.Completed {
continue
}
if channel.Status() == datatransfer.Failed || channel.Status() == datatransfer.Cancelled {
continue
}
if channel.SelfPeer() == channel.Sender() {
allSendingChannels[channelID] = channel
} else {
allReceivingChannels[channelID] = channel
}
}

// gather information about active transport channels
transportChannels := gsTransport.ChannelsForPeer(mpid)
// gather information about graphsync state for peer
gsPeerState := graphsyncConcrete.PeerState(mpid)

sendingTransfers := sm.generateTransfers(ctx, transportChannels.SendingChannels, gsPeerState.IncomingState)
receivingTransfers := sm.generateTransfers(ctx, transportChannels.ReceivingChannels, gsPeerState.OutgoingState)
sendingTransfers := sm.generateTransfers(ctx, transportChannels.SendingChannels, gsPeerState.IncomingState, allSendingChannels)
receivingTransfers := sm.generateTransfers(ctx, transportChannels.ReceivingChannels, gsPeerState.OutgoingState, allReceivingChannels)

return &api.TransferDiagnostics{
SendingTransfers: sendingTransfers,
Expand All @@ -587,11 +612,14 @@ func (sm *StorageMinerAPI) MarketDataTransferDiagnostics(ctx context.Context, mp
// to produce detailed output on what's happening with a transfer
func (sm *StorageMinerAPI) generateTransfers(ctx context.Context,
transportChannels map[datatransfer.ChannelID]gst.ChannelGraphsyncRequests,
gsPeerState peerstate.PeerState) []*api.GraphSyncDataTransfer {
gsPeerState peerstate.PeerState,
allChannels map[datatransfer.ChannelID]datatransfer.ChannelState) []*api.GraphSyncDataTransfer {
tc := &transferConverter{
matchedRequests: make(map[graphsync.RequestID]*api.GraphSyncDataTransfer),
gsDiagnostics: gsPeerState.Diagnostics(),
requestStates: gsPeerState.RequestStates,
matchedChannelIds: make(map[datatransfer.ChannelID]struct{}),
matchedRequests: make(map[graphsync.RequestID]*api.GraphSyncDataTransfer),
gsDiagnostics: gsPeerState.Diagnostics(),
requestStates: gsPeerState.RequestStates,
allChannels: allChannels,
}

// iterate through all operating data transfer transport channels
Expand Down Expand Up @@ -620,10 +648,12 @@ func (sm *StorageMinerAPI) generateTransfers(ctx context.Context,
}

type transferConverter struct {
matchedRequests map[graphsync.RequestID]*api.GraphSyncDataTransfer
transfers []*api.GraphSyncDataTransfer
gsDiagnostics map[graphsync.RequestID][]string
requestStates graphsync.RequestStates
matchedChannelIds map[datatransfer.ChannelID]struct{}
matchedRequests map[graphsync.RequestID]*api.GraphSyncDataTransfer
transfers []*api.GraphSyncDataTransfer
gsDiagnostics map[graphsync.RequestID][]string
requestStates graphsync.RequestStates
allChannels map[datatransfer.ChannelID]datatransfer.ChannelState
}

// convert transfer assembles transfer and diagnostic data for a given graphsync/data-transfer request
Expand Down Expand Up @@ -657,6 +687,9 @@ func (tc *transferConverter) convertTransfer(channelID datatransfer.ChannelID, h
}
tc.transfers = append(tc.transfers, transfer)
tc.matchedRequests[requestID] = transfer
if hasChannelID {
tc.matchedChannelIds[channelID] = struct{}{}
}
}

func (tc *transferConverter) collectRemainingTransfers() {
Expand All @@ -670,6 +703,21 @@ func (tc *transferConverter) collectRemainingTransfers() {
tc.convertTransfer(datatransfer.ChannelID{}, false, nil, nil, requestID, false)
}
}
for channelID, channelState := range tc.allChannels {
if _, ok := tc.matchedChannelIds[channelID]; !ok {
channelID := channelID
cs := api.NewDataTransferChannel(channelState.SelfPeer(), channelState)
transfer := &api.GraphSyncDataTransfer{
RequestID: graphsync.RequestID(-1),
RequestState: "graphsync state unknown",
IsCurrentChannelRequest: false,
ChannelID: &channelID,
ChannelState: &cs,
Diagnostics: []string{"data transfer with no open transport channel, cannot determine linked graphsync request"},
}
tc.transfers = append(tc.transfers, transfer)
}
}
}

func (sm *StorageMinerAPI) MarketPendingDeals(ctx context.Context) (api.PendingDealInfo, error) {
Expand Down

0 comments on commit 1078dff

Please sign in to comment.