Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Use encoding buffer in state verifier #4069

Merged
merged 1 commit into from
Nov 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions services/horizon/internal/ingest/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,7 @@ func (s *system) verifyState(verifyAgainstLatestCheckpoint bool) error {
}
defer stateReader.Close()

verifier := &verify.StateVerifier{
StateReader: stateReader,
}
verifier := verify.NewStateVerifier(stateReader, nil)

assetStats := processors.AssetStatSet{}
total := int64(0)
Expand Down
44 changes: 27 additions & 17 deletions services/horizon/internal/ingest/verify/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type TransformLedgerEntryFunction func(xdr.LedgerEntry) (ignore bool, newEntry x
// StateVerifier verifies if ledger entries provided by Add method are the same
// as in the checkpoint ledger entries provided by CheckpointChangeReader.
// The algorithm works in the following way:
// 0. Develop TransformFunction. It should remove all fields and objects not
// 0. Develop `transformFunction`. It should remove all fields and objects not
// stored in your app. For example, if you only store accounts, all other
// ledger entry types should be ignored (return ignore = true).
// 1. In a loop, get entries from history archive by calling GetEntries()
Expand All @@ -32,19 +32,28 @@ type TransformLedgerEntryFunction func(xdr.LedgerEntry) (ignore bool, newEntry x
// entries in your storage (to find if some extra entires exist in your
// storage).
// Functions will return StateError type if state is found to be incorrect.
// It's user responsibility to call `StateReader.Close()` when reading is done.
// It's user responsibility to call `stateReader.Close()` when reading is done.
// Check Horizon for an example how to use this tool.
type StateVerifier struct {
StateReader ingest.ChangeReader
// TransformFunction transforms (or ignores) ledger entries streamed from
stateReader ingest.ChangeReader
// transformFunction transforms (or ignores) ledger entries streamed from
// checkpoint buckets to match the form added by `Write`. Read
// TransformLedgerEntryFunction godoc for more information.
TransformFunction TransformLedgerEntryFunction
transformFunction TransformLedgerEntryFunction

readEntries int
readingDone bool

currentEntries map[string]xdr.LedgerEntry
encodingBuffer *xdr.EncodingBuffer
}

func NewStateVerifier(stateReader ingest.ChangeReader, tf TransformLedgerEntryFunction) *StateVerifier {
return &StateVerifier{
stateReader: stateReader,
transformFunction: tf,
encodingBuffer: xdr.NewEncodingBuffer(),
}
}

// GetLedgerKeys returns up to `count` ledger keys from history buckets
Expand All @@ -59,7 +68,7 @@ func (v *StateVerifier) GetLedgerKeys(count int) ([]xdr.LedgerKey, error) {
v.currentEntries = make(map[string]xdr.LedgerEntry)

for count > 0 {
entryChange, err := v.StateReader.Read()
entryChange, err := v.stateReader.Read()
if err != nil {
if err == io.EOF {
v.readingDone = true
Expand All @@ -70,15 +79,15 @@ func (v *StateVerifier) GetLedgerKeys(count int) ([]xdr.LedgerKey, error) {

entry := *entryChange.Post

if v.TransformFunction != nil {
ignore, _ := v.TransformFunction(entry)
if v.transformFunction != nil {
ignore, _ := v.transformFunction(entry)
if ignore {
continue
}
}

ledgerKey := entry.LedgerKey()
key, err := ledgerKey.MarshalBinary()
key, err := v.encodingBuffer.MarshalBinary(ledgerKey)
if err != nil {
return keys, errors.Wrap(err, "Error marshaling ledgerKey")
}
Expand All @@ -101,12 +110,13 @@ func (v *StateVerifier) GetLedgerKeys(count int) ([]xdr.LedgerKey, error) {
// Any `StateError` returned by this method indicates invalid state!
func (v *StateVerifier) Write(entry xdr.LedgerEntry) error {
actualEntry := entry.Normalize()
actualEntryMarshaled, err := actualEntry.MarshalBinary()
actualEntryMarshaled, err := v.encodingBuffer.MarshalBinary(actualEntry)
if err != nil {
return errors.Wrap(err, "Error marshaling actualEntry")
}

key, err := actualEntry.LedgerKey().MarshalBinary()
// safe, since we convert to string right away (causing a copy)
key, err := v.encodingBuffer.UnsafeMarshalBinary(actualEntry.LedgerKey())
if err != nil {
return errors.Wrap(err, "Error marshaling ledgerKey")
}
Expand All @@ -122,25 +132,25 @@ func (v *StateVerifier) Write(entry xdr.LedgerEntry) error {
delete(v.currentEntries, string(key))

preTransformExpectedEntry := expectedEntry
preTransformExpectedEntryMarshaled, err := preTransformExpectedEntry.MarshalBinary()
preTransformExpectedEntryMarshaled, err := v.encodingBuffer.MarshalBinary(&preTransformExpectedEntry)
if err != nil {
return errors.Wrap(err, "Error marshaling preTransformExpectedEntry")
}

if v.TransformFunction != nil {
if v.transformFunction != nil {
var ignore bool
ignore, expectedEntry = v.TransformFunction(expectedEntry)
ignore, expectedEntry = v.transformFunction(expectedEntry)
// Extra check: if entry was ignored in GetEntries, it shouldn't be
// ignored here.
if ignore {
return errors.Errorf(
"Entry ignored in GetEntries but not ignored in Write: %s. Possibly TransformFunction is buggy.",
"Entry ignored in GetEntries but not ignored in Write: %s. Possibly transformFunction is buggy.",
base64.StdEncoding.EncodeToString(preTransformExpectedEntryMarshaled),
)
}
}

expectedEntryMarshaled, err := expectedEntry.MarshalBinary()
expectedEntryMarshaled, err := v.encodingBuffer.MarshalBinary(&expectedEntry)
if err != nil {
return errors.Wrap(err, "Error marshaling expectedEntry")
}
Expand Down Expand Up @@ -195,7 +205,7 @@ func (v *StateVerifier) checkUnreadEntries() error {
}

// Ignore error as StateError below is more important
entryString, _ := xdr.MarshalBase64(entry)
entryString, _ := v.encodingBuffer.MarshalBase64(&entry)
return ingest.NewStateError(errors.Errorf(
"Entries (%d) not found locally, example: %s",
len(v.currentEntries),
Expand Down
14 changes: 6 additions & 8 deletions services/horizon/internal/ingest/verify/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ type StateVerifierTestSuite struct {

func (s *StateVerifierTestSuite) SetupTest() {
s.mockStateReader = &ingest.MockChangeReader{}
s.verifier = &StateVerifier{
StateReader: s.mockStateReader,
}
s.verifier = NewStateVerifier(s.mockStateReader, nil)
}

func (s *StateVerifierTestSuite) TearDownTest() {
Expand Down Expand Up @@ -103,7 +101,7 @@ func (s *StateVerifierTestSuite) TestTransformFunction() {

s.mockStateReader.On("Read").Return(ingest.Change{}, io.EOF).Once()

s.verifier.TransformFunction =
s.verifier.transformFunction =
func(entry xdr.LedgerEntry) (ignore bool, newEntry xdr.LedgerEntry) {
// Leave Account ID only for accounts, ignore the rest
switch entry.Data.Type {
Expand Down Expand Up @@ -190,7 +188,7 @@ func (s *StateVerifierTestSuite) TestTransformFunctionBuggyIgnore() {
Post: &accountEntry,
}, nil).Once()

s.verifier.TransformFunction =
s.verifier.transformFunction =
func(entry xdr.LedgerEntry) (ignore bool, newEntry xdr.LedgerEntry) {
return false, xdr.LedgerEntry{}
}
Expand All @@ -199,16 +197,16 @@ func (s *StateVerifierTestSuite) TestTransformFunctionBuggyIgnore() {
s.Assert().NoError(err)
s.Assert().Len(keys, 1)

// Check the behaviour of TransformFunction to code path to test.
s.verifier.TransformFunction =
// Check the behaviour of transformFunction to code path to test.
s.verifier.transformFunction =
func(entry xdr.LedgerEntry) (ignore bool, newEntry xdr.LedgerEntry) {
return true, xdr.LedgerEntry{}
}

entryBase64, err := xdr.MarshalBase64(accountEntry)
s.Assert().NoError(err)
errorMsg := fmt.Sprintf(
"Entry ignored in GetEntries but not ignored in Write: %s. Possibly TransformFunction is buggy.",
"Entry ignored in GetEntries but not ignored in Write: %s. Possibly transformFunction is buggy.",
entryBase64,
)
err = s.verifier.Write(accountEntry)
Expand Down
10 changes: 10 additions & 0 deletions xdr/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,16 @@ func (e *EncodingBuffer) UnsafeMarshalHex(encodable XDREncodable) ([]byte, error
return e.otherEncodersBuf, nil
}

func (e *EncodingBuffer) MarshalBinary(encodable XDREncodable) ([]byte, error) {
xdrEncoded, err := e.UnsafeMarshalBinary(encodable)
if err != nil {
return nil, err
}
ret := make([]byte, len(xdrEncoded))
copy(ret, xdrEncoded)
return ret, nil
}

func (e *EncodingBuffer) MarshalBase64(encodable XDREncodable) (string, error) {
b, err := e.UnsafeMarshalBase64(encodable)
if err != nil {
Expand Down