Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

VDB-696 Update transformer execute #101

Merged
merged 6 commits into from
Jun 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions libraries/shared/factories/event/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@
package event

import (
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
)

type Repository interface {
Create(headerID int64, models []interface{}) error
MarkHeaderChecked(headerID int64) error
MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This transformer specific MissingHeaders method was never being used. Instead this generic MissingHeaders method is being called the watcher and then the headers are passed into Transformer.Execute.

RecheckHeaders(startingBlockNumber, endingBlockNUmber int64) ([]core.Header, error)
SetDB(db *postgres.DB)
}
3 changes: 1 addition & 2 deletions libraries/shared/factories/event/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/ethereum/go-ethereum/core/types"
log "github.com/sirupsen/logrus"

"github.com/vulcanize/vulcanizedb/libraries/shared/constants"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
Expand All @@ -37,7 +36,7 @@ func (transformer Transformer) NewTransformer(db *postgres.DB) transformer.Event
return transformer
}

func (transformer Transformer) Execute(logs []types.Log, header core.Header, recheckHeaders constants.TransformerExecution) error {
func (transformer Transformer) Execute(logs []types.Log, header core.Header) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This recheckHeaders argument is not being used in this Execute method.

transformerName := transformer.Config.TransformerName
config := transformer.Config

Expand Down
21 changes: 10 additions & 11 deletions libraries/shared/factories/event/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"github.com/vulcanize/vulcanizedb/libraries/shared/constants"
"github.com/vulcanize/vulcanizedb/libraries/shared/factories/event"
"github.com/vulcanize/vulcanizedb/libraries/shared/mocks"
"github.com/vulcanize/vulcanizedb/libraries/shared/test_data"
Expand Down Expand Up @@ -60,14 +59,14 @@ var _ = Describe("Transformer", func() {
})

It("marks header checked if no logs returned", func() {
err := t.Execute([]types.Log{}, headerOne, constants.HeaderMissing)
err := t.Execute([]types.Log{}, headerOne)

Expect(err).NotTo(HaveOccurred())
repository.AssertMarkHeaderCheckedCalledWith(headerOne.Id)
})

It("doesn't attempt to convert or persist an empty collection when there are no logs", func() {
err := t.Execute([]types.Log{}, headerOne, constants.HeaderMissing)
err := t.Execute([]types.Log{}, headerOne)

Expect(err).NotTo(HaveOccurred())
Expect(converter.ToEntitiesCalledCounter).To(Equal(0))
Expand All @@ -76,7 +75,7 @@ var _ = Describe("Transformer", func() {
})

It("does not call repository.MarkCheckedHeader when there are logs", func() {
err := t.Execute(logs, headerOne, constants.HeaderMissing)
err := t.Execute(logs, headerOne)

Expect(err).NotTo(HaveOccurred())
repository.AssertMarkHeaderCheckedNotCalled()
Expand All @@ -85,14 +84,14 @@ var _ = Describe("Transformer", func() {
It("returns error if marking header checked returns err", func() {
repository.SetMarkHeaderCheckedError(fakes.FakeError)

err := t.Execute([]types.Log{}, headerOne, constants.HeaderMissing)
err := t.Execute([]types.Log{}, headerOne)

Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
})

It("converts an eth log to an entity", func() {
err := t.Execute(logs, headerOne, constants.HeaderMissing)
err := t.Execute(logs, headerOne)

Expect(err).NotTo(HaveOccurred())
Expect(converter.ContractAbi).To(Equal(config.ContractAbi))
Expand All @@ -102,7 +101,7 @@ var _ = Describe("Transformer", func() {
It("returns an error if converter fails", func() {
converter.ToEntitiesError = fakes.FakeError

err := t.Execute(logs, headerOne, constants.HeaderMissing)
err := t.Execute(logs, headerOne)

Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
Expand All @@ -111,7 +110,7 @@ var _ = Describe("Transformer", func() {
It("converts an entity to a model", func() {
converter.EntitiesToReturn = []interface{}{test_data.GenericEntity{}}

err := t.Execute(logs, headerOne, constants.HeaderMissing)
err := t.Execute(logs, headerOne)

Expect(err).NotTo(HaveOccurred())
Expect(converter.EntitiesToConvert[0]).To(Equal(test_data.GenericEntity{}))
Expand All @@ -121,7 +120,7 @@ var _ = Describe("Transformer", func() {
converter.EntitiesToReturn = []interface{}{test_data.GenericEntity{}}
converter.ToModelsError = fakes.FakeError

err := t.Execute(logs, headerOne, constants.HeaderMissing)
err := t.Execute(logs, headerOne)

Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
Expand All @@ -130,7 +129,7 @@ var _ = Describe("Transformer", func() {
It("persists the record", func() {
converter.ModelsToReturn = []interface{}{test_data.GenericModel{}}

err := t.Execute(logs, headerOne, constants.HeaderMissing)
err := t.Execute(logs, headerOne)

Expect(err).NotTo(HaveOccurred())
Expect(repository.PassedHeaderID).To(Equal(headerOne.Id))
Expand All @@ -139,7 +138,7 @@ var _ = Describe("Transformer", func() {

It("returns error if persisting the record fails", func() {
repository.SetCreateError(fakes.FakeError)
err := t.Execute(logs, headerOne, constants.HeaderMissing)
err := t.Execute(logs, headerOne)

Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
Expand Down
3 changes: 1 addition & 2 deletions libraries/shared/mocks/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package mocks
import (
"github.com/ethereum/go-ethereum/core/types"

"github.com/vulcanize/vulcanizedb/libraries/shared/constants"
shared_t "github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
Expand All @@ -33,7 +32,7 @@ type MockTransformer struct {
config shared_t.EventTransformerConfig
}

func (mh *MockTransformer) Execute(logs []types.Log, header core.Header, recheckHeaders constants.TransformerExecution) error {
func (mh *MockTransformer) Execute(logs []types.Log, header core.Header) error {
if mh.ExecuteError != nil {
return mh.ExecuteError
}
Expand Down
69 changes: 27 additions & 42 deletions libraries/shared/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,31 +69,6 @@ func MissingHeaders(startingBlockNumber, endingBlockNumber int64, db *postgres.D
return result, err
}

func RecheckHeaders(startingBlockNumber, endingBlockNumber int64, db *postgres.DB, checkedHeadersColumn string) ([]core.Header, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this method wasn't being called any longer.

Also, something else I hadn't realized previously was that in order to allow for rechecking headers, we need to pass in a CLI argument --recheck-headers when running composeAndExecute. 🌠

var result []core.Header
var query string
var err error

if endingBlockNumber == -1 {
query = `SELECT headers.id, headers.block_number, headers.hash FROM headers
LEFT JOIN checked_headers on headers.id = header_id
WHERE ` + checkedHeadersColumn + ` between 1 and ` + constants.RecheckHeaderCap + `
AND headers.block_number >= $1
AND headers.eth_node_fingerprint = $2`
err = db.Select(&result, query, startingBlockNumber, db.Node.ID)
} else {
query = `SELECT headers.id, headers.block_number, headers.hash FROM headers
LEFT JOIN checked_headers on headers.id = header_id
WHERE ` + checkedHeadersColumn + ` between 1 and ` + constants.RecheckHeaderCap + `
AND headers.block_number >= $1
AND headers.block_number <= $2
AND headers.eth_node_fingerprint = $3`
err = db.Select(&result, query, startingBlockNumber, endingBlockNumber, db.Node.ID)
}

return result, err
}

func GetCheckedColumnNames(db *postgres.DB) ([]string, error) {
// Query returns `[]driver.Value`, nullable polymorphic interface
var queryResult []driver.Value
Expand Down Expand Up @@ -121,37 +96,47 @@ func GetCheckedColumnNames(db *postgres.DB) ([]string, error) {
return columnNames, nil
}

// Builds a SQL string that checks if any column value is 0, given the column names.
// Builds a SQL string that checks if any column should be checked/rechecked.
// Defaults to FALSE when no columns are provided.
// Ex: ["columnA", "columnB"] => "NOT (columnA!=0 AND columnB!=0)"
// [] => "FALSE"
func CreateNotCheckedSQL(boolColumns []string, recheckHeaders constants.TransformerExecution) string {

var result bytes.Buffer

func CreateHeaderCheckedPredicateSQL(boolColumns []string, recheckHeaders constants.TransformerExecution) string {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored this method into two smaller ones & changed the name as I was trying to understand how it was working. If folks don't find this refactoring helpful, I'm happy to undo it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes more sense this way 💯

if len(boolColumns) == 0 {
return "FALSE"
}

result.WriteString("NOT (")
if recheckHeaders {
return createHeaderCheckedPredicateSQLForRecheckedHeaders(boolColumns)
} else {
return createHeaderCheckedPredicateSQLForMissingHeaders(boolColumns)
}
}

func createHeaderCheckedPredicateSQLForMissingHeaders(boolColumns []string) string {
var result bytes.Buffer
result.WriteString(" (")

// Loop excluding last column name
for _, column := range boolColumns[:len(boolColumns)-1] {

if recheckHeaders {
result.WriteString(fmt.Sprintf("%v>=%s AND ", column, constants.RecheckHeaderCap))
} else {
result.WriteString(fmt.Sprintf("%v!=0 AND ", column))
}
result.WriteString(fmt.Sprintf("%v=0 OR ", column))
}

// No trailing "OR" for the last column name
if recheckHeaders {
result.WriteString(fmt.Sprintf("%v>=%s)", boolColumns[len(boolColumns)-1], constants.RecheckHeaderCap))
} else {
result.WriteString(fmt.Sprintf("%v!=0)", boolColumns[len(boolColumns)-1]))
result.WriteString(fmt.Sprintf("%v=0)", boolColumns[len(boolColumns)-1]))

return result.String()
}

func createHeaderCheckedPredicateSQLForRecheckedHeaders(boolColumns []string) string {
var result bytes.Buffer
result.WriteString(" (")

// Loop excluding last column name
for _, column := range boolColumns[:len(boolColumns)-1] {
result.WriteString(fmt.Sprintf("%v<%s OR ", column, constants.RecheckHeaderCap))
}

// No trailing "OR" for the last column name
result.WriteString(fmt.Sprintf("%v<%s)", boolColumns[len(boolColumns)-1], constants.RecheckHeaderCap))

return result.String()
}
Loading