diff --git a/cmd/migrate-curio/migrate.go b/cmd/migrate-curio/migrate.go index 0ac5d61be..36b5864d7 100644 --- a/cmd/migrate-curio/migrate.go +++ b/cmd/migrate-curio/migrate.go @@ -1,7 +1,6 @@ package main import ( - "crypto/rand" "database/sql" "encoding/json" "errors" @@ -34,7 +33,6 @@ import ( "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" cbor "github.com/ipfs/go-ipld-cbor" - "github.com/libp2p/go-libp2p/core/crypto" "github.com/mitchellh/go-homedir" "github.com/urfave/cli/v2" "golang.org/x/net/context" @@ -163,11 +161,6 @@ func migrate(cctx *cli.Context, repoDir string) error { return xerrors.Errorf("failed to migrate DDO deals: %w", err) } - // Migrate libp2p key - if err := generateNewKeys(ctx, maddr, hdb); err != nil { - return xerrors.Errorf("failed to migrate libp2p key: %w", err) - } - return nil } @@ -191,7 +184,11 @@ func migrateBoostDeals(ctx context.Context, activeSectors bitfield.BitField, mad deals := append(aDeals, cDeals...) - for _, deal := range deals { + for i, deal := range deals { + if i > 0 && i%100 == 0 { + fmt.Printf("Migrating Boost Deals: %d / %d (%0.2f%%)\n", i, len(deals), float64(i)/float64(len(deals))*100) + } + llog := log.With("Boost Deal", deal.DealUuid.String()) // Skip deals which are before add piece if deal.Checkpoint < dealcheckpoints.AddedPiece { @@ -269,62 +266,69 @@ func migrateBoostDeals(ctx context.Context, activeSectors bitfield.BitField, mad return fmt.Errorf("deal: %s: failed to marshal headers: %s", deal.DealUuid.String(), err) } - // Add deal to HarmonyDB - if !a { - _, err = hdb.Exec(ctx, `INSERT INTO market_mk12_deals (uuid, sp_id, signed_proposal_cid, + _, err = hdb.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) { + // Add deal to HarmonyDB + if !a { + _, err = tx.Exec(`INSERT INTO market_mk12_deals (uuid, sp_id, signed_proposal_cid, proposal_signature, proposal, piece_cid, piece_size, offline, verified, start_epoch, end_epoch, client_peer_id, fast_retrieval, announce_to_ipni, url, url_headers, chain_deal_id, publish_cid, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19) ON CONFLICT (uuid) DO NOTHING`, - deal.DealUuid.String(), mid, sProp.String(), sigByte, propJson, prop.PieceCID.String(), - prop.PieceSize, deal.IsOffline, prop.VerifiedDeal, prop.StartEpoch, prop.EndEpoch, deal.ClientPeerID.String(), - deal.FastRetrieval, deal.AnnounceToIPNI, tInfo.URL, headers, int64(deal.ChainDealID), deal.PublishCID.String(), deal.CreatedAt) - - if err != nil { - return fmt.Errorf("deal: %s: failed to add the deal to harmonyDB: %w", deal.DealUuid.String(), err) - } - - // Mark deal added to harmonyDB - _, err = mdb.Exec(`INSERT INTO Deals (ID, DB, LID) VALUES (?, TRUE, FALSE) ON CONFLICT(ID) DO NOTHING`, deal.DealUuid.String()) - if err != nil { - return fmt.Errorf("deal: %s: failed to mark deal migrated: %w", deal.DealUuid.String(), err) - } - } - - if !b { - // Add LID details to pieceDeal in HarmonyDB - _, err = hdb.Exec(ctx, `SELECT process_piece_deal($1, $2, $3, $4, $5, $6, $7, $8, $9)`, - deal.DealUuid.String(), prop.PieceCID.String(), true, mid, deal.SectorID, deal.Offset, prop.PieceSize, deal.NBytesReceived, false) - if err != nil { - return fmt.Errorf("deal: %s: failed to update piece metadata and piece deal: %w", deal.DealUuid.String(), err) + deal.DealUuid.String(), mid, sProp.String(), sigByte, propJson, prop.PieceCID.String(), + prop.PieceSize, deal.IsOffline, prop.VerifiedDeal, prop.StartEpoch, prop.EndEpoch, deal.ClientPeerID.String(), + deal.FastRetrieval, deal.AnnounceToIPNI, tInfo.URL, headers, int64(deal.ChainDealID), deal.PublishCID.String(), deal.CreatedAt) + + if err != nil { + return false, fmt.Errorf("deal: %s: failed to add the deal to harmonyDB: %w", deal.DealUuid.String(), err) + } + + // Mark deal added to harmonyDB + _, err = mdb.Exec(`INSERT INTO Deals (ID, DB, LID) VALUES (?, TRUE, FALSE) ON CONFLICT(ID) DO NOTHING`, deal.DealUuid.String()) + if err != nil { + return false, fmt.Errorf("deal: %s: failed to mark deal migrated: %w", deal.DealUuid.String(), err) + } } - // Mark deal added to pieceDeal in HarmonyDB - _, err = mdb.Exec(`UPDATE Deals SET LID = TRUE WHERE ID = ?`, deal.DealUuid.String()) - if err != nil { - return fmt.Errorf("deal: %s: failed to mark deal LID migrated: %w", deal.DealUuid.String(), err) + if !b { + // Add LID details to pieceDeal in HarmonyDB + _, err = tx.Exec(`SELECT process_piece_deal($1, $2, $3, $4, $5, $6, $7, $8, $9)`, + deal.DealUuid.String(), prop.PieceCID.String(), true, mid, deal.SectorID, deal.Offset, prop.PieceSize, deal.NBytesReceived, false) + if err != nil { + return false, fmt.Errorf("deal: %s: failed to update piece metadata and piece deal: %w", deal.DealUuid.String(), err) + } + + // Mark deal added to pieceDeal in HarmonyDB + _, err = mdb.Exec(`UPDATE Deals SET LID = TRUE WHERE ID = ?`, deal.DealUuid.String()) + if err != nil { + return false, fmt.Errorf("deal: %s: failed to mark deal LID migrated: %w", deal.DealUuid.String(), err) + } } - } - if !c { - var proof abi.RegisteredSealProof - err = hdb.QueryRow(ctx, `SELECT reg_seal_proof FROM sectors_meta WHERE sp_id = $1 AND sector_num = $2`, mid, deal.SectorID).Scan(&proof) - if err != nil { - return fmt.Errorf("seal: %s: failed to get sector proof: %w", deal.DealUuid.String(), err) - } + if !c { + var proof abi.RegisteredSealProof + err = tx.QueryRow(`SELECT reg_seal_proof FROM sectors_meta WHERE sp_id = $1 AND sector_num = $2`, mid, deal.SectorID).Scan(&proof) + if err != nil { + return false, fmt.Errorf("seal: %s: failed to get sector proof: %w", deal.DealUuid.String(), err) + } - // Add deal to mk12 pipeline in Curio for indexing and announcement - _, err = hdb.Exec(ctx, `INSERT INTO market_mk12_deal_pipeline (uuid, sp_id, started, piece_cid, piece_size, raw_size, offline, + // Add deal to mk12 pipeline in Curio for indexing and announcement + _, err = tx.Exec(`INSERT INTO market_mk12_deal_pipeline (uuid, sp_id, started, piece_cid, piece_size, raw_size, offline, after_commp, after_psd, after_find_deal, sector, reg_seal_proof, sector_offset, sealed, should_index, indexing_created_at, announce) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (uuid) DO NOTHING`, - deal.DealUuid.String(), mid, true, prop.PieceCID.String(), prop.PieceSize, deal.NBytesReceived, deal.IsOffline, - true, true, true, deal.SectorID, proof, deal.Offset, true, true, time.Now(), true) - if err != nil { - return fmt.Errorf("deal: %s: failed to add deal to pipeline for indexing and announcing: %w", deal.DealUuid.String(), err) + deal.DealUuid.String(), mid, true, prop.PieceCID.String(), prop.PieceSize, deal.NBytesReceived, deal.IsOffline, + true, true, true, deal.SectorID, proof, deal.Offset, true, true, time.Now(), true) + if err != nil { + return false, fmt.Errorf("deal: %s: failed to add deal to pipeline for indexing and announcing: %w", deal.DealUuid.String(), err) + } } + return true, nil + }, harmonydb.OptionRetry()) + if err != nil { + return err } + } return nil @@ -366,7 +370,10 @@ func migrateLegacyDeals(ctx context.Context, full v1api.FullNode, activeSectors return err } - for _, deal := range legacyDeals { + for i, deal := range legacyDeals { + if i > 0 && i%100 == 0 { + fmt.Printf("Migrating Legacy Deals: %d / %d (%0.2f%%)\n", i, len(legacyDeals), float64(i)/float64(len(legacyDeals))*100) + } llog := log.With("Boost Deal", deal.ProposalCid.String()) // Skip deals which do not have chain deal ID if deal.DealID == 0 { @@ -422,7 +429,7 @@ func migrateLegacyDeals(ctx context.Context, full v1api.FullNode, activeSectors prop := deal.ClientDealProposal.Proposal - _, err = hdb.Exec(ctx, `INSERT INTO market_mk12_deals (signed_proposal_cid, sp_id, client_peer_id, + _, err = hdb.Exec(ctx, `INSERT INTO signed_proposal_cid (signed_proposal_cid, sp_id, client_peer_id, proposal_signature, proposal, piece_cid, piece_size, verified, start_epoch, end_epoch, publish_cid, chain_deal_id, fast_retrieval, created_at, sector_num) @@ -459,7 +466,10 @@ func migrateDDODeals(ctx context.Context, full v1api.FullNode, activeSectors bit return fmt.Errorf("failed to get all DDO deals: %w", err) } - for _, deal := range deals { + for i, deal := range deals { + if i > 0 && i%100 == 0 { + fmt.Printf("Migrating DDO Deals: %d / %d (%0.2f%%)\n", i, len(deals), float64(i)/float64(len(deals))*100) + } llog := log.With("Boost Deal", deal.ID.String()) if deal.Err != "" && deal.Retry == types.DealRetryFatal { llog.Infow("Skipping as deal retry is fatal") @@ -505,86 +515,67 @@ func migrateDDODeals(ctx context.Context, full v1api.FullNode, activeSectors bit continue } - if !a { - // Add DDO deal to harmonyDB - _, err = hdb.Exec(ctx, `INSERT INTO market_direct_deals (uuid, sp_id, created_at, client, offline, verified, + _, err = hdb.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) { + if !a { + // Add DDO deal to harmonyDB + _, err = tx.Exec(`INSERT INTO market_direct_deals (uuid, sp_id, created_at, client, offline, verified, start_epoch, end_epoch, allocation_id, piece_cid, piece_size, fast_retrieval, announce_to_ipni) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) ON CONFLICT (uuid) DO NOTHING`, - deal.ID.String(), mid, deal.CreatedAt, deal.Client.String(), true, true, deal.StartEpoch, deal.EndEpoch, deal.AllocationID, - deal.PieceCID.String(), deal.PieceSize, true, true) - - if err != nil { - return fmt.Errorf("deal: %s: failed to add the DDO deal to harmonyDB: %w", deal.ID.String(), err) + deal.ID.String(), mid, deal.CreatedAt, deal.Client.String(), true, true, deal.StartEpoch, deal.EndEpoch, deal.AllocationID, + deal.PieceCID.String(), deal.PieceSize, true, true) + + if err != nil { + return false, fmt.Errorf("deal: %s: failed to add the DDO deal to harmonyDB: %w", deal.ID.String(), err) + } + + // Mark deal added to harmonyDB + _, err = mdb.Exec(`INSERT INTO Deals (ID, DB, LID) VALUES (?, TRUE, FALSE) ON CONFLICT(ID) DO NOTHING`, deal.ID.String()) + if err != nil { + return false, fmt.Errorf("deal: %s: failed to mark DDO deal migrated: %w", deal.ID.String(), err) + } } - // Mark deal added to harmonyDB - _, err = mdb.Exec(`INSERT INTO Deals (ID, DB, LID) VALUES (?, TRUE, FALSE) ON CONFLICT(ID) DO NOTHING`, deal.ID.String()) - if err != nil { - return fmt.Errorf("deal: %s: failed to mark DDO deal migrated: %w", deal.ID.String(), err) - } - } - - if !b { - // Add LID details to pieceDeal in HarmonyDB - _, err = hdb.Exec(ctx, `SELECT process_piece_deal($1, $2, $3, $4, $5, $6, $7, $8, $9)`, - deal.ID.String(), deal.PieceCID.String(), false, mid, deal.SectorID, deal.Offset, deal.PieceSize, deal.InboundFileSize, false) - if err != nil { - return fmt.Errorf("deal: %s: failed to update piece metadata and piece deal for DDO deal %s: %w", deal.ID.String(), deal.ID.String(), err) + if !b { + // Add LID details to pieceDeal in HarmonyDB + _, err = tx.Exec(`SELECT process_piece_deal($1, $2, $3, $4, $5, $6, $7, $8, $9)`, + deal.ID.String(), deal.PieceCID.String(), false, mid, deal.SectorID, deal.Offset, deal.PieceSize, deal.InboundFileSize, false) + if err != nil { + return false, fmt.Errorf("deal: %s: failed to update piece metadata and piece deal for DDO deal %s: %w", deal.ID.String(), deal.ID.String(), err) + } + + // Mark deal added to pieceDeal in HarmonyDB + _, err = mdb.Exec(`UPDATE Deals SET LID = TRUE WHERE ID = ?`, deal.ID.String()) + if err != nil { + return false, fmt.Errorf("deal: %s: failed to mark deal LID migrated: %w", deal.ID.String(), err) + } } - // Mark deal added to pieceDeal in HarmonyDB - _, err = mdb.Exec(`UPDATE Deals SET LID = TRUE WHERE ID = ?`, deal.ID.String()) - if err != nil { - return fmt.Errorf("deal: %s: failed to mark deal LID migrated: %w", deal.ID.String(), err) - } - } + // TODO: Confirm if using the mk12 pipeline will have any impact for DDO deals + if !c { + var proof abi.RegisteredSealProof + err = tx.QueryRow(`SELECT reg_seal_proof FROM sectors_meta WHERE sp_id = $1 AND sector_num = $2`, mid, deal.SectorID).Scan(&proof) + if err != nil { + return false, fmt.Errorf("deal: %s: failed to get sector proof: %w", deal.ID.String(), err) + } - // TODO: Confirm if using the mk12 pipeline will have any impact for DDO deals - if !c { - var proof abi.RegisteredSealProof - err = hdb.QueryRow(ctx, `SELECT reg_seal_proof FROM sectors_meta WHERE sp_id = $1 AND sector_num = $2`, mid, deal.SectorID).Scan(&proof) - if err != nil { - return fmt.Errorf("deal: %s: failed to get sector proof: %w", deal.ID.String(), err) - } - - // Add deal to mk12 pipeline in Curio for indexing and announcement - _, err = hdb.Exec(ctx, `INSERT INTO market_mk12_deal_pipeline (uuid, sp_id, started, piece_cid, piece_size, raw_size, offline, + // Add deal to mk12 pipeline in Curio for indexing and announcement + _, err = tx.Exec(`INSERT INTO market_mk12_deal_pipeline (uuid, sp_id, started, piece_cid, piece_size, raw_size, offline, after_commp, after_psd, after_find_deal, sector, reg_seal_proof, sector_offset, sealed, should_index, indexing_created_at, announce) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (uuid) DO NOTHING`, - deal.ID.String(), mid, true, deal.PieceCID.String(), deal.PieceSize, deal.InboundFileSize, true, - true, true, true, deal.SectorID, proof, deal.Offset, true, true, time.Now(), true) - if err != nil { - return fmt.Errorf("deal: %s: failed to add DDO deal to pipeline for indexing and announcing: %w", deal.ID.String(), err) + deal.ID.String(), mid, true, deal.PieceCID.String(), deal.PieceSize, deal.InboundFileSize, true, + true, true, true, deal.SectorID, proof, deal.Offset, true, true, time.Now(), true) + if err != nil { + return false, fmt.Errorf("deal: %s: failed to add DDO deal to pipeline for indexing and announcing: %w", deal.ID.String(), err) + } } + return true, nil + }, harmonydb.OptionRetry()) + if err != nil { + return err } } return nil } - -func generateNewKeys(ctx context.Context, maddr address.Address, hdb *harmonydb.DB) error { - - mid, err := address.IDFromAddress(maddr) - if err != nil { - return err - } - - pk, _, err := crypto.GenerateEd25519Key(rand.Reader) - if err != nil { - return fmt.Errorf("generating private key: %w", err) - } - - kbytes, err := crypto.MarshalPrivateKey(pk) - if err != nil { - return fmt.Errorf("marshaling private key: %w", err) - } - - _, err = hdb.Exec(ctx, `INSERT INTO libp2p (sp_id, priv_key) VALUES ($1, $2) ON CONFLICT(sp_id) DO NOTHING`, mid, kbytes) - if err != nil { - return fmt.Errorf("inserting private key into libp2p table: %w", err) - } - - return nil -}