-
Notifications
You must be signed in to change notification settings - Fork 503
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
exp/ingest: Ingest Session #1456
Conversation
@@ -92,7 +91,7 @@ func (dblrc *DBLedgerReadCloser) init() error { | |||
return errors.Wrap(err, "error reading ledger from backend") | |||
} | |||
if !exists { | |||
return errors.Wrap(err, "ledger was not found") | |||
return ErrNotFound | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting note: it was actually always returning nil
before that change. err
in line 95 was always nil
and when nil
is passed to errors.Wrap
it returns nil
as well. I wonder if this is checked by staticcheck
because I found very similar instance of this bug earlier this week (#1443 (comment)). If not, would be great to add a rule that catches this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh wow, that's nasty. Very interesting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added it here if you're interested: dominikh/go-tools#529
sequence: sequence, | ||
backend: backend, | ||
} | ||
|
||
var err error | ||
reader.initOnce.Do(func() { err = reader.init() }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
related to #1433 . because reader
is created a few lines above. it seems strange reader
initialize it using initOnce.Do()
. fixing this might be outside the scope of this PR though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we'll decide on #1433 next week and change all the instances in another PR.
"github.com/stellar/go/support/errors" | ||
) | ||
|
||
var _ Session = &LiveSession{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's to ensure interface implementation. It ensures that LiveSession
implements all methods of Session
interface. Helpful if new methods are added to the interface (compilation will fail).
exp/ingest/live_session.go
Outdated
return errors.Wrap(err, "Error getting the latest ledger sequence") | ||
} | ||
|
||
fmt.Printf("Initializing state for ledger=%d\n", s.currentLedger) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when is it appropriate to use the log package vs fmt.Printf
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right this should be removed. Created this issue: #1459
exp/ingest/live_session.go
Outdated
|
||
historyAdapter := adapters.MakeHistoryArchiveAdapter(s.Archive) | ||
|
||
s.currentLedger, err = historyAdapter.GetLatestLedgerSequence() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be a local variable instead of a member variable of LiveSession
?
continue | ||
} | ||
|
||
return errors.Wrap(err, "Error getting ledger") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems like once you run into an error the session is kind of destroyed. there's no way to retry from the last successfully processed ledger, right? I think there might be cases where we encounter transient errors and it would be nice to support different retry strategies.
I don't know if this issue should be addressed in this PR but just wanted to leave a note
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created an issue here: #1473.
exp/ingest/live_session.go
Outdated
return nil | ||
} | ||
|
||
func (s *LiveSession) SetStatePipeline(p *pipeline.StatePipeline) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems like the state pipeline and ledger pipeline should be constructor / struct parameters since we cannot change the pipelines once the session has started running
exp/ingest/main.go
Outdated
Archive historyarchive.ArchiveInterface | ||
LedgerBackend ledgerbackend.LedgerBackend | ||
|
||
// mutex is used to make sure queries across many stores are persistent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these 2 lines can be deleted
Archive *historyarchive.Archive | ||
LedgerSequence uint32 | ||
|
||
statePipeline *pipeline.StatePipeline | ||
} | ||
|
||
type Session interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this interface necessary? I don't think I've seen it used anywhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not used internally but packages that are using ingest
can make use of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Golang maintainers have the following opinion on defining interfaces:
"Do not define interfaces before they are used: without a realistic example of usage, it is too difficult to see whether an interface is even necessary, let alone what methods it ought to contain."
From https://github.com/golang/go/wiki/CodeReviewComments#interfaces
Also, the fact that SingleLedgerSession
does not support Resume(ledgerSequence uint32) error
is another reason which makes me think the interface is not necessary.
That being said, I will defer to your judgement on whether it makes sense to keep the interface
exp/ingest/single_ledger_session.go
Outdated
return nil | ||
} | ||
|
||
func (s *SingleLedgerSession) SetStatePipeline(p *pipeline.StatePipeline) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think statePipeline
should be a struct / constructor parameter for the same reasons I mentioned in the LiveSession comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just minor doc comments. It looks good to me.
Regarding Reset
: I agree it seems it could be implemented incorrectly if a developer is not considering the reuse scenario for the pipeline. Maybe stick with this for now since it's done, and we can think about it as we finish the demos? I quite like the idea of the factory since it guarantees a clean set-up on each pass.
@@ -92,7 +91,7 @@ func (dblrc *DBLedgerReadCloser) init() error { | |||
return errors.Wrap(err, "error reading ledger from backend") | |||
} | |||
if !exists { | |||
return errors.Wrap(err, "ledger was not found") | |||
return ErrNotFound | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh wow, that's nasty. Very interesting
exp/ingest/io/main.go
Outdated
"github.com/stellar/go/xdr" | ||
) | ||
|
||
var ErrNotFound = errors.New("Not found") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should lowercase not found
exp/ingest/main.go
Outdated
} | ||
|
||
// LiveSession initializes the ledger state using `Archive` and `StatePipeline`, | ||
// then starts processing ledger data using `ledgerbackend`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment might be clearer as:
// LiveSession initializes the ledger state using `Archive` and `statePipeline`,
// then starts processing ledger data using `LedgerBackend` and `ledgerPipeline`.
exp/ingest/main.go
Outdated
LedgerBackend ledgerbackend.LedgerBackend | ||
|
||
// mutex is used to make sure queries across many stores are persistent | ||
// mutex sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused?
currentLedger uint32 | ||
} | ||
|
||
// SingleLedgerSession initializes the ledger state using `Archive` and `StatePipeline` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// SingleLedgerSession initializes the ledger state using `Archive` and `StatePipeline` | |
// SingleLedgerSession initializes the ledger state using `Archive` and `statePipeline` |
Archive *historyarchive.Archive | ||
LedgerSequence uint32 | ||
|
||
statePipeline *pipeline.StatePipeline | ||
} | ||
|
||
type Session interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
docstring for this interface would be helpful
return os.Create(p.Filename) | ||
} | ||
|
||
func (p *CSVPrinter) ProcessState(ctx context.Context, store *pipeline.Store, r io.StateReadCloser, w io.StateWriteCloser) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fine for now but we should add docstrings before general release
Co-Authored-By: Eric Saunders <[email protected]>
@ire-and-curses @tamirms you can take a look again. Added a few improvements and |
exp/ingest/io/ledger_transaction.go
Outdated
|
||
for _, operationMeta := range t.Meta.OperationsMeta() { | ||
ledgerEntryChanges := operationMeta.Changes | ||
for i := 0; i < len(ledgerEntryChanges); i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you use a range for loop here?
for i, entryChange := range ledgerEntryChanges {
// Session user to determine what was the last ledger processed by a | ||
// Session as it's stateless (or if Run() should be called first). | ||
Resume(ledgerSequence uint32) error | ||
GetLatestProcessedLedger() uint32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing doc string here
wg.Add(2) | ||
|
||
go func() { | ||
err = orderBookGraph.Apply() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why apply the updates here instead of applying the updates at the end of ProcessState()
and ProcessLedger()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's to make sure that state (db state and graph state) is consistent (at the same ledger). If we were applying and committing in the pipeline it's possible that for a short time data in both stores (db and memory) would represent state of 2 different ledgers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a lot of this logic should be encapsulated in DatabaseProcessor
and OrderbookProcessor
. DatabaseProcessor
could define Begin()
and Commit()
functions which are then called in the pre / post processing hooks. Similarly, OrderbookProcessor
could define an Appy()
function which would be called in the post processing hook.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general I agree but I think the problem is that we have two DatabaseProcessor
instances: one for inserting transactions, the other for updating signers. Then which one we use to call Begin()
and Commit()
. We can call Begin()
only once, it would be confusing if it's called on a single processor. That's why I decided to call it on Database
directly.
exp/ingest/io/ledger_transaction.go
Outdated
Post: &created.Data, | ||
}) | ||
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated: | ||
state := ledgerEntryChanges[i-1].MustState() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not familiar with the operations meta data format. Could it be possible that ledgerEntryChanges[0]
has type xdr.LedgerEntryChangeTypeLedgerEntryUpdated
or xdr.LedgerEntryChangeTypeLedgerEntryRemoved
? if that is a possibility then ledgerEntryChanges[i-1]
would crash
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It has specific format and I checked it in stellar-core. The algorithm is:
- If there is an existing entry:
- Insert
STATE
. - Insert
UPDATED
orREMOVED
.
- Insert
- Otherwise insert
CREATED
.
However, I will confirm it with the core team to be sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does that mean LedgerEntryChanges
is always constructed such that every UPDATED
and REMOVED
entry is always preceded with a STATE
which represents the state prior to the update / removal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it results from the algorithm above. However, I will confirm it with the core team next week.
} | ||
|
||
func (p *OrderbookProcessor) IsConcurrent() bool { | ||
return true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this processor should be concurrent because it is important that the offers be added to the graph in order. I think if the processor is concurrent then that means ledger and state entries could be processed out of order
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it really important that offers must be added in order? For initial processing it doesn't matter. For ledger/transactions processing I think it also doesn't matter because graph is locked for reading during updates. Can you elaborate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if someone creates an offer and then later decides to remove the offer? or if someone creates an offer and then the offer is updated because the offer is partially consumed? it's important that those events are processed in order because if you try to remove an offer which doesn't exist the code will panic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, you're totally right!
// in structs that embed Pipeline. | ||
type PipelineInterface interface { | ||
SetRoot(rootProcessor *PipelineNode) | ||
AddPreProcessingHook(hook func(context.Context) error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could Reset()
be implemented as a pre-processing hook? how are those two operations different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean internal reset()
function is always added to preProcessingHooks
when pipeline is created? I think we can do it, but when we decide on #1433. If someone creates new pipeline not via constructor, then the reset hook won't be added.
Added some updates connected to feedback. PTAL! |
@bartekn it looks good! the only remaining comments I have is that the mutex in
Or
|
exp/orderbook/batch.go
Outdated
} | ||
|
||
// removeOffer will queue an operation to remove the given offer from the order book | ||
func (tx *orderBookBatchedUpdates) RemoveOffer(offerID xdr.Int64) *orderBookBatchedUpdates { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be removeOffer
PR Checklist
PR Structure
otherwise).
services/friendbot
Thoroughness
.md
files, etc... affected by this change). Take a look in the
docs
folder for a given service,like this one.
Release planning
needed with deprecations, added features, breaking changes, and DB schema changes.
semver, or if it's mainly a patch change. The PR is targeted at the next
release branch if it's not a patch change.
Summary
This PR adds two
Session
implementations (LiveSession
andSingleLedgerSession
) and a simplehorizon-demo
tool.Goal and scope
The goal of this PR is to implement the last missing component of
exp/ingest
that connects all the existing packages together:Session
.Session
is connecting to history archives and/or ledger backend and passes data to one or two pipelines (state pipeline and ledger pipeline).Session
supports one of the use cases developers can interact with Stellar ledger. For example:LiveSession
initializes the state and then follows the new ledgers and processes transactions (it's running indefinitely). On the contrarySingleLedgerSession
processes the state of a single ledger and terminates. More sessions will be added in a future (ex.RangeSession
that processes data between two ledgers).It also contains a simple demo app called
horizon-demo
(go run ./exp/tools/horizon-demo
) that's usingLiveSession
internally.horizon-demo
is reading data from history archives and ledger backend and 1) updates accounts for signers, 2) inserts transactions to a database and 3) updates in-memory orderbook graph.Close #1310.
Summary of changes
Session
implementations andhorizon-demo
app.Reset()
method to pipeline processors. It's required to reset internal state of processors when a pipeline is used again (ex. when new ledger is closed). I'm still not sure if this is the best approach - read in the next section.support/pipeline.Pipeline
to be reusable (previously it was a single use only object) and addedShutdown()
method.ingest/processors
(CSVPrinter
is an interesting example because it's possible to use it in both: state and ledger pipelines - it implements both interfaces).support/pipeline
with pre- and post-processing hooks that are useful for opening and committing database transactions but also applying changes to other structures (like orderbook graph). Maybe it's a solution for Find a way for reporting status of pipeline and session #1459?Known limitations & issues
Reset()
incorrectly when creating a new processor. The alternative is to create something like aPipelineFactory
that would create a completely new pipeline with defaultstruct
values.adapters
package will likely be removed. It doesn't really give any value and acts as an unnecessary wrapper around readers.What shouldn't be reviewed
Please ignore changes to
adapters
package. It's clear to me right now that we will remove and refactor it (#1405).