diff --git a/services/horizon/internal/ingest/verify.go b/services/horizon/internal/ingest/verify.go index 9c5dbefacd..2538cf7268 100644 --- a/services/horizon/internal/ingest/verify.go +++ b/services/horizon/internal/ingest/verify.go @@ -152,9 +152,7 @@ func (s *system) verifyState(verifyAgainstLatestCheckpoint bool) error { } defer stateReader.Close() - verifier := &verify.StateVerifier{ - StateReader: stateReader, - } + verifier := verify.NewStateVerifier(stateReader, nil) assetStats := processors.AssetStatSet{} total := int64(0) diff --git a/services/horizon/internal/ingest/verify/main.go b/services/horizon/internal/ingest/verify/main.go index 256709e27f..416bc7b9de 100644 --- a/services/horizon/internal/ingest/verify/main.go +++ b/services/horizon/internal/ingest/verify/main.go @@ -23,7 +23,7 @@ type TransformLedgerEntryFunction func(xdr.LedgerEntry) (ignore bool, newEntry x // StateVerifier verifies if ledger entries provided by Add method are the same // as in the checkpoint ledger entries provided by CheckpointChangeReader. // The algorithm works in the following way: -// 0. Develop TransformFunction. It should remove all fields and objects not +// 0. Develop `transformFunction`. It should remove all fields and objects not // stored in your app. For example, if you only store accounts, all other // ledger entry types should be ignored (return ignore = true). // 1. In a loop, get entries from history archive by calling GetEntries() @@ -32,19 +32,28 @@ type TransformLedgerEntryFunction func(xdr.LedgerEntry) (ignore bool, newEntry x // entries in your storage (to find if some extra entires exist in your // storage). // Functions will return StateError type if state is found to be incorrect. -// It's user responsibility to call `StateReader.Close()` when reading is done. +// It's user responsibility to call `stateReader.Close()` when reading is done. // Check Horizon for an example how to use this tool. type StateVerifier struct { - StateReader ingest.ChangeReader - // TransformFunction transforms (or ignores) ledger entries streamed from + stateReader ingest.ChangeReader + // transformFunction transforms (or ignores) ledger entries streamed from // checkpoint buckets to match the form added by `Write`. Read // TransformLedgerEntryFunction godoc for more information. - TransformFunction TransformLedgerEntryFunction + transformFunction TransformLedgerEntryFunction readEntries int readingDone bool currentEntries map[string]xdr.LedgerEntry + encodingBuffer *xdr.EncodingBuffer +} + +func NewStateVerifier(stateReader ingest.ChangeReader, tf TransformLedgerEntryFunction) *StateVerifier { + return &StateVerifier{ + stateReader: stateReader, + transformFunction: tf, + encodingBuffer: xdr.NewEncodingBuffer(), + } } // GetLedgerKeys returns up to `count` ledger keys from history buckets @@ -59,7 +68,7 @@ func (v *StateVerifier) GetLedgerKeys(count int) ([]xdr.LedgerKey, error) { v.currentEntries = make(map[string]xdr.LedgerEntry) for count > 0 { - entryChange, err := v.StateReader.Read() + entryChange, err := v.stateReader.Read() if err != nil { if err == io.EOF { v.readingDone = true @@ -70,15 +79,15 @@ func (v *StateVerifier) GetLedgerKeys(count int) ([]xdr.LedgerKey, error) { entry := *entryChange.Post - if v.TransformFunction != nil { - ignore, _ := v.TransformFunction(entry) + if v.transformFunction != nil { + ignore, _ := v.transformFunction(entry) if ignore { continue } } ledgerKey := entry.LedgerKey() - key, err := ledgerKey.MarshalBinary() + key, err := v.encodingBuffer.MarshalBinary(ledgerKey) if err != nil { return keys, errors.Wrap(err, "Error marshaling ledgerKey") } @@ -101,12 +110,13 @@ func (v *StateVerifier) GetLedgerKeys(count int) ([]xdr.LedgerKey, error) { // Any `StateError` returned by this method indicates invalid state! func (v *StateVerifier) Write(entry xdr.LedgerEntry) error { actualEntry := entry.Normalize() - actualEntryMarshaled, err := actualEntry.MarshalBinary() + actualEntryMarshaled, err := v.encodingBuffer.MarshalBinary(actualEntry) if err != nil { return errors.Wrap(err, "Error marshaling actualEntry") } - key, err := actualEntry.LedgerKey().MarshalBinary() + // safe, since we convert to string right away (causing a copy) + key, err := v.encodingBuffer.UnsafeMarshalBinary(actualEntry.LedgerKey()) if err != nil { return errors.Wrap(err, "Error marshaling ledgerKey") } @@ -122,25 +132,25 @@ func (v *StateVerifier) Write(entry xdr.LedgerEntry) error { delete(v.currentEntries, string(key)) preTransformExpectedEntry := expectedEntry - preTransformExpectedEntryMarshaled, err := preTransformExpectedEntry.MarshalBinary() + preTransformExpectedEntryMarshaled, err := v.encodingBuffer.MarshalBinary(&preTransformExpectedEntry) if err != nil { return errors.Wrap(err, "Error marshaling preTransformExpectedEntry") } - if v.TransformFunction != nil { + if v.transformFunction != nil { var ignore bool - ignore, expectedEntry = v.TransformFunction(expectedEntry) + ignore, expectedEntry = v.transformFunction(expectedEntry) // Extra check: if entry was ignored in GetEntries, it shouldn't be // ignored here. if ignore { return errors.Errorf( - "Entry ignored in GetEntries but not ignored in Write: %s. Possibly TransformFunction is buggy.", + "Entry ignored in GetEntries but not ignored in Write: %s. Possibly transformFunction is buggy.", base64.StdEncoding.EncodeToString(preTransformExpectedEntryMarshaled), ) } } - expectedEntryMarshaled, err := expectedEntry.MarshalBinary() + expectedEntryMarshaled, err := v.encodingBuffer.MarshalBinary(&expectedEntry) if err != nil { return errors.Wrap(err, "Error marshaling expectedEntry") } @@ -195,7 +205,7 @@ func (v *StateVerifier) checkUnreadEntries() error { } // Ignore error as StateError below is more important - entryString, _ := xdr.MarshalBase64(entry) + entryString, _ := v.encodingBuffer.MarshalBase64(&entry) return ingest.NewStateError(errors.Errorf( "Entries (%d) not found locally, example: %s", len(v.currentEntries), diff --git a/services/horizon/internal/ingest/verify/main_test.go b/services/horizon/internal/ingest/verify/main_test.go index 1b91e39f8b..6f8b2947ed 100644 --- a/services/horizon/internal/ingest/verify/main_test.go +++ b/services/horizon/internal/ingest/verify/main_test.go @@ -36,9 +36,7 @@ type StateVerifierTestSuite struct { func (s *StateVerifierTestSuite) SetupTest() { s.mockStateReader = &ingest.MockChangeReader{} - s.verifier = &StateVerifier{ - StateReader: s.mockStateReader, - } + s.verifier = NewStateVerifier(s.mockStateReader, nil) } func (s *StateVerifierTestSuite) TearDownTest() { @@ -103,7 +101,7 @@ func (s *StateVerifierTestSuite) TestTransformFunction() { s.mockStateReader.On("Read").Return(ingest.Change{}, io.EOF).Once() - s.verifier.TransformFunction = + s.verifier.transformFunction = func(entry xdr.LedgerEntry) (ignore bool, newEntry xdr.LedgerEntry) { // Leave Account ID only for accounts, ignore the rest switch entry.Data.Type { @@ -190,7 +188,7 @@ func (s *StateVerifierTestSuite) TestTransformFunctionBuggyIgnore() { Post: &accountEntry, }, nil).Once() - s.verifier.TransformFunction = + s.verifier.transformFunction = func(entry xdr.LedgerEntry) (ignore bool, newEntry xdr.LedgerEntry) { return false, xdr.LedgerEntry{} } @@ -199,8 +197,8 @@ func (s *StateVerifierTestSuite) TestTransformFunctionBuggyIgnore() { s.Assert().NoError(err) s.Assert().Len(keys, 1) - // Check the behaviour of TransformFunction to code path to test. - s.verifier.TransformFunction = + // Check the behaviour of transformFunction to code path to test. + s.verifier.transformFunction = func(entry xdr.LedgerEntry) (ignore bool, newEntry xdr.LedgerEntry) { return true, xdr.LedgerEntry{} } @@ -208,7 +206,7 @@ func (s *StateVerifierTestSuite) TestTransformFunctionBuggyIgnore() { entryBase64, err := xdr.MarshalBase64(accountEntry) s.Assert().NoError(err) errorMsg := fmt.Sprintf( - "Entry ignored in GetEntries but not ignored in Write: %s. Possibly TransformFunction is buggy.", + "Entry ignored in GetEntries but not ignored in Write: %s. Possibly transformFunction is buggy.", entryBase64, ) err = s.verifier.Write(accountEntry) diff --git a/xdr/main.go b/xdr/main.go index cd08e82676..9f45e24f37 100644 --- a/xdr/main.go +++ b/xdr/main.go @@ -167,6 +167,16 @@ func (e *EncodingBuffer) UnsafeMarshalHex(encodable XDREncodable) ([]byte, error return e.otherEncodersBuf, nil } +func (e *EncodingBuffer) MarshalBinary(encodable XDREncodable) ([]byte, error) { + xdrEncoded, err := e.UnsafeMarshalBinary(encodable) + if err != nil { + return nil, err + } + ret := make([]byte, len(xdrEncoded)) + copy(ret, xdrEncoded) + return ret, nil +} + func (e *EncodingBuffer) MarshalBase64(encodable XDREncodable) (string, error) { b, err := e.UnsafeMarshalBase64(encodable) if err != nil {