Skip to content

Commit

Permalink
exp/ingest: Ingest Session (#1456)
Browse files Browse the repository at this point in the history
This commit adds two `Session` implementations (`LiveSession` and
`SingleLedgerSession`) and a simple `horizon-demo` tool.

The goal of this commit is to implement the last missing component of
`exp/ingest` that connects all the existing packages together:
`Session`. `Session` is connecting to history archives and/or ledger
backend and passes data to one or two pipelines (state pipeline and
ledger pipeline).

`Session` supports one of the use cases developers can interact with
Stellar ledger. For example: `LiveSession` initializes the state and
then follows the new ledgers and processes transactions (it's running
indefinitely). On the contrary `SingleLedgerSession` processes the state
of a single ledger and terminates. More sessions will be added in a
future (ex. `RangeSession` that processes data between two ledgers).

It also contains a simple demo app called `horizon-demo` (`go run
./exp/tools/horizon-demo`) that's using `LiveSession` internally.
`horizon-demo` is reading data from history archives and ledger backend
and 1) updates accounts for signers, 2) inserts transactions to a
database and 3) updates in-memory orderbook graph.
  • Loading branch information
bartekn authored Jul 5, 2019
1 parent a13280e commit 8cd5bd9
Show file tree
Hide file tree
Showing 43 changed files with 2,102 additions and 496 deletions.
2 changes: 1 addition & 1 deletion exp/ingest/adapters/history_archive_adapter.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package ingestadapters
package adapters

import (
"fmt"
Expand Down
2 changes: 1 addition & 1 deletion exp/ingest/adapters/history_archive_adapter_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package ingestadapters
package adapters

import (
"fmt"
Expand Down
5 changes: 2 additions & 3 deletions exp/ingest/adapters/ledger_backend_adapter.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package ingestadapters
package adapters

import (
"errors"
Expand Down Expand Up @@ -28,8 +28,7 @@ func (lba *LedgerBackendAdapter) GetLedger(sequence uint32) (io.LedgerReadCloser
return nil, errors.New(noBackendErr)
}

dblrc := io.MakeLedgerReadCloser(sequence, lba.Backend)
return dblrc, nil
return io.MakeLedgerReadCloser(sequence, lba.Backend)
}

// Close shuts down the provided backend.
Expand Down
2 changes: 1 addition & 1 deletion exp/ingest/adapters/ledger_backend_adapter_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package ingestadapters
package adapters

import (
"testing"
Expand Down
5 changes: 1 addition & 4 deletions exp/ingest/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,7 @@ func useAdapter() {
s := lrc.GetSequence()
fmt.Println("lrc sequence:", s)

h, err := lrc.GetHeader()
if err != nil {
log.Fatal(err)
}
h := lrc.GetHeader()
fmt.Println("lrc header:", h)

for {
Expand Down
31 changes: 16 additions & 15 deletions exp/ingest/io/ledger_read_closer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,6 @@ import (
"github.com/stellar/go/xdr"
)

// LedgerTransaction represents the data for a single transaction within a ledger.
type LedgerTransaction struct {
Index uint32
Envelope xdr.TransactionEnvelope
Result xdr.TransactionResultPair
Meta xdr.TransactionMeta
FeeChanges xdr.LedgerEntryChanges
}

// DBLedgerReadCloser is a database-backed implementation of the io.LedgerReadCloser interface.
type DBLedgerReadCloser struct {
sequence uint32
Expand All @@ -33,11 +24,19 @@ type DBLedgerReadCloser struct {
var _ LedgerReadCloser = (*DBLedgerReadCloser)(nil)

// MakeLedgerReadCloser is a factory method for LedgerReadCloser.
func MakeLedgerReadCloser(sequence uint32, backend ledgerbackend.LedgerBackend) *DBLedgerReadCloser {
return &DBLedgerReadCloser{
func MakeLedgerReadCloser(sequence uint32, backend ledgerbackend.LedgerBackend) (*DBLedgerReadCloser, error) {
reader := &DBLedgerReadCloser{
sequence: sequence,
backend: backend,
}

var err error
reader.initOnce.Do(func() { err = reader.init() })
if err != nil {
return nil, err
}

return reader, nil
}

// GetSequence returns the sequence number of the ledger data stored by this object.
Expand All @@ -46,13 +45,15 @@ func (dblrc *DBLedgerReadCloser) GetSequence() uint32 {
}

// GetHeader returns the XDR Header data associated with the stored ledger.
func (dblrc *DBLedgerReadCloser) GetHeader() (xdr.LedgerHeaderHistoryEntry, error) {
func (dblrc *DBLedgerReadCloser) GetHeader() xdr.LedgerHeaderHistoryEntry {
var err error
dblrc.initOnce.Do(func() { err = dblrc.init() })
if err != nil {
return xdr.LedgerHeaderHistoryEntry{}, err
// TODO, object should be initialized in constructor.
// Not returning error here, makes this much simpler.
panic(err)
}
return dblrc.header, nil
return dblrc.header
}

// Read returns the next transaction in the ledger, ordered by tx number, each time it is called. When there
Expand Down Expand Up @@ -92,7 +93,7 @@ func (dblrc *DBLedgerReadCloser) init() error {
return errors.Wrap(err, "error reading ledger from backend")
}
if !exists {
return errors.Wrap(err, "ledger was not found")
return ErrNotFound
}

dblrc.header = ledgerCloseMeta.LedgerHeader
Expand Down
128 changes: 128 additions & 0 deletions exp/ingest/io/ledger_transaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package io

import (
"github.com/stellar/go/xdr"
)

// Change is a developer friendly representation of LedgerEntryChanges.
// It also provides some helper functions to quickly check if a given
// change has occured in an entry.
//
// If an entry is created: Pre is nil and Post is not nil.
// If an entry is updated: Pre is not nil and Post is not nil.
// If an entry is removed: Pre is not nil and Post is nil.
type Change struct {
Type xdr.LedgerEntryType
Pre *xdr.LedgerEntryData
Post *xdr.LedgerEntryData
}

func (c *Change) AccountSignersChanged() bool {
if c.Type != xdr.LedgerEntryTypeAccount {
panic("This should not be called on changes other than Account changes")
}

// Signers must be removed before merging an account and it's
// impossible to add signers during a creation of a new account.
if c.Pre == nil || c.Post == nil {
return false
}

if len(c.Pre.MustAccount().Signers) != len(c.Post.MustAccount().Signers) {
return true
}

signers := map[string]uint32{} // signer => weight

for _, signer := range c.Pre.MustAccount().Signers {
signers[signer.Key.Address()] = uint32(signer.Weight)
}

for _, signer := range c.Post.MustAccount().Signers {
weight, exist := signers[signer.Key.Address()]
if !exist {
return false
}

if weight != uint32(signer.Weight) {
return false
}
}

// TODO should it also change on master key weight changes?

return false
}

// GetChanges returns a developer friendly representation of LedgerEntryChanges.
// It contains fee changes, transaction changes and operation changes in that
// order.
func (t *LedgerTransaction) GetChanges() []Change {
// Fee meta
changes := getChangesFromLedgerEntryChanges(t.FeeChanges)

// Transaction meta
v1Meta, ok := t.Meta.GetV1()
if ok {
txChanges := getChangesFromLedgerEntryChanges(v1Meta.TxChanges)
changes = append(changes, txChanges...)
}

// Operation meta
for _, operationMeta := range t.Meta.OperationsMeta() {
ledgerEntryChanges := operationMeta.Changes
opChanges := getChangesFromLedgerEntryChanges(ledgerEntryChanges)

changes = append(changes, opChanges...)
}

return changes
}

// getChangesFromLedgerEntryChanges transforms LedgerEntryChanges to []Change.
// Each `update` and `removed` is preceded with `state` and `create` changes
// are alone, without `state`. The transformation we're doing is to move each
// change (state/update, state/removed or create) to an array of pre/post pairs.
// Then:
// - for create, pre is null and post is a new entry,
// - for update, pre is previous state and post is the current state,
// - for removed, pre is previous state and post is null.
//
// stellar-core source:
// https://github.com/stellar/stellar-core/blob/e584b43/src/ledger/LedgerTxn.cpp#L582
func getChangesFromLedgerEntryChanges(ledgerEntryChanges xdr.LedgerEntryChanges) []Change {
changes := []Change{}

for i, entryChange := range ledgerEntryChanges {
switch entryChange.Type {
case xdr.LedgerEntryChangeTypeLedgerEntryCreated:
created := entryChange.MustCreated()
changes = append(changes, Change{
Type: created.Data.Type,
Pre: nil,
Post: &created.Data,
})
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
state := ledgerEntryChanges[i-1].MustState()
updated := entryChange.MustUpdated()
changes = append(changes, Change{
Type: state.Data.Type,
Pre: &state.Data,
Post: &updated.Data,
})
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
state := ledgerEntryChanges[i-1].MustState()
changes = append(changes, Change{
Type: state.Data.Type,
Pre: &state.Data,
Post: nil,
})
case xdr.LedgerEntryChangeTypeLedgerEntryState:
continue
default:
panic("Invalid LedgerEntryChangeType")
}
}

return changes
}
21 changes: 19 additions & 2 deletions exp/ingest/io/main.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package io

import (
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)

var ErrNotFound = errors.New("not found")

// StateReadCloser interface placeholder
type StateReadCloser interface {
GetSequence() uint32
Expand Down Expand Up @@ -32,21 +35,35 @@ type StateWriteCloser interface {
// LedgerReadCloser provides convenient, streaming access to the transactions within a ledger.
type LedgerReadCloser interface {
GetSequence() uint32
GetHeader() (xdr.LedgerHeaderHistoryEntry, error)
GetHeader() xdr.LedgerHeaderHistoryEntry
// Read should return the next transaction. If there are no more
// transactions it should return `io.EOF` error.
Read() (LedgerTransaction, error)
// Close should be called when reading is finished. This is especially
// helpful when there are still some entries available so the reader can stop
// helpful when there are still some transactions available so reader can stop
// streaming them.
Close() error
}

// LedgerWriteCloser provides convenient, streaming access to the transactions within a ledger.
type LedgerWriteCloser interface {
// Write is used to pass a transaction to the next processor. It can return
// `io.ErrClosedPipe` when the pipe between processors has been closed meaning
// that next processor does not need more data. In such situation the current
// processor can terminate as sending more transactions to a `LedgerWriteCloser`
// does not make sense (will not be read).
Write(LedgerTransaction) error
// Close should be called when reading is finished. This is especially
// helpful when there are still some transactions available so the reader can stop
// streaming them.
Close() error
}

// LedgerTransaction represents the data for a single transaction within a ledger.
type LedgerTransaction struct {
Index uint32
Envelope xdr.TransactionEnvelope
Result xdr.TransactionResultPair
Meta xdr.TransactionMeta
FeeChanges xdr.LedgerEntryChanges
}
4 changes: 2 additions & 2 deletions exp/ingest/io/mock_ledger_read_closer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ func (m *MockLedgerReadCloser) GetSequence() uint32 {
return args.Get(0).(uint32)
}

func (m *MockLedgerReadCloser) GetHeader() (xdr.LedgerHeaderHistoryEntry, error) {
func (m *MockLedgerReadCloser) GetHeader() xdr.LedgerHeaderHistoryEntry {
args := m.Called()
return args.Get(0).(xdr.LedgerHeaderHistoryEntry), nil
return args.Get(0).(xdr.LedgerHeaderHistoryEntry)
}

func (m *MockLedgerReadCloser) Read() (LedgerTransaction, error) {
Expand Down
Loading

0 comments on commit 8cd5bd9

Please sign in to comment.