-
Notifications
You must be signed in to change notification settings - Fork 33
(VDB-699) Decouple Log extraction vs delegation to transformers #131
Conversation
cmd/execute.go
Outdated
for range ticker.C { | ||
w.Execute(recheck) | ||
errs := make(chan error) | ||
go w.Execute(recheck, errs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't technically need to be run concurrently (we could have Execute
return an error), but doing so makes testing easier since the function is spinning up an infinite loop on the happy path (there's never a reason to stop extracting and delegating, only to Sleep
for a bit if you haven't found anything new).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get where you're coming from, but it does feel needlessly confusing that the following while-loop, select and error channel are actually just a single return value in disguise 🤔 It definitely makes stuff look more complicated than they really are.
libraries/shared/logs/extractor.go
Outdated
} | ||
|
||
func (extractor *LogExtractor) updateCheckedHeaders(config transformer.EventTransformerConfig) error { | ||
hasBeenChecked, hasBeenCheckedErr := extractor.CheckedLogsRepository.HaveLogsBeenChecked(config.ContractAddresses, config.Topic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could probably short circuit this in the case where no headers have been marked checked (there's no reason to check the logs repository for the address/topic or update the check_count
on public.headers
if every header row has a check_count
of 0
), but you would still want to persist the transformer's addresses + topics in checked_logs
, and I couldn't think of a great way to do that without making this code pretty ugly. Felt like running this function all the way through n
times where n
is the number of transformers wasn't a cost worthy of the optimization, but open to other folks' thoughts.
insertCheckedHeaderQuery = `UPDATE public.headers SET check_count = (SELECT check_count WHERE id = $1) + 1 WHERE id = $1` | ||
) | ||
|
||
type CheckedHeadersRepository struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could make an argument for including all of these functions on the headers_repository
now that they're only dealing with a column on that table, but figured this was enough of a dedicated use case that we shouldn't bloat that namespace. Open to reconsidering though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could it make future refactorisation easier to have it all in one self-contained place? Or do we have no dependencies between these two packages now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hoping this makes it easier to refactor keeping all the check_count
stuff separate and consolidated (e.g. can delete or change the table without touching other namespaces) 👍
@@ -103,3 +103,15 @@ func GetFakeUncle(hash, reward string) core.Uncle { | |||
Timestamp: strconv.FormatInt(fakeTimestamp, 10), | |||
} | |||
} | |||
|
|||
func RandomString(length int) string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is duplicated from libraries/shared
, but I opted not to consolidate just yet since I think you could make an argument for treating libraries
and pkg
as basically separate repos... 🤔
} | ||
|
||
func MarkHeaderCheckedInTransaction(headerID int64, tx *sqlx.Tx, checkedHeadersColumn string) error { | ||
func MarkContractWatcherHeaderCheckedInTransaction(headerID int64, tx *sqlx.Tx, checkedHeadersColumn string) error { | ||
_, err := tx.Exec(`INSERT INTO public.checked_headers (header_id, `+checkedHeadersColumn+`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: not deleting the checked_headers
migration because it's still used by the contract_watcher
. Hoping we can apply a similar pattern there in the future and remove this table once and for all ✂️ 🎉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are MissingHeaders
and GetCheckedColumnNames
not being used in the contract_watcher
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The contract_watcher
has its own header repository that supplies missing headers 👍
May be worth consolidating the different header repositories at some point though 🤔
- Enable creating new table for logs used in event watching based on header sync
- rather than header - enables executing transformers without full header lookup
- enables decoupling event extraction/persistence from transformation - modifies event transformer, converter, and log chunker to accept payload that includes internal log database ID with log data - remove alias for transformer pkg as shared_t - remove unused mock watcher repository
- limit missing headers results set to 100 so that extraction doesn't excessively block delegation - wrap checked headers functions in repository struct - move storage repository to factory, to correspond with event repository path - remove unused files - reformat sql - remove line breaks in imports
Co-Authored-By: Edvard Hübinette <[email protected]>
- extract and delegate logs synchronously after initial goroutine fired
- If a header was marked as checked before a transformer was added to the watcher, mark all headers since the new transformer's starting block number as unchecked.
37de974
to
5a87217
Compare
- Don't need to maintain it on public.checked_headers if we're not adding additional columns to that table
5a87217
to
5ac76ee
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really have constructive feedback but I appreciate your tests! I'll check on this when other people leave feedback because I'm sure I'll learn from it.
@@ -14,14 +14,14 @@ | |||
// You should have received a copy of the GNU Affero General Public License | |||
// along with this program. If not, see <http://www.gnu.org/licenses/>. | |||
|
|||
package repository | |||
package storage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔥
libraries/shared/logs/delegator.go
Outdated
|
||
func (delegator *LogDelegator) DelegateLogs() (error, bool) { | ||
if len(delegator.Transformers) < 1 { | ||
return ErrNoTransformers, noLogsFound |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this reads nicely 👍
"math/rand" | ||
) | ||
|
||
var _ = Describe("Log extractor", func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests are well written and helpful for understanding how the LogExtractor
works ⭐️
@@ -39,7 +39,7 @@ import ( | |||
|
|||
type TransferLog struct { | |||
Id int64 `db:"id"` | |||
VulvanizeLogId int64 `db:"vulcanize_log_id"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👀
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😂
endingBlockNumber = startingBlockNumber + 2 | ||
outOfRangeBlockNumber = endingBlockNumber + 1 | ||
|
||
blockNumbers = []int64{startingBlockNumber, middleBlockNumber, endingBlockNumber, outOfRangeBlockNumber} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I appreciate the names here
headers, err := repo.UncheckedHeaders(startingBlockNumber, endingBlockNumber, uncheckedCheckCount) | ||
|
||
Expect(err).NotTo(HaveOccurred()) | ||
// doesn't include outOfRangeBlockNumber |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to compose the Not
and ContainElement
matchers on the headers to emphasize that
headersdoes not include
outOfRangeBlockNumber` instead of adding a comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will give it a try!
|
||
type Converter interface { | ||
ToEntities(contractAbi string, ethLog []types.Log) ([]interface{}, error) | ||
ToEntities(contractAbi string, ethLog []core.HeaderSyncLog) ([]interface{}, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a story to update all of the converters since this type is changing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put up a PR applying these changes 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good! Just have a few questions to make sure I understand how this is working. :)
ALTER TABLE logs | ||
DROP CONSTRAINT log_uc; | ||
ALTER TABLE full_sync_logs | ||
DROP CONSTRAINT full_sync_log_uc; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a big deal, but do you think we could just remove this constraint from the previous migration file (db/migrations/00007_create_full_sync_logs_table.sql)? rather than adding it and then immediately removing it?
@@ -6,28 +6,44 @@ require ( | |||
github.com/allegro/bigcache v1.2.1 // indirect | |||
github.com/aristanetworks/goarista v0.0.0-20190712234253-ed1100a1c015 // indirect | |||
github.com/dave/jennifer v1.3.0 | |||
github.com/deckarep/golang-set v1.7.1 // indirect |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this may be a silly question, but do you know why all of these indirect dependencies were added?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not 100% sure, but I'm assuming it's that dependencies of our dependencies have changed
"github.com/jmoiron/sqlx" | ||
log "github.com/sirupsen/logrus" | ||
|
||
"github.com/sirupsen/logrus" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -158,7 +158,7 @@ var _ = Describe("Repository", func() { | |||
Expect(err).ToNot(HaveOccurred()) | |||
expectedLog := test_helpers.TransferLog{ | |||
Id: 1, | |||
VulvanizeLogId: vulcanizeLogId, | |||
VulcanizeLogId: vulcanizeLogId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯
@@ -157,10 +160,13 @@ func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) { | |||
} else { | |||
recheck = constants.HeaderMissing | |||
} | |||
ticker := time.NewTicker(pollingInterval) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How come we're doing this as a sleep in the event_watcher
instead of as a ticker here? It is because the storage_watcher
doesn't use the interval?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking was that with the ticker we're kicking off a new process every 7 seconds regardless of what else is happening, which can lead to weird situations where two processes are dealing with the same missing header - if the first process gets 1,000,000 missing headers and deals with 500,000 of them in 7 seconds, both the original process and the new one will see the remaining 500,000 as missing.
The event watcher only starts a new process after it has completed a given segment, and sleeps if the last segment it processed was empty (so that we're not endlessly triggering new processes at the head of the chain). Hoping that this results in using resources more efficiently, but definitely welcome feedback!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha, that makes sense. Not in scope for this PR, but I wonder if this is something that we could do in watchEthContract
and watchEthStorage
(I don't know what I was talking about above saying that the storage_watcher
doesn't use a ticker 😳)?
libraries/shared/logs/extractor.go
Outdated
|
||
type ILogExtractor interface { | ||
AddTransformerConfig(config transformer.EventTransformerConfig) error | ||
ExtractLogs(recheckHeaders constants.TransformerExecution) (error, bool) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a big deal, but I think that elsewhere in the code we're returning the error
as the second argument rather than the first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I suppose I was thinking this is sort of a weird case because the bool functions similarly to ok
in result, ok := mapping[key]
. Happy to switch up the order if you think that'd make it more clear though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I too think it makes sense to swap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, can do
libraries/shared/logs/extractor.go
Outdated
var ErrNoWatchedAddresses = errors.New("no watched addresses configured in the log extractor") | ||
|
||
const ( | ||
missingHeadersFound = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not 100% about this - but do you think it would make sense to name these constants uncheckedHeadersFound
and noUncheckedHeadersFound
instead of calling them missing headers? I'm wondering if that overlaps with the concept of headers that aren't yet in the db (vs headers that are in the db but need to be checked for applicable logs).
} | ||
} | ||
|
||
func (extractor *LogExtractor) updateCheckedHeaders(config transformer.EventTransformerConfig) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some reason, I'm having trouble understanding this method. I think that it is:
- checking to see if logs with the given contract addresses + topic combinations have been checked
- if so, return nil and move on. If not:
- mark the headers unchecked so that we'll fetch logs for them in
ExtractLogs
and - mark the logs as checked
I think 2 is the part that is confusing me. It seems like we're marking logs as checked before we've actually fetched the logs and checked them. I wonder if this method should be split up into updateCheckedHeaders
which does just 1. And then another method as updateCheckedLogs
that does just 2 that is called after ExtractLogs
is run.
I also wonder if calling this in ExtractLogs
would work instead of calling it in AddTransformers
?
It's also possible that my brain is tired because it's the end of the day. So I'll take another look tomorrow morning. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this is definitely a little complicated - would welcome ideas about ways to simplify. The basic idea is that we can add new transformers after the system has already been running for awhile and it will automatically recheck previously-checked headers that are relevant to the new transformer.
Your understanding of the function is correct, and my thinking with marking logs checked here was that (1) it only happens after we've marked headers unchecked, so there's no risk of the log's being marked as checked preventing us from syncing those logs, and (2) marking logs as checked requires iterating through that address+topic0 combinations, which we're already doing in AddTransformers
but not ExtractLogs
.
Definitely happy to refactor/modify the approach - this was just my first pass at enabling new transformers to sync against already-checked headers with minimal additional code. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I understand - so the main motivation for marking logs as checked here in updateCheckedHeaders
is for a performance gain in not having to iterate through the address + topic0 combos? That makes sense to me.
I think I was mainly confused about marking logs as checked before they were unchecked, but maybe a comment explaining why we're doing this could help? Also what do you think about renaming updateCheckedHeaders
to updateCheckedHeadersAndLogs
or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, marking marking both headers
and logs
as checked, and that we have a header_sync_log
table and a checked_logs
table tripped me up a bit as well. I don't know if I have any ideas on how this may be simplified, but just thought I'd throw it out there as a potential hang up for folks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking I may change checked_logs
to watched_logs
or something similar - really the purpose of the table is just to keep track of what logs we were fetching when we marked a header as checked for logs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't mean to get stuck on the watched_logs
table, I think I'm having a hard time creating a mental model with two tables (in this process) that are both called ..._logs
and that kind of have similar data in them and am trying to understand. So, this is totally not merge blocking 😊.
Is there any reason why we can't re-use the header_sync_log
table for this purpose? It has the topics and address, so we could potentially add a column for checked
or watched
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 I'm not sure...
The header_sync_log
table has actual log events (likely multiple rows for each given address+topic0). The watched_logs
table has one row for each address+topic0 that we're submitting to GetLogsWithCustomQuery
.
I don't think we'd need to add a column to header_sync_log
if we wanted to avoid adding a separate table - we could potentially SELECT DISTINCT address, topics[0] FROM header_sync_logs
to get an idea of what logs we've checked for in the past, but I think that would miss scenarios where we had fetched for a given log but had not received any results. Also thinking that query could maybe get a bit expensive after we've fetched a lot of logs - but maybe that's worth it to avoid adding this table?
What do you think? Does the purpose of the code - trying to uncheck previously checked headers if log fetching calls did not include a new address+topic0 combination - make sense/seem like something we should approach differently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what you're saying - with querying the header_sync_log
table we'd miss address+topic0 combos that were checked, but nothing was found. 👍
} | ||
|
||
func (repository HeaderSyncLogRepository) GetUntransformedHeaderSyncLogs() ([]core.HeaderSyncLog, error) { | ||
rows, queryErr := repository.db.Queryx(`SELECT * FROM public.header_sync_logs WHERE transformed = false`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this method is checking for header_sync_logs where transformed = false
, where are we setting transformed to true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a responsibility of the event transformer repository's Create
function in this setup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh gotcha!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rmulhol Any reason not to move that responsibility up to the watcher in case of a non-error execution of the transformer on that log? :>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guess I just wanted to keep the field switching as close as possible to the place where the truth changed, but I'd be down to move it up if we prioritize not having plugins write back to tables in the public
schema. Only concern would be that it maybe prevents us from achieving max async (i.e. rn the delegator could theoretically ignore transformer errors, just send logs and move on - that's not true if it's responsible for flipping fields based on errors in transformer execution)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rmulhol Makes sense. Still kinda feels like we could reduce transformer complexity further here though, if we assume they throw errors on failure (which they do)
- Use assertions instead of comments to document expectations - Also randomize the fake timestamp in test data
- Missing == not in DB - Unchecked == logs haven't been fetched
- We're logging that a given log has been included in any fetch calls for checked headers, rather than that we have already checked for that log
I think I'm finally understanding how this works. 😳 And just to confirm, this approach assumes that a address+topic0 combination will always be transformed in the same way, is that correct? For example, if a new transformer is added with an already watched address+topic0, the new transformer will need to rely on the transformed logs from a previous transformer? |
Yeah that's true - the first transformer to be delegated a log will mark the log as transformed, so it will no longer be delegated to other transformers in the future. If we wanted to support multiple transformers for a given log, I think the easiest path would be for the subsequent transformer to target the data created by the first transformer (and/or the logs with the |
Quick thought before I dive into the actual changes: for footnote 3, a contributing factor to this fetched-but-never-transformed set could be that the available functionality in Geth doesn't allow to filter on both address and topic simultaneously, but simply applies the latter after the first. This means that we can get a log with a topic we care about, but from a contract we track but do not care about that topic specifically. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for being totally slow with this review! 🙇♀️
cmd/execute.go
Outdated
for range ticker.C { | ||
w.Execute(recheck) | ||
errs := make(chan error) | ||
go w.Execute(recheck, errs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get where you're coming from, but it does feel needlessly confusing that the following while-loop, select and error channel are actually just a single return value in disguise 🤔 It definitely makes stuff look more complicated than they really are.
block_number BIGINT, | ||
raw JSONB, | ||
block_timestamp NUMERIC, | ||
check_count INTEGER NOT NULL DEFAULT 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We increment it after we've called FetchLogs
for a given header. Basically it's replacing the checked headers table
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rmulhol What do we use the count for? Why not a bool?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To enable rechecking headers with a cap on how many times we recheck
@@ -6,28 +6,44 @@ require ( | |||
github.com/allegro/bigcache v1.2.1 // indirect | |||
github.com/aristanetworks/goarista v0.0.0-20190712234253-ed1100a1c015 // indirect | |||
github.com/dave/jennifer v1.3.0 | |||
github.com/deckarep/golang-set v1.7.1 // indirect |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious as well
// Goes through an array of logs, associating relevant logs (matching addresses and topic) with transformers | ||
func (chunker *LogChunker) ChunkLogs(logs []types.Log) map[string][]types.Log { | ||
chunks := map[string][]types.Log{} | ||
// Goes through a slice of logs, associating relevant logs (matching addresses and topic) with transformers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯 🔪
if getTopicZeroExistsErr != nil { | ||
return false, getTopicZeroExistsErr | ||
} | ||
return topicZeroExists, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking it might be an alternative to create a migration with a helper function that checks this in the DB, would reduce this to a single call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like, on one hand let's not move application logic into the db, but otoh these types of chained queries are getting old
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me take a whirl at making it a single query - think we might be able to do that with the existing setup
return insertErr | ||
} | ||
} | ||
return tx.Commit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This solution is really nice! 🐈
} | ||
|
||
func (repository HeaderSyncLogRepository) GetUntransformedHeaderSyncLogs() ([]core.HeaderSyncLog, error) { | ||
rows, queryErr := repository.db.Queryx(`SELECT * FROM public.header_sync_logs WHERE transformed = false`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rmulhol Any reason not to move that responsibility up to the watcher in case of a non-error execution of the transformer on that log? :>
BlockHash: common.HexToHash(rawLog.BlockHash), | ||
Index: rawLog.LogIndex, | ||
// TODO: revisit if not cascade deleting logs when header removed | ||
// currently, fetched logs are cascade deleted if removed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What was this referring to again? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
types.Log
includes a Removed
field that's set to true if you're subscribed to logs over the pubsub rpc and a log is removed by a reorg. When we're recreating a types.Log
out of a log in the DB, we're always saying Removed
is false
because we're assuming we're only holding onto logs that are not removed. So we need to reconsider our Removed
assignment if we start holding onto removed logs
} | ||
|
||
func (repository HeaderSyncLogRepository) insertLog(headerID int64, log types.Log, tx *sqlx.Tx) error { | ||
topics := buildTopics(log) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How come we need this? Are the topics just jumbled into the same byte array? No separator even?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The topics column is an array of byte arrays. Felt nicer than having separate columns for each topic (especially given that many logs don't fill all four columns), but I'm open to reconsidering (especially if we need to do something like filter by topic0)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yah, makes sense
Yo another question, does this implement/make possible removing all event metadata fields from the individual tables? ( |
Yes 😎 |
- Replaces bool and moots question of error/bool ordering - Also make event watcher execution synchronous
- Prevent extract/delegate from spinning when other side is being simulated
Totally dig 4fa19be! 🙆♀️ This is looking solid to me now! 🏆 |
This PR breaks down the
event_watcher
into two fundamental operations that run concurrently:(1) Extracting desired logs
(2) Delegating logs to transformers for further processing
This has several benefits:
(1) Enables further work to support extracting logs without writing transformers
(2) Reduces complexity around tracking checked headers - instead of indicating whether a header has been checked for each individual log, we can track whether it has been checked for all watched logs. This was not possible with the previous approach because marking a header checked required both extraction and delegation to succeed
(3) Potentially improves performance since extraction no longer blocks delegation
A previous iteration of this PR was held up because we didn't have a good way to deal with adding new transformers on the fly - if you've already marked a header as checked when we looked up logs for the previous set of transformers, how do you get logs from that header for a new transformer? This PR addresses that issue by tracking all contract addresses + event log topics that we've included when extracting logs. If a transformer with a new combination of address+topics is added to the set, we mark all headers since that transformer's starting block number as unchecked.1 2
During the delegation step, logs are passed to transformers if they have not been marked as
transformed
. Transformers are expected to mark logstransformed
after they have been persisted as domain objects.3There are a number of formatting changes that I'd like to apply (and occasionally did apply in between commits)4, but I've attempted to purge many of them from this PR to minimize the diff. Happy to add any changes that folks request here and follow up with further PRs for additional formatting improvements.
1 We don't necessarily mark headers unchecked in the case of a new combination, because it's possible that combination was already included in previous fetches. This is because
eth_getLogs
takes a collection of addresses and a collection of topics. If the new transformers address and topic were already included in the collection as a result of two other transformers, we've already fetched those logs.2 Further work should potentially explore how we can go back and fetch logs only for the new combination of address+topic, if doing so would yield a performance benefit over fetching all logs. However, the behavior in this PR matches the existing implementation, which looks up all logs for a header if any log has not been checked.
3 One thing to note here is that we will likely accumulate a number of logs that have not been and will never be marked as
transformed
, for the same reason that we don't always mark headers unchecked in the case of a new combination - we are likely to extract logs that match an address and topic in our set but are not actually used by any current transformer.4 e.g. formatting the migrations, removing the
mock_
prefix that is used inconsistently for test fakes, removing redundant initializers, etc.