Skip to content

Commit

Permalink
Make market deals work (#289)
Browse files Browse the repository at this point in the history
* webui: Ask UI

* webui market: Prices in tib/mo

* webui market: Fix price calc

* webui market: Working ask setting

* fix libp2p safehandle

* more fixes to get deals working

* fix market navlink

* last fix to create_indexing_task
  • Loading branch information
magik6k authored Oct 18, 2024
1 parent d97e21d commit 6297fc2
Show file tree
Hide file tree
Showing 16 changed files with 879 additions and 76 deletions.
6 changes: 3 additions & 3 deletions harmony/harmonydb/sql/20240731-market-migration.sql
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ CREATE TABLE market_mk12_deal_pipeline (

complete BOOLEAN NOT NULL DEFAULT FALSE,

created_at TIMESTAMPTZ NOT NULL DEFAULT TIMEZONE('UTC', NOW()),

constraint market_mk12_deal_pipeline_identity_key unique (uuid)
);

Expand All @@ -201,7 +203,7 @@ BEGIN
FROM
%I ssp
JOIN
market_mk12_deal_pipeline dp ON ssp.sp_id = dp.sp_id AND ssp.sector_num = dp.sector
market_mk12_deal_pipeline dp ON ssp.sp_id = dp.sp_id AND ssp.sector_number = dp.sector
WHERE
ssp.task_id_move_storage = $1', sealing_table);
ELSIF sealing_table = 'sectors_snap_pipeline' THEN
Expand Down Expand Up @@ -236,8 +238,6 @@ FOR pms IN EXECUTE query USING task_id

EXCEPTION
WHEN OTHERS THEN
-- Rollback the transaction and raise the exception for Go to catch
ROLLBACK;
RAISE EXCEPTION 'Failed to create indexing task: %', SQLERRM;
END;
$$ LANGUAGE plpgsql;
Expand Down
20 changes: 12 additions & 8 deletions market/libp2p/libp2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,13 +266,15 @@ const providerReadDeadline = 10 * time.Second
const providerWriteDeadline = 10 * time.Second

func SafeHandle(h network.StreamHandler) network.StreamHandler {
defer func() {
if r := recover(); r != nil {
netlog.Error("panic occurred", "stack", debug.Stack())
}
}()
return func(stream network.Stream) {
defer func() {
if r := recover(); r != nil {
netlog.Error("panic occurred\n", string(debug.Stack()))
}
}()

return h
h(stream)
}
}

// DealProvider listens for incoming deal proposals over libp2p
Expand Down Expand Up @@ -456,7 +458,7 @@ func (p *DealProvider) handleNewDealStream(s network.Stream) {
reqLog.Infow("received deal proposal")
startExec := time.Now()

var res *mk12.ProviderDealRejectionInfo
var res mk12.ProviderDealRejectionInfo

if lo.Contains(p.disabledMiners, proposal.ClientDealProposal.Proposal.Provider) {
reqLog.Infow("Deal rejected as libp2p is disabled for provider", "deal", proposal.DealUUID, "provider", proposal.ClientDealProposal.Proposal.Provider)
Expand All @@ -466,11 +468,13 @@ func (p *DealProvider) handleNewDealStream(s network.Stream) {
// Start executing the deal.
// Note: This method just waits for the deal to be accepted, it doesn't
// wait for deal execution to complete.
res, err := p.prov.ExecuteDeal(context.Background(), &proposal, s.Conn().RemotePeer())
eres, err := p.prov.ExecuteDeal(context.Background(), &proposal, s.Conn().RemotePeer())
reqLog.Debugw("processed deal proposal accept")
if err != nil {
reqLog.Warnw("deal proposal failed", "err", err, "reason", res.Reason)
}

res = *eres
}

// Log the response
Expand Down
2 changes: 1 addition & 1 deletion market/mk12/mk12.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ func (m *MK12) processDeal(ctx context.Context, deal *ProviderDealState) (*Provi

_, err = tx.Exec(`INSERT INTO market_mk12_deal_pipeline (uuid, sp_id, piece_cid, piece_size, offline, url, raw_size, should_index, announce)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (uuid) DO NOTHING`,
deal.DealUuid.String(), mid, prop.PieceCID.String(), prop.PieceSize, deal.IsOffline, pieceIDUrl, deal.Transfer.Size,
deal.DealUuid.String(), mid, prop.PieceCID.String(), prop.PieceSize, deal.IsOffline, pieceIDUrl.String(), deal.Transfer.Size,
deal.FastRetrieval, deal.AnnounceToIPNI)
if err != nil {
return false, xerrors.Errorf("inserting deal into deal pipeline: %w", err)
Expand Down
3 changes: 3 additions & 0 deletions market/storageingest/deal_ingest_snap.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
)

const IdealEndEpochBuffer = 2 * builtin.EpochsInDay
const MaxEndEpochBufferUnverified = 180 * builtin.EpochsInDay

// assuming that snap takes up to 20min to get to submitting the message we want to avoid sectors from deadlines which will
// become immutable in the next 20min (40 epochs)
Expand Down Expand Up @@ -297,6 +298,8 @@ func (p *PieceIngesterSnap) AllocatePieceToSector(ctx context.Context, tx *harmo
vd.tmax = alloc.TermMax

maxExpiration = int64(head.Height() + alloc.TermMax)
} else {
maxExpiration = int64(piece.DealSchedule.EndEpoch) + MaxEndEpochBufferUnverified
}
propJson, err = json.Marshal(piece.PieceActivationManifest)
if err != nil {
Expand Down
110 changes: 69 additions & 41 deletions tasks/storage-market/storage_market.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,25 +64,33 @@ type CurioStorageDealMarket struct {
}

type MK12Pipeline struct {
UUID string `db:"uuid"`
SpID int64 `db:"sp_id"`
Started bool `db:"started"`
PieceCid string `db:"piece_cid"`
Offline bool `db:"offline"`
Downloaded bool `db:"downloaded"`
RawSize int64 `db:"raw_size"`
URL string `db:"url"`
Headers json.RawMessage `db:"headers"`

CommTaskID *int64 `db:"commp_task_id"`
AfterCommp bool `db:"after_commp"`
PSDWaitTime time.Time `db:"psd_wait_time"`
PSDTaskID *int64 `db:"psd_task_id"`
AfterPSD bool `db:"after_psd"`
FindDealTaskID *int64 `db:"find_deal_task_id"`
AfterFindDeal bool `db:"after_find_deal"`
Sector *int64 `db:"sector"`
Offset *int64 `db:"sector_offset"`
UUID string `db:"uuid"`
SpID int64 `db:"sp_id"`

// started after data download
Started bool `db:"started"`
PieceCid string `db:"piece_cid"`
Offline bool `db:"offline"` // data is not downloaded before starting the deal
RawSize int64 `db:"raw_size"`
URL *string `db:"url"`
Headers json.RawMessage `db:"headers"`

// commP task
CommTaskID *int64 `db:"commp_task_id"`
AfterCommp bool `db:"after_commp"`

// PSD task
PSDWaitTime *time.Time `db:"psd_wait_time"` // set in commp to now
PSDTaskID *int64 `db:"psd_task_id"`
AfterPSD bool `db:"after_psd"`

// Find Deal task (just looks at the chain for the deal ID)
FindDealTaskID *int64 `db:"find_deal_task_id"`
AfterFindDeal bool `db:"after_find_deal"`

// Sector the deal was assigned into
Sector *int64 `db:"sector"`
Offset *int64 `db:"sector_offset"`
}

func NewCurioStorageDealMarket(miners []address.Address, db *harmonydb.DB, cfg *config.CurioConfig, sc *ffi.SealCalls, mapi storageMarketAPI) *CurioStorageDealMarket {
Expand Down Expand Up @@ -208,12 +216,14 @@ func (d *CurioStorageDealMarket) processMK12Deals(ctx context.Context) {
p.after_psd as after_psd,
p.find_deal_task_id as find_deal_task_id,
p.after_find_deal as after_find_deal,
p.psd_wait_time as psd_wait_time
p.psd_wait_time as psd_wait_time,
p.sector as sector,
p.sector_offset as sector_offset
FROM
market_mk12_deal_pipeline p
LEFT JOIN
market_mk12_deals b ON p.uuid = b.uuid
WHERE p.started = TRUE
ORDER BY b.start_epoch ASC;`)

if err != nil {
Expand All @@ -232,7 +242,7 @@ func (d *CurioStorageDealMarket) processMK12Deals(ctx context.Context) {
deal := deal
err := d.processMk12Deal(ctx, deal)
if err != nil {
log.Errorf("%w", err)
log.Errorf("process deal: %s", err)
}
}
}
Expand All @@ -242,8 +252,8 @@ func (d *CurioStorageDealMarket) processMk12Deal(ctx context.Context, deal MK12P
// Try to mark the deal as started
if !deal.Started {
// Check if download is finished and update the deal state in DB
if deal.URL != "" {
goUrl, err := url.Parse(deal.URL)
if deal.URL != nil && *deal.URL != "" {
goUrl, err := url.Parse(*deal.URL)
if err != nil {
return xerrors.Errorf("UUID: %s parsing data URL: %w", deal.UUID, err)
}
Expand Down Expand Up @@ -271,7 +281,6 @@ func (d *CurioStorageDealMarket) processMk12Deal(ctx context.Context, deal MK12P
return xerrors.Errorf("failed to mark deal %s as started: %w", deal.UUID, err)
}
log.Infof("UUID: %s deal started successfully", deal.UUID)
return nil
}
}
} else {
Expand Down Expand Up @@ -317,6 +326,17 @@ func (d *CurioStorageDealMarket) processMk12Deal(ctx context.Context, deal MK12P

// Create Find Deal task
if deal.Started && deal.AfterCommp && deal.AfterPSD && !deal.AfterFindDeal && deal.FindDealTaskID == nil {
var executed bool
err := d.db.QueryRow(ctx, `SELECT EXISTS(SELECT TRUE FROM market_mk12_deals d
INNER JOIN message_waits mw ON mw.signed_message_cid = d.publish_cid
WHERE mw.executed_tsk_cid IS NOT NULL AND d.uuid = $1)`, deal.UUID).Scan(&executed)
if err != nil {
return xerrors.Errorf("UUID: %s: checking if the message is executed: %w", deal.UUID, err)
}
if !executed {
return nil
}

d.adders[pollerFindDeal].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) {
// update
n, err := tx.Exec(`UPDATE market_mk12_deal_pipeline SET find_deal_task_id = $1
Expand All @@ -337,7 +357,7 @@ func (d *CurioStorageDealMarket) processMk12Deal(ctx context.Context, deal MK12P
if deal.AfterFindDeal && deal.Sector == nil && deal.Offset == nil {
err := d.ingestDeal(ctx, deal)
if err != nil {
return err
return xerrors.Errorf("ingest deal: %w", err)
}
}
return nil
Expand All @@ -361,7 +381,7 @@ type MarketMK12Deal struct {
PublishCid string `db:"publish_cid"`
FastRetrieval bool `db:"fast_retrieval"`
AnnounceToIpni bool `db:"announce_to_ipni"`
Error string `db:"error"`
Error *string `db:"error"`
}

func (d *CurioStorageDealMarket) findURLForOfflineDeals(ctx context.Context, deal string, pcid string) error {
Expand Down Expand Up @@ -461,15 +481,15 @@ func (d *CurioStorageDealMarket) addPSDTask(ctx context.Context, deals []MK12Pip
dm := make(map[int64]queue)

for _, deal := range deals {
if deal.Started && deal.AfterCommp && !deal.AfterPSD && deal.PSDTaskID == nil {
if deal.Started && deal.AfterCommp && deal.PSDWaitTime != nil && !deal.AfterPSD && deal.PSDTaskID == nil {
// Check if the spID is already in the map
if q, exists := dm[deal.SpID]; exists {
// Append the UUID to the deals list
q.deals = append(q.deals, deal.UUID)

// Update the time if the current deal's time is older
if deal.PSDWaitTime.Before(q.t) {
q.t = deal.PSDWaitTime
q.t = *deal.PSDWaitTime
}

// Update the map with the new queue
Expand All @@ -478,7 +498,7 @@ func (d *CurioStorageDealMarket) addPSDTask(ctx context.Context, deals []MK12Pip
// Add a new entry to the map if spID is not present
dm[deal.SpID] = queue{
deals: []string{deal.UUID},
t: deal.PSDWaitTime,
t: *deal.PSDWaitTime,
}
}
}
Expand All @@ -488,19 +508,29 @@ func (d *CurioStorageDealMarket) addPSDTask(ctx context.Context, deals []MK12Pip
maxDeals := d.cfg.Market.StorageMarketConfig.MK12.MaxDealsPerPublishMsg

for _, q := range dm {
if q.t.Add(time.Duration(publishPeriod)).After(time.Now()) || uint64(len(q.deals)) > maxDeals {
if q.t.Add(time.Duration(publishPeriod)).Before(time.Now()) || uint64(len(q.deals)) > maxDeals {
d.adders[pollerPSD].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) {
// update
n, err := tx.Exec(`UPDATE market_mk12_deal_pipeline SET psd_task_id = $1
WHERE uuid = ANY($2) AND started = TRUE AND after_commp = TRUE
AND psd_task_id IS NULL`, id, q.deals)
if err != nil {
return false, xerrors.Errorf("updating deal pipeline: %w", err)
var n int
for _, deal := range q.deals {
u, err := tx.Exec(`UPDATE market_mk12_deal_pipeline SET psd_task_id = $1
WHERE uuid = $2 AND started = TRUE AND after_commp = TRUE
AND psd_task_id IS NULL`, id, deal)
if err != nil {
return false, xerrors.Errorf("updating deal pipeline: %w", err)
}
n += u
}

if n > 0 {
log.Infof("PSD task created for %d deals %s", n, q.deals)
}

return n > 0, nil
})
} else {
log.Infow("PSD task not created as the time is not yet reached", "time", q.t.Add(time.Duration(publishPeriod)), "deals", q.deals)
}
log.Infof("PSD task created successfully for deals %s", q.deals)
}
return nil
}
Expand Down Expand Up @@ -530,13 +560,11 @@ func (d *CurioStorageDealMarket) ingestDeal(ctx context.Context, deal MK12Pipeli
publish_cid,
fast_retrieval,
announce_to_ipni,
url,
url_headers,
error
FROM market_mk12_deals
WHERE uuid = $1;`, deal.UUID)
if err != nil {
return false, xerrors.Errorf("failed to get MK12 deals from DB")
return false, xerrors.Errorf("failed to get MK12 deals from DB: %w", err)
}

if len(dbdeals) != 1 {
Expand Down Expand Up @@ -573,7 +601,7 @@ func (d *CurioStorageDealMarket) ingestDeal(ctx context.Context, deal MK12Pipeli
KeepUnsealed: true,
}

dealUrl, err := url.Parse(deal.URL)
dealUrl, err := url.Parse(*deal.URL)
if err != nil {
return false, xerrors.Errorf("UUID: %s: %w", deal.UUID, err)
}
Expand Down
16 changes: 11 additions & 5 deletions tasks/storage-market/task_commp.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ func (c *CommpTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
UUID string `db:"uuid"`
URL *string `db:"url"`
Headers json.RawMessage `db:"headers"`
Size *int64 `db:"raw_size"`
RawSize int64 `db:"raw_size"`
}

err = c.db.Select(ctx, &pieces, `SELECT uuid, url, headers, raw_size, piece_cid
err = c.db.Select(ctx, &pieces, `SELECT uuid, url, headers, raw_size, piece_cid, piece_size
FROM market_mk12_deal_pipeline WHERE commp_task_id = $1`, taskID)

if err != nil {
Expand Down Expand Up @@ -152,7 +152,7 @@ func (c *CommpTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
reader = resp.Body
}

pReader, _ := padreader.New(reader, uint64(*piece.Size))
pReader, pSz := padreader.New(reader, uint64(piece.RawSize))

defer func() {
_ = closer.Close()
Expand All @@ -164,8 +164,8 @@ func (c *CommpTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
return false, xerrors.Errorf("copy into commp writer: %w", err)
}

if written != *piece.Size {
return false, xerrors.Errorf("number of bytes written to CommP writer %d not equal to the file size %d", written, piece.Size)
if written != int64(pSz) {
return false, xerrors.Errorf("number of bytes written to CommP writer %d not equal to the file size %d", written, pSz)
}

calculatedCommp, err := w.Sum()
Expand Down Expand Up @@ -223,6 +223,12 @@ func (c *CommpTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.Task
// ParkPiece should be scheduled on same node which has the piece
// Remote HTTP ones can be scheduled on any node

if true {
// TODO make this a setting
id := ids[0]
return &id, nil
}

ctx := context.Background()

var tasks []struct {
Expand Down
2 changes: 1 addition & 1 deletion tasks/storage-market/task_find_deal.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (f *FindDealTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (do
err = f.db.Select(ctx, &execResult, `SELECT executed_tsk_cid, executed_tsk_epoch, executed_msg_cid,
executed_rcpt_exitcode, executed_rcpt_gas_used
FROM message_waits
WHERE signed_message_cid AND executed_tsk_epoch IS NOT NULL`, bd.PublishCid)
WHERE signed_message_cid = $1 AND executed_tsk_epoch IS NOT NULL`, bd.PublishCid)
if err != nil {
fdLog.Errorw("failed to query message_waits", "error", err)
}
Expand Down
Loading

0 comments on commit 6297fc2

Please sign in to comment.