-
Notifications
You must be signed in to change notification settings - Fork 502
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
exp/lighthorizon: Unify map-reduce and single-process index builders #4423
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,35 +4,174 @@ import ( | |
"context" | ||
"fmt" | ||
"io" | ||
"math" | ||
"sync/atomic" | ||
"time" | ||
|
||
"github.com/stellar/go/historyarchive" | ||
"github.com/stellar/go/ingest" | ||
"github.com/stellar/go/ingest/ledgerbackend" | ||
"github.com/stellar/go/support/errors" | ||
"github.com/stellar/go/support/log" | ||
"github.com/stellar/go/toid" | ||
"github.com/stellar/go/xdr" | ||
"golang.org/x/sync/errgroup" | ||
) | ||
|
||
// Module is a way to process data and store it into an index. | ||
func BuildIndices( | ||
ctx context.Context, | ||
sourceUrl string, // where is raw txmeta coming from? | ||
targetUrl string, // where should the resulting indices go? | ||
networkPassphrase string, | ||
startLedger, endLedger uint32, | ||
modules []string, | ||
workerCount int, | ||
) error { | ||
indexStore, err := Connect(targetUrl) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Simple file os access | ||
source, err := historyarchive.ConnectBackend( | ||
sourceUrl, | ||
historyarchive.ConnectOptions{ | ||
Context: ctx, | ||
NetworkPassphrase: networkPassphrase, | ||
}, | ||
) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(source) | ||
defer ledgerBackend.Close() | ||
|
||
if endLedger == 0 { | ||
latest, err := ledgerBackend.GetLatestLedgerSequence(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
endLedger = latest | ||
} | ||
|
||
ledgerCount := 1 + (endLedger - startLedger) // +1 because endLedger is inclusive | ||
parallel := max(1, workerCount) | ||
|
||
startTime := time.Now() | ||
log.Infof("Creating indices for ledger range: %d through %d (%d ledgers)", | ||
startLedger, endLedger, ledgerCount) | ||
log.Infof("Using %d workers", parallel) | ||
|
||
// Create a bunch of workers that process ledgers a checkpoint range at a | ||
// time (better than a ledger at a time to minimize flushes). | ||
wg, ctx := errgroup.WithContext(ctx) | ||
ch := make(chan historyarchive.Range, parallel) | ||
|
||
indexBuilder := NewIndexBuilder(indexStore, ledgerBackend, networkPassphrase) | ||
for _, part := range modules { | ||
switch part { | ||
case "transactions": | ||
indexBuilder.RegisterModule(ProcessTransaction) | ||
case "accounts": | ||
indexBuilder.RegisterModule(ProcessAccounts) | ||
case "accounts_unbacked": | ||
indexBuilder.RegisterModule(ProcessAccountsWithoutBackend) | ||
default: | ||
return fmt.Errorf("Unknown module: %s", part) | ||
} | ||
} | ||
|
||
// Submit the work to the channels, breaking up the range into individual | ||
// checkpoint ranges. | ||
go func() { | ||
// Recall: A ledger X is a checkpoint ledger iff (X + 1) % 64 == 0 | ||
nextCheckpoint := (((startLedger / 64) * 64) + 63) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Ideal use case for |
||
|
||
ledger := startLedger | ||
nextLedger := min(endLedger, ledger+(nextCheckpoint-startLedger)) | ||
for ledger <= endLedger { | ||
chunk := historyarchive.Range{Low: ledger, High: nextLedger} | ||
log.Debugf("Submitted [%d, %d] for work", chunk.Low, chunk.High) | ||
ch <- chunk | ||
|
||
ledger = nextLedger + 1 | ||
nextLedger = min(endLedger, ledger+63) // don't exceed upper bound | ||
} | ||
|
||
close(ch) | ||
}() | ||
|
||
processed := uint64(0) | ||
for i := 0; i < parallel; i++ { | ||
wg.Go(func() error { | ||
for ledgerRange := range ch { | ||
count := (ledgerRange.High - ledgerRange.Low) + 1 | ||
nprocessed := atomic.AddUint64(&processed, uint64(count)) | ||
|
||
log.Debugf("Working on checkpoint range [%d, %d]", | ||
ledgerRange.Low, ledgerRange.High) | ||
|
||
// Assertion for testing | ||
if ledgerRange.High != endLedger && (ledgerRange.High+1)%64 != 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
log.Fatalf("Upper ledger isn't a checkpoint: %v", ledgerRange) | ||
} | ||
|
||
err = indexBuilder.Build(ctx, ledgerRange) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
printProgress("Reading ledgers", nprocessed, uint64(ledgerCount), startTime) | ||
|
||
// Upload indices once per checkpoint to save memory | ||
if err := indexStore.Flush(); err != nil { | ||
return errors.Wrap(err, "flushing indices failed") | ||
} | ||
} | ||
return nil | ||
}) | ||
} | ||
|
||
if err := wg.Wait(); err != nil { | ||
return errors.Wrap(err, "one or more workers failed") | ||
} | ||
|
||
printProgress("Reading ledgers", uint64(ledgerCount), uint64(ledgerCount), startTime) | ||
|
||
// Assertion for testing | ||
if processed != uint64(ledgerCount) { | ||
log.Fatalf("processed %d but expected %d", processed, ledgerCount) | ||
} | ||
|
||
log.Infof("Processed %d ledgers via %d workers", processed, parallel) | ||
log.Infof("Uploading indices to %s", targetUrl) | ||
if err := indexStore.Flush(); err != nil { | ||
return errors.Wrap(err, "flushing indices failed") | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// Module is a way to process ingested data and shove it into an index store. | ||
type Module func( | ||
idx Store, | ||
indexStore Store, | ||
ledger xdr.LedgerCloseMeta, | ||
checkpoint uint32, | ||
transaction ingest.LedgerTransaction, | ||
) error | ||
|
||
// IndexBuilder contains everything needed to build indices from ledger ranges. | ||
type IndexBuilder struct { | ||
store Store | ||
history ledgerbackend.HistoryArchiveBackend | ||
history *ledgerbackend.HistoryArchiveBackend | ||
networkPassphrase string | ||
|
||
modules []Module | ||
} | ||
|
||
func NewIndexBuilder( | ||
indexStore Store, | ||
backend ledgerbackend.HistoryArchiveBackend, | ||
backend *ledgerbackend.HistoryArchiveBackend, | ||
networkPassphrase string, | ||
) *IndexBuilder { | ||
return &IndexBuilder{ | ||
|
@@ -51,18 +190,23 @@ func (builder *IndexBuilder) RegisterModule(module Module) { | |
// RunModules executes all of the registered modules on the given ledger. | ||
func (builder *IndexBuilder) RunModules( | ||
ledger xdr.LedgerCloseMeta, | ||
checkpoint uint32, | ||
tx ingest.LedgerTransaction, | ||
) error { | ||
for _, module := range builder.modules { | ||
if err := module(builder.store, ledger, checkpoint, tx); err != nil { | ||
if err := module(builder.store, ledger, tx); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// Build sequentially creates indices for each ledger in the given range based | ||
// on the registered modules. | ||
// | ||
// TODO: We can probably optimize this by doing GetLedger in parallel with the | ||
// ingestion & index building, since the network will be idle during the latter | ||
// portion. | ||
func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchive.Range) error { | ||
for ledgerSeq := ledgerRange.Low; ledgerSeq <= ledgerRange.High; ledgerSeq++ { | ||
ledger, err := builder.history.GetLedger(ctx, ledgerSeq) | ||
|
@@ -71,8 +215,6 @@ func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchi | |
return err | ||
} | ||
|
||
checkpoint := (ledgerSeq / 64) + 1 | ||
|
||
reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta( | ||
builder.networkPassphrase, ledger) | ||
if err != nil { | ||
|
@@ -87,7 +229,7 @@ func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchi | |
return err | ||
} | ||
|
||
if err := builder.RunModules(ledger, checkpoint, tx); err != nil { | ||
if err := builder.RunModules(ledger, tx); err != nil { | ||
return err | ||
} | ||
} | ||
|
@@ -99,7 +241,6 @@ func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchi | |
func ProcessTransaction( | ||
indexStore Store, | ||
ledger xdr.LedgerCloseMeta, | ||
_ uint32, | ||
tx ingest.LedgerTransaction, | ||
) error { | ||
return indexStore.AddTransactionToIndexes( | ||
|
@@ -110,10 +251,10 @@ func ProcessTransaction( | |
|
||
func ProcessAccounts( | ||
indexStore Store, | ||
_ xdr.LedgerCloseMeta, | ||
checkpoint uint32, | ||
ledger xdr.LedgerCloseMeta, | ||
tx ingest.LedgerTransaction, | ||
) error { | ||
checkpoint := (ledger.LedgerSequence() / 64) + 1 | ||
allParticipants, err := getParticipants(tx) | ||
if err != nil { | ||
return err | ||
|
@@ -148,6 +289,48 @@ func ProcessAccounts( | |
|
||
return nil | ||
} | ||
|
||
func ProcessAccountsWithoutBackend( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We probably want a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was no code doing that, so I didn't add it 🤷♂️ This new module was ripped straight from the existing variant, and you can see here that transactions were added with a backend in the original version. |
||
indexStore Store, | ||
ledger xdr.LedgerCloseMeta, | ||
tx ingest.LedgerTransaction, | ||
) error { | ||
checkpoint := (ledger.LedgerSequence() / 64) + 1 | ||
allParticipants, err := getParticipants(tx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
err = indexStore.AddParticipantsToIndexesNoBackend(checkpoint, "all_all", allParticipants) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
paymentsParticipants, err := getPaymentParticipants(tx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
err = indexStore.AddParticipantsToIndexesNoBackend(checkpoint, "all_payments", paymentsParticipants) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if tx.Result.Successful() { | ||
err = indexStore.AddParticipantsToIndexesNoBackend(checkpoint, "successful_all", allParticipants) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
err = indexStore.AddParticipantsToIndexesNoBackend(checkpoint, "successful_payments", paymentsParticipants) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func getPaymentParticipants(transaction ingest.LedgerTransaction) ([]string, error) { | ||
return participantsForOperations(transaction, true) | ||
} | ||
|
@@ -263,13 +446,16 @@ func participantsForOperations(transaction ingest.LedgerTransaction, onlyPayment | |
// Requires meta | ||
// sponsor, err := operation.getSponsor() | ||
// if err != nil { | ||
// return nil, err | ||
// return nil, err | ||
// } | ||
// if sponsor != nil { | ||
// otherParticipants = append(otherParticipants, *sponsor) | ||
// otherParticipants = append(otherParticipants, *sponsor) | ||
// } | ||
} | ||
|
||
// FIXME: This could probably be a set rather than a list, since there's no | ||
// reason to track a participating account more than once if they are | ||
// participants across multiple operations. | ||
return participants, nil | ||
} | ||
|
||
|
@@ -292,3 +478,47 @@ func getLedgerKeyParticipants(ledgerKey xdr.LedgerKey) []string { | |
} | ||
return []string{} | ||
} | ||
|
||
func printProgress(prefix string, done, total uint64, startTime time.Time) { | ||
// This should never happen, more of a runtime assertion for now. | ||
// We can remove it when production-ready. | ||
if done > total { | ||
panic(fmt.Errorf("error for %s: done > total (%d > %d)", | ||
prefix, done, total)) | ||
} | ||
|
||
progress := float64(done) / float64(total) | ||
elapsed := time.Since(startTime) | ||
|
||
// Approximate based on how many ledgers are left and how long this much | ||
// progress took, e.g. if 4/10 took 2s then 6/10 will "take" 3s (though this | ||
// assumes consistent ledger load). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we could improve the estimation by counting ops instead of ledgers? |
||
remaining := (float64(elapsed) / float64(done)) * float64(total-done) | ||
|
||
var remainingStr string | ||
if math.IsInf(remaining, 0) || math.IsNaN(remaining) { | ||
remainingStr = "unknown" | ||
} else { | ||
remainingStr = time.Duration(remaining).Round(time.Millisecond).String() | ||
} | ||
|
||
log.Infof("%s - %.1f%% (%d/%d) - elapsed: %s, remaining: ~%s", prefix, | ||
100*progress, done, total, | ||
elapsed.Round(time.Millisecond), | ||
remainingStr, | ||
) | ||
} | ||
|
||
func min(a, b uint32) uint32 { | ||
if a < b { | ||
return a | ||
} | ||
return b | ||
} | ||
|
||
func max(a, b int) int { | ||
if a > b { | ||
return a | ||
} | ||
return b | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it worthwhile to use stronger type like enum to represent module types as they increase in number?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, because these are supposed to be plug-and-play. Neither their execution order nor enum value would ever matter.