Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp/ingest: Ingest Session #1456

Merged
merged 18 commits into from
Jul 5, 2019
2 changes: 1 addition & 1 deletion exp/ingest/adapters/history_archive_adapter.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package ingestadapters
package adapters

import (
"fmt"
Expand Down
2 changes: 1 addition & 1 deletion exp/ingest/adapters/history_archive_adapter_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package ingestadapters
package adapters

import (
"fmt"
Expand Down
5 changes: 2 additions & 3 deletions exp/ingest/adapters/ledger_backend_adapter.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package ingestadapters
package adapters

import (
"errors"
Expand Down Expand Up @@ -28,8 +28,7 @@ func (lba *LedgerBackendAdapter) GetLedger(sequence uint32) (io.LedgerReadCloser
return nil, errors.New(noBackendErr)
}

dblrc := io.MakeLedgerReadCloser(sequence, lba.Backend)
return dblrc, nil
return io.MakeLedgerReadCloser(sequence, lba.Backend)
}

// Close shuts down the provided backend.
Expand Down
2 changes: 1 addition & 1 deletion exp/ingest/adapters/ledger_backend_adapter_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package ingestadapters
package adapters

import (
"testing"
Expand Down
5 changes: 1 addition & 4 deletions exp/ingest/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,7 @@ func useAdapter() {
s := lrc.GetSequence()
fmt.Println("lrc sequence:", s)

h, err := lrc.GetHeader()
if err != nil {
log.Fatal(err)
}
h := lrc.GetHeader()
fmt.Println("lrc header:", h)

for {
Expand Down
31 changes: 16 additions & 15 deletions exp/ingest/io/ledger_read_closer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,6 @@ import (
"github.com/stellar/go/xdr"
)

// LedgerTransaction represents the data for a single transaction within a ledger.
type LedgerTransaction struct {
Index uint32
Envelope xdr.TransactionEnvelope
Result xdr.TransactionResultPair
Meta xdr.TransactionMeta
FeeChanges xdr.LedgerEntryChanges
}

// DBLedgerReadCloser is a database-backed implementation of the io.LedgerReadCloser interface.
type DBLedgerReadCloser struct {
sequence uint32
Expand All @@ -33,11 +24,19 @@ type DBLedgerReadCloser struct {
var _ LedgerReadCloser = (*DBLedgerReadCloser)(nil)

// MakeLedgerReadCloser is a factory method for LedgerReadCloser.
func MakeLedgerReadCloser(sequence uint32, backend ledgerbackend.LedgerBackend) *DBLedgerReadCloser {
return &DBLedgerReadCloser{
func MakeLedgerReadCloser(sequence uint32, backend ledgerbackend.LedgerBackend) (*DBLedgerReadCloser, error) {
reader := &DBLedgerReadCloser{
sequence: sequence,
backend: backend,
}

var err error
reader.initOnce.Do(func() { err = reader.init() })
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

related to #1433 . because reader is created a few lines above. it seems strange reader initialize it using initOnce.Do() . fixing this might be outside the scope of this PR though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we'll decide on #1433 next week and change all the instances in another PR.

if err != nil {
return nil, err
}

return reader, nil
}

// GetSequence returns the sequence number of the ledger data stored by this object.
Expand All @@ -46,13 +45,15 @@ func (dblrc *DBLedgerReadCloser) GetSequence() uint32 {
}

// GetHeader returns the XDR Header data associated with the stored ledger.
func (dblrc *DBLedgerReadCloser) GetHeader() (xdr.LedgerHeaderHistoryEntry, error) {
func (dblrc *DBLedgerReadCloser) GetHeader() xdr.LedgerHeaderHistoryEntry {
var err error
dblrc.initOnce.Do(func() { err = dblrc.init() })
if err != nil {
return xdr.LedgerHeaderHistoryEntry{}, err
// TODO, object should be initialized in constructor.
// Not returning error here, makes this much simpler.
panic(err)
}
return dblrc.header, nil
return dblrc.header
}

// Read returns the next transaction in the ledger, ordered by tx number, each time it is called. When there
Expand Down Expand Up @@ -92,7 +93,7 @@ func (dblrc *DBLedgerReadCloser) init() error {
return errors.Wrap(err, "error reading ledger from backend")
}
if !exists {
return errors.Wrap(err, "ledger was not found")
return ErrNotFound
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting note: it was actually always returning nil before that change. err in line 95 was always nil and when nil is passed to errors.Wrap it returns nil as well. I wonder if this is checked by staticcheck because I found very similar instance of this bug earlier this week (#1443 (comment)). If not, would be great to add a rule that catches this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh wow, that's nasty. Very interesting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added it here if you're interested: dominikh/go-tools#529


dblrc.header = ledgerCloseMeta.LedgerHeader
Expand Down
100 changes: 100 additions & 0 deletions exp/ingest/io/ledger_transaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package io

import (
"github.com/stellar/go/xdr"
)

// Change is a developer friendly representation of LedgerEntryChanges.
// It also provides some helper functions to quickly check if a given
// change has occured in an entry.
//
// If an entry is created: Pre is nil and Post is not nil.
// If an entry is updated: Pre is not nil and Post is not nil.
// If an entry is removed: Pre is not nil and Post is nil.
type Change struct {
Type xdr.LedgerEntryType
Pre *xdr.LedgerEntryData
Post *xdr.LedgerEntryData
}

func (c *Change) AccountSignersChanged() bool {
if c.Type != xdr.LedgerEntryTypeAccount {
panic("This should not be called on changes other than Account changes")
}

// Signers must be removed before merging an account and it's
// impossible to add signers during a creation of a new account.
if c.Pre == nil || c.Post == nil {
return false
}

if len(c.Pre.MustAccount().Signers) != len(c.Post.MustAccount().Signers) {
return true
}

signers := map[string]uint32{} // signer => weight

for _, signer := range c.Pre.MustAccount().Signers {
signers[signer.Key.Address()] = uint32(signer.Weight)
}

for _, signer := range c.Post.MustAccount().Signers {
weight, exist := signers[signer.Key.Address()]
if !exist {
return false
}

if weight != uint32(signer.Weight) {
return false
}
}

// TODO should it also change on master key weight changes?

return false
}

// GetChanges returns a developer friendly representation of LedgerEntryChanges.
// Currently it results operations related LedgerEntryChanges only.
// TODO this should include TransactionMetaV1.txChanges and fee changes too!
func (t *LedgerTransaction) GetChanges() []Change {
changes := []Change{}

for _, operationMeta := range t.Meta.OperationsMeta() {
ledgerEntryChanges := operationMeta.Changes
for i := 0; i < len(ledgerEntryChanges); i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you use a range for loop here?

for i, entryChange := range ledgerEntryChanges {

entryChange := ledgerEntryChanges[i]

switch entryChange.Type {
case xdr.LedgerEntryChangeTypeLedgerEntryCreated:
created := entryChange.MustCreated()
changes = append(changes, Change{
Type: created.Data.Type,
Pre: nil,
Post: &created.Data,
})
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
state := ledgerEntryChanges[i-1].MustState()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with the operations meta data format. Could it be possible that ledgerEntryChanges[0] has type xdr.LedgerEntryChangeTypeLedgerEntryUpdated or xdr.LedgerEntryChangeTypeLedgerEntryRemoved ? if that is a possibility then ledgerEntryChanges[i-1] would crash

Copy link
Contributor Author

@bartekn bartekn Jul 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has specific format and I checked it in stellar-core. The algorithm is:

  1. If there is an existing entry:
    1. Insert STATE.
    2. Insert UPDATED or REMOVED.
  2. Otherwise insert CREATED.

However, I will confirm it with the core team to be sure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does that mean LedgerEntryChanges is always constructed such that every UPDATED and REMOVED entry is always preceded with a STATE which represents the state prior to the update / removal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it results from the algorithm above. However, I will confirm it with the core team next week.

updated := entryChange.MustUpdated()
changes = append(changes, Change{
Type: state.Data.Type,
Pre: &state.Data,
Post: &updated.Data,
})
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
state := ledgerEntryChanges[i-1].MustState()
changes = append(changes, Change{
Type: state.Data.Type,
Pre: &state.Data,
Post: nil,
})
case xdr.LedgerEntryChangeTypeLedgerEntryState:
continue
default:
panic("Invalid LedgerEntryChangeType")
}
}
}

return changes
}
21 changes: 19 additions & 2 deletions exp/ingest/io/main.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package io

import (
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)

var ErrNotFound = errors.New("not found")

// StateReadCloser interface placeholder
type StateReadCloser interface {
GetSequence() uint32
Expand Down Expand Up @@ -32,21 +35,35 @@ type StateWriteCloser interface {
// LedgerReadCloser provides convenient, streaming access to the transactions within a ledger.
type LedgerReadCloser interface {
GetSequence() uint32
GetHeader() (xdr.LedgerHeaderHistoryEntry, error)
GetHeader() xdr.LedgerHeaderHistoryEntry
// Read should return the next transaction. If there are no more
// transactions it should return `io.EOF` error.
Read() (LedgerTransaction, error)
// Close should be called when reading is finished. This is especially
// helpful when there are still some entries available so the reader can stop
// helpful when there are still some transactions available so reader can stop
// streaming them.
Close() error
}

// LedgerWriteCloser provides convenient, streaming access to the transactions within a ledger.
type LedgerWriteCloser interface {
// Write is used to pass a transaction to the next processor. It can return
// `io.ErrClosedPipe` when the pipe between processors has been closed meaning
// that next processor does not need more data. In such situation the current
// processor can terminate as sending more transactions to a `LedgerWriteCloser`
// does not make sense (will not be read).
Write(LedgerTransaction) error
// Close should be called when reading is finished. This is especially
// helpful when there are still some transactions available so the reader can stop
// streaming them.
Close() error
}

// LedgerTransaction represents the data for a single transaction within a ledger.
type LedgerTransaction struct {
Index uint32
Envelope xdr.TransactionEnvelope
Result xdr.TransactionResultPair
Meta xdr.TransactionMeta
FeeChanges xdr.LedgerEntryChanges
}
4 changes: 2 additions & 2 deletions exp/ingest/io/mock_ledger_read_closer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ func (m *MockLedgerReadCloser) GetSequence() uint32 {
return args.Get(0).(uint32)
}

func (m *MockLedgerReadCloser) GetHeader() (xdr.LedgerHeaderHistoryEntry, error) {
func (m *MockLedgerReadCloser) GetHeader() xdr.LedgerHeaderHistoryEntry {
args := m.Called()
return args.Get(0).(xdr.LedgerHeaderHistoryEntry), nil
return args.Get(0).(xdr.LedgerHeaderHistoryEntry)
}

func (m *MockLedgerReadCloser) Read() (LedgerTransaction, error) {
Expand Down
Loading