Skip to content

Commit

Permalink
Refactor access to meta archive (#4488)
Browse files Browse the repository at this point in the history
Refactor `historyarchive` and `ledgerbackend` to allow better access to the new meta archives:
* Created `metaarchive` package that connects to the new meta archives (and
  allows accessing `xdr.SerializedLedgerCloseMeta`).
* Extracted `ArchiveBackend` to the new `support/storage` package as it contains
  only storage related methods. New package is used in both `historyarchive` and
  `metaarchive`.
  • Loading branch information
bartekn authored Aug 1, 2022
1 parent 3808eaf commit 1ac18f3
Show file tree
Hide file tree
Showing 27 changed files with 444 additions and 337 deletions.
15 changes: 9 additions & 6 deletions exp/lighthorizon/archive/ingest_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"github.com/stellar/go/exp/lighthorizon/index"
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/metaarchive"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/xdr"
Expand Down Expand Up @@ -48,18 +50,17 @@ func NewIngestArchive(config ArchiveConfig) (Archive, error) {
// a local on-disk LRU cache if we can.
source, err := historyarchive.ConnectBackend(
config.SourceUrl,
historyarchive.ConnectOptions{
Context: context.Background(),
NetworkPassphrase: config.NetworkPassphrase,
S3Region: region,
storage.ConnectOptions{
Context: context.Background(),
S3Region: region,
},
)
if err != nil {
return nil, err
}

if needsCache {
cache, err := historyarchive.MakeFsCacheBackend(source,
cache, err := storage.MakeOnDiskCache(source,
config.CacheDir, uint(config.CacheSize))

if err != nil { // warn but continue w/o cache
Expand All @@ -73,7 +74,9 @@ func NewIngestArchive(config ArchiveConfig) (Archive, error) {
}
}

ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(source)
metaArchive := metaarchive.NewMetaArchive(source)

ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(metaArchive)
return ingestArchive{ledgerBackend}, nil
}

Expand Down
30 changes: 16 additions & 14 deletions exp/lighthorizon/index/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ import (
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/metaarchive"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -44,18 +46,18 @@ func BuildIndices(
// with the filesystem directly.
source, err := historyarchive.ConnectBackend(
sourceUrl,
historyarchive.ConnectOptions{
Context: ctx,
NetworkPassphrase: networkPassphrase,
S3Region: "us-east-1",
storage.ConnectOptions{
Context: ctx,
S3Region: "us-east-1",
},
)
if err != nil {
return nil, err
}

ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(source)
defer ledgerBackend.Close()
metaArchive := metaarchive.NewMetaArchive(source)

ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(metaArchive)

if ledgerRange.High == 0 {
var backendErr error
Expand All @@ -82,7 +84,7 @@ func BuildIndices(
wg, ctx := errgroup.WithContext(ctx)
ch := make(chan historyarchive.Range, parallel)

indexBuilder := NewIndexBuilder(indexStore, ledgerBackend, networkPassphrase)
indexBuilder := NewIndexBuilder(indexStore, *metaArchive, networkPassphrase)
for _, part := range modules {
switch part {
case "transactions":
Expand Down Expand Up @@ -172,7 +174,7 @@ type Module func(
// IndexBuilder contains everything needed to build indices from ledger ranges.
type IndexBuilder struct {
store Store
ledgerBackend ledgerbackend.LedgerBackend
metaArchive metaarchive.MetaArchive
networkPassphrase string

lastBuiltLedgerWriteLock sync.Mutex
Expand All @@ -183,12 +185,12 @@ type IndexBuilder struct {

func NewIndexBuilder(
indexStore Store,
backend ledgerbackend.LedgerBackend,
metaArchive metaarchive.MetaArchive,
networkPassphrase string,
) *IndexBuilder {
return &IndexBuilder{
store: indexStore,
ledgerBackend: backend,
metaArchive: metaArchive,
networkPassphrase: networkPassphrase,
}
}
Expand Down Expand Up @@ -221,7 +223,7 @@ func (builder *IndexBuilder) RunModules(
// portion.
func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchive.Range) error {
for ledgerSeq := ledgerRange.Low; ledgerSeq <= ledgerRange.High; ledgerSeq++ {
ledger, err := builder.ledgerBackend.GetLedger(ctx, ledgerSeq)
ledger, err := builder.metaArchive.GetLedger(ctx, ledgerSeq)
if err != nil {
if !os.IsNotExist(err) {
log.Errorf("error getting ledger %d: %v", ledgerSeq, err)
Expand All @@ -230,7 +232,7 @@ func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchi
}

reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(
builder.networkPassphrase, ledger)
builder.networkPassphrase, *ledger.V0)
if err != nil {
return err
}
Expand All @@ -243,7 +245,7 @@ func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchi
return err
}

if err := builder.RunModules(ledger, tx); err != nil {
if err := builder.RunModules(*ledger.V0, tx); err != nil {
return err
}
}
Expand All @@ -257,7 +259,7 @@ func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchi
}

func (builder *IndexBuilder) Watch(ctx context.Context) error {
latestLedger, err := builder.ledgerBackend.GetLatestLedgerSequence(ctx)
latestLedger, err := builder.metaArchive.GetLatestLedgerSequence(ctx)
if err != nil {
log.Errorf("Failed to retrieve latest ledger: %v", err)
return err
Expand Down
14 changes: 9 additions & 5 deletions exp/lighthorizon/index/cmd/single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/metaarchive"
"github.com/stellar/go/network"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/toid"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -183,14 +185,16 @@ func IndexLedgerRange(
ctx := context.Background()
backend, err := historyarchive.ConnectBackend(
txmetaSource,
historyarchive.ConnectOptions{
Context: ctx,
NetworkPassphrase: network.TestNetworkPassphrase,
S3Region: "us-east-1",
storage.ConnectOptions{
Context: ctx,
S3Region: "us-east-1",
},
)
require.NoError(t, err)
ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(backend)

metaArchive := metaarchive.NewMetaArchive(backend)

ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(metaArchive)
defer ledgerBackend.Close()

participation := make(map[string][]uint32)
Expand Down
12 changes: 6 additions & 6 deletions exp/services/ledgerexporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/network"
supportlog "github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -57,9 +58,8 @@ func main() {

target, err := historyarchive.ConnectBackend(
*targetUrl,
historyarchive.ConnectOptions{
Context: context.Background(),
NetworkPassphrase: params.NetworkPassphrase,
storage.ConnectOptions{
Context: context.Background(),
},
)
logFatalIf(err, "Could not connect to target")
Expand Down Expand Up @@ -119,7 +119,7 @@ func main() {

// readLatestLedger determines the latest ledger in the given backend (at the
// /latest path), defaulting to Ledger #2 if one doesn't exist
func readLatestLedger(backend historyarchive.ArchiveBackend) uint32 {
func readLatestLedger(backend storage.Storage) uint32 {
r, err := backend.GetFile("latest")
if os.IsNotExist(err) {
return 2
Expand All @@ -140,7 +140,7 @@ func readLatestLedger(backend historyarchive.ArchiveBackend) uint32 {
// writeLedger stores the given LedgerCloseMeta instance as a raw binary at the
// /ledgers/<seqNum> path. If an error is returned, it may be transient so you
// should attempt to retry.
func writeLedger(backend historyarchive.ArchiveBackend, ledger xdr.LedgerCloseMeta) error {
func writeLedger(backend storage.Storage, ledger xdr.LedgerCloseMeta) error {
toSerialize := xdr.SerializedLedgerCloseMeta{
V: 0,
V0: &ledger,
Expand All @@ -153,7 +153,7 @@ func writeLedger(backend historyarchive.ArchiveBackend, ledger xdr.LedgerCloseMe
)
}

func writeLatestLedger(backend historyarchive.ArchiveBackend, ledger uint32) error {
func writeLatestLedger(backend storage.Storage, ledger uint32) error {
return backend.PutFile(
"latest",
io.NopCloser(
Expand Down
4 changes: 2 additions & 2 deletions exp/tools/dump-ledger-state/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,13 +307,13 @@ func archive(testnet bool) (*historyarchive.Archive, error) {
if testnet {
return historyarchive.Connect(
"https://history.stellar.org/prd/core-testnet/core_testnet_001",
historyarchive.ConnectOptions{},
historyarchive.ArchiveOptions{},
)
}

return historyarchive.Connect(
fmt.Sprintf("https://history.stellar.org/prd/core-live/core_live_001/"),
historyarchive.ConnectOptions{},
historyarchive.ArchiveOptions{},
)
}

Expand Down
4 changes: 2 additions & 2 deletions exp/tools/dump-orderbook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ func archive(testnet bool) (*historyarchive.Archive, error) {
if testnet {
return historyarchive.Connect(
"https://history.stellar.org/prd/core-testnet/core_testnet_001",
historyarchive.ConnectOptions{},
historyarchive.ArchiveOptions{},
)
}

return historyarchive.Connect(
fmt.Sprintf("https://history.stellar.org/prd/core-live/core_live_001/"),
historyarchive.ConnectOptions{},
historyarchive.ArchiveOptions{},
)
}
71 changes: 15 additions & 56 deletions historyarchive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
log "github.com/sirupsen/logrus"

"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"
)

Expand All @@ -37,22 +38,16 @@ type CommandOptions struct {
SkipOptional bool
}

type ConnectOptions struct {
Context context.Context
type ArchiveOptions struct {
// NetworkPassphrase defines the expected network of history archive. It is
// checked when getting HAS. If network passphrase does not match, error is
// returned.
NetworkPassphrase string
S3Region string
S3Endpoint string
UnsignedRequests bool
GCSEndpoint string
// CheckpointFrequency is the number of ledgers between checkpoints
// if unset, DefaultCheckpointFrequency will be used
CheckpointFrequency uint32

// Wrap the archivebackend after connection. For example, to add a caching or introspection layer.
Wrap func(ArchiveBackend) (ArchiveBackend, error)
storage.ConnectOptions
}

type Ledger struct {
Expand All @@ -61,16 +56,6 @@ type Ledger struct {
TransactionResult xdr.TransactionHistoryResultEntry
}

type ArchiveBackend interface {
Exists(path string) (bool, error)
Size(path string) (int64, error)
GetFile(path string) (io.ReadCloser, error)
PutFile(path string, in io.ReadCloser) error
ListFiles(path string) (chan string, chan error)
CanListFiles() bool
Close() error
}

type ArchiveInterface interface {
GetPathHAS(path string) (HistoryArchiveState, error)
PutPathHAS(path string, has HistoryArchiveState, opts *CommandOptions) error
Expand Down Expand Up @@ -117,7 +102,7 @@ type Archive struct {

checkpointManager CheckpointManager

backend ArchiveBackend
backend storage.Storage
}

func (arch *Archive) GetCheckpointManager() CheckpointManager {
Expand Down Expand Up @@ -381,7 +366,7 @@ func (a *Archive) GetXdrStream(pth string) (*XdrStream, error) {
return NewXdrGzStream(rdr)
}

func Connect(u string, opts ConnectOptions) (*Archive, error) {
func Connect(u string, opts ArchiveOptions) (*Archive, error) {
arch := Archive{
networkPassphrase: opts.NetworkPassphrase,
checkpointFiles: make(map[string](map[uint32]bool)),
Expand All @@ -399,16 +384,16 @@ func Connect(u string, opts ConnectOptions) (*Archive, error) {
arch.checkpointFiles[cat] = make(map[uint32]bool)
}

if opts.Context == nil {
opts.Context = context.Background()
if opts.ConnectOptions.Context == nil {
opts.ConnectOptions.Context = context.Background()
}

var err error
arch.backend, err = ConnectBackend(u, opts)
arch.backend, err = ConnectBackend(u, opts.ConnectOptions)
return &arch, err
}

func ConnectBackend(u string, opts ConnectOptions) (ArchiveBackend, error) {
func ConnectBackend(u string, opts storage.ConnectOptions) (storage.Storage, error) {
if u == "" {
return nil, errors.New("URL is empty")
}
Expand All @@ -418,43 +403,17 @@ func ConnectBackend(u string, opts ConnectOptions) (ArchiveBackend, error) {
return nil, err
}

if opts.Context == nil {
opts.Context = context.Background()
var backend storage.Storage
if parsed.Scheme == "mock" {
backend = makeMockBackend()
} else {
backend, err = storage.ConnectBackend(u, opts)
}

pth := parsed.Path
var backend ArchiveBackend
switch parsed.Scheme {
case "s3":
// Inside s3, all paths start _without_ the leading /
pth = strings.TrimPrefix(pth, "/")
backend, err = makeS3Backend(parsed.Host, pth, opts)

case "gcs":
// Inside gcs, all paths start _without_ the leading /
pth = strings.TrimPrefix(pth, "/")
backend, err = makeGCSBackend(parsed.Host, pth, opts)

case "file":
pth = path.Join(parsed.Host, pth)
backend = makeFsBackend(pth, opts)

case "http", "https":
backend = makeHttpBackend(parsed, opts)

case "mock":
backend = makeMockBackend(opts)

default:
err = errors.New("unknown URL scheme: '" + parsed.Scheme + "'")
}
if err == nil && opts.Wrap != nil {
backend, err = opts.Wrap(backend)
}
return backend, err
}

func MustConnect(u string, opts ConnectOptions) *Archive {
func MustConnect(u string, opts ArchiveOptions) *Archive {
arch, err := Connect(u, opts)
if err != nil {
log.Fatal(err)
Expand Down
Loading

0 comments on commit 1ac18f3

Please sign in to comment.