Skip to content

Commit

Permalink
Optimize captive-core pipe decoding
Browse files Browse the repository at this point in the history
Also, remove `UnmarshalFramed` which wasn't used
  • Loading branch information
2opremio committed Nov 17, 2021
1 parent 28238ea commit 7bd6215
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 27 deletions.
16 changes: 10 additions & 6 deletions ingest/ledgerbackend/buffered_meta_pipe_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/pkg/errors"
xdr3 "github.com/stellar/go-xdr/xdr3"

"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"
Expand Down Expand Up @@ -63,16 +64,19 @@ type metaResult struct {
// until xdr.LedgerCloseMeta objects channel is empty. This prevents memory
// exhaustion when network closes a series a large ledgers.
type bufferedLedgerMetaReader struct {
r *bufio.Reader
c chan metaResult
r *bufio.Reader
c chan metaResult
decoder *xdr3.Decoder
}

// newBufferedLedgerMetaReader creates a new meta reader that will shutdown
// when stellar-core terminates.
func newBufferedLedgerMetaReader(reader io.Reader) *bufferedLedgerMetaReader {
r := bufio.NewReaderSize(reader, metaPipeBufferSize)
return &bufferedLedgerMetaReader{
c: make(chan metaResult, ledgerReadAheadBufferSize),
r: bufio.NewReaderSize(reader, metaPipeBufferSize),
c: make(chan metaResult, ledgerReadAheadBufferSize),
r: r,
decoder: xdr3.NewDecoder(r),
}
}

Expand All @@ -82,7 +86,7 @@ func newBufferedLedgerMetaReader(reader io.Reader) *bufferedLedgerMetaReader {
// * The next ledger available in the buffer exceeds the meta pipe buffer size.
// In such case the method will block until LedgerCloseMeta buffer is empty.
func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe() (*xdr.LedgerCloseMeta, error) {
frameLength, err := xdr.ReadFrameLength(b.r)
frameLength, err := xdr.ReadFrameLength(b.decoder)
if err != nil {
return nil, errors.Wrap(err, "error reading frame length")
}
Expand All @@ -93,7 +97,7 @@ func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe() (*xdr.LedgerCloseMet
}

var xlcm xdr.LedgerCloseMeta
_, err = xdr.Unmarshal(b.r, &xlcm)
_, err = xlcm.DecodeFrom(b.decoder)
if err != nil {
return nil, errors.Wrap(err, "unmarshalling framed LedgerCloseMeta")
}
Expand Down
23 changes: 2 additions & 21 deletions xdr/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,8 @@ func MarshalFramed(w io.Writer, v interface{}) error {
}

// ReadFrameLength returns a length of a framed XDR object.
func ReadFrameLength(r io.Reader) (uint32, error) {
var frameLen uint32
n, e := Unmarshal(r, &frameLen)
func ReadFrameLength(d *xdr.Decoder) (uint32, error) {
frameLen, n, e := d.DecodeUint()
if e != nil {
return 0, errors.Wrap(e, "unmarshalling XDR frame header")
}
Expand All @@ -275,24 +274,6 @@ func ReadFrameLength(r io.Reader) (uint32, error) {
return frameLen, nil
}

// XDR and RPC define a (minimal) framing format which our metadata arrives in: a 4-byte
// big-endian length header that has the high bit set, followed by that length worth of
// XDR data. Decoding this involves just a little more work than xdr.Unmarshal.
func UnmarshalFramed(r io.Reader, v interface{}) (int, error) {
frameLen, err := ReadFrameLength(r)
if err != nil {
return 0, errors.Wrap(err, "unmarshalling XDR frame header")
}
m, err := xdr.Unmarshal(r, v)
if err != nil {
return 0, errors.Wrap(err, "unmarshalling framed XDR")
}
if int64(m) != int64(frameLen) {
return 0, errors.New("bad length of XDR frame body")
}
return m + 4 /* frame size: uint32 */, nil
}

type countWriter struct {
Count int
}
Expand Down

0 comments on commit 7bd6215

Please sign in to comment.