Skip to content

Commit

Permalink
Update to use BufferedStorageBackend to read txmeta files
Browse files Browse the repository at this point in the history
  • Loading branch information
chowbao committed May 7, 2024
1 parent 834b7bb commit fdc8857
Show file tree
Hide file tree
Showing 25 changed files with 229 additions and 166 deletions.
10 changes: 5 additions & 5 deletions cmd/export_account_signers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ should be used in an initial data dump. In order to get account information with
the export_ledger_entry_changes command.`,
Run: func(cmd *cobra.Command, args []string) {
cmdLogger.SetLevel(logrus.InfoLevel)
endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = strictExport
env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl)
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath)
path := utils.MustBucketFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)

accounts, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeAccount, env.ArchiveURLs)
accounts, err := input.GetEntriesFromGenesis(commonArgs.EndNum, xdr.LedgerEntryTypeAccount, env.ArchiveURLs)
if err != nil {
cmdLogger.Fatal("could not read accounts: ", err)
}
Expand All @@ -48,7 +48,7 @@ the export_ledger_entry_changes command.`,
}

for _, entry := range transformed {
numBytes, err := exportEntry(entry, outFile, extra)
numBytes, err := exportEntry(entry, outFile, commonArgs.Extra)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not export entry: %v", err))
numFailures += 1
Expand Down
10 changes: 5 additions & 5 deletions cmd/export_accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ should be used in an initial data dump. In order to get account information with
the export_ledger_entry_changes command.`,
Run: func(cmd *cobra.Command, args []string) {
cmdLogger.SetLevel(logrus.InfoLevel)
endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = strictExport
env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl)
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath)
path := utils.MustBucketFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)

accounts, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeAccount, env.ArchiveURLs)
accounts, err := input.GetEntriesFromGenesis(commonArgs.EndNum, xdr.LedgerEntryTypeAccount, env.ArchiveURLs)
if err != nil {
cmdLogger.Fatal("could not read accounts: ", err)
}
Expand All @@ -45,7 +45,7 @@ the export_ledger_entry_changes command.`,
continue
}

numBytes, err := exportEntry(transformed, outFile, extra)
numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not export entry: %v", err))
numFailures += 1
Expand Down
18 changes: 9 additions & 9 deletions cmd/export_all_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,23 @@ This is a temporary command used to reduce the amount of requests to history arc
in order to mitigate egress costs for the entity hosting history archives.`,
Run: func(cmd *cobra.Command, args []string) {
cmdLogger.SetLevel(logrus.InfoLevel)
endNum, strictExport, isTest, isFuture, extra, useCaptiveCore, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = strictExport
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl)
env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath)

allHistory, err := input.GetAllHistory(startNum, endNum, limit, env, useCaptiveCore)
allHistory, err := input.GetAllHistory(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore)
if err != nil {
cmdLogger.Fatal("could not read all history: ", err)
}

cmdLogger.Info("start doing other exports")
getOperations(allHistory.Operations, extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_operations.txt", env)
getTrades(allHistory.Trades, extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_trades.txt")
getEffects(allHistory.Ledgers, extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_effects.txt", env)
getTransactions(allHistory.Ledgers, extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_transactions.txt")
getDiagnosticEvents(allHistory.Ledgers, extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_diagnostic_events.txt")
getOperations(allHistory.Operations, commonArgs.Extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_operations.txt", env)
getTrades(allHistory.Trades, commonArgs.Extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_trades.txt")
getEffects(allHistory.Ledgers, commonArgs.Extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_effects.txt", env)
getTransactions(allHistory.Ledgers, commonArgs.Extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_transactions.txt")
getDiagnosticEvents(allHistory.Ledgers, commonArgs.Extra, cloudStorageBucket, cloudCredentials, cloudProvider, path+"exported_diagnostic_events.txt")
cmdLogger.Info("done doing other exports")
},
}
Expand Down
14 changes: 7 additions & 7 deletions cmd/export_assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@ var assetsCmd = &cobra.Command{
Long: `Exports the assets that are created from payment operations over a specified ledger range`,
Run: func(cmd *cobra.Command, args []string) {
cmdLogger.SetLevel(logrus.InfoLevel)
endNum, strictExport, isTest, isFuture, extra, useCaptiveCore, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = strictExport
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl)
env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath)

outFile := mustOutFile(path)

var paymentOps []input.AssetTransformInput
var err error

if useCaptiveCore {
paymentOps, err = input.GetPaymentOperationsHistoryArchive(startNum, endNum, limit, env, useCaptiveCore)
if commonArgs.UseCaptiveCore {
paymentOps, err = input.GetPaymentOperationsHistoryArchive(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore)
} else {
paymentOps, err = input.GetPaymentOperations(startNum, endNum, limit, env, useCaptiveCore)
paymentOps, err = input.GetPaymentOperations(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore)
}
if err != nil {
cmdLogger.Fatal("could not read asset: ", err)
Expand All @@ -55,7 +55,7 @@ var assetsCmd = &cobra.Command{
}

seenIDs[transformed.AssetID] = true
numBytes, err := exportEntry(transformed, outFile, extra)
numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra)
if err != nil {
cmdLogger.Error(err)
numFailures += 1
Expand Down
10 changes: 5 additions & 5 deletions cmd/export_claimable_balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ var claimableBalancesCmd = &cobra.Command{
the export_ledger_entry_changes command.`,
Run: func(cmd *cobra.Command, args []string) {
cmdLogger.SetLevel(logrus.InfoLevel)
endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = strictExport
env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl)
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath)
path := utils.MustBucketFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)

balances, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeClaimableBalance, env.ArchiveURLs)
balances, err := input.GetEntriesFromGenesis(commonArgs.EndNum, xdr.LedgerEntryTypeClaimableBalance, env.ArchiveURLs)
if err != nil {
cmdLogger.Fatal("could not read balances: ", err)
}
Expand All @@ -45,7 +45,7 @@ var claimableBalancesCmd = &cobra.Command{
continue
}

numBytes, err := exportEntry(transformed, outFile, extra)
numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not export balance %+v: %v", balance, err))
numFailures += 1
Expand Down
10 changes: 5 additions & 5 deletions cmd/export_config_setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ var configSettingCmd = &cobra.Command{
the export_ledger_entry_changes command.`,
Run: func(cmd *cobra.Command, args []string) {
cmdLogger.SetLevel(logrus.InfoLevel)
endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = strictExport
env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl)
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath)
path := utils.MustBucketFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)

settings, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeConfigSetting, env.ArchiveURLs)
settings, err := input.GetEntriesFromGenesis(commonArgs.EndNum, xdr.LedgerEntryTypeConfigSetting, env.ArchiveURLs)
if err != nil {
cmdLogger.Fatal("Error getting ledger entries: ", err)
}
Expand All @@ -45,7 +45,7 @@ var configSettingCmd = &cobra.Command{
continue
}

numBytes, err := exportEntry(transformed, outFile, extra)
numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not export config setting %+v: %v", setting, err))
numFailures += 1
Expand Down
10 changes: 5 additions & 5 deletions cmd/export_contract_code.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ var codeCmd = &cobra.Command{
the export_ledger_entry_changes command.`,
Run: func(cmd *cobra.Command, args []string) {
cmdLogger.SetLevel(logrus.InfoLevel)
endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = strictExport
env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl)
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath)
path := utils.MustBucketFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)

codes, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeContractCode, env.ArchiveURLs)
codes, err := input.GetEntriesFromGenesis(commonArgs.EndNum, xdr.LedgerEntryTypeContractCode, env.ArchiveURLs)
if err != nil {
cmdLogger.Fatal("Error getting ledger entries: ", err)
}
Expand All @@ -45,7 +45,7 @@ var codeCmd = &cobra.Command{
continue
}

numBytes, err := exportEntry(transformed, outFile, extra)
numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not export contract code %+v: %v", code, err))
numFailures += 1
Expand Down
10 changes: 5 additions & 5 deletions cmd/export_contract_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ var dataCmd = &cobra.Command{
the export_ledger_entry_changes command.`,
Run: func(cmd *cobra.Command, args []string) {
cmdLogger.SetLevel(logrus.InfoLevel)
endNum, strictExport, isTest, isFuture, extra, _, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = strictExport
env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl)
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath)
path := utils.MustBucketFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)

datas, err := input.GetEntriesFromGenesis(endNum, xdr.LedgerEntryTypeContractData, env.ArchiveURLs)
datas, err := input.GetEntriesFromGenesis(commonArgs.EndNum, xdr.LedgerEntryTypeContractData, env.ArchiveURLs)
if err != nil {
cmdLogger.Fatal("Error getting ledger entries: ", err)
}
Expand All @@ -50,7 +50,7 @@ var dataCmd = &cobra.Command{
continue
}

numBytes, err := exportEntry(transformed, outFile, extra)
numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not export contract data %+v: %v", data, err))
numFailures += 1
Expand Down
10 changes: 5 additions & 5 deletions cmd/export_diagnostic_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ var diagnosticEventsCmd = &cobra.Command{
Long: `Exports the diagnostic events over a specified range to an output file.`,
Run: func(cmd *cobra.Command, args []string) {
cmdLogger.SetLevel(logrus.InfoLevel)
endNum, strictExport, isTest, isFuture, extra, useCaptiveCore, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = strictExport
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl)
env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath)

transactions, err := input.GetTransactions(startNum, endNum, limit, env, useCaptiveCore)
transactions, err := input.GetTransactions(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore)
if err != nil {
cmdLogger.Fatal("could not read transactions: ", err)
}
Expand All @@ -42,7 +42,7 @@ var diagnosticEventsCmd = &cobra.Command{
continue
}
for _, diagnosticEvent := range transformed {
_, err := exportEntry(diagnosticEvent, outFile, extra)
_, err := exportEntry(diagnosticEvent, outFile, commonArgs.Extra)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not export diagnostic event: %v", err))
numFailures += 1
Expand Down
12 changes: 6 additions & 6 deletions cmd/export_effects.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ var effectsCmd = &cobra.Command{
Long: "Exports the effects data over a specified range to an output file.",
Run: func(cmd *cobra.Command, args []string) {
cmdLogger.SetLevel(logrus.InfoLevel)
endNum, strictExport, isTest, isFuture, extra, useCaptiveCore, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = strictExport
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl)
env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath)

transactions, err := input.GetTransactions(startNum, endNum, limit, env, useCaptiveCore)
transactions, err := input.GetTransactions(startNum, commonArgs.EndNum, limit, env, commonArgs.UseCaptiveCore)
if err != nil {
cmdLogger.Fatalf("could not read transactions in [%d, %d] (limit=%d): %v", startNum, endNum, limit, err)
cmdLogger.Fatalf("could not read transactions in [%d, %d] (limit=%d): %v", startNum, commonArgs.EndNum, limit, err)
}

outFile := mustOutFile(path)
Expand All @@ -39,7 +39,7 @@ var effectsCmd = &cobra.Command{
}

for _, transformed := range effects {
numBytes, err := exportEntry(transformed, outFile, extra)
numBytes, err := exportEntry(transformed, outFile, commonArgs.Extra)
if err != nil {
cmdLogger.LogError(err)
numFailures += 1
Expand Down
29 changes: 19 additions & 10 deletions cmd/export_ledger_entry_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ confirmed by the Stellar network.
If no data type flags are set, then by default all of them are exported. If any are set, it is assumed that the others should not
be exported.`,
Run: func(cmd *cobra.Command, args []string) {
endNum, strictExport, isTest, isFuture, extra, useCaptiveCore, datastoreUrl := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = strictExport
env := utils.GetEnvironmentDetails(isTest, isFuture, datastoreUrl)
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
env := utils.GetEnvironmentDetails(commonArgs.IsTest, commonArgs.IsFuture, commonArgs.DatastorePath)

_, configPath, startNum, batchSize, outputFolder := utils.MustCoreFlags(cmd.Flags(), cmdLogger)
exports := utils.MustExportTypeFlags(cmd.Flags(), cmdLogger)
Expand Down Expand Up @@ -62,28 +62,28 @@ be exported.`,
}
}

if configPath == "" && endNum == 0 {
if configPath == "" && commonArgs.EndNum == 0 {
cmdLogger.Fatal("stellar-core needs a config file path when exporting ledgers continuously (endNum = 0)")
}

ctx := context.Background()
backend, err := utils.CreateLedgerBackend(ctx, useCaptiveCore, env)
backend, err := utils.CreateLedgerBackend(ctx, commonArgs.UseCaptiveCore, env)
if err != nil {
cmdLogger.Fatal("error creating a cloud storage backend: ", err)
}

err = backend.PrepareRange(ctx, ledgerbackend.BoundedRange(startNum, endNum))
err = backend.PrepareRange(ctx, ledgerbackend.BoundedRange(startNum, commonArgs.EndNum))
if err != nil {
cmdLogger.Fatal("error preparing ledger range for cloud storage backend: ", err)
}

if endNum == 0 {
endNum = math.MaxInt32
if commonArgs.EndNum == 0 {
commonArgs.EndNum = math.MaxInt32
}

changeChan := make(chan input.ChangeBatch)
closeChan := make(chan int)
go input.StreamChanges(&backend, startNum, endNum, batchSize, changeChan, closeChan, env, cmdLogger)
go input.StreamChanges(&backend, startNum, commonArgs.EndNum, batchSize, changeChan, closeChan, env, cmdLogger)

for {
select {
Expand Down Expand Up @@ -252,7 +252,16 @@ be exported.`,
}
}

err := exportTransformedData(batch.BatchStart, batch.BatchEnd, outputFolder, transformedOutputs, cloudCredentials, cloudStorageBucket, cloudProvider, extra)
err := exportTransformedData(
batch.BatchStart,
batch.BatchEnd,
outputFolder,
transformedOutputs,
cloudCredentials,
cloudStorageBucket,
cloudProvider,
commonArgs.Extra,
)
if err != nil {
cmdLogger.LogError(err)
continue
Expand Down
Loading

0 comments on commit fdc8857

Please sign in to comment.