diff --git a/libraries/shared/repository/repository.go b/libraries/shared/repository/repository.go deleted file mode 100644 index 155e27aed..000000000 --- a/libraries/shared/repository/repository.go +++ /dev/null @@ -1,27 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package repository - -import "github.com/jmoiron/sqlx" - -func MarkContractWatcherHeaderCheckedInTransaction(headerID int64, tx *sqlx.Tx, checkedHeadersColumn string) error { - _, err := tx.Exec(`INSERT INTO public.checked_headers (header_id, `+checkedHeadersColumn+`) - VALUES ($1, $2) - ON CONFLICT (header_id) DO - UPDATE SET `+checkedHeadersColumn+` = checked_headers.`+checkedHeadersColumn+` + 1`, headerID, 1) - return err -} diff --git a/libraries/shared/repository/repository_test.go b/libraries/shared/repository/repository_test.go deleted file mode 100644 index 72a613af0..000000000 --- a/libraries/shared/repository/repository_test.go +++ /dev/null @@ -1,67 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package repository_test - -import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/libraries/shared/repository" - "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" - "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" - "github.com/vulcanize/vulcanizedb/pkg/fakes" - "github.com/vulcanize/vulcanizedb/test_config" -) - -var _ = Describe("", func() { - Describe("MarkContractWatcherHeaderCheckedInTransaction", func() { - var ( - checkedHeadersColumn string - db *postgres.DB - ) - - BeforeEach(func() { - db = test_config.NewTestDB(test_config.NewTestNode()) - test_config.CleanTestDB(db) - checkedHeadersColumn = "test_column_checked" - _, migrateErr := db.Exec(`ALTER TABLE public.checked_headers - ADD COLUMN ` + checkedHeadersColumn + ` integer`) - Expect(migrateErr).NotTo(HaveOccurred()) - }) - - AfterEach(func() { - _, cleanupMigrateErr := db.Exec(`ALTER TABLE public.checked_headers DROP COLUMN ` + checkedHeadersColumn) - Expect(cleanupMigrateErr).NotTo(HaveOccurred()) - }) - - It("marks passed header as checked within a passed transaction", func() { - headerRepository := repositories.NewHeaderRepository(db) - headerID, headerErr := headerRepository.CreateOrUpdateHeader(fakes.FakeHeader) - Expect(headerErr).NotTo(HaveOccurred()) - tx, txErr := db.Beginx() - Expect(txErr).NotTo(HaveOccurred()) - - err := repository.MarkContractWatcherHeaderCheckedInTransaction(headerID, tx, checkedHeadersColumn) - Expect(err).NotTo(HaveOccurred()) - commitErr := tx.Commit() - Expect(commitErr).NotTo(HaveOccurred()) - var checkedCount int - fetchErr := db.Get(&checkedCount, `SELECT COUNT(*) FROM public.checked_headers WHERE header_id = $1`, headerID) - Expect(fetchErr).NotTo(HaveOccurred()) - Expect(checkedCount).To(Equal(1)) - }) - }) -}) diff --git a/pkg/contract_watcher/header/transformer/transformer.go b/pkg/contract_watcher/header/transformer/transformer.go index b91d98c86..fd7c88b67 100644 --- a/pkg/contract_watcher/header/transformer/transformer.go +++ b/pkg/contract_watcher/header/transformer/transformer.go @@ -128,7 +128,7 @@ func (tr *Transformer) Init() error { firstBlock, retrieveErr := tr.Retriever.RetrieveFirstBlock() if retrieveErr != nil { if retrieveErr == sql.ErrNoRows { - logrus.Error(retrieveErr) + logrus.Error(fmt.Errorf("error retrieving first block: %s", retrieveErr.Error())) firstBlock = 0 } else { return fmt.Errorf("error retrieving first block: %s", retrieveErr.Error()) @@ -252,12 +252,6 @@ func (tr *Transformer) Execute() error { continue } - // Sort logs by the contract they belong to - // Start by adding every contract addr to the map - // So that if we don't have any logs for it, it is caught and the header is still marked checked for its events - for _, addr := range tr.contractAddresses { - sortedLogs[addr] = nil - } for _, log := range allLogs { addr := strings.ToLower(log.Address.Hex()) sortedLogs[addr] = append(sortedLogs[addr], log) @@ -267,14 +261,6 @@ func (tr *Transformer) Execute() error { for conAddr, logs := range sortedLogs { con := tr.Contracts[conAddr] if len(logs) < 1 { - eventIds := make([]string, 0) - for eventName := range con.Events { - eventIds = append(eventIds, strings.ToLower(eventName+"_"+con.Address)) - } - markCheckedErr := tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, eventIds) - if markCheckedErr != nil { - return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error()) - } logrus.Tracef("no logs found for contract %s at block %d, continuing", conAddr, header.BlockNumber) continue } @@ -290,16 +276,10 @@ func (tr *Transformer) Execute() error { for eventName, logs := range convertedLogs { // If logs for this event are empty, mark them checked at this header and continue if len(logs) < 1 { - eventID := strings.ToLower(eventName + "_" + con.Address) - markCheckedErr := tr.HeaderRepository.MarkHeaderChecked(header.Id, eventID) - if markCheckedErr != nil { - return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error()) - } logrus.Tracef("no logs found for event %s on contract %s at block %d, continuing", eventName, conAddr, header.BlockNumber) continue } // If logs aren't empty, persist them - // Header is marked checked in the transactions persistErr := tr.EventRepository.PersistLogs(logs, con.Events[eventName], con.Address, con.Name) if persistErr != nil { return fmt.Errorf("error persisting logs: %s", persistErr.Error()) @@ -307,6 +287,11 @@ func (tr *Transformer) Execute() error { } } + markCheckedErr := tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, tr.eventIds) + if markCheckedErr != nil { + return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error()) + } + // Poll contracts at this block height pollingErr := tr.methodPolling(header, tr.sortedMethodIds) if pollingErr != nil { diff --git a/pkg/contract_watcher/shared/repository/event_repository.go b/pkg/contract_watcher/shared/repository/event_repository.go index d6d3c601f..ab5756dff 100644 --- a/pkg/contract_watcher/shared/repository/event_repository.go +++ b/pkg/contract_watcher/shared/repository/event_repository.go @@ -24,7 +24,6 @@ import ( "github.com/hashicorp/golang-lru" "github.com/sirupsen/logrus" - "github.com/vulcanize/vulcanizedb/libraries/shared/repository" "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/types" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) @@ -144,17 +143,6 @@ func (r *eventRepository) persistHeaderSyncLogs(logs []types.Log, eventInfo type } } - // Mark header as checked for this eventId - eventID := strings.ToLower(eventInfo.Name + "_" + contractAddr) - markCheckedErr := repository.MarkContractWatcherHeaderCheckedInTransaction(logs[0].ID, tx, eventID) // This assumes all logs are from same block - if markCheckedErr != nil { - rollbackErr := tx.Rollback() - if rollbackErr != nil { - logrus.Warnf("error rolling back transaction while marking header checked: %s", rollbackErr.Error()) - } - return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error()) - } - return tx.Commit() } diff --git a/pkg/contract_watcher/shared/repository/event_repository_test.go b/pkg/contract_watcher/shared/repository/event_repository_test.go index 40f1a783c..bc61d7760 100644 --- a/pkg/contract_watcher/shared/repository/event_repository_test.go +++ b/pkg/contract_watcher/shared/repository/event_repository_test.go @@ -355,11 +355,6 @@ var _ = Describe("Repository", func() { Expect(count).To(Equal(2)) }) - It("Fails if the persisted event does not have a corresponding eventID column in the checked_headers table", func() { - err = dataStore.PersistLogs(logs, event, con.Address, con.Name) - Expect(err).To(HaveOccurred()) - }) - It("Fails with empty log", func() { err = dataStore.PersistLogs([]types.Log{}, event, con.Address, con.Name) Expect(err).To(HaveOccurred())