From dd3ecf15f3fee93f12092f3298b027a1a0c4063f Mon Sep 17 00:00:00 2001 From: LexLuthr <88259624+LexLuthr@users.noreply.github.com> Date: Fri, 18 Oct 2024 14:50:12 +0400 Subject: [PATCH] controlled indexing after migration (#288) * controlled indexing after migration * shared max counter, ipni complete * limit count --- cmd/curio/tasks/tasks.go | 6 +- .../sql/20241017-market-mig-indexing.sql | 88 +++++++++++++++++++ tasks/gc/pipeline_meta_gc.go | 10 +-- tasks/indexing/task_indexing.go | 4 +- tasks/indexing/task_ipni.go | 34 ++++--- tasks/storage-market/storage_market.go | 28 ++++-- 6 files changed, 143 insertions(+), 27 deletions(-) create mode 100644 harmony/harmonydb/sql/20241017-market-mig-indexing.sql diff --git a/cmd/curio/tasks/tasks.go b/cmd/curio/tasks/tasks.go index 20ad06d7e..5f36a8030 100644 --- a/cmd/curio/tasks/tasks.go +++ b/cmd/curio/tasks/tasks.go @@ -239,8 +239,10 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task } } - indexingTask := indexing.NewIndexingTask(db, sc, iStore, pp, cfg) - ipniTask := indexing.NewIPNITask(db, sc, iStore, pp, cfg) + idxMax := taskhelp.Max(8) + + indexingTask := indexing.NewIndexingTask(db, sc, iStore, pp, cfg, idxMax) + ipniTask := indexing.NewIPNITask(db, sc, iStore, pp, cfg, idxMax) activeTasks = append(activeTasks, indexingTask, ipniTask) if cfg.HTTP.Enable { diff --git a/harmony/harmonydb/sql/20241017-market-mig-indexing.sql b/harmony/harmonydb/sql/20241017-market-mig-indexing.sql new file mode 100644 index 000000000..11675c618 --- /dev/null +++ b/harmony/harmonydb/sql/20241017-market-mig-indexing.sql @@ -0,0 +1,88 @@ +-- All indexing task entries are made into this table +-- and then copied over to market_mk12_deal_pipeline for a controlled migration +CREATE TABLE market_mk12_deal_pipeline_migration ( + uuid TEXT NOT NULL PRIMARY KEY, + sp_id BIGINT NOT NULL, + piece_cid TEXT NOT NULL, + piece_size BIGINT NOT NULL, -- padded size + raw_size BIGINT DEFAULT NULL, + sector BIGINT DEFAULT NULL, + reg_seal_proof INT DEFAULT NULL, + sector_offset BIGINT DEFAULT NULL, -- padded offset + should_announce BOOLEAN NOT NULL +); + +CREATE OR REPLACE FUNCTION migrate_deal_pipeline_entries() +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + -- Counts for existing entries and tasks + num_entries_ready_for_indexing INTEGER; + num_pending_ipni_tasks INTEGER; + num_entries_needed INTEGER; + num_entries_to_move INTEGER; + cnt INTEGER; + moved_rows RECORD; +BEGIN + -- Step 1: Check if the migration table has entries + SELECT COUNT(*) INTO cnt FROM market_mk12_deal_pipeline_migration LIMIT 16; + IF cnt = 0 THEN + RETURN; + END IF; + + -- Step 2: Count entries ready for indexing in the pipeline table + SELECT COUNT(*) INTO num_entries_ready_for_indexing + FROM market_mk12_deal_pipeline + WHERE sealed = TRUE AND should_index = TRUE AND indexed = FALSE LIMIT 16; + + -- Step 3: Count pending IPNI tasks + SELECT COUNT(*) INTO num_pending_ipni_tasks + FROM harmony_task + WHERE name = 'IPNI' AND owner_id IS NULL LIMIT 16; + + -- Step 4: Calculate how many entries we need to reach 16 + num_entries_needed := 16 - num_entries_ready_for_indexing; + + -- If we already have 16 or more entries ready, no need to move more + IF num_entries_needed <= 0 THEN + RETURN; + END IF; + + -- Step 5: Calculate how many entries we can move without exceeding 16 pending IPNI tasks + num_entries_to_move := LEAST(num_entries_needed, 16 - num_pending_ipni_tasks); + + -- Limit by the number of entries available in the migration table + SELECT COUNT(*) INTO cnt FROM market_mk12_deal_pipeline_migration LIMIT 16; + num_entries_to_move := LEAST(num_entries_to_move, cnt); + + -- If no entries to move after calculations, exit + IF num_entries_to_move <= 0 THEN + RETURN; + END IF; + + -- Move entries from the migration table to the pipeline table + FOR moved_rows IN + SELECT uuid, sp_id, piece_cid, piece_size, raw_size, sector, reg_seal_proof, sector_offset, should_announce + FROM market_mk12_deal_pipeline_migration + LIMIT num_entries_to_move + LOOP + -- Insert into the pipeline table + INSERT INTO market_mk12_deal_pipeline ( + uuid, sp_id, started, piece_cid, piece_size, raw_size, offline, + after_commp, after_psd, after_find_deal, sector, reg_seal_proof, sector_offset, + sealed, should_index, indexing_created_at, announce + ) VALUES ( + moved_rows.uuid, moved_rows.sp_id, TRUE, moved_rows.piece_cid, moved_rows.piece_size, moved_rows.raw_size, FALSE, + TRUE, TRUE, TRUE, moved_rows.sector, moved_rows.reg_seal_proof, moved_rows.sector_offset, + TRUE, TRUE, NOW() AT TIME ZONE 'UTC', moved_rows.should_announce + ) ON CONFLICT (uuid) DO NOTHING; + -- Remove the entry from the migration table + DELETE FROM market_mk12_deal_pipeline_migration WHERE uuid = moved_rows.uuid; + END LOOP; + + RAISE NOTICE 'Moved % entries to the pipeline table.', num_entries_to_move; +END; +$$; + + diff --git a/tasks/gc/pipeline_meta_gc.go b/tasks/gc/pipeline_meta_gc.go index 157b27a7f..b6788db52 100644 --- a/tasks/gc/pipeline_meta_gc.go +++ b/tasks/gc/pipeline_meta_gc.go @@ -34,11 +34,6 @@ func (s *PipelineGC) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done if err := s.cleanupMK12DealPipeline(); err != nil { return false, xerrors.Errorf("cleanupMK12DealPipeline: %w", err) } - - if err := s.cleanupMK12DealPipeline(); err != nil { - return false, xerrors.Errorf("cleanupMK12DealPipeline: %w", err) - } - if err := s.cleanupUnseal(); err != nil { return false, xerrors.Errorf("cleanupUnseal: %w", err) @@ -173,6 +168,11 @@ func (s *PipelineGC) cleanupMK12DealPipeline() error { return xerrors.Errorf("failed to clean up sealed deals: %w", err) } + _, err = s.db.Exec(ctx, `DELETE FROM ipni_task WHERE complete = TRUE;`) + if err != nil { + return xerrors.Errorf("failed to clean up indexing tasks: %w", err) + } + return nil } diff --git a/tasks/indexing/task_indexing.go b/tasks/indexing/task_indexing.go index 1a0821793..75d5fd8a5 100644 --- a/tasks/indexing/task_indexing.go +++ b/tasks/indexing/task_indexing.go @@ -37,9 +37,10 @@ type IndexingTask struct { cfg *config.CurioConfig insertConcurrency int insertBatchSize int + max taskhelp.Limiter } -func NewIndexingTask(db *harmonydb.DB, sc *ffi.SealCalls, indexStore *indexstore.IndexStore, pieceProvider *pieceprovider.PieceProvider, cfg *config.CurioConfig) *IndexingTask { +func NewIndexingTask(db *harmonydb.DB, sc *ffi.SealCalls, indexStore *indexstore.IndexStore, pieceProvider *pieceprovider.PieceProvider, cfg *config.CurioConfig, max taskhelp.Limiter) *IndexingTask { return &IndexingTask{ db: db, @@ -49,6 +50,7 @@ func NewIndexingTask(db *harmonydb.DB, sc *ffi.SealCalls, indexStore *indexstore cfg: cfg, insertConcurrency: cfg.Market.StorageMarketConfig.Indexing.InsertConcurrency, insertBatchSize: cfg.Market.StorageMarketConfig.Indexing.InsertBatchSize, + max: max, } } diff --git a/tasks/indexing/task_ipni.go b/tasks/indexing/task_ipni.go index bbf64bf15..33ffb1aa2 100644 --- a/tasks/indexing/task_ipni.go +++ b/tasks/indexing/task_ipni.go @@ -29,6 +29,7 @@ import ( "github.com/filecoin-project/curio/harmony/harmonydb" "github.com/filecoin-project/curio/harmony/harmonytask" "github.com/filecoin-project/curio/harmony/resources" + "github.com/filecoin-project/curio/harmony/taskhelp" "github.com/filecoin-project/curio/lib/ffi" "github.com/filecoin-project/curio/lib/passcall" "github.com/filecoin-project/curio/lib/pieceprovider" @@ -46,9 +47,10 @@ type IPNITask struct { pieceProvider *pieceprovider.PieceProvider sc *ffi.SealCalls cfg *config.CurioConfig + max taskhelp.Limiter } -func NewIPNITask(db *harmonydb.DB, sc *ffi.SealCalls, indexStore *indexstore.IndexStore, pieceProvider *pieceprovider.PieceProvider, cfg *config.CurioConfig) *IPNITask { +func NewIPNITask(db *harmonydb.DB, sc *ffi.SealCalls, indexStore *indexstore.IndexStore, pieceProvider *pieceprovider.PieceProvider, cfg *config.CurioConfig, max taskhelp.Limiter) *IPNITask { return &IPNITask{ db: db, @@ -56,6 +58,7 @@ func NewIPNITask(db *harmonydb.DB, sc *ffi.SealCalls, indexStore *indexstore.Ind pieceProvider: pieceProvider, sc: sc, cfg: cfg, + max: max, } } @@ -63,13 +66,14 @@ func (I *IPNITask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b ctx := context.Background() var tasks []struct { - SPID int64 `db:"sp_id"` - Sector abi.SectorNumber `db:"sector"` - Proof abi.RegisteredSealProof `db:"reg_seal_proof"` - Offset int64 `db:"sector_offset"` - CtxID []byte `db:"context_id"` - Rm bool `db:"is_rm"` - Prov string `db:"provider"` + SPID int64 `db:"sp_id"` + Sector abi.SectorNumber `db:"sector"` + Proof abi.RegisteredSealProof `db:"reg_seal_proof"` + Offset int64 `db:"sector_offset"` + CtxID []byte `db:"context_id"` + Rm bool `db:"is_rm"` + Prov string `db:"provider"` + Complete bool `db:"complete"` } err = I.db.Select(ctx, &tasks, `SELECT @@ -79,7 +83,8 @@ func (I *IPNITask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b sector_offset, context_id, is_rm, - provider + provider, + complete FROM ipni_task WHERE @@ -94,6 +99,10 @@ func (I *IPNITask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b task := tasks[0] + if task.Complete { + return true, nil + } + var pi abi.PieceInfo err = pi.UnmarshalCBOR(bytes.NewReader(task.CtxID)) if err != nil { @@ -219,7 +228,7 @@ func (I *IPNITask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b return false, xerrors.Errorf("converting advertisement to link: %w", err) } - n, err := I.db.Exec(ctx, `SELECT insert_ad_and_update_head($1, $2, $3, $4, $5, $6, $7)`, + _, err = tx.Exec(`SELECT insert_ad_and_update_head($1, $2, $3, $4, $5, $6, $7)`, ad.(cidlink.Link).Cid.String(), adv.ContextID, adv.IsRm, adv.Provider, strings.Join(adv.Addresses, "|"), adv.Signature, adv.Entries.String()) @@ -227,6 +236,10 @@ func (I *IPNITask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b return false, xerrors.Errorf("adding advertisement to the database: %w", err) } + n, err := tx.Exec(`UPDATE ipni_task SET complete = true WHERE task_id = $1`, taskID) + if err != nil { + return false, xerrors.Errorf("failed to mark IPNI task complete: %w", err) + } if n != 1 { return false, xerrors.Errorf("updated %d rows", n) } @@ -306,6 +319,7 @@ func (I *IPNITask) TypeDetails() harmonytask.TaskTypeDetails { IAmBored: passcall.Every(5*time.Minute, func(taskFunc harmonytask.AddTaskFunc) error { return I.schedule(context.Background(), taskFunc) }), + Max: I.max, } } diff --git a/tasks/storage-market/storage_market.go b/tasks/storage-market/storage_market.go index ca381ff3f..e0fba3cf1 100644 --- a/tasks/storage-market/storage_market.go +++ b/tasks/storage-market/storage_market.go @@ -160,24 +160,23 @@ func (d *CurioStorageDealMarket) runPoller(ctx context.Context) { func (d *CurioStorageDealMarket) poll(ctx context.Context) { + d.createIndexingTaskForMigratedDeals(ctx) /* FULL DEAL FLOW: Online: 1. Make an entry for each online deal in market_mk12_deal_pipeline 2. For online deals - keep checking if piecePark is complete 4. Create commP task for online deal - 5. Once commP is complete, add the deal using pieceIngest + 5. Once commP is complete, send PSD and find the allocated deal ID + 6. Add the deal using pieceIngest Offline: 1. Make an entry for each online deal in market_mk12_deal_pipeline - 2. Offline deal would not be started. It will have 2 triggers - A. We find a pieceCID <> URL binding - B. User manually imports the data using a file (will need piecePark) - 3. Check if piece is parked for offline deal triggered manually - 4. Create commP task for offline deals - A. If we have piecePark then do local commP - B. Do streaming commP if we have URL - 5. Once commP is complete, add the deal using pieceIngest + 2. Offline deal would not be started till we find a pieceCID <> URL binding + 3. Create commP task for offline deals + A. Do streaming commP + 5. Once commP is complete, send PSD and find the allocated deal ID + 6. Add the deal using pieceIngest */ for module, miners := range d.miners { if module == mk12Str { @@ -653,3 +652,14 @@ func (d *CurioStorageDealMarket) ingestDeal(ctx context.Context, deal MK12Pipeli log.Infof("Added deal %s to sector %d at %d", deal.UUID, info.Sector, info.Offset) return nil } + +func (d *CurioStorageDealMarket) createIndexingTaskForMigratedDeals(ctx context.Context) { + // Call the migration function and get the number of rows moved + var rowsMoved int + err := d.db.QueryRow(ctx, "SELECT migrate_deal_pipeline_entries()").Scan(&rowsMoved) + if err != nil { + log.Errorf("Error creating indexing tasks for migrated deals: %w", err) + return + } + log.Debugf("Successfully created indexing tasks for %d migrated deals", rowsMoved) +}