From 9cb52ad6777175929b36bc98569b8bc4e12fc634 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Fri, 3 Jun 2022 10:54:34 -0700 Subject: [PATCH] Unify map-reduce and single-process index builders --- exp/lighthorizon/index/builder.go | 260 ++++++++++++- exp/lighthorizon/index/cmd/batch/map/main.go | 383 ++++--------------- exp/lighthorizon/index/cmd/single/main.go | 170 +------- 3 files changed, 323 insertions(+), 490 deletions(-) diff --git a/exp/lighthorizon/index/builder.go b/exp/lighthorizon/index/builder.go index cacdb17580..0fd554308b 100644 --- a/exp/lighthorizon/index/builder.go +++ b/exp/lighthorizon/index/builder.go @@ -4,27 +4,166 @@ import ( "context" "fmt" "io" + "math" + "sync/atomic" + "time" "github.com/stellar/go/historyarchive" "github.com/stellar/go/ingest" "github.com/stellar/go/ingest/ledgerbackend" + "github.com/stellar/go/support/errors" "github.com/stellar/go/support/log" "github.com/stellar/go/toid" "github.com/stellar/go/xdr" + "golang.org/x/sync/errgroup" ) -// Module is a way to process data and store it into an index. +func BuildIndices( + ctx context.Context, + sourceUrl string, // where is raw txmeta coming from? + targetUrl string, // where should the resulting indices go? + networkPassphrase string, + startLedger, endLedger uint32, + modules []string, + workerCount int, +) error { + indexStore, err := Connect(targetUrl) + if err != nil { + return err + } + + // Simple file os access + source, err := historyarchive.ConnectBackend( + sourceUrl, + historyarchive.ConnectOptions{ + Context: ctx, + NetworkPassphrase: networkPassphrase, + }, + ) + if err != nil { + return err + } + + ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(source) + defer ledgerBackend.Close() + + if endLedger == 0 { + latest, err := ledgerBackend.GetLatestLedgerSequence(ctx) + if err != nil { + return err + } + endLedger = latest + } + + ledgerCount := 1 + (endLedger - startLedger) // +1 because endLedger is inclusive + parallel := max(1, workerCount) + + startTime := time.Now() + log.Infof("Creating indices for ledger range: %d through %d (%d ledgers)", + startLedger, endLedger, ledgerCount) + log.Infof("Using %d workers", parallel) + + // Create a bunch of workers that process ledgers a checkpoint range at a + // time (better than a ledger at a time to minimize flushes). + wg, ctx := errgroup.WithContext(ctx) + ch := make(chan historyarchive.Range, parallel) + + indexBuilder := NewIndexBuilder(indexStore, ledgerBackend, networkPassphrase) + for _, part := range modules { + switch part { + case "transactions": + indexBuilder.RegisterModule(ProcessTransaction) + case "accounts": + indexBuilder.RegisterModule(ProcessAccounts) + case "accounts_unbacked": + indexBuilder.RegisterModule(ProcessAccountsWithoutBackend) + default: + return fmt.Errorf("Unknown module: %s", part) + } + } + + // Submit the work to the channels, breaking up the range into individual + // checkpoint ranges. + go func() { + // Recall: A ledger X is a checkpoint ledger iff (X + 1) % 64 == 0 + nextCheckpoint := (((startLedger / 64) * 64) + 63) + + ledger := startLedger + nextLedger := min(endLedger, ledger+(nextCheckpoint-startLedger)) + for ledger <= endLedger { + chunk := historyarchive.Range{Low: ledger, High: nextLedger} + log.Debugf("Submitted [%d, %d] for work", chunk.Low, chunk.High) + ch <- chunk + + ledger = nextLedger + 1 + nextLedger = min(endLedger, ledger+63) // don't exceed upper bound + } + + close(ch) + }() + + processed := uint64(0) + for i := 0; i < parallel; i++ { + wg.Go(func() error { + for ledgerRange := range ch { + count := (ledgerRange.High - ledgerRange.Low) + 1 + nprocessed := atomic.AddUint64(&processed, uint64(count)) + + log.Debugf("Working on checkpoint range [%d, %d]", + ledgerRange.Low, ledgerRange.High) + + // Assertion for testing + if ledgerRange.High != endLedger && (ledgerRange.High+1)%64 != 0 { + log.Fatalf("Upper ledger isn't a checkpoint: %v", ledgerRange) + } + + err = indexBuilder.Build(ctx, ledgerRange) + if err != nil { + return err + } + + printProgress("Reading ledgers", nprocessed, uint64(ledgerCount), startTime) + + // Upload indices once per checkpoint to save memory + if err := indexStore.Flush(); err != nil { + return errors.Wrap(err, "flushing indices failed") + } + } + return nil + }) + } + + if err := wg.Wait(); err != nil { + return errors.Wrap(err, "one or more workers failed") + } + + printProgress("Reading ledgers", uint64(ledgerCount), uint64(ledgerCount), startTime) + + // Assertion for testing + if processed != uint64(ledgerCount) { + log.Fatalf("processed %d but expected %d", processed, ledgerCount) + } + + log.Infof("Processed %d ledgers via %d workers", processed, parallel) + log.Infof("Uploading indices to %s", targetUrl) + if err := indexStore.Flush(); err != nil { + return errors.Wrap(err, "flushing indices failed") + } + + return nil +} + +// Module is a way to process ingested data and shove it into an index store. type Module func( - idx Store, + indexStore Store, ledger xdr.LedgerCloseMeta, - checkpoint uint32, transaction ingest.LedgerTransaction, ) error // IndexBuilder contains everything needed to build indices from ledger ranges. type IndexBuilder struct { store Store - history ledgerbackend.HistoryArchiveBackend + history *ledgerbackend.HistoryArchiveBackend networkPassphrase string modules []Module @@ -32,7 +171,7 @@ type IndexBuilder struct { func NewIndexBuilder( indexStore Store, - backend ledgerbackend.HistoryArchiveBackend, + backend *ledgerbackend.HistoryArchiveBackend, networkPassphrase string, ) *IndexBuilder { return &IndexBuilder{ @@ -51,11 +190,10 @@ func (builder *IndexBuilder) RegisterModule(module Module) { // RunModules executes all of the registered modules on the given ledger. func (builder *IndexBuilder) RunModules( ledger xdr.LedgerCloseMeta, - checkpoint uint32, tx ingest.LedgerTransaction, ) error { for _, module := range builder.modules { - if err := module(builder.store, ledger, checkpoint, tx); err != nil { + if err := module(builder.store, ledger, tx); err != nil { return err } } @@ -63,6 +201,12 @@ func (builder *IndexBuilder) RunModules( return nil } +// Build sequentially creates indices for each ledger in the given range based +// on the registered modules. +// +// TODO: We can probably optimize this by doing GetLedger in parallel with the +// ingestion & index building, since the network will be idle during the latter +// portion. func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchive.Range) error { for ledgerSeq := ledgerRange.Low; ledgerSeq <= ledgerRange.High; ledgerSeq++ { ledger, err := builder.history.GetLedger(ctx, ledgerSeq) @@ -71,8 +215,6 @@ func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchi return err } - checkpoint := (ledgerSeq / 64) + 1 - reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta( builder.networkPassphrase, ledger) if err != nil { @@ -87,7 +229,7 @@ func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchi return err } - if err := builder.RunModules(ledger, checkpoint, tx); err != nil { + if err := builder.RunModules(ledger, tx); err != nil { return err } } @@ -99,7 +241,6 @@ func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchi func ProcessTransaction( indexStore Store, ledger xdr.LedgerCloseMeta, - _ uint32, tx ingest.LedgerTransaction, ) error { return indexStore.AddTransactionToIndexes( @@ -110,10 +251,10 @@ func ProcessTransaction( func ProcessAccounts( indexStore Store, - _ xdr.LedgerCloseMeta, - checkpoint uint32, + ledger xdr.LedgerCloseMeta, tx ingest.LedgerTransaction, ) error { + checkpoint := (ledger.LedgerSequence() / 64) + 1 allParticipants, err := getParticipants(tx) if err != nil { return err @@ -148,6 +289,48 @@ func ProcessAccounts( return nil } + +func ProcessAccountsWithoutBackend( + indexStore Store, + ledger xdr.LedgerCloseMeta, + tx ingest.LedgerTransaction, +) error { + checkpoint := (ledger.LedgerSequence() / 64) + 1 + allParticipants, err := getParticipants(tx) + if err != nil { + return err + } + + err = indexStore.AddParticipantsToIndexesNoBackend(checkpoint, "all_all", allParticipants) + if err != nil { + return err + } + + paymentsParticipants, err := getPaymentParticipants(tx) + if err != nil { + return err + } + + err = indexStore.AddParticipantsToIndexesNoBackend(checkpoint, "all_payments", paymentsParticipants) + if err != nil { + return err + } + + if tx.Result.Successful() { + err = indexStore.AddParticipantsToIndexesNoBackend(checkpoint, "successful_all", allParticipants) + if err != nil { + return err + } + + err = indexStore.AddParticipantsToIndexesNoBackend(checkpoint, "successful_payments", paymentsParticipants) + if err != nil { + return err + } + } + + return nil +} + func getPaymentParticipants(transaction ingest.LedgerTransaction) ([]string, error) { return participantsForOperations(transaction, true) } @@ -263,13 +446,16 @@ func participantsForOperations(transaction ingest.LedgerTransaction, onlyPayment // Requires meta // sponsor, err := operation.getSponsor() // if err != nil { - // return nil, err + // return nil, err // } // if sponsor != nil { - // otherParticipants = append(otherParticipants, *sponsor) + // otherParticipants = append(otherParticipants, *sponsor) // } } + // FIXME: This could probably be a set rather than a list, since there's no + // reason to track a participating account more than once if they are + // participants across multiple operations. return participants, nil } @@ -292,3 +478,47 @@ func getLedgerKeyParticipants(ledgerKey xdr.LedgerKey) []string { } return []string{} } + +func printProgress(prefix string, done, total uint64, startTime time.Time) { + // This should never happen, more of a runtime assertion for now. + // We can remove it when production-ready. + if done > total { + panic(fmt.Errorf("error for %s: done > total (%d > %d)", + prefix, done, total)) + } + + progress := float64(done) / float64(total) + elapsed := time.Since(startTime) + + // Approximate based on how many ledgers are left and how long this much + // progress took, e.g. if 4/10 took 2s then 6/10 will "take" 3s (though this + // assumes consistent ledger load). + remaining := (float64(elapsed) / float64(done)) * float64(total-done) + + var remainingStr string + if math.IsInf(remaining, 0) || math.IsNaN(remaining) { + remainingStr = "unknown" + } else { + remainingStr = time.Duration(remaining).Round(time.Millisecond).String() + } + + log.Infof("%s - %.1f%% (%d/%d) - elapsed: %s, remaining: ~%s", prefix, + 100*progress, done, total, + elapsed.Round(time.Millisecond), + remainingStr, + ) +} + +func min(a, b uint32) uint32 { + if a < b { + return a + } + return b +} + +func max(a, b int) int { + if a > b { + return a + } + return b +} diff --git a/exp/lighthorizon/index/cmd/batch/map/main.go b/exp/lighthorizon/index/cmd/batch/map/main.go index bd49f7d50b..bd48849422 100644 --- a/exp/lighthorizon/index/cmd/batch/map/main.go +++ b/exp/lighthorizon/index/cmd/batch/map/main.go @@ -3,352 +3,109 @@ package main import ( "context" "fmt" - "io" "os" "strconv" - "sync/atomic" - "time" - "github.com/aws/aws-sdk-go/aws" "github.com/stellar/go/exp/lighthorizon/index" "github.com/stellar/go/historyarchive" - "github.com/stellar/go/ingest" "github.com/stellar/go/network" + "github.com/stellar/go/support/errors" "github.com/stellar/go/support/log" - "github.com/stellar/go/toid" - "github.com/stellar/go/xdr" - "golang.org/x/sync/errgroup" ) -var ( - // Should we use runtime.NumCPU() for a reasonable default? - parallel = uint32(20) -) +type BatchConfig struct { + historyarchive.Range + TxMetaSourceUrl, TargetUrl string +} -func main() { - log.SetLevel(log.InfoLevel) - startTime := time.Now() +const ( + batchSizeEnv = "BATCH_SIZE" + jobIndexEnv = "AWS_BATCH_JOB_ARRAY_INDEX" + firstCheckpointEnv = "FIRST_CHECKPOINT" + txmetaSourceUrlEnv = "TXMETA_SOURCE" + indexTargetUrlEnv = "INDEX_TARGET" - jobIndexString := os.Getenv("AWS_BATCH_JOB_ARRAY_INDEX") - if jobIndexString == "" { - panic("AWS_BATCH_JOB_ARRAY_INDEX env required") - } + s3BucketName = "sdf-txmeta-pubnet" +) - jobIndex, err := strconv.ParseUint(jobIndexString, 10, 64) +func NewS3BatchConfig() (*BatchConfig, error) { + jobIndex, err := strconv.ParseUint(os.Getenv(jobIndexEnv), 10, 32) if err != nil { - panic(err) + return nil, errors.Wrap(err, "invalid parameter "+jobIndexEnv) } - firstCheckpointString := os.Getenv("FIRST_CHECKPOINT") - firstCheckpoint, err := strconv.ParseUint(firstCheckpointString, 10, 64) - if err != nil { - panic(err) + url := fmt.Sprintf("s3://%s/job_%d?region=%s", s3BucketName, jobIndex, "us-east-1") + if err := os.Setenv(indexTargetUrlEnv, url); err != nil { + return nil, err } - batchSizeString := os.Getenv("BATCH_SIZE") - batchSize, err := strconv.ParseUint(batchSizeString, 10, 64) - if err != nil { - panic(err) - } + return NewBatchConfig() +} - startCheckpoint := uint32(firstCheckpoint + batchSize*jobIndex) - endCheckpoint := startCheckpoint + uint32(batchSize) - 1 +func NewBatchConfig() (*BatchConfig, error) { + targetUrl := os.Getenv(indexTargetUrlEnv) + if targetUrl == "" { + return nil, errors.New("required parameter: " + indexTargetUrlEnv) + } - indexStore, err := index.NewS3Store( - &aws.Config{Region: aws.String("us-east-1")}, - fmt.Sprintf("job_%d", jobIndex), - parallel, - ) + jobIndex, err := strconv.ParseUint(os.Getenv(jobIndexEnv), 10, 32) if err != nil { - panic(err) + return nil, errors.Wrap(err, "invalid parameter "+jobIndexEnv) } - historyArchive, err := historyarchive.Connect( - "s3://history.stellar.org/prd/core-live/core_live_001", - historyarchive.ConnectOptions{ - NetworkPassphrase: network.PublicNetworkPassphrase, - S3Region: "eu-west-1", - UnsignedRequests: true, - }, - ) + firstCheckpoint, err := strconv.ParseUint(os.Getenv(firstCheckpointEnv), 10, 32) if err != nil { - panic(err) + return nil, errors.Wrap(err, "invalid parameter "+firstCheckpointEnv) } - - all := endCheckpoint - startCheckpoint - - ctx := context.Background() - wg, _ := errgroup.WithContext(ctx) - - ch := make(chan uint32, parallel) - - go func() { - for i := startCheckpoint; i <= endCheckpoint; i++ { - ch <- i - } - close(ch) - }() - - processed := uint64(0) - for i := uint32(0); i < parallel; i++ { - wg.Go(func() error { - for checkpoint := range ch { - - startLedger := checkpoint * 64 - if startLedger == 0 { - startLedger = 1 - } - endLedger := checkpoint*64 + 64 - 1 - - log.Info("Processing checkpoint ", checkpoint, " ledgers ", startLedger, endLedger) - - ledgers, err := historyArchive.GetLedgers(startLedger, endLedger) - if err != nil { - log.WithField("error", err).Error("error getting ledgers") - ch <- checkpoint - continue - } - - for i := startLedger; i <= endLedger; i++ { - ledger, ok := ledgers[i] - if !ok { - return fmt.Errorf("no ledger %d", i) - } - - resultMeta := make([]xdr.TransactionResultMeta, len(ledger.TransactionResult.TxResultSet.Results)) - for i, result := range ledger.TransactionResult.TxResultSet.Results { - resultMeta[i].Result = result - } - - closeMeta := xdr.LedgerCloseMeta{ - V0: &xdr.LedgerCloseMetaV0{ - LedgerHeader: ledger.Header, - TxSet: ledger.Transaction.TxSet, - TxProcessing: resultMeta, - }, - } - - reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(network.PublicNetworkPassphrase, closeMeta) - if err != nil { - return err - } - - for { - tx, err := reader.Read() - if err != nil { - if err == io.EOF { - break - } - return err - } - - indexStore.AddTransactionToIndexes( - toid.New(int32(closeMeta.LedgerSequence()), int32(tx.Index), 0).ToInt64(), - tx.Result.TransactionHash, - ) - - allParticipants, err := participantsForOperations(tx, false) - if err != nil { - return err - } - - err = indexStore.AddParticipantsToIndexesNoBackend(checkpoint, "all_all", allParticipants) - if err != nil { - return err - } - - paymentsParticipants, err := participantsForOperations(tx, true) - if err != nil { - return err - } - - err = indexStore.AddParticipantsToIndexesNoBackend(checkpoint, "all_payments", paymentsParticipants) - if err != nil { - return err - } - - if tx.Result.Successful() { - allParticipants, err := participantsForOperations(tx, false) - if err != nil { - return err - } - - err = indexStore.AddParticipantsToIndexesNoBackend(checkpoint, "successful_all", allParticipants) - if err != nil { - return err - } - - paymentsParticipants, err := participantsForOperations(tx, true) - if err != nil { - return err - } - - err = indexStore.AddParticipantsToIndexesNoBackend(checkpoint, "successful_payments", paymentsParticipants) - if err != nil { - return err - } - } - } - } - - nprocessed := atomic.AddUint64(&processed, 1) - - if nprocessed%100 == 0 { - log.Infof( - "Reading checkpoints... - %.2f%% - elapsed: %s, remaining: %s", - (float64(nprocessed)/float64(all))*100, - time.Since(startTime).Round(1*time.Second), - (time.Duration(int64(time.Since(startTime))*int64(all)/int64(nprocessed)) - time.Since(startTime)).Round(1*time.Second), - ) - } - } - return nil - }) + if (firstCheckpoint+1)%64 != 0 { + return nil, fmt.Errorf("invalid checkpoint: %d", firstCheckpoint) } - if err := wg.Wait(); err != nil { - panic(err) - } - log.Infof("Uploading accounts") - if err := indexStore.FlushAccounts(); err != nil { - panic(err) - } - log.Infof("Uploading indexes") - if err := indexStore.Flush(); err != nil { - panic(err) + batchSize, err := strconv.ParseUint(os.Getenv(batchSizeEnv), 10, 32) + if err != nil { + return nil, errors.Wrap(err, "invalid parameter "+batchSizeEnv) } -} -func participantsForOperations(transaction ingest.LedgerTransaction, onlyPayments bool) ([]string, error) { - var participants []string + sourceUrl := os.Getenv(txmetaSourceUrlEnv) + if sourceUrl == "" { + return nil, errors.New("required parameter " + txmetaSourceUrlEnv) + } - for opindex, operation := range transaction.Envelope.Operations() { - opSource := operation.SourceAccount - if opSource == nil { - txSource := transaction.Envelope.SourceAccount() - opSource = &txSource - } + log.Debugf("%s: %d", batchSizeEnv, batchSize) + log.Debugf("%s: %d", jobIndexEnv, jobIndex) + log.Debugf("%s: %d", firstCheckpointEnv, firstCheckpoint) + log.Debugf("%s: %v", txmetaSourceUrlEnv, sourceUrl) - switch operation.Body.Type { - case xdr.OperationTypeCreateAccount, - xdr.OperationTypePayment, - xdr.OperationTypePathPaymentStrictReceive, - xdr.OperationTypePathPaymentStrictSend, - xdr.OperationTypeAccountMerge: - participants = append(participants, opSource.Address()) - default: - if onlyPayments { - continue - } - participants = append(participants, opSource.Address()) - } + startCheckpoint := uint32(firstCheckpoint + batchSize*jobIndex) + endCheckpoint := startCheckpoint + uint32(batchSize) - 1 + return &BatchConfig{ + Range: historyarchive.Range{Low: startCheckpoint, High: endCheckpoint}, + TxMetaSourceUrl: sourceUrl, + TargetUrl: targetUrl, + }, nil +} - switch operation.Body.Type { - case xdr.OperationTypeCreateAccount: - participants = append(participants, operation.Body.MustCreateAccountOp().Destination.Address()) - case xdr.OperationTypePayment: - participants = append(participants, operation.Body.MustPaymentOp().Destination.ToAccountId().Address()) - case xdr.OperationTypePathPaymentStrictReceive: - participants = append(participants, operation.Body.MustPathPaymentStrictReceiveOp().Destination.ToAccountId().Address()) - case xdr.OperationTypePathPaymentStrictSend: - participants = append(participants, operation.Body.MustPathPaymentStrictSendOp().Destination.ToAccountId().Address()) - case xdr.OperationTypeManageBuyOffer: - // the only direct participant is the source_account - case xdr.OperationTypeManageSellOffer: - // the only direct participant is the source_account - case xdr.OperationTypeCreatePassiveSellOffer: - // the only direct participant is the source_account - case xdr.OperationTypeSetOptions: - // the only direct participant is the source_account - case xdr.OperationTypeChangeTrust: - // the only direct participant is the source_account - case xdr.OperationTypeAllowTrust: - participants = append(participants, operation.Body.MustAllowTrustOp().Trustor.Address()) - case xdr.OperationTypeAccountMerge: - participants = append(participants, operation.Body.MustDestination().ToAccountId().Address()) - case xdr.OperationTypeInflation: - // the only direct participant is the source_account - case xdr.OperationTypeManageData: - // the only direct participant is the source_account - case xdr.OperationTypeBumpSequence: - // the only direct participant is the source_account - case xdr.OperationTypeCreateClaimableBalance: - for _, c := range operation.Body.MustCreateClaimableBalanceOp().Claimants { - participants = append(participants, c.MustV0().Destination.Address()) - } - case xdr.OperationTypeClaimClaimableBalance: - // the only direct participant is the source_account - case xdr.OperationTypeBeginSponsoringFutureReserves: - participants = append(participants, operation.Body.MustBeginSponsoringFutureReservesOp().SponsoredId.Address()) - case xdr.OperationTypeEndSponsoringFutureReserves: - // Failed transactions may not have a compliant sandwich structure - // we can rely on (e.g. invalid nesting or a being operation with the wrong sponsoree ID) - // and thus we bail out since we could return incorrect information. - if transaction.Result.Successful() { - sponsoree := transaction.Envelope.SourceAccount().ToAccountId().Address() - if operation.SourceAccount != nil { - sponsoree = operation.SourceAccount.Address() - } - operations := transaction.Envelope.Operations() - for i := int(opindex) - 1; i >= 0; i-- { - if beginOp, ok := operations[i].Body.GetBeginSponsoringFutureReservesOp(); ok && - beginOp.SponsoredId.Address() == sponsoree { - participants = append(participants, beginOp.SponsoredId.Address()) - } - } - } - case xdr.OperationTypeRevokeSponsorship: - op := operation.Body.MustRevokeSponsorshipOp() - switch op.Type { - case xdr.RevokeSponsorshipTypeRevokeSponsorshipLedgerEntry: - participants = append(participants, getLedgerKeyParticipants(*op.LedgerKey)...) - case xdr.RevokeSponsorshipTypeRevokeSponsorshipSigner: - participants = append(participants, op.Signer.AccountId.Address()) - // We don't add signer as a participant because a signer can be arbitrary account. - // This can spam successful operations history of any account. - } - case xdr.OperationTypeClawback: - op := operation.Body.MustClawbackOp() - participants = append(participants, op.From.ToAccountId().Address()) - case xdr.OperationTypeClawbackClaimableBalance: - // the only direct participant is the source_account - case xdr.OperationTypeSetTrustLineFlags: - op := operation.Body.MustSetTrustLineFlagsOp() - participants = append(participants, op.Trustor.Address()) - case xdr.OperationTypeLiquidityPoolDeposit: - // the only direct participant is the source_account - case xdr.OperationTypeLiquidityPoolWithdraw: - // the only direct participant is the source_account - default: - return nil, fmt.Errorf("unknown operation type: %s", operation.Body.Type) - } +func main() { + // log.SetLevel(log.DebugLevel) + log.SetLevel(log.InfoLevel) - // Requires meta - // sponsor, err := operation.getSponsor() - // if err != nil { - // return nil, err - // } - // if sponsor != nil { - // otherParticipants = append(otherParticipants, *sponsor) - // } + batch, err := NewBatchConfig() + if err != nil { + panic(err) } - return participants, nil -} - -func getLedgerKeyParticipants(ledgerKey xdr.LedgerKey) []string { - var result []string - switch ledgerKey.Type { - case xdr.LedgerEntryTypeAccount: - result = append(result, ledgerKey.Account.AccountId.Address()) - case xdr.LedgerEntryTypeClaimableBalance: - // nothing to do - case xdr.LedgerEntryTypeData: - result = append(result, ledgerKey.Data.AccountId.Address()) - case xdr.LedgerEntryTypeOffer: - result = append(result, ledgerKey.Offer.SellerId.Address()) - case xdr.LedgerEntryTypeTrustline: - result = append(result, ledgerKey.TrustLine.AccountId.Address()) + log.Infof("Uploading ledger range [%d, %d] to %s", + batch.Range.Low, batch.Range.High, batch.TargetUrl) + + if err := index.BuildIndices( + context.Background(), + batch.TxMetaSourceUrl, + batch.TargetUrl, + network.TestNetworkPassphrase, + batch.Low, batch.High, + []string{"transactions", "accounts_unbacked"}, + 1, + ); err != nil { + panic(err) } - return result } diff --git a/exp/lighthorizon/index/cmd/single/main.go b/exp/lighthorizon/index/cmd/single/main.go index 8d95c35315..374e5c1204 100644 --- a/exp/lighthorizon/index/cmd/single/main.go +++ b/exp/lighthorizon/index/cmd/single/main.go @@ -3,19 +3,12 @@ package main import ( "context" "flag" - "fmt" - "math" "runtime" "strings" - "sync/atomic" - "time" "github.com/stellar/go/exp/lighthorizon/index" - "github.com/stellar/go/historyarchive" - "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/network" "github.com/stellar/go/support/log" - "golang.org/x/sync/errgroup" ) func main() { @@ -33,159 +26,19 @@ func main() { flag.Parse() log.SetLevel(log.InfoLevel) - ctx := context.Background() - - indexStore, err := index.Connect(*targetUrl) - if err != nil { - panic(err) - } - - // Simple file os access - source, err := historyarchive.ConnectBackend( + err := index.BuildIndices( + context.Background(), *sourceUrl, - historyarchive.ConnectOptions{ - Context: context.Background(), - NetworkPassphrase: *networkPassphrase, - }, + *targetUrl, + *networkPassphrase, + uint32(max(*start, 2)), + uint32(*end), + strings.Split(*modules, ","), + *workerCount, ) if err != nil { panic(err) } - ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(source) - defer ledgerBackend.Close() - - startTime := time.Now() - - startLedger := uint32(max(*start, 2)) - endLedger := uint32(*end) - if endLedger < 0 { - latest, err := ledgerBackend.GetLatestLedgerSequence(ctx) - if err != nil { - panic(err) - } - endLedger = latest - } - ledgerCount := 1 + (endLedger - startLedger) // +1 because endLedger is inclusive - parallel := max(1, *workerCount) - - log.Infof("Creating indices for ledger range: %d through %d (%d ledgers)", - startLedger, endLedger, ledgerCount) - log.Infof("Using %d workers", parallel) - - // Create a bunch of workers that process ledgers a checkpoint range at a - // time (better than a ledger at a time to minimize flushes). - wg, ctx := errgroup.WithContext(ctx) - ch := make(chan historyarchive.Range, parallel) - - indexBuilder := index.NewIndexBuilder(indexStore, *ledgerBackend, *networkPassphrase) - for _, part := range strings.Split(*modules, ",") { - switch part { - case "transactions": - indexBuilder.RegisterModule(index.ProcessTransaction) - case "accounts": - indexBuilder.RegisterModule(index.ProcessAccounts) - default: - panic(fmt.Errorf("Unknown module: %s", part)) - } - } - - // Submit the work to the channels, breaking up the range into checkpoints. - go func() { - // Recall: A ledger X is a checkpoint ledger iff (X + 1) % 64 == 0 - nextCheckpoint := (((startLedger / 64) * 64) + 63) - - ledger := startLedger - nextLedger := ledger + (nextCheckpoint - startLedger) - for ledger <= endLedger { - ch <- historyarchive.Range{Low: ledger, High: nextLedger} - - ledger = nextLedger + 1 - // Ensure we don't exceed the upper ledger bound - nextLedger = uint32(min(int(endLedger), int(ledger+63))) - } - - close(ch) - }() - - processed := uint64(0) - for i := 0; i < parallel; i++ { - wg.Go(func() error { - for ledgerRange := range ch { - count := (ledgerRange.High - ledgerRange.Low) + 1 - nprocessed := atomic.AddUint64(&processed, uint64(count)) - - log.Debugf("Working on checkpoint range %+v", ledgerRange) - - // Assertion for testing - if ledgerRange.High != endLedger && - (ledgerRange.High+1)%64 != 0 { - log.Fatalf("Uh oh: bad range") - } - - err = indexBuilder.Build(ctx, ledgerRange) - if err != nil { - return err - } - - printProgress("Reading ledgers", - nprocessed, uint64(ledgerCount), startTime) - - // Upload indices once per checkpoint to save memory - if err := indexStore.Flush(); err != nil { - return err - } - } - return nil - }) - } - - if err := wg.Wait(); err != nil { - panic(err) - } - - printProgress("Reading ledgers", - uint64(ledgerCount), uint64(ledgerCount), startTime) - - // Assertion for testing - if processed != uint64(ledgerCount) { - log.Fatalf("processed %d but expected %d", processed, ledgerCount) - } - - log.Infof("Processed %d ledgers via %d workers", processed, parallel) - log.Infof("Uploading indices to %s", *targetUrl) - if err := indexStore.Flush(); err != nil { - panic(err) - } -} - -func printProgress(prefix string, done, total uint64, startTime time.Time) { - // This should never happen, more of a runtime assertion for now. - // We can remove it when production-ready. - if done > total { - panic(fmt.Errorf("error for %s: done > total (%d > %d)", - prefix, done, total)) - } - - progress := float64(done) / float64(total) - elapsed := time.Since(startTime) - - // Approximate based on how many ledgers are left and how long this much - // progress took, e.g. if 4/10 took 2s then 6/10 will "take" 3s (though this - // assumes consistent ledger load). - remaining := (float64(elapsed) / float64(done)) * float64(total-done) - - var remainingStr string - if math.IsInf(remaining, 0) || math.IsNaN(remaining) { - remainingStr = "unknown" - } else { - remainingStr = time.Duration(remaining).Round(time.Millisecond).String() - } - - log.Infof("%s - %.1f%% (%d/%d) - elapsed: %s, remaining: ~%s", prefix, - 100*progress, done, total, - elapsed.Round(time.Millisecond), - remainingStr, - ) } func max(a, b int) int { @@ -194,10 +47,3 @@ func max(a, b int) int { } return b } - -func min(a, b int) int { - if a < b { - return a - } - return b -}