Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

(VDB-699) Decouple Log extraction vs delegation to transformers #131

Merged
merged 21 commits into from
Sep 20, 2019

Conversation

rmulhol
Copy link
Contributor

@rmulhol rmulhol commented Aug 27, 2019

This PR breaks down the event_watcher into two fundamental operations that run concurrently:
(1) Extracting desired logs
(2) Delegating logs to transformers for further processing

This has several benefits:
(1) Enables further work to support extracting logs without writing transformers
(2) Reduces complexity around tracking checked headers - instead of indicating whether a header has been checked for each individual log, we can track whether it has been checked for all watched logs. This was not possible with the previous approach because marking a header checked required both extraction and delegation to succeed
(3) Potentially improves performance since extraction no longer blocks delegation

A previous iteration of this PR was held up because we didn't have a good way to deal with adding new transformers on the fly - if you've already marked a header as checked when we looked up logs for the previous set of transformers, how do you get logs from that header for a new transformer? This PR addresses that issue by tracking all contract addresses + event log topics that we've included when extracting logs. If a transformer with a new combination of address+topics is added to the set, we mark all headers since that transformer's starting block number as unchecked.1 2

During the delegation step, logs are passed to transformers if they have not been marked as transformed. Transformers are expected to mark logs transformed after they have been persisted as domain objects.3

There are a number of formatting changes that I'd like to apply (and occasionally did apply in between commits)4, but I've attempted to purge many of them from this PR to minimize the diff. Happy to add any changes that folks request here and follow up with further PRs for additional formatting improvements.

1 We don't necessarily mark headers unchecked in the case of a new combination, because it's possible that combination was already included in previous fetches. This is because eth_getLogs takes a collection of addresses and a collection of topics. If the new transformers address and topic were already included in the collection as a result of two other transformers, we've already fetched those logs.
2 Further work should potentially explore how we can go back and fetch logs only for the new combination of address+topic, if doing so would yield a performance benefit over fetching all logs. However, the behavior in this PR matches the existing implementation, which looks up all logs for a header if any log has not been checked.
3 One thing to note here is that we will likely accumulate a number of logs that have not been and will never be marked as transformed, for the same reason that we don't always mark headers unchecked in the case of a new combination - we are likely to extract logs that match an address and topic in our set but are not actually used by any current transformer.
4 e.g. formatting the migrations, removing the mock_ prefix that is used inconsistently for test fakes, removing redundant initializers, etc.

cmd/execute.go Outdated
for range ticker.C {
w.Execute(recheck)
errs := make(chan error)
go w.Execute(recheck, errs)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't technically need to be run concurrently (we could have Execute return an error), but doing so makes testing easier since the function is spinning up an infinite loop on the happy path (there's never a reason to stop extracting and delegating, only to Sleep for a bit if you haven't found anything new).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get where you're coming from, but it does feel needlessly confusing that the following while-loop, select and error channel are actually just a single return value in disguise 🤔 It definitely makes stuff look more complicated than they really are.

}

func (extractor *LogExtractor) updateCheckedHeaders(config transformer.EventTransformerConfig) error {
hasBeenChecked, hasBeenCheckedErr := extractor.CheckedLogsRepository.HaveLogsBeenChecked(config.ContractAddresses, config.Topic)
Copy link
Contributor Author

@rmulhol rmulhol Aug 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could probably short circuit this in the case where no headers have been marked checked (there's no reason to check the logs repository for the address/topic or update the check_count on public.headers if every header row has a check_count of 0), but you would still want to persist the transformer's addresses + topics in checked_logs, and I couldn't think of a great way to do that without making this code pretty ugly. Felt like running this function all the way through n times where n is the number of transformers wasn't a cost worthy of the optimization, but open to other folks' thoughts.

insertCheckedHeaderQuery = `UPDATE public.headers SET check_count = (SELECT check_count WHERE id = $1) + 1 WHERE id = $1`
)

type CheckedHeadersRepository struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could make an argument for including all of these functions on the headers_repository now that they're only dealing with a column on that table, but figured this was enough of a dedicated use case that we shouldn't bloat that namespace. Open to reconsidering though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it make future refactorisation easier to have it all in one self-contained place? Or do we have no dependencies between these two packages now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hoping this makes it easier to refactor keeping all the check_count stuff separate and consolidated (e.g. can delete or change the table without touching other namespaces) 👍

@@ -103,3 +103,15 @@ func GetFakeUncle(hash, reward string) core.Uncle {
Timestamp: strconv.FormatInt(fakeTimestamp, 10),
}
}

func RandomString(length int) string {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is duplicated from libraries/shared, but I opted not to consolidate just yet since I think you could make an argument for treating libraries and pkg as basically separate repos... 🤔

}

func MarkHeaderCheckedInTransaction(headerID int64, tx *sqlx.Tx, checkedHeadersColumn string) error {
func MarkContractWatcherHeaderCheckedInTransaction(headerID int64, tx *sqlx.Tx, checkedHeadersColumn string) error {
_, err := tx.Exec(`INSERT INTO public.checked_headers (header_id, `+checkedHeadersColumn+`)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: not deleting the checked_headers migration because it's still used by the contract_watcher. Hoping we can apply a similar pattern there in the future and remove this table once and for all ✂️ 🎉

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are MissingHeaders and GetCheckedColumnNames not being used in the contract_watcher?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The contract_watcher has its own header repository that supplies missing headers 👍

May be worth consolidating the different header repositories at some point though 🤔

rmulhol and others added 9 commits August 28, 2019 09:11
- Enable creating new table for logs used in event watching based on
  header sync
- rather than header
- enables executing transformers without full header lookup
- enables decoupling event extraction/persistence from transformation
- modifies event transformer, converter, and log chunker to accept
  payload that includes internal log database ID with log data
- remove alias for transformer pkg as shared_t
- remove unused mock watcher repository
- limit missing headers results set to 100 so that extraction doesn't
  excessively block delegation
- wrap checked headers functions in repository struct
- move storage repository to factory, to correspond with event
  repository path
- remove unused files
- reformat sql
- remove line breaks in imports
Co-Authored-By: Edvard Hübinette <[email protected]>
- extract and delegate logs synchronously after initial goroutine fired
- If a header was marked as checked before a transformer was added to
  the watcher, mark all headers since the new transformer's starting
  block number as unchecked.
@rmulhol rmulhol force-pushed the vdb-699-logs-table branch from 37de974 to 5a87217 Compare August 28, 2019 14:43
- Don't need to maintain it on public.checked_headers if we're not
  adding additional columns to that table
@rmulhol rmulhol force-pushed the vdb-699-logs-table branch from 5a87217 to 5ac76ee Compare August 28, 2019 14:50
Copy link

@aaizuss aaizuss left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really have constructive feedback but I appreciate your tests! I'll check on this when other people leave feedback because I'm sure I'll learn from it.

@@ -14,14 +14,14 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package repository
package storage
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔥


func (delegator *LogDelegator) DelegateLogs() (error, bool) {
if len(delegator.Transformers) < 1 {
return ErrNoTransformers, noLogsFound
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this reads nicely 👍

"math/rand"
)

var _ = Describe("Log extractor", func() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are well written and helpful for understanding how the LogExtractor works ⭐️

@@ -39,7 +39,7 @@ import (

type TransferLog struct {
Id int64 `db:"id"`
VulvanizeLogId int64 `db:"vulcanize_log_id"`
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👀

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😂

endingBlockNumber = startingBlockNumber + 2
outOfRangeBlockNumber = endingBlockNumber + 1

blockNumbers = []int64{startingBlockNumber, middleBlockNumber, endingBlockNumber, outOfRangeBlockNumber}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate the names here

headers, err := repo.UncheckedHeaders(startingBlockNumber, endingBlockNumber, uncheckedCheckCount)

Expect(err).NotTo(HaveOccurred())
// doesn't include outOfRangeBlockNumber
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to compose the Not and ContainElement matchers on the headers to emphasize that headersdoes not includeoutOfRangeBlockNumber` instead of adding a comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will give it a try!


type Converter interface {
ToEntities(contractAbi string, ethLog []types.Log) ([]interface{}, error)
ToEntities(contractAbi string, ethLog []core.HeaderSyncLog) ([]interface{}, error)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a story to update all of the converters since this type is changing?

Copy link
Contributor Author

@rmulhol rmulhol Aug 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put up a PR applying these changes 👍

Copy link
Contributor

@elizabethengelman elizabethengelman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good! Just have a few questions to make sure I understand how this is working. :)

ALTER TABLE logs
DROP CONSTRAINT log_uc;
ALTER TABLE full_sync_logs
DROP CONSTRAINT full_sync_log_uc;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a big deal, but do you think we could just remove this constraint from the previous migration file (db/migrations/00007_create_full_sync_logs_table.sql)? rather than adding it and then immediately removing it?

@@ -6,28 +6,44 @@ require (
github.com/allegro/bigcache v1.2.1 // indirect
github.com/aristanetworks/goarista v0.0.0-20190712234253-ed1100a1c015 // indirect
github.com/dave/jennifer v1.3.0
github.com/deckarep/golang-set v1.7.1 // indirect
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this may be a silly question, but do you know why all of these indirect dependencies were added?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 100% sure, but I'm assuming it's that dependencies of our dependencies have changed

"github.com/jmoiron/sqlx"
log "github.com/sirupsen/logrus"

"github.com/sirupsen/logrus"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -158,7 +158,7 @@ var _ = Describe("Repository", func() {
Expect(err).ToNot(HaveOccurred())
expectedLog := test_helpers.TransferLog{
Id: 1,
VulvanizeLogId: vulcanizeLogId,
VulcanizeLogId: vulcanizeLogId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

@@ -157,10 +160,13 @@ func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) {
} else {
recheck = constants.HeaderMissing
}
ticker := time.NewTicker(pollingInterval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come we're doing this as a sleep in the event_watcher instead of as a ticker here? It is because the storage_watcher doesn't use the interval?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking was that with the ticker we're kicking off a new process every 7 seconds regardless of what else is happening, which can lead to weird situations where two processes are dealing with the same missing header - if the first process gets 1,000,000 missing headers and deals with 500,000 of them in 7 seconds, both the original process and the new one will see the remaining 500,000 as missing.

The event watcher only starts a new process after it has completed a given segment, and sleeps if the last segment it processed was empty (so that we're not endlessly triggering new processes at the head of the chain). Hoping that this results in using resources more efficiently, but definitely welcome feedback!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, that makes sense. Not in scope for this PR, but I wonder if this is something that we could do in watchEthContract and watchEthStorage (I don't know what I was talking about above saying that the storage_watcher doesn't use a ticker 😳)?


type ILogExtractor interface {
AddTransformerConfig(config transformer.EventTransformerConfig) error
ExtractLogs(recheckHeaders constants.TransformerExecution) (error, bool)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big deal, but I think that elsewhere in the code we're returning the error as the second argument rather than the first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I suppose I was thinking this is sort of a weird case because the bool functions similarly to ok in result, ok := mapping[key]. Happy to switch up the order if you think that'd make it more clear though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I too think it makes sense to swap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, can do

var ErrNoWatchedAddresses = errors.New("no watched addresses configured in the log extractor")

const (
missingHeadersFound = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% about this - but do you think it would make sense to name these constants uncheckedHeadersFound and noUncheckedHeadersFound instead of calling them missing headers? I'm wondering if that overlaps with the concept of headers that aren't yet in the db (vs headers that are in the db but need to be checked for applicable logs).

}
}

func (extractor *LogExtractor) updateCheckedHeaders(config transformer.EventTransformerConfig) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason, I'm having trouble understanding this method. I think that it is:

  • checking to see if logs with the given contract addresses + topic combinations have been checked
  • if so, return nil and move on. If not:
  1. mark the headers unchecked so that we'll fetch logs for them in ExtractLogsand
  2. mark the logs as checked

I think 2 is the part that is confusing me. It seems like we're marking logs as checked before we've actually fetched the logs and checked them. I wonder if this method should be split up into updateCheckedHeaders which does just 1. And then another method as updateCheckedLogs that does just 2 that is called after ExtractLogs is run.

I also wonder if calling this in ExtractLogs would work instead of calling it in AddTransformers?

It's also possible that my brain is tired because it's the end of the day. So I'll take another look tomorrow morning. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is definitely a little complicated - would welcome ideas about ways to simplify. The basic idea is that we can add new transformers after the system has already been running for awhile and it will automatically recheck previously-checked headers that are relevant to the new transformer.

Your understanding of the function is correct, and my thinking with marking logs checked here was that (1) it only happens after we've marked headers unchecked, so there's no risk of the log's being marked as checked preventing us from syncing those logs, and (2) marking logs as checked requires iterating through that address+topic0 combinations, which we're already doing in AddTransformers but not ExtractLogs.

Definitely happy to refactor/modify the approach - this was just my first pass at enabling new transformers to sync against already-checked headers with minimal additional code. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand - so the main motivation for marking logs as checked here in updateCheckedHeaders is for a performance gain in not having to iterate through the address + topic0 combos? That makes sense to me.

I think I was mainly confused about marking logs as checked before they were unchecked, but maybe a comment explaining why we're doing this could help? Also what do you think about renaming updateCheckedHeaders to updateCheckedHeadersAndLogs or something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, marking marking both headers and logs as checked, and that we have a header_sync_log table and a checked_logs table tripped me up a bit as well. I don't know if I have any ideas on how this may be simplified, but just thought I'd throw it out there as a potential hang up for folks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking I may change checked_logs to watched_logs or something similar - really the purpose of the table is just to keep track of what logs we were fetching when we marked a header as checked for logs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mean to get stuck on the watched_logs table, I think I'm having a hard time creating a mental model with two tables (in this process) that are both called ..._logs and that kind of have similar data in them and am trying to understand. So, this is totally not merge blocking 😊.

Is there any reason why we can't re-use the header_sync_log table for this purpose? It has the topics and address, so we could potentially add a column for checked or watched?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I'm not sure...

The header_sync_log table has actual log events (likely multiple rows for each given address+topic0). The watched_logs table has one row for each address+topic0 that we're submitting to GetLogsWithCustomQuery.

I don't think we'd need to add a column to header_sync_log if we wanted to avoid adding a separate table - we could potentially SELECT DISTINCT address, topics[0] FROM header_sync_logs to get an idea of what logs we've checked for in the past, but I think that would miss scenarios where we had fetched for a given log but had not received any results. Also thinking that query could maybe get a bit expensive after we've fetched a lot of logs - but maybe that's worth it to avoid adding this table?

What do you think? Does the purpose of the code - trying to uncheck previously checked headers if log fetching calls did not include a new address+topic0 combination - make sense/seem like something we should approach differently?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you're saying - with querying the header_sync_log table we'd miss address+topic0 combos that were checked, but nothing was found. 👍

}

func (repository HeaderSyncLogRepository) GetUntransformedHeaderSyncLogs() ([]core.HeaderSyncLog, error) {
rows, queryErr := repository.db.Queryx(`SELECT * FROM public.header_sync_logs WHERE transformed = false`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this method is checking for header_sync_logs where transformed = false, where are we setting transformed to true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a responsibility of the event transformer repository's Create function in this setup

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh gotcha!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rmulhol Any reason not to move that responsibility up to the watcher in case of a non-error execution of the transformer on that log? :>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guess I just wanted to keep the field switching as close as possible to the place where the truth changed, but I'd be down to move it up if we prioritize not having plugins write back to tables in the public schema. Only concern would be that it maybe prevents us from achieving max async (i.e. rn the delegator could theoretically ignore transformer errors, just send logs and move on - that's not true if it's responsible for flipping fields based on errors in transformer execution)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rmulhol Makes sense. Still kinda feels like we could reduce transformer complexity further here though, if we assume they throw errors on failure (which they do)

- Use assertions instead of comments to document expectations
- Also randomize the fake timestamp in test data
- Missing == not in DB
- Unchecked == logs haven't been fetched
- We're logging that a given log has been included in any fetch calls
  for checked headers, rather than that we have already checked for
  that log
@elizabethengelman
Copy link
Contributor

I think I'm finally understanding how this works. 😳 And just to confirm, this approach assumes that a address+topic0 combination will always be transformed in the same way, is that correct?

For example, if a new transformer is added with an already watched address+topic0, the new transformer will need to rely on the transformed logs from a previous transformer?

@rmulhol
Copy link
Contributor Author

rmulhol commented Sep 12, 2019

I think I'm finally understanding how this works. 😳 And just to confirm, this approach assumes that a address+topic0 combination will always be transformed in the same way, is that correct?

For example, if a new transformer is added with an already watched address+topic0, the new transformer will need to rely on the transformed logs from a previous transformer?

Yeah that's true - the first transformer to be delegated a log will mark the log as transformed, so it will no longer be delegated to other transformers in the future. If we wanted to support multiple transformers for a given log, I think the easiest path would be for the subsequent transformer to target the data created by the first transformer (and/or the logs with the log_ids present on the first transformer's table).

@m0ar
Copy link
Contributor

m0ar commented Sep 16, 2019

Quick thought before I dive into the actual changes: for footnote 3, a contributing factor to this fetched-but-never-transformed set could be that the available functionality in Geth doesn't allow to filter on both address and topic simultaneously, but simply applies the latter after the first.

This means that we can get a log with a topic we care about, but from a contract we track but do not care about that topic specifically.

Copy link
Contributor

@m0ar m0ar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for being totally slow with this review! 🙇‍♀️

cmd/execute.go Outdated
for range ticker.C {
w.Execute(recheck)
errs := make(chan error)
go w.Execute(recheck, errs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get where you're coming from, but it does feel needlessly confusing that the following while-loop, select and error channel are actually just a single return value in disguise 🤔 It definitely makes stuff look more complicated than they really are.

block_number BIGINT,
raw JSONB,
block_timestamp NUMERIC,
check_count INTEGER NOT NULL DEFAULT 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We increment it after we've called FetchLogs for a given header. Basically it's replacing the checked headers table

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rmulhol What do we use the count for? Why not a bool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To enable rechecking headers with a cap on how many times we recheck

@@ -6,28 +6,44 @@ require (
github.com/allegro/bigcache v1.2.1 // indirect
github.com/aristanetworks/goarista v0.0.0-20190712234253-ed1100a1c015 // indirect
github.com/dave/jennifer v1.3.0
github.com/deckarep/golang-set v1.7.1 // indirect
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious as well

// Goes through an array of logs, associating relevant logs (matching addresses and topic) with transformers
func (chunker *LogChunker) ChunkLogs(logs []types.Log) map[string][]types.Log {
chunks := map[string][]types.Log{}
// Goes through a slice of logs, associating relevant logs (matching addresses and topic) with transformers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 🔪

if getTopicZeroExistsErr != nil {
return false, getTopicZeroExistsErr
}
return topicZeroExists, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking it might be an alternative to create a migration with a helper function that checks this in the DB, would reduce this to a single call

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like, on one hand let's not move application logic into the db, but otoh these types of chained queries are getting old

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me take a whirl at making it a single query - think we might be able to do that with the existing setup

return insertErr
}
}
return tx.Commit()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This solution is really nice! 🐈

}

func (repository HeaderSyncLogRepository) GetUntransformedHeaderSyncLogs() ([]core.HeaderSyncLog, error) {
rows, queryErr := repository.db.Queryx(`SELECT * FROM public.header_sync_logs WHERE transformed = false`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rmulhol Any reason not to move that responsibility up to the watcher in case of a non-error execution of the transformer on that log? :>

BlockHash: common.HexToHash(rawLog.BlockHash),
Index: rawLog.LogIndex,
// TODO: revisit if not cascade deleting logs when header removed
// currently, fetched logs are cascade deleted if removed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was this referring to again? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

types.Log includes a Removed field that's set to true if you're subscribed to logs over the pubsub rpc and a log is removed by a reorg. When we're recreating a types.Log out of a log in the DB, we're always saying Removed is false because we're assuming we're only holding onto logs that are not removed. So we need to reconsider our Removed assignment if we start holding onto removed logs

}

func (repository HeaderSyncLogRepository) insertLog(headerID int64, log types.Log, tx *sqlx.Tx) error {
topics := buildTopics(log)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come we need this? Are the topics just jumbled into the same byte array? No separator even?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The topics column is an array of byte arrays. Felt nicer than having separate columns for each topic (especially given that many logs don't fill all four columns), but I'm open to reconsidering (especially if we need to do something like filter by topic0)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yah, makes sense

@m0ar
Copy link
Contributor

m0ar commented Sep 17, 2019

Yo another question, does this implement/make possible removing all event metadata fields from the individual tables? (tx_idx, log_idx, raw_log)

@rmulhol
Copy link
Contributor Author

rmulhol commented Sep 17, 2019

Yo another question, does this implement/make possible removing all event metadata fields from the individual tables? (tx_idx, log_idx, raw_log)

Yes 😎

- Replaces bool and moots question of error/bool ordering
- Also make event watcher execution synchronous
- Prevent extract/delegate from spinning when other side is being
  simulated
@m0ar
Copy link
Contributor

m0ar commented Sep 19, 2019

Totally dig 4fa19be! 🙆‍♀️

This is looking solid to me now! 🏆

@rmulhol rmulhol merged commit 2b151c2 into staging Sep 20, 2019
@rmulhol rmulhol deleted the vdb-699-logs-table branch September 20, 2019 15:52
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants