Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

(VDB-699) Decouple Log extraction vs delegation to transformers #131

Merged
merged 21 commits into from
Sep 20, 2019
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/composeAndExecute.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,10 @@ func composeAndExecute() {
var wg syn.WaitGroup
if len(ethEventInitializers) > 0 {
ew := watcher.NewEventWatcher(&db, blockChain)
ew.AddTransformers(ethEventInitializers)
err := ew.AddTransformers(ethEventInitializers)
if err != nil {
LogWithCommand.Fatalf("failed to add event transformer initializers to watcher: %s", err.Error())
}
wg.Add(1)
go watchEthEvents(&ew, &wg)
}
Expand Down
16 changes: 11 additions & 5 deletions cmd/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ func execute() {
var wg syn.WaitGroup
if len(ethEventInitializers) > 0 {
ew := watcher.NewEventWatcher(&db, blockChain)
ew.AddTransformers(ethEventInitializers)
err = ew.AddTransformers(ethEventInitializers)
if err != nil {
LogWithCommand.Fatalf("failed to add event transformer initializers to watcher: %s", err.Error())
}
wg.Add(1)
go watchEthEvents(&ew, &wg)
}
Expand Down Expand Up @@ -157,10 +160,13 @@ func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) {
} else {
recheck = constants.HeaderMissing
}
ticker := time.NewTicker(pollingInterval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come we're doing this as a sleep in the event_watcher instead of as a ticker here? It is because the storage_watcher doesn't use the interval?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking was that with the ticker we're kicking off a new process every 7 seconds regardless of what else is happening, which can lead to weird situations where two processes are dealing with the same missing header - if the first process gets 1,000,000 missing headers and deals with 500,000 of them in 7 seconds, both the original process and the new one will see the remaining 500,000 as missing.

The event watcher only starts a new process after it has completed a given segment, and sleeps if the last segment it processed was empty (so that we're not endlessly triggering new processes at the head of the chain). Hoping that this results in using resources more efficiently, but definitely welcome feedback!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, that makes sense. Not in scope for this PR, but I wonder if this is something that we could do in watchEthContract and watchEthStorage (I don't know what I was talking about above saying that the storage_watcher doesn't use a ticker 😳)?

defer ticker.Stop()
for range ticker.C {
w.Execute(recheck)
errs := make(chan error)
go w.Execute(recheck, errs)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't technically need to be run concurrently (we could have Execute return an error), but doing so makes testing easier since the function is spinning up an infinite loop on the happy path (there's never a reason to stop extracting and delegating, only to Sleep for a bit if you haven't found anything new).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get where you're coming from, but it does feel needlessly confusing that the following while-loop, select and error channel are actually just a single return value in disguise 🤔 It definitely makes stuff look more complicated than they really are.

for {
select {
case err := <-errs:
LogWithCommand.Fatalf("error executing event watcher: %s", err.Error())
}
}
}

Expand Down
19 changes: 19 additions & 0 deletions db/migrations/00007_create_full_sync_logs_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- +goose Up
CREATE TABLE full_sync_logs
(
id SERIAL PRIMARY KEY,
block_number BIGINT,
address VARCHAR(66),
tx_hash VARCHAR(66),
index BIGINT,
topic0 VARCHAR(66),
topic1 VARCHAR(66),
topic2 VARCHAR(66),
topic3 VARCHAR(66),
data TEXT,
CONSTRAINT full_sync_log_uc UNIQUE (block_number, index)
);


-- +goose Down
DROP TABLE full_sync_logs;
19 changes: 0 additions & 19 deletions db/migrations/00007_create_logs_table.sql

This file was deleted.

30 changes: 15 additions & 15 deletions db/migrations/00016_add_receipts_fk_to_logs.sql
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
-- +goose Up
ALTER TABLE logs
DROP CONSTRAINT log_uc;
ALTER TABLE full_sync_logs
DROP CONSTRAINT full_sync_log_uc;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a big deal, but do you think we could just remove this constraint from the previous migration file (db/migrations/00007_create_full_sync_logs_table.sql)? rather than adding it and then immediately removing it?

ALTER TABLE logs
ADD COLUMN receipt_id INT;
ALTER TABLE full_sync_logs
ADD COLUMN receipt_id INT;

ALTER TABLE logs
ADD CONSTRAINT receipts_fk
FOREIGN KEY (receipt_id)
REFERENCES full_sync_receipts (id)
ON DELETE CASCADE;
ALTER TABLE full_sync_logs
ADD CONSTRAINT receipts_fk
FOREIGN KEY (receipt_id)
REFERENCES full_sync_receipts (id)
ON DELETE CASCADE;


-- +goose Down
ALTER TABLE logs
DROP CONSTRAINT receipts_fk;
ALTER TABLE full_sync_logs
DROP CONSTRAINT receipts_fk;

ALTER TABLE logs
DROP COLUMN receipt_id;
ALTER TABLE full_sync_logs
DROP COLUMN receipt_id;

ALTER TABLE logs
ADD CONSTRAINT log_uc UNIQUE (block_number, index);
ALTER TABLE full_sync_logs
ADD CONSTRAINT full_sync_log_uc UNIQUE (block_number, index);
50 changes: 24 additions & 26 deletions db/migrations/00018_create_watched_event_logs.sql
Original file line number Diff line number Diff line change
@@ -1,33 +1,31 @@
-- +goose Up
CREATE VIEW block_stats AS
SELECT
max(block_number) AS max_block,
min(block_number) AS min_block
FROM logs;
SELECT max(block_number) AS max_block,
min(block_number) AS min_block
FROM full_sync_logs;

CREATE VIEW watched_event_logs AS
SELECT
log_filters.name,
logs.id,
block_number,
logs.address,
tx_hash,
index,
logs.topic0,
logs.topic1,
logs.topic2,
logs.topic3,
data,
receipt_id
FROM log_filters
CROSS JOIN block_stats
JOIN logs ON logs.address = log_filters.address
AND logs.block_number >= coalesce(log_filters.from_block, block_stats.min_block)
AND logs.block_number <= coalesce(log_filters.to_block, block_stats.max_block)
WHERE (log_filters.topic0 = logs.topic0 OR log_filters.topic0 ISNULL)
AND (log_filters.topic1 = logs.topic1 OR log_filters.topic1 ISNULL)
AND (log_filters.topic2 = logs.topic2 OR log_filters.topic2 ISNULL)
AND (log_filters.topic3 = logs.topic3 OR log_filters.topic3 ISNULL);
SELECT log_filters.name,
full_sync_logs.id,
block_number,
full_sync_logs.address,
tx_hash,
index,
full_sync_logs.topic0,
full_sync_logs.topic1,
full_sync_logs.topic2,
full_sync_logs.topic3,
data,
receipt_id
FROM log_filters
CROSS JOIN block_stats
JOIN full_sync_logs ON full_sync_logs.address = log_filters.address
AND full_sync_logs.block_number >= coalesce(log_filters.from_block, block_stats.min_block)
AND full_sync_logs.block_number <= coalesce(log_filters.to_block, block_stats.max_block)
WHERE (log_filters.topic0 = full_sync_logs.topic0 OR log_filters.topic0 ISNULL)
AND (log_filters.topic1 = full_sync_logs.topic1 OR log_filters.topic1 ISNULL)
AND (log_filters.topic2 = full_sync_logs.topic2 OR log_filters.topic2 ISNULL)
AND (log_filters.topic3 = full_sync_logs.topic3 OR log_filters.topic3 ISNULL);

-- +goose Down
DROP VIEW watched_event_logs;
Expand Down
18 changes: 10 additions & 8 deletions db/migrations/00023_create_headers_table.sql
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
-- +goose Up
CREATE TABLE public.headers (
id SERIAL PRIMARY KEY,
hash VARCHAR(66),
block_number BIGINT,
raw JSONB,
block_timestamp NUMERIC,
eth_node_id INTEGER NOT NULL REFERENCES eth_nodes (id) ON DELETE CASCADE,
eth_node_fingerprint VARCHAR(128)
CREATE TABLE public.headers
(
id SERIAL PRIMARY KEY,
hash VARCHAR(66),
block_number BIGINT,
raw JSONB,
block_timestamp NUMERIC,
check_count INTEGER NOT NULL DEFAULT 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We increment it after we've called FetchLogs for a given header. Basically it's replacing the checked headers table

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rmulhol What do we use the count for? Why not a bool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To enable rechecking headers with a cap on how many times we recheck

eth_node_id INTEGER NOT NULL REFERENCES eth_nodes (id) ON DELETE CASCADE,
eth_node_fingerprint VARCHAR(128)
);

-- Index is removed when table is
Expand Down
22 changes: 22 additions & 0 deletions db/migrations/00029_create_header_sync_logs_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-- +goose Up
-- SQL in this section is executed when the migration is applied.
CREATE TABLE header_sync_logs
(
id SERIAL PRIMARY KEY,
header_id INTEGER NOT NULL REFERENCES headers (id) ON DELETE CASCADE,
address INTEGER NOT NULL REFERENCES addresses (id) ON DELETE CASCADE,
topics BYTEA[],
data BYTEA,
block_number BIGINT,
block_hash VARCHAR(66),
tx_hash VARCHAR(66),
tx_index INTEGER,
log_index INTEGER,
raw JSONB,
transformed BOOL NOT NULL DEFAULT FALSE,
UNIQUE (header_id, tx_index, log_index)
elizabethengelman marked this conversation as resolved.
Show resolved Hide resolved
);

-- +goose Down
-- SQL in this section is executed when the migration is rolled back.
DROP TABLE header_sync_logs;
12 changes: 12 additions & 0 deletions db/migrations/00030_create_checked_logs_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- +goose Up
-- SQL in this section is executed when the migration is applied.
CREATE TABLE public.checked_logs
(
id SERIAL PRIMARY KEY,
contract_address VARCHAR(42),
topic_zero VARCHAR(66)
);

-- +goose Down
-- SQL in this section is executed when the migration is rolled back.
DROP TABLE public.checked_logs;
Loading