Skip to content
This repository has been archived by the owner on Sep 11, 2020. It is now read-only.

plumbing: packfile/scanner, readability/performance improvements, zlib pooling #1124

Merged
merged 1 commit into from
Apr 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions plumbing/format/packfile/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package packfile

import (
"bytes"
"compress/zlib"
"io"
"sync"

Expand Down Expand Up @@ -66,3 +67,12 @@ var bufPool = sync.Pool{
return bytes.NewBuffer(nil)
},
}

var zlibInitBytes = []byte{0x78, 0x9c, 0x01, 0x00, 0x00, 0xff, 0xff, 0x00, 0x00, 0x00, 0x01}

var zlibReaderPool = sync.Pool{
New: func() interface{} {
r, _ := zlib.NewReader(bytes.NewReader(zlibInitBytes))
return r
},
}
189 changes: 84 additions & 105 deletions plumbing/format/packfile/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ type ObjectHeader struct {
}

type Scanner struct {
r reader
zr readerResetter
r *scannerReader
crc hash.Hash32

// pendingObject is used to detect if an object has been read, or still
Expand All @@ -56,19 +55,27 @@ type Scanner struct {
// NewScanner returns a new Scanner based on a reader, if the given reader
// implements io.ReadSeeker the Scanner will be also Seekable
func NewScanner(r io.Reader) *Scanner {
seeker, ok := r.(io.ReadSeeker)
if !ok {
seeker = &trackableReader{Reader: r}
}
_, ok := r.(io.ReadSeeker)

crc := crc32.NewIEEE()
return &Scanner{
r: newTeeReader(newByteReadSeeker(seeker), crc),
r: newScannerReader(r, crc),
crc: crc,
IsSeekable: ok,
}
}

func (s *Scanner) Reset(r io.Reader) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to hace a test that reuses scanner with Reset. Other than that LGTM! Thanks!

Copy link
Contributor Author

@saracen saracen Apr 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good shout! I noticed a problem whilst figuring out how/what to test. Even when the reader is seekable, I'm returning the r.offset when whence == io.SeekCurrent && offset == 0, that incorrectly assumes that the reader provided to start with was set at the beginning of the file.

I'll fix that and add a test shortly!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added two tests. TestReaderReset checks basics usage and that fields are correctly reset. TestReaderResetSeeks checks that the seek behavior is correct.

_, ok := r.(io.ReadSeeker)

s.r.Reset(r)
s.crc.Reset()
s.IsSeekable = ok
s.pendingObject = nil
s.version = 0
s.objects = 0
}

// Header reads the whole packfile header (signature, version and object count).
// It returns the version and the object count and performs checks on the
// validity of the signature and the version fields.
Expand Down Expand Up @@ -182,8 +189,7 @@ func (s *Scanner) NextObjectHeader() (*ObjectHeader, error) {
// nextObjectHeader returns the ObjectHeader for the next object in the reader
// without the Offset field
func (s *Scanner) nextObjectHeader() (*ObjectHeader, error) {
defer s.Flush()

s.r.Flush()
s.crc.Reset()

h := &ObjectHeader{}
Expand Down Expand Up @@ -304,35 +310,29 @@ func (s *Scanner) readLength(first byte) (int64, error) {
// NextObject writes the content of the next object into the reader, returns
// the number of bytes written, the CRC32 of the content and an error, if any
func (s *Scanner) NextObject(w io.Writer) (written int64, crc32 uint32, err error) {
defer s.crc.Reset()

s.pendingObject = nil
written, err = s.copyObject(w)
s.Flush()

s.r.Flush()
crc32 = s.crc.Sum32()
s.crc.Reset()

return
}

// ReadRegularObject reads and write a non-deltified object
// from it zlib stream in an object entry in the packfile.
func (s *Scanner) copyObject(w io.Writer) (n int64, err error) {
if s.zr == nil {
var zr io.ReadCloser
zr, err = zlib.NewReader(s.r)
if err != nil {
return 0, fmt.Errorf("zlib initialization error: %s", err)
}
zr := zlibReaderPool.Get().(io.ReadCloser)
defer zlibReaderPool.Put(zr)

s.zr = zr.(readerResetter)
} else {
if err = s.zr.Reset(s.r, nil); err != nil {
return 0, fmt.Errorf("zlib reset error: %s", err)
}
if err = zr.(zlib.Resetter).Reset(s.r, nil); err != nil {
return 0, fmt.Errorf("zlib reset error: %s", err)
}

defer ioutil.CheckClose(s.zr, &err)
defer ioutil.CheckClose(zr, &err)
buf := byteSlicePool.Get().([]byte)
n, err = io.CopyBuffer(w, s.zr, buf)
n, err = io.CopyBuffer(w, zr, buf)
byteSlicePool.Put(buf)
return
}
Expand Down Expand Up @@ -378,110 +378,89 @@ func (s *Scanner) Close() error {
return err
}

// Flush finishes writing the buffer to crc hasher in case we are using
// a teeReader. Otherwise it is a no-op.
// Flush is a no-op (deprecated)
func (s *Scanner) Flush() error {
tee, ok := s.r.(*teeReader)
if ok {
return tee.Flush()
}
return nil
}

type trackableReader struct {
count int64
io.Reader
// scannerReader has the following characteristics:
// - Provides an io.SeekReader impl for bufio.Reader, when the underlying
// reader supports it.
// - Keeps track of the current read position, for when the underlying reader
// isn't an io.SeekReader, but we still want to know the current offset.
// - Writes to the hash writer what it reads, with the aid of a smaller buffer.
// The buffer helps avoid a performance penality for performing small writes
// to the crc32 hash writer.
type scannerReader struct {
reader io.Reader
crc io.Writer
rbuf *bufio.Reader
wbuf *bufio.Writer
offset int64
}

// Read reads up to len(p) bytes into p.
func (r *trackableReader) Read(p []byte) (n int, err error) {
n, err = r.Reader.Read(p)
r.count += int64(n)

return
}

// Seek only supports io.SeekCurrent, any other operation fails
func (r *trackableReader) Seek(offset int64, whence int) (int64, error) {
if whence != io.SeekCurrent {
return -1, ErrSeekNotSupported
func newScannerReader(r io.Reader, h io.Writer) *scannerReader {
sr := &scannerReader{
rbuf: bufio.NewReader(nil),
wbuf: bufio.NewWriterSize(nil, 64),
crc: h,
}
sr.Reset(r)

return r.count, nil
return sr
}

func newByteReadSeeker(r io.ReadSeeker) *bufferedSeeker {
return &bufferedSeeker{
r: r,
Reader: *bufio.NewReader(r),
}
}
func (r *scannerReader) Reset(reader io.Reader) {
r.reader = reader
r.rbuf.Reset(r.reader)
r.wbuf.Reset(r.crc)

type bufferedSeeker struct {
r io.ReadSeeker
bufio.Reader
}

func (r *bufferedSeeker) Seek(offset int64, whence int) (int64, error) {
if whence == io.SeekCurrent && offset == 0 {
current, err := r.r.Seek(offset, whence)
if err != nil {
return current, err
}

return current - int64(r.Buffered()), nil
r.offset = 0
if seeker, ok := r.reader.(io.ReadSeeker); ok {
r.offset, _ = seeker.Seek(0, io.SeekCurrent)
}

defer r.Reader.Reset(r.r)
return r.r.Seek(offset, whence)
}

type readerResetter interface {
io.ReadCloser
zlib.Resetter
}
func (r *scannerReader) Read(p []byte) (n int, err error) {
n, err = r.rbuf.Read(p)

type reader interface {
io.Reader
io.ByteReader
io.Seeker
r.offset += int64(n)
if _, err := r.wbuf.Write(p[:n]); err != nil {
return n, err
}
return
}

type teeReader struct {
reader
w hash.Hash32
bufWriter *bufio.Writer
func (r *scannerReader) ReadByte() (b byte, err error) {
b, err = r.rbuf.ReadByte()
if err == nil {
r.offset++
return b, r.wbuf.WriteByte(b)
}
return
}

func newTeeReader(r reader, h hash.Hash32) *teeReader {
return &teeReader{
reader: r,
w: h,
bufWriter: bufio.NewWriter(h),
}
func (r *scannerReader) Flush() error {
return r.wbuf.Flush()
}

func (r *teeReader) Read(p []byte) (n int, err error) {
r.Flush()
// Seek seeks to a location. If the underlying reader is not an io.ReadSeeker,
// then only whence=io.SeekCurrent is supported, any other operation fails.
func (r *scannerReader) Seek(offset int64, whence int) (int64, error) {
var err error

n, err = r.reader.Read(p)
if n > 0 {
if n, err := r.w.Write(p[:n]); err != nil {
return n, err
if seeker, ok := r.reader.(io.ReadSeeker); !ok {
if whence != io.SeekCurrent || offset != 0 {
return -1, ErrSeekNotSupported
}
} else {
if whence == io.SeekCurrent && offset == 0 {
return r.offset, nil
}
}
return
}

func (r *teeReader) ReadByte() (b byte, err error) {
b, err = r.reader.ReadByte()
if err == nil {
return b, r.bufWriter.WriteByte(b)
r.offset, err = seeker.Seek(offset, whence)
r.rbuf.Reset(r.reader)
}

return
}

func (r *teeReader) Flush() (err error) {
return r.bufWriter.Flush()
return r.offset, err
}
49 changes: 49 additions & 0 deletions plumbing/format/packfile/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,55 @@ func (s *ScannerSuite) TestSeekObjectHeaderNonSeekable(c *C) {
c.Assert(err, Equals, ErrSeekNotSupported)
}

func (s *ScannerSuite) TestReaderReset(c *C) {
r := fixtures.Basic().One().Packfile()
p := NewScanner(r)

version, objects, err := p.Header()
c.Assert(version, Equals, VersionSupported)
c.Assert(objects, Equals, uint32(31))

h, err := p.SeekObjectHeader(expectedHeadersOFS[0].Offset)
c.Assert(err, IsNil)
c.Assert(h, DeepEquals, &expectedHeadersOFS[0])

p.Reset(r)
c.Assert(p.pendingObject, IsNil)
c.Assert(p.version, Equals, uint32(0))
c.Assert(p.objects, Equals, uint32(0))
c.Assert(p.r.reader, Equals, r)
c.Assert(p.r.offset > expectedHeadersOFS[0].Offset, Equals, true)

p.Reset(bytes.NewReader(nil))
c.Assert(p.r.offset, Equals, int64(0))
}

func (s *ScannerSuite) TestReaderResetSeeks(c *C) {
r := fixtures.Basic().One().Packfile()

// seekable
p := NewScanner(r)
c.Assert(p.IsSeekable, Equals, true)
h, err := p.SeekObjectHeader(expectedHeadersOFS[0].Offset)
c.Assert(err, IsNil)
c.Assert(h, DeepEquals, &expectedHeadersOFS[0])

// reset with seekable
p.Reset(r)
c.Assert(p.IsSeekable, Equals, true)
h, err = p.SeekObjectHeader(expectedHeadersOFS[1].Offset)
c.Assert(err, IsNil)
c.Assert(h, DeepEquals, &expectedHeadersOFS[1])

// reset with non-seekable
f := fixtures.Basic().ByTag("ref-delta").One()
p.Reset(io.MultiReader(f.Packfile()))
c.Assert(p.IsSeekable, Equals, false)

_, err = p.SeekObjectHeader(expectedHeadersOFS[4].Offset)
c.Assert(err, Equals, ErrSeekNotSupported)
}

var expectedHeadersOFS = []ObjectHeader{
{Type: plumbing.CommitObject, Offset: 12, Length: 254},
{Type: plumbing.OFSDeltaObject, Offset: 186, Length: 93, OffsetReference: 12},
Expand Down