Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update UI when deal sealing state changes #395

Merged
merged 3 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 42 additions & 16 deletions gql/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (r *resolver) Deal(ctx context.Context, args struct{ ID graphql.ID }) (*dea
return nil, err
}

return newDealResolver(deal, r.dealsDB, r.logsDB), nil
return newDealResolver(deal, r.dealsDB, r.logsDB, r.spApi), nil
}

type dealsArgs struct {
Expand Down Expand Up @@ -117,7 +117,7 @@ func (r *resolver) Deals(ctx context.Context, args dealsArgs) (*dealListResolver

resolvers := make([]*dealResolver, 0, len(deals))
for _, deal := range deals {
resolvers = append(resolvers, newDealResolver(&deal, r.dealsDB, r.logsDB))
resolvers = append(resolvers, newDealResolver(&deal, r.dealsDB, r.logsDB, r.spApi))
}

return &dealListResolver{
Expand Down Expand Up @@ -154,7 +154,7 @@ func (r *resolver) DealUpdate(ctx context.Context, args struct{ ID graphql.ID })
select {
case <-ctx.Done():
return nil, ctx.Err()
case net <- newDealResolver(deal, r.dealsDB, r.logsDB):
case net <- newDealResolver(deal, r.dealsDB, r.logsDB, r.spApi):
}

// Updates to deal state are broadcast on pubsub. Pipe these updates to the
Expand All @@ -167,8 +167,11 @@ func (r *resolver) DealUpdate(ctx context.Context, args struct{ ID graphql.ID })
}
return nil, xerrors.Errorf("%s: subscribing to deal updates: %w", args.ID, err)
}
sub := &subLastUpdate{sub: dealUpdatesSub, dealsDB: r.dealsDB, logsDB: r.logsDB}
go sub.Pipe(ctx, net)
sub := &subLastUpdate{sub: dealUpdatesSub, dealsDB: r.dealsDB, logsDB: r.logsDB, spApi: r.spApi}
go func() {
sub.Pipe(ctx, net) // blocks until connection is closed
close(net)
}()

return net, nil
}
Expand Down Expand Up @@ -203,7 +206,7 @@ func (r *resolver) DealNew(ctx context.Context) (<-chan *dealNewResolver, error)
case evti := <-sub.Out():
// Pipe the deal to the new deal channel
di := evti.(types.ProviderDealState)
rsv := newDealResolver(&di, r.dealsDB, r.logsDB)
rsv := newDealResolver(&di, r.dealsDB, r.logsDB, r.spApi)
totalCount, err := r.dealsDB.Count(ctx)
if err != nil {
log.Errorf("getting total deal count: %w", err)
Expand Down Expand Up @@ -293,14 +296,16 @@ type dealResolver struct {
transferred uint64
dealsDB *db.DealsDB
logsDB *db.LogsDB
spApi sealingpipeline.API
}

func newDealResolver(deal *types.ProviderDealState, dealsDB *db.DealsDB, logsDB *db.LogsDB) *dealResolver {
func newDealResolver(deal *types.ProviderDealState, dealsDB *db.DealsDB, logsDB *db.LogsDB, spApi sealingpipeline.API) *dealResolver {
return &dealResolver{
ProviderDealState: *deal,
transferred: uint64(deal.NBytesReceived),
dealsDB: dealsDB,
logsDB: logsDB,
spApi: spApi,
}
}

Expand Down Expand Up @@ -422,7 +427,7 @@ func (dr *dealResolver) Stage() string {
return dr.ProviderDealState.Checkpoint.String()
}

func (dr *dealResolver) Message() string {
func (dr *dealResolver) Message(ctx context.Context) string {
switch dr.Checkpoint {
case dealcheckpoints.Accepted:
if dr.IsOffline {
Expand All @@ -446,7 +451,7 @@ func (dr *dealResolver) Message() string {
case dealcheckpoints.AddedPiece:
return "Announcing"
case dealcheckpoints.IndexedAndAnnounced:
return "Sealing"
return dr.sealingState(ctx)
case dealcheckpoints.Complete:
switch dr.Err {
case "":
Expand All @@ -459,6 +464,15 @@ func (dr *dealResolver) Message() string {
return dr.ProviderDealState.Checkpoint.String()
}

func (dr *dealResolver) sealingState(ctx context.Context) string {
si, err := dr.spApi.SectorsStatus(ctx, dr.SectorID, false)
if err != nil {
return "Sealer: Sealing"
}

return "Sealer: " + string(si.State)
}

func (dr *dealResolver) Logs(ctx context.Context) ([]*logsResolver, error) {
logs, err := dr.logsDB.Logs(ctx, dr.ProviderDealState.DealUuid)
if err != nil {
Expand Down Expand Up @@ -513,10 +527,11 @@ type subLastUpdate struct {
sub event.Subscription
dealsDB *db.DealsDB
logsDB *db.LogsDB
spApi sealingpipeline.API
}

func (s *subLastUpdate) Pipe(ctx context.Context, net chan *dealResolver) {
// When the connection ends, unsubscribe
// When the connection ends, unsubscribe from deal update events
defer s.sub.Close()

var lastUpdate interface{}
Expand All @@ -525,34 +540,45 @@ func (s *subLastUpdate) Pipe(ctx context.Context, net chan *dealResolver) {
select {
case <-ctx.Done():
return
case update := <-s.sub.Out():
case update, ok := <-s.sub.Out():
if !ok {
// Stop listening for updates when the subscription is closed
return
}
lastUpdate = update
}

// Each update supersedes the one before it, so read all pending
// updates that are queued up behind the first one, and only save
// the very last
select {
case update := <-s.sub.Out():
lastUpdate = update
case update, ok := <-s.sub.Out():
if ok {
lastUpdate = update
}
default:
}

// Attempt to send the update to the network. If the network is
// blocked, and another update arrives on the subscription,
// override the latest update.
updates := s.sub.Out()
loop:
for {
di := lastUpdate.(types.ProviderDealState)
rsv := newDealResolver(&di, s.dealsDB, s.logsDB)
rsv := newDealResolver(&di, s.dealsDB, s.logsDB, s.spApi)

select {
case <-ctx.Done():
return
case net <- rsv:
break loop
case update := <-s.sub.Out():
lastUpdate = update
case update, ok := <-updates:
if ok {
lastUpdate = update
} else {
updates = nil
}
}
}
}
Expand Down
130 changes: 108 additions & 22 deletions storagemarket/deal_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,28 @@ import (
"os"
"time"

"github.com/filecoin-project/dagstore"

"github.com/filecoin-project/go-fil-markets/piecestore"

"github.com/filecoin-project/go-fil-markets/stores"

acrypto "github.com/filecoin-project/go-state-types/crypto"

"github.com/filecoin-project/boost/transport"
"github.com/filecoin-project/go-state-types/abi"

"golang.org/x/xerrors"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"

"github.com/filecoin-project/boost/storagemarket/types"
transporttypes "github.com/filecoin-project/boost/transport/types"

"github.com/filecoin-project/boost/storagemarket/types/dealcheckpoints"
"github.com/filecoin-project/boost/transport"
transporttypes "github.com/filecoin-project/boost/transport/types"
"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/go-commp-utils/writer"
commcid "github.com/filecoin-project/go-fil-commcid"
commp "github.com/filecoin-project/go-fil-commp-hashhash"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/stores"
"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/abi"
acrypto "github.com/filecoin-project/go-state-types/crypto"
lapi "github.com/filecoin-project/lotus/api"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
carv2 "github.com/ipld/go-car/v2"
"github.com/libp2p/go-eventbus"
"github.com/libp2p/go-libp2p-core/event"
"golang.org/x/xerrors"
)

const (
Expand Down Expand Up @@ -111,10 +108,13 @@ func (p *Provider) doDeal(deal *types.ProviderDealState, dh *dealHandler) {
// to wait for deal completion/slashing and update the state in DB accordingly.
p.cleanupDealLogged(deal)
p.dealLogger.Infow(deal.DealUuid, "finished deal cleanup after successful execution")

// Watch the sealing status of the deal and fire events for each change
p.fireSealingUpdateEvents(dh, pub, deal.DealUuid, deal.SectorID)
p.cleanupDealHandler(deal.DealUuid)

// TODO
// Watch deal on chain and change state in DB and emit notifications.
// Given that cleanup deal above also gets rid of the deal handler, subscriptions to deal updates from here on
// will fail, we can look into it when we implement deal completion.
}

func (p *Provider) execDealUptoAddPiece(ctx context.Context, pub event.Emitter, deal *types.ProviderDealState, dh *dealHandler) *dealMakingError {
Expand Down Expand Up @@ -596,6 +596,84 @@ func (p *Provider) indexAndAnnounce(ctx context.Context, pub event.Emitter, deal
return p.updateCheckpoint(pub, deal, dealcheckpoints.IndexedAndAnnounced)
}

// fireSealingUpdateEvents periodically checks the sealing status of the deal
// and fires events for each change
func (p *Provider) fireSealingUpdateEvents(dh *dealHandler, pub event.Emitter, dealUuid uuid.UUID, sectorNum abi.SectorNumber) {
var lastSealingState lapi.SectorState
aarshkshah1992 marked this conversation as resolved.
Show resolved Hide resolved
checkStatus := func(force bool) lapi.SectorState {
// To avoid overloading the sealing service, only get the sector status
// if there's at least one subscriber to the event that will be published
if !force && !dh.hasActiveSubscribers() {
return ""
}

// Get the sector status
si, err := p.sps.SectorsStatus(p.ctx, sectorNum, false)
if err == nil && si.State != lastSealingState {
lastSealingState = si.State

// Sector status has changed, fire an update event
deal, err := p.dealsDB.ByID(p.ctx, dealUuid)
if err != nil {
log.Errorf("getting deal %s with sealing update: %w", dealUuid, err)
return si.State
}

p.fireEventDealUpdate(pub, deal)
}
return si.State
}

// Check status immediately
state := checkStatus(true)
if isFinalSealingState(state) {
return
}

// Check status every second
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
count := 0
forceCount := 60
for {
select {
case <-p.ctx.Done():
return
case <-ticker.C:
count++
// Force a status check every forceCount seconds, even if there
// are no subscribers (so that we can stop checking altogether
// if the sector reaches a final sealing state)
state := checkStatus(count >= forceCount)
if count >= forceCount {
count = 0
}

if isFinalSealingState(state) {
return
}
}
}
}

func isFinalSealingState(state lapi.SectorState) bool {
switch sealing.SectorState(state) {
case
sealing.Proving,
sealing.Available,
sealing.UpdateActivating,
sealing.ReleaseSectorKey,
sealing.Removed,
sealing.Removing,
sealing.Terminating,
sealing.TerminateWait,
sealing.TerminateFinality,
sealing.TerminateFailed:
return true
}
return false
}

func (p *Provider) failDeal(pub event.Emitter, deal *types.ProviderDealState, err error) {
// Update state in DB with error
deal.Checkpoint = dealcheckpoints.Complete
Expand Down Expand Up @@ -631,12 +709,8 @@ func (p *Provider) cleanupDeal(deal *types.ProviderDealState) {
_ = os.Remove(deal.InboundFilePath)
}

// close and clean up the deal handler
dh := p.getDealHandler(deal.DealUuid)
if dh != nil {
dh.transferCancelled(errors.New("deal cleaned up"))
dh.close()
p.delDealHandler(deal.DealUuid)
if deal.Checkpoint == dealcheckpoints.Complete {
p.cleanupDealHandler(deal.DealUuid)
}

done := make(chan struct{}, 1)
Expand All @@ -655,6 +729,18 @@ func (p *Provider) cleanupDeal(deal *types.ProviderDealState) {
}
}

// cleanupDealHandler closes and cleans up the deal handler
func (p *Provider) cleanupDealHandler(dealUuid uuid.UUID) {
dh := p.getDealHandler(dealUuid)
if dh == nil {
return
}

dh.transferCancelled(errors.New("deal cleaned up"))
dh.close()
p.delDealHandler(dealUuid)
}

func (p *Provider) fireEventDealNew(deal *types.ProviderDealState) {
if err := p.newDealPS.NewDeals.Emit(*deal); err != nil {
p.dealLogger.Warnw(deal.DealUuid, "publishing new deal event", "err", err.Error())
Expand Down
Loading