Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor access to meta archive #4488

Merged
merged 7 commits into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions exp/lighthorizon/archive/ingest_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"github.com/stellar/go/exp/lighthorizon/index"
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/metaarchive"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/xdr"
Expand Down Expand Up @@ -48,18 +50,17 @@ func NewIngestArchive(config ArchiveConfig) (Archive, error) {
// a local on-disk LRU cache if we can.
source, err := historyarchive.ConnectBackend(
config.SourceUrl,
historyarchive.ConnectOptions{
Context: context.Background(),
NetworkPassphrase: config.NetworkPassphrase,
S3Region: region,
storage.ConnectOptions{
Context: context.Background(),
S3Region: region,
},
)
if err != nil {
return nil, err
}

if needsCache {
cache, err := historyarchive.MakeFsCacheBackend(source,
cache, err := storage.MakeOnDiskCache(source,
config.CacheDir, uint(config.CacheSize))

if err != nil { // warn but continue w/o cache
Expand All @@ -73,7 +74,9 @@ func NewIngestArchive(config ArchiveConfig) (Archive, error) {
}
}

ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(source)
metaArchive := metaarchive.NewMetaArchive(source)

ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(metaArchive)
return ingestArchive{ledgerBackend}, nil
}

Expand Down
30 changes: 16 additions & 14 deletions exp/lighthorizon/index/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ import (
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/metaarchive"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -44,18 +46,18 @@ func BuildIndices(
// with the filesystem directly.
source, err := historyarchive.ConnectBackend(
sourceUrl,
historyarchive.ConnectOptions{
Context: ctx,
NetworkPassphrase: networkPassphrase,
S3Region: "us-east-1",
storage.ConnectOptions{
Context: ctx,
S3Region: "us-east-1",
},
)
if err != nil {
return nil, err
}

ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(source)
defer ledgerBackend.Close()
metaArchive := metaarchive.NewMetaArchive(source)

ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(metaArchive)

if ledgerRange.High == 0 {
var backendErr error
Expand All @@ -82,7 +84,7 @@ func BuildIndices(
wg, ctx := errgroup.WithContext(ctx)
ch := make(chan historyarchive.Range, parallel)

indexBuilder := NewIndexBuilder(indexStore, ledgerBackend, networkPassphrase)
indexBuilder := NewIndexBuilder(indexStore, *metaArchive, networkPassphrase)
for _, part := range modules {
switch part {
case "transactions":
Expand Down Expand Up @@ -170,7 +172,7 @@ type Module func(
// IndexBuilder contains everything needed to build indices from ledger ranges.
type IndexBuilder struct {
store Store
ledgerBackend ledgerbackend.LedgerBackend
metaArchive metaarchive.MetaArchive
networkPassphrase string

lastBuiltLedgerWriteLock sync.Mutex
Expand All @@ -181,12 +183,12 @@ type IndexBuilder struct {

func NewIndexBuilder(
indexStore Store,
backend ledgerbackend.LedgerBackend,
metaArchive metaarchive.MetaArchive,
networkPassphrase string,
) *IndexBuilder {
return &IndexBuilder{
store: indexStore,
ledgerBackend: backend,
metaArchive: metaArchive,
networkPassphrase: networkPassphrase,
}
}
Expand Down Expand Up @@ -219,7 +221,7 @@ func (builder *IndexBuilder) RunModules(
// portion.
func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchive.Range) error {
for ledgerSeq := ledgerRange.Low; ledgerSeq <= ledgerRange.High; ledgerSeq++ {
ledger, err := builder.ledgerBackend.GetLedger(ctx, ledgerSeq)
ledger, err := builder.metaArchive.GetLedger(ctx, ledgerSeq)
if err != nil {
if !os.IsNotExist(err) {
log.Errorf("error getting ledger %d: %v", ledgerSeq, err)
Expand All @@ -228,7 +230,7 @@ func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchi
}

reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(
builder.networkPassphrase, ledger)
builder.networkPassphrase, *ledger.V0)
if err != nil {
return err
}
Expand All @@ -241,7 +243,7 @@ func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchi
return err
}

if err := builder.RunModules(ledger, tx); err != nil {
if err := builder.RunModules(*ledger.V0, tx); err != nil {
return err
}
}
Expand All @@ -255,7 +257,7 @@ func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchi
}

func (builder *IndexBuilder) Watch(ctx context.Context) error {
latestLedger, err := builder.ledgerBackend.GetLatestLedgerSequence(ctx)
latestLedger, err := builder.metaArchive.GetLatestLedgerSequence(ctx)
if err != nil {
log.Errorf("Failed to retrieve latest ledger: %v", err)
return err
Expand Down
14 changes: 9 additions & 5 deletions exp/lighthorizon/index/cmd/single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/metaarchive"
"github.com/stellar/go/network"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/toid"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -183,14 +185,16 @@ func IndexLedgerRange(
ctx := context.Background()
backend, err := historyarchive.ConnectBackend(
txmetaSource,
historyarchive.ConnectOptions{
Context: ctx,
NetworkPassphrase: network.TestNetworkPassphrase,
S3Region: "us-east-1",
storage.ConnectOptions{
Context: ctx,
S3Region: "us-east-1",
},
)
require.NoError(t, err)
ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(backend)

metaArchive := metaarchive.NewMetaArchive(backend)

ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(metaArchive)
defer ledgerBackend.Close()

participation := make(map[string][]uint32)
Expand Down
12 changes: 6 additions & 6 deletions exp/services/ledgerexporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/network"
supportlog "github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -57,9 +58,8 @@ func main() {

target, err := historyarchive.ConnectBackend(
*targetUrl,
historyarchive.ConnectOptions{
Context: context.Background(),
NetworkPassphrase: params.NetworkPassphrase,
storage.ConnectOptions{
Context: context.Background(),
},
)
logFatalIf(err, "Could not connect to target")
Expand Down Expand Up @@ -119,7 +119,7 @@ func main() {

// readLatestLedger determines the latest ledger in the given backend (at the
// /latest path), defaulting to Ledger #2 if one doesn't exist
func readLatestLedger(backend historyarchive.ArchiveBackend) uint32 {
func readLatestLedger(backend storage.Storage) uint32 {
r, err := backend.GetFile("latest")
if os.IsNotExist(err) {
return 2
Expand All @@ -140,7 +140,7 @@ func readLatestLedger(backend historyarchive.ArchiveBackend) uint32 {
// writeLedger stores the given LedgerCloseMeta instance as a raw binary at the
// /ledgers/<seqNum> path. If an error is returned, it may be transient so you
// should attempt to retry.
func writeLedger(backend historyarchive.ArchiveBackend, ledger xdr.LedgerCloseMeta) error {
func writeLedger(backend storage.Storage, ledger xdr.LedgerCloseMeta) error {
toSerialize := xdr.SerializedLedgerCloseMeta{
V: 0,
V0: &ledger,
Expand All @@ -153,7 +153,7 @@ func writeLedger(backend historyarchive.ArchiveBackend, ledger xdr.LedgerCloseMe
)
}

func writeLatestLedger(backend historyarchive.ArchiveBackend, ledger uint32) error {
func writeLatestLedger(backend storage.Storage, ledger uint32) error {
return backend.PutFile(
"latest",
io.NopCloser(
Expand Down
4 changes: 2 additions & 2 deletions exp/tools/dump-ledger-state/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,13 +307,13 @@ func archive(testnet bool) (*historyarchive.Archive, error) {
if testnet {
return historyarchive.Connect(
"https://history.stellar.org/prd/core-testnet/core_testnet_001",
historyarchive.ConnectOptions{},
historyarchive.ArchiveOptions{},
)
}

return historyarchive.Connect(
fmt.Sprintf("https://history.stellar.org/prd/core-live/core_live_001/"),
historyarchive.ConnectOptions{},
historyarchive.ArchiveOptions{},
)
}

Expand Down
4 changes: 2 additions & 2 deletions exp/tools/dump-orderbook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ func archive(testnet bool) (*historyarchive.Archive, error) {
if testnet {
return historyarchive.Connect(
"https://history.stellar.org/prd/core-testnet/core_testnet_001",
historyarchive.ConnectOptions{},
historyarchive.ArchiveOptions{},
)
}

return historyarchive.Connect(
fmt.Sprintf("https://history.stellar.org/prd/core-live/core_live_001/"),
historyarchive.ConnectOptions{},
historyarchive.ArchiveOptions{},
)
}
71 changes: 15 additions & 56 deletions historyarchive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
log "github.com/sirupsen/logrus"

"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"
)

Expand All @@ -37,22 +38,16 @@ type CommandOptions struct {
SkipOptional bool
}

type ConnectOptions struct {
Context context.Context
type ArchiveOptions struct {
// NetworkPassphrase defines the expected network of history archive. It is
// checked when getting HAS. If network passphrase does not match, error is
// returned.
NetworkPassphrase string
S3Region string
S3Endpoint string
UnsignedRequests bool
GCSEndpoint string
// CheckpointFrequency is the number of ledgers between checkpoints
// if unset, DefaultCheckpointFrequency will be used
CheckpointFrequency uint32

// Wrap the archivebackend after connection. For example, to add a caching or introspection layer.
Wrap func(ArchiveBackend) (ArchiveBackend, error)
storage.ConnectOptions
}

type Ledger struct {
Expand All @@ -61,16 +56,6 @@ type Ledger struct {
TransactionResult xdr.TransactionHistoryResultEntry
}

type ArchiveBackend interface {
Exists(path string) (bool, error)
Size(path string) (int64, error)
GetFile(path string) (io.ReadCloser, error)
PutFile(path string, in io.ReadCloser) error
ListFiles(path string) (chan string, chan error)
CanListFiles() bool
Close() error
}

type ArchiveInterface interface {
GetPathHAS(path string) (HistoryArchiveState, error)
PutPathHAS(path string, has HistoryArchiveState, opts *CommandOptions) error
Expand Down Expand Up @@ -117,7 +102,7 @@ type Archive struct {

checkpointManager CheckpointManager

backend ArchiveBackend
backend storage.Storage
}

func (arch *Archive) GetCheckpointManager() CheckpointManager {
Expand Down Expand Up @@ -381,7 +366,7 @@ func (a *Archive) GetXdrStream(pth string) (*XdrStream, error) {
return NewXdrGzStream(rdr)
}

func Connect(u string, opts ConnectOptions) (*Archive, error) {
func Connect(u string, opts ArchiveOptions) (*Archive, error) {
arch := Archive{
networkPassphrase: opts.NetworkPassphrase,
checkpointFiles: make(map[string](map[uint32]bool)),
Expand All @@ -399,16 +384,16 @@ func Connect(u string, opts ConnectOptions) (*Archive, error) {
arch.checkpointFiles[cat] = make(map[uint32]bool)
}

if opts.Context == nil {
opts.Context = context.Background()
if opts.ConnectOptions.Context == nil {
opts.ConnectOptions.Context = context.Background()
}

var err error
arch.backend, err = ConnectBackend(u, opts)
arch.backend, err = ConnectBackend(u, opts.ConnectOptions)
return &arch, err
}

func ConnectBackend(u string, opts ConnectOptions) (ArchiveBackend, error) {
func ConnectBackend(u string, opts storage.ConnectOptions) (storage.Storage, error) {
if u == "" {
return nil, errors.New("URL is empty")
}
Expand All @@ -418,43 +403,17 @@ func ConnectBackend(u string, opts ConnectOptions) (ArchiveBackend, error) {
return nil, err
}

if opts.Context == nil {
opts.Context = context.Background()
var backend storage.Storage
if parsed.Scheme == "mock" {
backend = makeMockBackend()
} else {
backend, err = storage.ConnectBackend(u, opts)
}

pth := parsed.Path
var backend ArchiveBackend
switch parsed.Scheme {
case "s3":
// Inside s3, all paths start _without_ the leading /
pth = strings.TrimPrefix(pth, "/")
backend, err = makeS3Backend(parsed.Host, pth, opts)

case "gcs":
// Inside gcs, all paths start _without_ the leading /
pth = strings.TrimPrefix(pth, "/")
backend, err = makeGCSBackend(parsed.Host, pth, opts)

case "file":
pth = path.Join(parsed.Host, pth)
backend = makeFsBackend(pth, opts)

case "http", "https":
backend = makeHttpBackend(parsed, opts)

case "mock":
backend = makeMockBackend(opts)

default:
err = errors.New("unknown URL scheme: '" + parsed.Scheme + "'")
}
if err == nil && opts.Wrap != nil {
backend, err = opts.Wrap(backend)
}
return backend, err
}

func MustConnect(u string, opts ConnectOptions) *Archive {
func MustConnect(u string, opts ArchiveOptions) *Archive {
arch, err := Connect(u, opts)
if err != nil {
log.Fatal(err)
Expand Down
Loading