Skip to content

Commit

Permalink
controlled indexing after migration (#288)
Browse files Browse the repository at this point in the history
* controlled indexing after migration

* shared max counter, ipni complete

* limit count
  • Loading branch information
LexLuthr authored Oct 18, 2024
1 parent 6297fc2 commit dd3ecf1
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 27 deletions.
6 changes: 4 additions & 2 deletions cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,10 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
}
}

indexingTask := indexing.NewIndexingTask(db, sc, iStore, pp, cfg)
ipniTask := indexing.NewIPNITask(db, sc, iStore, pp, cfg)
idxMax := taskhelp.Max(8)

indexingTask := indexing.NewIndexingTask(db, sc, iStore, pp, cfg, idxMax)
ipniTask := indexing.NewIPNITask(db, sc, iStore, pp, cfg, idxMax)
activeTasks = append(activeTasks, indexingTask, ipniTask)

if cfg.HTTP.Enable {
Expand Down
88 changes: 88 additions & 0 deletions harmony/harmonydb/sql/20241017-market-mig-indexing.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
-- All indexing task entries are made into this table
-- and then copied over to market_mk12_deal_pipeline for a controlled migration
CREATE TABLE market_mk12_deal_pipeline_migration (
uuid TEXT NOT NULL PRIMARY KEY,
sp_id BIGINT NOT NULL,
piece_cid TEXT NOT NULL,
piece_size BIGINT NOT NULL, -- padded size
raw_size BIGINT DEFAULT NULL,
sector BIGINT DEFAULT NULL,
reg_seal_proof INT DEFAULT NULL,
sector_offset BIGINT DEFAULT NULL, -- padded offset
should_announce BOOLEAN NOT NULL
);

CREATE OR REPLACE FUNCTION migrate_deal_pipeline_entries()
RETURNS VOID
LANGUAGE plpgsql
AS $$
DECLARE
-- Counts for existing entries and tasks
num_entries_ready_for_indexing INTEGER;
num_pending_ipni_tasks INTEGER;
num_entries_needed INTEGER;
num_entries_to_move INTEGER;
cnt INTEGER;
moved_rows RECORD;
BEGIN
-- Step 1: Check if the migration table has entries
SELECT COUNT(*) INTO cnt FROM market_mk12_deal_pipeline_migration LIMIT 16;
IF cnt = 0 THEN
RETURN;
END IF;

-- Step 2: Count entries ready for indexing in the pipeline table
SELECT COUNT(*) INTO num_entries_ready_for_indexing
FROM market_mk12_deal_pipeline
WHERE sealed = TRUE AND should_index = TRUE AND indexed = FALSE LIMIT 16;

-- Step 3: Count pending IPNI tasks
SELECT COUNT(*) INTO num_pending_ipni_tasks
FROM harmony_task
WHERE name = 'IPNI' AND owner_id IS NULL LIMIT 16;

-- Step 4: Calculate how many entries we need to reach 16
num_entries_needed := 16 - num_entries_ready_for_indexing;

-- If we already have 16 or more entries ready, no need to move more
IF num_entries_needed <= 0 THEN
RETURN;
END IF;

-- Step 5: Calculate how many entries we can move without exceeding 16 pending IPNI tasks
num_entries_to_move := LEAST(num_entries_needed, 16 - num_pending_ipni_tasks);

-- Limit by the number of entries available in the migration table
SELECT COUNT(*) INTO cnt FROM market_mk12_deal_pipeline_migration LIMIT 16;
num_entries_to_move := LEAST(num_entries_to_move, cnt);

-- If no entries to move after calculations, exit
IF num_entries_to_move <= 0 THEN
RETURN;
END IF;

-- Move entries from the migration table to the pipeline table
FOR moved_rows IN
SELECT uuid, sp_id, piece_cid, piece_size, raw_size, sector, reg_seal_proof, sector_offset, should_announce
FROM market_mk12_deal_pipeline_migration
LIMIT num_entries_to_move
LOOP
-- Insert into the pipeline table
INSERT INTO market_mk12_deal_pipeline (
uuid, sp_id, started, piece_cid, piece_size, raw_size, offline,
after_commp, after_psd, after_find_deal, sector, reg_seal_proof, sector_offset,
sealed, should_index, indexing_created_at, announce
) VALUES (
moved_rows.uuid, moved_rows.sp_id, TRUE, moved_rows.piece_cid, moved_rows.piece_size, moved_rows.raw_size, FALSE,
TRUE, TRUE, TRUE, moved_rows.sector, moved_rows.reg_seal_proof, moved_rows.sector_offset,
TRUE, TRUE, NOW() AT TIME ZONE 'UTC', moved_rows.should_announce
) ON CONFLICT (uuid) DO NOTHING;
-- Remove the entry from the migration table
DELETE FROM market_mk12_deal_pipeline_migration WHERE uuid = moved_rows.uuid;
END LOOP;

RAISE NOTICE 'Moved % entries to the pipeline table.', num_entries_to_move;
END;
$$;


10 changes: 5 additions & 5 deletions tasks/gc/pipeline_meta_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ func (s *PipelineGC) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
if err := s.cleanupMK12DealPipeline(); err != nil {
return false, xerrors.Errorf("cleanupMK12DealPipeline: %w", err)
}

if err := s.cleanupMK12DealPipeline(); err != nil {
return false, xerrors.Errorf("cleanupMK12DealPipeline: %w", err)
}

if err := s.cleanupUnseal(); err != nil {
return false, xerrors.Errorf("cleanupUnseal: %w", err)

Expand Down Expand Up @@ -173,6 +168,11 @@ func (s *PipelineGC) cleanupMK12DealPipeline() error {
return xerrors.Errorf("failed to clean up sealed deals: %w", err)
}

_, err = s.db.Exec(ctx, `DELETE FROM ipni_task WHERE complete = TRUE;`)
if err != nil {
return xerrors.Errorf("failed to clean up indexing tasks: %w", err)
}

return nil
}

Expand Down
4 changes: 3 additions & 1 deletion tasks/indexing/task_indexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ type IndexingTask struct {
cfg *config.CurioConfig
insertConcurrency int
insertBatchSize int
max taskhelp.Limiter
}

func NewIndexingTask(db *harmonydb.DB, sc *ffi.SealCalls, indexStore *indexstore.IndexStore, pieceProvider *pieceprovider.PieceProvider, cfg *config.CurioConfig) *IndexingTask {
func NewIndexingTask(db *harmonydb.DB, sc *ffi.SealCalls, indexStore *indexstore.IndexStore, pieceProvider *pieceprovider.PieceProvider, cfg *config.CurioConfig, max taskhelp.Limiter) *IndexingTask {

return &IndexingTask{
db: db,
Expand All @@ -49,6 +50,7 @@ func NewIndexingTask(db *harmonydb.DB, sc *ffi.SealCalls, indexStore *indexstore
cfg: cfg,
insertConcurrency: cfg.Market.StorageMarketConfig.Indexing.InsertConcurrency,
insertBatchSize: cfg.Market.StorageMarketConfig.Indexing.InsertBatchSize,
max: max,
}
}

Expand Down
34 changes: 24 additions & 10 deletions tasks/indexing/task_ipni.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/harmony/harmonytask"
"github.com/filecoin-project/curio/harmony/resources"
"github.com/filecoin-project/curio/harmony/taskhelp"
"github.com/filecoin-project/curio/lib/ffi"
"github.com/filecoin-project/curio/lib/passcall"
"github.com/filecoin-project/curio/lib/pieceprovider"
Expand All @@ -46,30 +47,33 @@ type IPNITask struct {
pieceProvider *pieceprovider.PieceProvider
sc *ffi.SealCalls
cfg *config.CurioConfig
max taskhelp.Limiter
}

func NewIPNITask(db *harmonydb.DB, sc *ffi.SealCalls, indexStore *indexstore.IndexStore, pieceProvider *pieceprovider.PieceProvider, cfg *config.CurioConfig) *IPNITask {
func NewIPNITask(db *harmonydb.DB, sc *ffi.SealCalls, indexStore *indexstore.IndexStore, pieceProvider *pieceprovider.PieceProvider, cfg *config.CurioConfig, max taskhelp.Limiter) *IPNITask {

return &IPNITask{
db: db,
indexStore: indexStore,
pieceProvider: pieceProvider,
sc: sc,
cfg: cfg,
max: max,
}
}

func (I *IPNITask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
ctx := context.Background()

var tasks []struct {
SPID int64 `db:"sp_id"`
Sector abi.SectorNumber `db:"sector"`
Proof abi.RegisteredSealProof `db:"reg_seal_proof"`
Offset int64 `db:"sector_offset"`
CtxID []byte `db:"context_id"`
Rm bool `db:"is_rm"`
Prov string `db:"provider"`
SPID int64 `db:"sp_id"`
Sector abi.SectorNumber `db:"sector"`
Proof abi.RegisteredSealProof `db:"reg_seal_proof"`
Offset int64 `db:"sector_offset"`
CtxID []byte `db:"context_id"`
Rm bool `db:"is_rm"`
Prov string `db:"provider"`
Complete bool `db:"complete"`
}

err = I.db.Select(ctx, &tasks, `SELECT
Expand All @@ -79,7 +83,8 @@ func (I *IPNITask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b
sector_offset,
context_id,
is_rm,
provider
provider,
complete
FROM
ipni_task
WHERE
Expand All @@ -94,6 +99,10 @@ func (I *IPNITask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b

task := tasks[0]

if task.Complete {
return true, nil
}

var pi abi.PieceInfo
err = pi.UnmarshalCBOR(bytes.NewReader(task.CtxID))
if err != nil {
Expand Down Expand Up @@ -219,14 +228,18 @@ func (I *IPNITask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b
return false, xerrors.Errorf("converting advertisement to link: %w", err)
}

n, err := I.db.Exec(ctx, `SELECT insert_ad_and_update_head($1, $2, $3, $4, $5, $6, $7)`,
_, err = tx.Exec(`SELECT insert_ad_and_update_head($1, $2, $3, $4, $5, $6, $7)`,
ad.(cidlink.Link).Cid.String(), adv.ContextID, adv.IsRm, adv.Provider, strings.Join(adv.Addresses, "|"),
adv.Signature, adv.Entries.String())

if err != nil {
return false, xerrors.Errorf("adding advertisement to the database: %w", err)
}

n, err := tx.Exec(`UPDATE ipni_task SET complete = true WHERE task_id = $1`, taskID)
if err != nil {
return false, xerrors.Errorf("failed to mark IPNI task complete: %w", err)
}
if n != 1 {
return false, xerrors.Errorf("updated %d rows", n)
}
Expand Down Expand Up @@ -306,6 +319,7 @@ func (I *IPNITask) TypeDetails() harmonytask.TaskTypeDetails {
IAmBored: passcall.Every(5*time.Minute, func(taskFunc harmonytask.AddTaskFunc) error {
return I.schedule(context.Background(), taskFunc)
}),
Max: I.max,
}
}

Expand Down
28 changes: 19 additions & 9 deletions tasks/storage-market/storage_market.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,24 +160,23 @@ func (d *CurioStorageDealMarket) runPoller(ctx context.Context) {

func (d *CurioStorageDealMarket) poll(ctx context.Context) {

d.createIndexingTaskForMigratedDeals(ctx)
/*
FULL DEAL FLOW:
Online:
1. Make an entry for each online deal in market_mk12_deal_pipeline
2. For online deals - keep checking if piecePark is complete
4. Create commP task for online deal
5. Once commP is complete, add the deal using pieceIngest
5. Once commP is complete, send PSD and find the allocated deal ID
6. Add the deal using pieceIngest
Offline:
1. Make an entry for each online deal in market_mk12_deal_pipeline
2. Offline deal would not be started. It will have 2 triggers
A. We find a pieceCID <> URL binding
B. User manually imports the data using a file (will need piecePark)
3. Check if piece is parked for offline deal triggered manually
4. Create commP task for offline deals
A. If we have piecePark then do local commP
B. Do streaming commP if we have URL
5. Once commP is complete, add the deal using pieceIngest
2. Offline deal would not be started till we find a pieceCID <> URL binding
3. Create commP task for offline deals
A. Do streaming commP
5. Once commP is complete, send PSD and find the allocated deal ID
6. Add the deal using pieceIngest
*/
for module, miners := range d.miners {
if module == mk12Str {
Expand Down Expand Up @@ -653,3 +652,14 @@ func (d *CurioStorageDealMarket) ingestDeal(ctx context.Context, deal MK12Pipeli
log.Infof("Added deal %s to sector %d at %d", deal.UUID, info.Sector, info.Offset)
return nil
}

func (d *CurioStorageDealMarket) createIndexingTaskForMigratedDeals(ctx context.Context) {
// Call the migration function and get the number of rows moved
var rowsMoved int
err := d.db.QueryRow(ctx, "SELECT migrate_deal_pipeline_entries()").Scan(&rowsMoved)
if err != nil {
log.Errorf("Error creating indexing tasks for migrated deals: %w", err)
return
}
log.Debugf("Successfully created indexing tasks for %d migrated deals", rowsMoved)
}

0 comments on commit dd3ecf1

Please sign in to comment.