diff --git a/gql/resolver.go b/gql/resolver.go index 51c464a81..c11f1fae6 100644 --- a/gql/resolver.go +++ b/gql/resolver.go @@ -89,7 +89,7 @@ func (r *resolver) Deal(ctx context.Context, args struct{ ID graphql.ID }) (*dea return nil, err } - return newDealResolver(deal, r.dealsDB, r.logsDB), nil + return newDealResolver(deal, r.dealsDB, r.logsDB, r.spApi), nil } type dealsArgs struct { @@ -117,7 +117,7 @@ func (r *resolver) Deals(ctx context.Context, args dealsArgs) (*dealListResolver resolvers := make([]*dealResolver, 0, len(deals)) for _, deal := range deals { - resolvers = append(resolvers, newDealResolver(&deal, r.dealsDB, r.logsDB)) + resolvers = append(resolvers, newDealResolver(&deal, r.dealsDB, r.logsDB, r.spApi)) } return &dealListResolver{ @@ -154,7 +154,7 @@ func (r *resolver) DealUpdate(ctx context.Context, args struct{ ID graphql.ID }) select { case <-ctx.Done(): return nil, ctx.Err() - case net <- newDealResolver(deal, r.dealsDB, r.logsDB): + case net <- newDealResolver(deal, r.dealsDB, r.logsDB, r.spApi): } // Updates to deal state are broadcast on pubsub. Pipe these updates to the @@ -167,8 +167,11 @@ func (r *resolver) DealUpdate(ctx context.Context, args struct{ ID graphql.ID }) } return nil, xerrors.Errorf("%s: subscribing to deal updates: %w", args.ID, err) } - sub := &subLastUpdate{sub: dealUpdatesSub, dealsDB: r.dealsDB, logsDB: r.logsDB} - go sub.Pipe(ctx, net) + sub := &subLastUpdate{sub: dealUpdatesSub, dealsDB: r.dealsDB, logsDB: r.logsDB, spApi: r.spApi} + go func() { + sub.Pipe(ctx, net) // blocks until connection is closed + close(net) + }() return net, nil } @@ -203,7 +206,7 @@ func (r *resolver) DealNew(ctx context.Context) (<-chan *dealNewResolver, error) case evti := <-sub.Out(): // Pipe the deal to the new deal channel di := evti.(types.ProviderDealState) - rsv := newDealResolver(&di, r.dealsDB, r.logsDB) + rsv := newDealResolver(&di, r.dealsDB, r.logsDB, r.spApi) totalCount, err := r.dealsDB.Count(ctx) if err != nil { log.Errorf("getting total deal count: %w", err) @@ -293,14 +296,16 @@ type dealResolver struct { transferred uint64 dealsDB *db.DealsDB logsDB *db.LogsDB + spApi sealingpipeline.API } -func newDealResolver(deal *types.ProviderDealState, dealsDB *db.DealsDB, logsDB *db.LogsDB) *dealResolver { +func newDealResolver(deal *types.ProviderDealState, dealsDB *db.DealsDB, logsDB *db.LogsDB, spApi sealingpipeline.API) *dealResolver { return &dealResolver{ ProviderDealState: *deal, transferred: uint64(deal.NBytesReceived), dealsDB: dealsDB, logsDB: logsDB, + spApi: spApi, } } @@ -422,7 +427,7 @@ func (dr *dealResolver) Stage() string { return dr.ProviderDealState.Checkpoint.String() } -func (dr *dealResolver) Message() string { +func (dr *dealResolver) Message(ctx context.Context) string { switch dr.Checkpoint { case dealcheckpoints.Accepted: if dr.IsOffline { @@ -446,7 +451,7 @@ func (dr *dealResolver) Message() string { case dealcheckpoints.AddedPiece: return "Announcing" case dealcheckpoints.IndexedAndAnnounced: - return "Sealing" + return dr.sealingState(ctx) case dealcheckpoints.Complete: switch dr.Err { case "": @@ -459,6 +464,15 @@ func (dr *dealResolver) Message() string { return dr.ProviderDealState.Checkpoint.String() } +func (dr *dealResolver) sealingState(ctx context.Context) string { + si, err := dr.spApi.SectorsStatus(ctx, dr.SectorID, false) + if err != nil { + return "Sealer: Sealing" + } + + return "Sealer: " + string(si.State) +} + func (dr *dealResolver) Logs(ctx context.Context) ([]*logsResolver, error) { logs, err := dr.logsDB.Logs(ctx, dr.ProviderDealState.DealUuid) if err != nil { @@ -513,10 +527,11 @@ type subLastUpdate struct { sub event.Subscription dealsDB *db.DealsDB logsDB *db.LogsDB + spApi sealingpipeline.API } func (s *subLastUpdate) Pipe(ctx context.Context, net chan *dealResolver) { - // When the connection ends, unsubscribe + // When the connection ends, unsubscribe from deal update events defer s.sub.Close() var lastUpdate interface{} @@ -525,7 +540,11 @@ func (s *subLastUpdate) Pipe(ctx context.Context, net chan *dealResolver) { select { case <-ctx.Done(): return - case update := <-s.sub.Out(): + case update, ok := <-s.sub.Out(): + if !ok { + // Stop listening for updates when the subscription is closed + return + } lastUpdate = update } @@ -533,26 +552,33 @@ func (s *subLastUpdate) Pipe(ctx context.Context, net chan *dealResolver) { // updates that are queued up behind the first one, and only save // the very last select { - case update := <-s.sub.Out(): - lastUpdate = update + case update, ok := <-s.sub.Out(): + if ok { + lastUpdate = update + } default: } // Attempt to send the update to the network. If the network is // blocked, and another update arrives on the subscription, // override the latest update. + updates := s.sub.Out() loop: for { di := lastUpdate.(types.ProviderDealState) - rsv := newDealResolver(&di, s.dealsDB, s.logsDB) + rsv := newDealResolver(&di, s.dealsDB, s.logsDB, s.spApi) select { case <-ctx.Done(): return case net <- rsv: break loop - case update := <-s.sub.Out(): - lastUpdate = update + case update, ok := <-updates: + if ok { + lastUpdate = update + } else { + updates = nil + } } } } diff --git a/storagemarket/deal_execution.go b/storagemarket/deal_execution.go index edf281bd3..98c08dbdb 100644 --- a/storagemarket/deal_execution.go +++ b/storagemarket/deal_execution.go @@ -8,31 +8,28 @@ import ( "os" "time" - "github.com/filecoin-project/dagstore" - - "github.com/filecoin-project/go-fil-markets/piecestore" - - "github.com/filecoin-project/go-fil-markets/stores" - - acrypto "github.com/filecoin-project/go-state-types/crypto" - - "github.com/filecoin-project/boost/transport" - "github.com/filecoin-project/go-state-types/abi" - - "golang.org/x/xerrors" + sealing "github.com/filecoin-project/lotus/extern/storage-sealing" "github.com/filecoin-project/boost/storagemarket/types" - transporttypes "github.com/filecoin-project/boost/transport/types" - "github.com/filecoin-project/boost/storagemarket/types/dealcheckpoints" + "github.com/filecoin-project/boost/transport" + transporttypes "github.com/filecoin-project/boost/transport/types" + "github.com/filecoin-project/dagstore" "github.com/filecoin-project/go-commp-utils/writer" commcid "github.com/filecoin-project/go-fil-commcid" commp "github.com/filecoin-project/go-fil-commp-hashhash" + "github.com/filecoin-project/go-fil-markets/piecestore" + "github.com/filecoin-project/go-fil-markets/stores" "github.com/filecoin-project/go-padreader" + "github.com/filecoin-project/go-state-types/abi" + acrypto "github.com/filecoin-project/go-state-types/crypto" + lapi "github.com/filecoin-project/lotus/api" + "github.com/google/uuid" "github.com/ipfs/go-cid" carv2 "github.com/ipld/go-car/v2" "github.com/libp2p/go-eventbus" "github.com/libp2p/go-libp2p-core/event" + "golang.org/x/xerrors" ) const ( @@ -111,10 +108,13 @@ func (p *Provider) doDeal(deal *types.ProviderDealState, dh *dealHandler) { // to wait for deal completion/slashing and update the state in DB accordingly. p.cleanupDealLogged(deal) p.dealLogger.Infow(deal.DealUuid, "finished deal cleanup after successful execution") + + // Watch the sealing status of the deal and fire events for each change + p.fireSealingUpdateEvents(dh, pub, deal.DealUuid, deal.SectorID) + p.cleanupDealHandler(deal.DealUuid) + // TODO // Watch deal on chain and change state in DB and emit notifications. - // Given that cleanup deal above also gets rid of the deal handler, subscriptions to deal updates from here on - // will fail, we can look into it when we implement deal completion. } func (p *Provider) execDealUptoAddPiece(ctx context.Context, pub event.Emitter, deal *types.ProviderDealState, dh *dealHandler) *dealMakingError { @@ -596,6 +596,84 @@ func (p *Provider) indexAndAnnounce(ctx context.Context, pub event.Emitter, deal return p.updateCheckpoint(pub, deal, dealcheckpoints.IndexedAndAnnounced) } +// fireSealingUpdateEvents periodically checks the sealing status of the deal +// and fires events for each change +func (p *Provider) fireSealingUpdateEvents(dh *dealHandler, pub event.Emitter, dealUuid uuid.UUID, sectorNum abi.SectorNumber) { + var lastSealingState lapi.SectorState + checkStatus := func(force bool) lapi.SectorState { + // To avoid overloading the sealing service, only get the sector status + // if there's at least one subscriber to the event that will be published + if !force && !dh.hasActiveSubscribers() { + return "" + } + + // Get the sector status + si, err := p.sps.SectorsStatus(p.ctx, sectorNum, false) + if err == nil && si.State != lastSealingState { + lastSealingState = si.State + + // Sector status has changed, fire an update event + deal, err := p.dealsDB.ByID(p.ctx, dealUuid) + if err != nil { + log.Errorf("getting deal %s with sealing update: %w", dealUuid, err) + return si.State + } + + p.fireEventDealUpdate(pub, deal) + } + return si.State + } + + // Check status immediately + state := checkStatus(true) + if isFinalSealingState(state) { + return + } + + // Check status every second + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + count := 0 + forceCount := 60 + for { + select { + case <-p.ctx.Done(): + return + case <-ticker.C: + count++ + // Force a status check every forceCount seconds, even if there + // are no subscribers (so that we can stop checking altogether + // if the sector reaches a final sealing state) + state := checkStatus(count >= forceCount) + if count >= forceCount { + count = 0 + } + + if isFinalSealingState(state) { + return + } + } + } +} + +func isFinalSealingState(state lapi.SectorState) bool { + switch sealing.SectorState(state) { + case + sealing.Proving, + sealing.Available, + sealing.UpdateActivating, + sealing.ReleaseSectorKey, + sealing.Removed, + sealing.Removing, + sealing.Terminating, + sealing.TerminateWait, + sealing.TerminateFinality, + sealing.TerminateFailed: + return true + } + return false +} + func (p *Provider) failDeal(pub event.Emitter, deal *types.ProviderDealState, err error) { // Update state in DB with error deal.Checkpoint = dealcheckpoints.Complete @@ -631,12 +709,8 @@ func (p *Provider) cleanupDeal(deal *types.ProviderDealState) { _ = os.Remove(deal.InboundFilePath) } - // close and clean up the deal handler - dh := p.getDealHandler(deal.DealUuid) - if dh != nil { - dh.transferCancelled(errors.New("deal cleaned up")) - dh.close() - p.delDealHandler(deal.DealUuid) + if deal.Checkpoint == dealcheckpoints.Complete { + p.cleanupDealHandler(deal.DealUuid) } done := make(chan struct{}, 1) @@ -655,6 +729,18 @@ func (p *Provider) cleanupDeal(deal *types.ProviderDealState) { } } +// cleanupDealHandler closes and cleans up the deal handler +func (p *Provider) cleanupDealHandler(dealUuid uuid.UUID) { + dh := p.getDealHandler(dealUuid) + if dh == nil { + return + } + + dh.transferCancelled(errors.New("deal cleaned up")) + dh.close() + p.delDealHandler(dealUuid) +} + func (p *Provider) fireEventDealNew(deal *types.ProviderDealState) { if err := p.newDealPS.NewDeals.Emit(*deal); err != nil { p.dealLogger.Warnw(deal.DealUuid, "publishing new deal event", "err", err.Error()) diff --git a/storagemarket/deal_handler.go b/storagemarket/deal_handler.go index e40b03902..fee705711 100644 --- a/storagemarket/deal_handler.go +++ b/storagemarket/deal_handler.go @@ -31,14 +31,74 @@ type dealHandler struct { transferMu sync.Mutex transferFinished bool transferErr error + + activeSubsLk sync.RWMutex + activeSubs map[*updatesSubscription]struct{} +} + +func newDealHandler(ctx context.Context, dealUuid uuid.UUID) *dealHandler { + // Create a deal handler + bus := eventbus.NewBus() + + transferCtx, cancel := context.WithCancel(ctx) + return &dealHandler{ + providerCtx: ctx, + dealUuid: dealUuid, + bus: bus, + + transferCtx: transferCtx, + transferCancel: cancel, + transferDone: make(chan error, 1), + + activeSubs: make(map[*updatesSubscription]struct{}), + } +} + +// updatesSubscription wraps event.Subscription so that we can add an onClose +// callback +type updatesSubscription struct { + event.Subscription + onClose func(*updatesSubscription) } +func (s *updatesSubscription) Close() error { + s.onClose(s) + return s.Subscription.Close() +} + +// subscribeUpdates subscribes to deal status updates func (d *dealHandler) subscribeUpdates() (event.Subscription, error) { sub, err := d.bus.Subscribe(new(types.ProviderDealState), eventbus.BufSize(256)) if err != nil { return nil, fmt.Errorf("failed to create deal update subscriber to %s: %w", d.dealUuid, err) } - return sub, nil + + // create an updatesSubscription that will delete itself from the map of + // all update subscriptions when it is closed + updatesSub := &updatesSubscription{ + Subscription: sub, + onClose: func(s *updatesSubscription) { + d.activeSubsLk.Lock() + defer d.activeSubsLk.Unlock() + delete(d.activeSubs, s) + }, + } + + // Add the updatesSubscription to the map of all update subscriptions + d.activeSubsLk.Lock() + defer d.activeSubsLk.Unlock() + d.activeSubs[updatesSub] = struct{}{} + + return updatesSub, nil +} + +// hasActiveSubscribers indicates if anyone is subscribed to updates. +// This is useful if we want to check if anyone is listening before doing an +// expensive operation to publish an event. +func (d *dealHandler) hasActiveSubscribers() bool { + d.activeSubsLk.RLock() + defer d.activeSubsLk.RUnlock() + return len(d.activeSubs) > 0 } // TransferCancelledByUser returns true if the user explicitly cancelled the transfer by calling `dealhandler.cancelTransfer()` diff --git a/storagemarket/provider.go b/storagemarket/provider.go index 28e668c2e..182d4ec2a 100644 --- a/storagemarket/provider.go +++ b/storagemarket/provider.go @@ -397,19 +397,7 @@ func (p *Provider) checkForDealAcceptance(ds *types.ProviderDealState, dh *dealH } func (p *Provider) mkAndInsertDealHandler(dealUuid uuid.UUID) *dealHandler { - // Create a deal handler - bus := eventbus.NewBus() - - transferCtx, cancel := context.WithCancel(p.ctx) - dh := &dealHandler{ - providerCtx: p.ctx, - dealUuid: dealUuid, - bus: bus, - - transferCtx: transferCtx, - transferCancel: cancel, - transferDone: make(chan error, 1), - } + dh := newDealHandler(p.ctx, dealUuid) p.dhsMu.Lock() defer p.dhsMu.Unlock() @@ -457,18 +445,12 @@ func (p *Provider) Start() ([]*dealHandler, error) { var dhs []*dealHandler for _, d := range pds { d := d - if d.Checkpoint >= dealcheckpoints.IndexedAndAnnounced { - continue - } dh := p.mkAndInsertDealHandler(d.DealUuid) p.wg.Add(1) dhs = append(dhs, dh) go func() { - defer func() { - p.wg.Done() - log.Infow("finished running deal", "id", d.DealUuid) - }() + defer p.wg.Done() // If it's an offline deal, and the deal data hasn't yet been // imported, just wait for the SP operator to import the data @@ -477,8 +459,17 @@ func (p *Provider) Start() ([]*dealHandler, error) { return } + // Check if deal is already proving + if d.Checkpoint >= dealcheckpoints.IndexedAndAnnounced { + si, err := p.sps.SectorsStatus(p.ctx, d.SectorID, false) + if err != nil || isFinalSealingState(si.State) { + return + } + } + p.dealLogger.Infow(d.DealUuid, "resuming deal on boost restart", "checkpoint on resumption", d.Checkpoint.String()) p.doDeal(d, dh) + log.Infow("finished running deal", "id", d.DealUuid) }() } @@ -536,7 +527,7 @@ func (p *Provider) SubscribeNewDeals() (event.Subscription, error) { return p.newDealPS.subscribe() } -// SubscribeNewDeals subscribes to updates to a deal +// SubscribeDealUpdates subscribes to updates to a deal func (p *Provider) SubscribeDealUpdates(dealUuid uuid.UUID) (event.Subscription, error) { dh := p.getDealHandler(dealUuid) if dh == nil { diff --git a/storagemarket/provider_test.go b/storagemarket/provider_test.go index 8ff09c6b4..93db1d75e 100644 --- a/storagemarket/provider_test.go +++ b/storagemarket/provider_test.go @@ -16,19 +16,9 @@ import ( "testing" "time" - piecestoreimpl "github.com/filecoin-project/go-fil-markets/piecestore/impl" - "github.com/filecoin-project/go-fil-markets/shared_testutil" - ds "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" - - logging "github.com/ipfs/go-log/v2" - - "github.com/libp2p/go-libp2p-core/host" - - "golang.org/x/sync/errgroup" - "github.com/filecoin-project/boost/db" "github.com/filecoin-project/boost/fundmanager" + mock_sealingpipeline "github.com/filecoin-project/boost/sealingpipeline/mock" "github.com/filecoin-project/boost/storagemanager" "github.com/filecoin-project/boost/storagemarket/smtestutil" "github.com/filecoin-project/boost/storagemarket/types" @@ -36,25 +26,31 @@ import ( "github.com/filecoin-project/boost/testutil" "github.com/filecoin-project/boost/transport/httptransport" types2 "github.com/filecoin-project/boost/transport/types" - "github.com/filecoin-project/lotus/extern/sector-storage/storiface" - - mock_sealingpipeline "github.com/filecoin-project/boost/sealingpipeline/mock" "github.com/filecoin-project/go-address" + piecestoreimpl "github.com/filecoin-project/go-fil-markets/piecestore/impl" + "github.com/filecoin-project/go-fil-markets/shared_testutil" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" acrypto "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/lotus/api" lotusmocks "github.com/filecoin-project/lotus/api/mocks" + "github.com/filecoin-project/lotus/extern/sector-storage/storiface" + sealing "github.com/filecoin-project/lotus/extern/storage-sealing" "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/golang/mock/gomock" "github.com/google/uuid" "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + logging "github.com/ipfs/go-log/v2" carv2 "github.com/ipld/go-car/v2" "github.com/libp2p/go-libp2p-core/event" + "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) func TestSimpleDealHappy(t *testing.T) { @@ -95,6 +91,10 @@ func TestSimpleDealHappy(t *testing.T) { td.waitForAndAssert(t, ctx, dealcheckpoints.AddedPiece) harness.EventuallyAssertNoTagged(t, ctx) + // expect Proving event to be fired + err := td.waitForSealingState(api.SectorState(sealing.Proving)) + require.NoError(t, err) + // assert logs lgs, err := harness.Provider.logsDB.Logs(ctx, td.params.DealUUID) require.NoError(t, err) @@ -686,6 +686,9 @@ func NewHarness(t *testing.T, ctx context.Context, opts ...harnessOpt) *Provider ph.MockSealingPipelineAPI.EXPECT().SectorsSummary(gomock.Any()).Return(sealingpipelineStatus, nil).AnyTimes() + secInfo := api.SectorInfo{State: api.SectorState(sealing.Proving)} + ph.MockSealingPipelineAPI.EXPECT().SectorsStatus(gomock.Any(), gomock.Any(), false).Return(secInfo, nil).AnyTimes() + ph.DAGStore = dagStore return ph @@ -1089,6 +1092,28 @@ LOOP: return nil } +func (td *testDeal) waitForSealingState(secState api.SectorState) error { + if td.sub == nil { + return errors.New("no subcription for deal") + } + + for i := range td.sub.Out() { + st := i.(types.ProviderDealState) + if len(st.Err) != 0 { + return errors.New(st.Err) + } + si, err := td.ph.MockSealingPipelineAPI.SectorsStatus(context.Background(), st.SectorID, false) + if err != nil { + return err + } + if si.State == secState { + return nil + } + } + + return fmt.Errorf("did not reach sealing state %s", secState) +} + func (td *testDeal) updateWithRestartedProvider(ph *ProviderHarness) *testDealBuilder { old := td.stubOutput