Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp/ingest: Ingest Session #1456

Merged
merged 18 commits into from
Jul 5, 2019
Merged

exp/ingest: Ingest Session #1456

merged 18 commits into from
Jul 5, 2019

Conversation

bartekn
Copy link
Contributor

@bartekn bartekn commented Jun 26, 2019

PR Checklist

PR Structure

  • This PR has reasonably narrow scope (if not, break it down into smaller PRs).
  • [ x This PR avoids mixing refactoring changes with feature changes (split into two PRs
    otherwise).
  • This PR's title starts with name of package that is most changed in the PR, ex.
    services/friendbot

Thoroughness

  • This PR adds tests for the most critical parts of the new functionality or fixes.
  • I've updated any docs (developer docs, .md
    files, etc... affected by this change). Take a look in the docs folder for a given service,
    like this one.

Release planning

  • I've updated the relevant CHANGELOG (here for Horizon) if
    needed with deprecations, added features, breaking changes, and DB schema changes.
  • I've decided if this PR requires a new major/minor version according to
    semver, or if it's mainly a patch change. The PR is targeted at the next
    release branch if it's not a patch change.

Summary

This PR adds two Session implementations (LiveSession and SingleLedgerSession) and a simple horizon-demo tool.

Goal and scope

The goal of this PR is to implement the last missing component of exp/ingest that connects all the existing packages together: Session. Session is connecting to history archives and/or ledger backend and passes data to one or two pipelines (state pipeline and ledger pipeline).

Session supports one of the use cases developers can interact with Stellar ledger. For example: LiveSession initializes the state and then follows the new ledgers and processes transactions (it's running indefinitely). On the contrary SingleLedgerSession processes the state of a single ledger and terminates. More sessions will be added in a future (ex. RangeSession that processes data between two ledgers).

It also contains a simple demo app called horizon-demo (go run ./exp/tools/horizon-demo) that's using LiveSession internally. horizon-demo is reading data from history archives and ledger backend and 1) updates accounts for signers, 2) inserts transactions to a database and 3) updates in-memory orderbook graph.

Close #1310.

Summary of changes

  • Added Session implementations and horizon-demo app.
  • Added Reset() method to pipeline processors. It's required to reset internal state of processors when a pipeline is used again (ex. when new ledger is closed). I'm still not sure if this is the best approach - read in the next section.
  • Updated support/pipeline.Pipeline to be reusable (previously it was a single use only object) and added Shutdown() method.
  • Added a few common processors to ingest/processors (CSVPrinter is an interesting example because it's possible to use it in both: state and ledger pipelines - it implements both interfaces).
  • Updated support/pipeline with pre- and post-processing hooks that are useful for opening and committing database transactions but also applying changes to other structures (like orderbook graph). Maybe it's a solution for Find a way for reporting status of pipeline and session #1459?

Known limitations & issues

  • I'm still not sure if the current approach to resetting pipeline is good. It's elegant but I'm afraid that developers may implement Reset() incorrectly when creating a new processor. The alternative is to create something like a PipelineFactory that would create a completely new pipeline with default struct values.
  • adapters package will likely be removed. It doesn't really give any value and acts as an unnecessary wrapper around readers.
  • No docs and tests. Will be improved in another PR.

What shouldn't be reviewed

Please ignore changes to adapters package. It's clear to me right now that we will remove and refactor it (#1405).

@bartekn bartekn marked this pull request as ready for review June 26, 2019 19:06
@@ -92,7 +91,7 @@ func (dblrc *DBLedgerReadCloser) init() error {
return errors.Wrap(err, "error reading ledger from backend")
}
if !exists {
return errors.Wrap(err, "ledger was not found")
return ErrNotFound
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting note: it was actually always returning nil before that change. err in line 95 was always nil and when nil is passed to errors.Wrap it returns nil as well. I wonder if this is checked by staticcheck because I found very similar instance of this bug earlier this week (#1443 (comment)). If not, would be great to add a rule that catches this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh wow, that's nasty. Very interesting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added it here if you're interested: dominikh/go-tools#529

sequence: sequence,
backend: backend,
}

var err error
reader.initOnce.Do(func() { err = reader.init() })
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

related to #1433 . because reader is created a few lines above. it seems strange reader initialize it using initOnce.Do() . fixing this might be outside the scope of this PR though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we'll decide on #1433 next week and change all the instances in another PR.

"github.com/stellar/go/support/errors"
)

var _ Session = &LiveSession{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to ensure interface implementation. It ensures that LiveSession implements all methods of Session interface. Helpful if new methods are added to the interface (compilation will fail).

return errors.Wrap(err, "Error getting the latest ledger sequence")
}

fmt.Printf("Initializing state for ledger=%d\n", s.currentLedger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when is it appropriate to use the log package vs fmt.Printf ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right this should be removed. Created this issue: #1459


historyAdapter := adapters.MakeHistoryArchiveAdapter(s.Archive)

s.currentLedger, err = historyAdapter.GetLatestLedgerSequence()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be a local variable instead of a member variable of LiveSession?

continue
}

return errors.Wrap(err, "Error getting ledger")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like once you run into an error the session is kind of destroyed. there's no way to retry from the last successfully processed ledger, right? I think there might be cases where we encounter transient errors and it would be nice to support different retry strategies.

I don't know if this issue should be addressed in this PR but just wanted to leave a note

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created an issue here: #1473.

return nil
}

func (s *LiveSession) SetStatePipeline(p *pipeline.StatePipeline) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like the state pipeline and ledger pipeline should be constructor / struct parameters since we cannot change the pipelines once the session has started running

Archive historyarchive.ArchiveInterface
LedgerBackend ledgerbackend.LedgerBackend

// mutex is used to make sure queries across many stores are persistent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these 2 lines can be deleted

Archive *historyarchive.Archive
LedgerSequence uint32

statePipeline *pipeline.StatePipeline
}

type Session interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this interface necessary? I don't think I've seen it used anywhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not used internally but packages that are using ingest can make use of it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Golang maintainers have the following opinion on defining interfaces:

"Do not define interfaces before they are used: without a realistic example of usage, it is too difficult to see whether an interface is even necessary, let alone what methods it ought to contain."

From https://github.com/golang/go/wiki/CodeReviewComments#interfaces

Also, the fact that SingleLedgerSession does not support Resume(ledgerSequence uint32) error is another reason which makes me think the interface is not necessary.

That being said, I will defer to your judgement on whether it makes sense to keep the interface

return nil
}

func (s *SingleLedgerSession) SetStatePipeline(p *pipeline.StatePipeline) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think statePipeline should be a struct / constructor parameter for the same reasons I mentioned in the LiveSession comment

Copy link
Member

@ire-and-curses ire-and-curses left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just minor doc comments. It looks good to me.

Regarding Reset: I agree it seems it could be implemented incorrectly if a developer is not considering the reuse scenario for the pipeline. Maybe stick with this for now since it's done, and we can think about it as we finish the demos? I quite like the idea of the factory since it guarantees a clean set-up on each pass.

@@ -92,7 +91,7 @@ func (dblrc *DBLedgerReadCloser) init() error {
return errors.Wrap(err, "error reading ledger from backend")
}
if !exists {
return errors.Wrap(err, "ledger was not found")
return ErrNotFound
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh wow, that's nasty. Very interesting

"github.com/stellar/go/xdr"
)

var ErrNotFound = errors.New("Not found")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should lowercase not found

}

// LiveSession initializes the ledger state using `Archive` and `StatePipeline`,
// then starts processing ledger data using `ledgerbackend`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment might be clearer as:

// LiveSession initializes the ledger state using `Archive` and `statePipeline`,
// then starts processing ledger data using `LedgerBackend` and `ledgerPipeline`.

LedgerBackend ledgerbackend.LedgerBackend

// mutex is used to make sure queries across many stores are persistent
// mutex sync.RWMutex
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused?

currentLedger uint32
}

// SingleLedgerSession initializes the ledger state using `Archive` and `StatePipeline`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// SingleLedgerSession initializes the ledger state using `Archive` and `StatePipeline`
// SingleLedgerSession initializes the ledger state using `Archive` and `statePipeline`

Archive *historyarchive.Archive
LedgerSequence uint32

statePipeline *pipeline.StatePipeline
}

type Session interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docstring for this interface would be helpful

exp/ingest/processors/doc.go Outdated Show resolved Hide resolved
return os.Create(p.Filename)
}

func (p *CSVPrinter) ProcessState(ctx context.Context, store *pipeline.Store, r io.StateReadCloser, w io.StateWriteCloser) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine for now but we should add docstrings before general release

@bartekn
Copy link
Contributor Author

bartekn commented Jul 3, 2019

@ire-and-curses @tamirms you can take a look again. Added a few improvements and horizon-demo is fully functional now.


for _, operationMeta := range t.Meta.OperationsMeta() {
ledgerEntryChanges := operationMeta.Changes
for i := 0; i < len(ledgerEntryChanges); i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you use a range for loop here?

for i, entryChange := range ledgerEntryChanges {

// Session user to determine what was the last ledger processed by a
// Session as it's stateless (or if Run() should be called first).
Resume(ledgerSequence uint32) error
GetLatestProcessedLedger() uint32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing doc string here

wg.Add(2)

go func() {
err = orderBookGraph.Apply()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why apply the updates here instead of applying the updates at the end of ProcessState() and ProcessLedger() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to make sure that state (db state and graph state) is consistent (at the same ledger). If we were applying and committing in the pipeline it's possible that for a short time data in both stores (db and memory) would represent state of 2 different ledgers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a lot of this logic should be encapsulated in DatabaseProcessor and OrderbookProcessor. DatabaseProcessor could define Begin() and Commit() functions which are then called in the pre / post processing hooks. Similarly, OrderbookProcessor could define an Appy() function which would be called in the post processing hook.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I agree but I think the problem is that we have two DatabaseProcessor instances: one for inserting transactions, the other for updating signers. Then which one we use to call Begin() and Commit(). We can call Begin() only once, it would be confusing if it's called on a single processor. That's why I decided to call it on Database directly.

Post: &created.Data,
})
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
state := ledgerEntryChanges[i-1].MustState()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with the operations meta data format. Could it be possible that ledgerEntryChanges[0] has type xdr.LedgerEntryChangeTypeLedgerEntryUpdated or xdr.LedgerEntryChangeTypeLedgerEntryRemoved ? if that is a possibility then ledgerEntryChanges[i-1] would crash

Copy link
Contributor Author

@bartekn bartekn Jul 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has specific format and I checked it in stellar-core. The algorithm is:

  1. If there is an existing entry:
    1. Insert STATE.
    2. Insert UPDATED or REMOVED.
  2. Otherwise insert CREATED.

However, I will confirm it with the core team to be sure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does that mean LedgerEntryChanges is always constructed such that every UPDATED and REMOVED entry is always preceded with a STATE which represents the state prior to the update / removal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it results from the algorithm above. However, I will confirm it with the core team next week.

}

func (p *OrderbookProcessor) IsConcurrent() bool {
return true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this processor should be concurrent because it is important that the offers be added to the graph in order. I think if the processor is concurrent then that means ledger and state entries could be processed out of order

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really important that offers must be added in order? For initial processing it doesn't matter. For ledger/transactions processing I think it also doesn't matter because graph is locked for reading during updates. Can you elaborate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if someone creates an offer and then later decides to remove the offer? or if someone creates an offer and then the offer is updated because the offer is partially consumed? it's important that those events are processed in order because if you try to remove an offer which doesn't exist the code will panic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, you're totally right!

// in structs that embed Pipeline.
type PipelineInterface interface {
SetRoot(rootProcessor *PipelineNode)
AddPreProcessingHook(hook func(context.Context) error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could Reset() be implemented as a pre-processing hook? how are those two operations different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean internal reset() function is always added to preProcessingHooks when pipeline is created? I think we can do it, but when we decide on #1433. If someone creates new pipeline not via constructor, then the reset hook won't be added.

@bartekn
Copy link
Contributor Author

bartekn commented Jul 5, 2019

Added some updates connected to feedback. PTAL!

@tamirms
Copy link
Contributor

tamirms commented Jul 5, 2019

@bartekn it looks good! the only remaining comments I have is that the mutex in orderBookBatchedUpdates is no longer necessary because the orderbook processor is not concurrent. Also, I think it would be better if updates could only be applied in one way. My suggestion is to either

  • get rid of the AddOffer, RemoveOffer, and Apply methods on OrderBookGraph. Instead, you can create the batch and apply it in the pipeline hooks similar to what you do with the database transaction for the DatabaseProcessor instances

Or

  • get rid of the BatchedUpdates interface entirely so that you cannot create multiple batches. The only way to apply updates would be to call the AddOffer, RemoveOffer, and Apply methods on OrderBookGraph.

}

// removeOffer will queue an operation to remove the given offer from the order book
func (tx *orderBookBatchedUpdates) RemoveOffer(offerID xdr.Int64) *orderBookBatchedUpdates {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be removeOffer

@bartekn bartekn merged commit 8cd5bd9 into master Jul 5, 2019
@bartekn bartekn deleted the ingest-session branch July 5, 2019 16:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Ingestion system and session
3 participants