-
Notifications
You must be signed in to change notification settings - Fork 33
(VDB-699) Decouple Log extraction vs delegation to transformers #131
Changes from 11 commits
3693ed9
66a4e20
cb819fa
d496dad
63dabbb
1883a11
d76be49
666ea1c
222252f
5ac76ee
b9f3b9f
ce91b0d
3d6c973
c568fed
13d503b
3f9b034
2b798e0
4fa19be
f83e996
b96a1ee
3897b28
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -114,7 +114,10 @@ func execute() { | |
var wg syn.WaitGroup | ||
if len(ethEventInitializers) > 0 { | ||
ew := watcher.NewEventWatcher(&db, blockChain) | ||
ew.AddTransformers(ethEventInitializers) | ||
err = ew.AddTransformers(ethEventInitializers) | ||
if err != nil { | ||
LogWithCommand.Fatalf("failed to add event transformer initializers to watcher: %s", err.Error()) | ||
} | ||
wg.Add(1) | ||
go watchEthEvents(&ew, &wg) | ||
} | ||
|
@@ -157,10 +160,13 @@ func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) { | |
} else { | ||
recheck = constants.HeaderMissing | ||
} | ||
ticker := time.NewTicker(pollingInterval) | ||
defer ticker.Stop() | ||
for range ticker.C { | ||
w.Execute(recheck) | ||
errs := make(chan error) | ||
go w.Execute(recheck, errs) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't technically need to be run concurrently (we could have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I get where you're coming from, but it does feel needlessly confusing that the following while-loop, select and error channel are actually just a single return value in disguise 🤔 It definitely makes stuff look more complicated than they really are. |
||
for { | ||
select { | ||
case err := <-errs: | ||
LogWithCommand.Fatalf("error executing event watcher: %s", err.Error()) | ||
} | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
-- +goose Up | ||
CREATE TABLE full_sync_logs | ||
( | ||
id SERIAL PRIMARY KEY, | ||
block_number BIGINT, | ||
address VARCHAR(66), | ||
tx_hash VARCHAR(66), | ||
index BIGINT, | ||
topic0 VARCHAR(66), | ||
topic1 VARCHAR(66), | ||
topic2 VARCHAR(66), | ||
topic3 VARCHAR(66), | ||
data TEXT, | ||
CONSTRAINT full_sync_log_uc UNIQUE (block_number, index) | ||
); | ||
|
||
|
||
-- +goose Down | ||
DROP TABLE full_sync_logs; |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,23 +1,23 @@ | ||
-- +goose Up | ||
ALTER TABLE logs | ||
DROP CONSTRAINT log_uc; | ||
ALTER TABLE full_sync_logs | ||
DROP CONSTRAINT full_sync_log_uc; | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not a big deal, but do you think we could just remove this constraint from the previous migration file (db/migrations/00007_create_full_sync_logs_table.sql)? rather than adding it and then immediately removing it? |
||
ALTER TABLE logs | ||
ADD COLUMN receipt_id INT; | ||
ALTER TABLE full_sync_logs | ||
ADD COLUMN receipt_id INT; | ||
|
||
ALTER TABLE logs | ||
ADD CONSTRAINT receipts_fk | ||
FOREIGN KEY (receipt_id) | ||
REFERENCES full_sync_receipts (id) | ||
ON DELETE CASCADE; | ||
ALTER TABLE full_sync_logs | ||
ADD CONSTRAINT receipts_fk | ||
FOREIGN KEY (receipt_id) | ||
REFERENCES full_sync_receipts (id) | ||
ON DELETE CASCADE; | ||
|
||
|
||
-- +goose Down | ||
ALTER TABLE logs | ||
DROP CONSTRAINT receipts_fk; | ||
ALTER TABLE full_sync_logs | ||
DROP CONSTRAINT receipts_fk; | ||
|
||
ALTER TABLE logs | ||
DROP COLUMN receipt_id; | ||
ALTER TABLE full_sync_logs | ||
DROP COLUMN receipt_id; | ||
|
||
ALTER TABLE logs | ||
ADD CONSTRAINT log_uc UNIQUE (block_number, index); | ||
ALTER TABLE full_sync_logs | ||
ADD CONSTRAINT full_sync_log_uc UNIQUE (block_number, index); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,14 @@ | ||
-- +goose Up | ||
CREATE TABLE public.headers ( | ||
id SERIAL PRIMARY KEY, | ||
hash VARCHAR(66), | ||
block_number BIGINT, | ||
raw JSONB, | ||
block_timestamp NUMERIC, | ||
eth_node_id INTEGER NOT NULL REFERENCES eth_nodes (id) ON DELETE CASCADE, | ||
eth_node_fingerprint VARCHAR(128) | ||
CREATE TABLE public.headers | ||
( | ||
id SERIAL PRIMARY KEY, | ||
hash VARCHAR(66), | ||
block_number BIGINT, | ||
raw JSONB, | ||
block_timestamp NUMERIC, | ||
check_count INTEGER NOT NULL DEFAULT 0, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How is this used? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We increment it after we've called There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rmulhol What do we use the count for? Why not a bool? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To enable rechecking headers with a cap on how many times we recheck |
||
eth_node_id INTEGER NOT NULL REFERENCES eth_nodes (id) ON DELETE CASCADE, | ||
eth_node_fingerprint VARCHAR(128) | ||
); | ||
|
||
-- Index is removed when table is | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
-- +goose Up | ||
-- SQL in this section is executed when the migration is applied. | ||
CREATE TABLE header_sync_logs | ||
( | ||
id SERIAL PRIMARY KEY, | ||
header_id INTEGER NOT NULL REFERENCES headers (id) ON DELETE CASCADE, | ||
address INTEGER NOT NULL REFERENCES addresses (id) ON DELETE CASCADE, | ||
topics BYTEA[], | ||
data BYTEA, | ||
block_number BIGINT, | ||
block_hash VARCHAR(66), | ||
tx_hash VARCHAR(66), | ||
tx_index INTEGER, | ||
log_index INTEGER, | ||
raw JSONB, | ||
transformed BOOL NOT NULL DEFAULT FALSE, | ||
UNIQUE (header_id, tx_index, log_index) | ||
elizabethengelman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
); | ||
|
||
-- +goose Down | ||
-- SQL in this section is executed when the migration is rolled back. | ||
DROP TABLE header_sync_logs; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
-- +goose Up | ||
-- SQL in this section is executed when the migration is applied. | ||
CREATE TABLE public.checked_logs | ||
( | ||
id SERIAL PRIMARY KEY, | ||
contract_address VARCHAR(42), | ||
topic_zero VARCHAR(66) | ||
); | ||
|
||
-- +goose Down | ||
-- SQL in this section is executed when the migration is rolled back. | ||
DROP TABLE public.checked_logs; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How come we're doing this as a sleep in the
event_watcher
instead of as a ticker here? It is because thestorage_watcher
doesn't use the interval?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking was that with the ticker we're kicking off a new process every 7 seconds regardless of what else is happening, which can lead to weird situations where two processes are dealing with the same missing header - if the first process gets 1,000,000 missing headers and deals with 500,000 of them in 7 seconds, both the original process and the new one will see the remaining 500,000 as missing.
The event watcher only starts a new process after it has completed a given segment, and sleeps if the last segment it processed was empty (so that we're not endlessly triggering new processes at the head of the chain). Hoping that this results in using resources more efficiently, but definitely welcome feedback!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha, that makes sense. Not in scope for this PR, but I wonder if this is something that we could do in
watchEthContract
andwatchEthStorage
(I don't know what I was talking about above saying that thestorage_watcher
doesn't use a ticker 😳)?