Skip to content
This repository has been archived by the owner on Sep 11, 2020. It is now read-only.

Commit

Permalink
plumbing: packfile/scanner, readability/performance improvements, zli…
Browse files Browse the repository at this point in the history
…b pooling

Signed-off-by: Arran Walker <[email protected]>
  • Loading branch information
saracen committed Apr 22, 2019
1 parent e5268e9 commit 9d4279f
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 105 deletions.
10 changes: 10 additions & 0 deletions plumbing/format/packfile/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package packfile

import (
"bytes"
"compress/zlib"
"io"
"sync"

Expand Down Expand Up @@ -66,3 +67,12 @@ var bufPool = sync.Pool{
return bytes.NewBuffer(nil)
},
}

var zlibInitBytes = []byte{0x78, 0x9c, 0x01, 0x00, 0x00, 0xff, 0xff, 0x00, 0x00, 0x00, 0x01}

var zlibReaderPool = sync.Pool{
New: func() interface{} {
r, _ := zlib.NewReader(bytes.NewReader(zlibInitBytes))
return r
},
}
189 changes: 84 additions & 105 deletions plumbing/format/packfile/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ type ObjectHeader struct {
}

type Scanner struct {
r reader
zr readerResetter
r *scannerReader
crc hash.Hash32

// pendingObject is used to detect if an object has been read, or still
Expand All @@ -56,19 +55,27 @@ type Scanner struct {
// NewScanner returns a new Scanner based on a reader, if the given reader
// implements io.ReadSeeker the Scanner will be also Seekable
func NewScanner(r io.Reader) *Scanner {
seeker, ok := r.(io.ReadSeeker)
if !ok {
seeker = &trackableReader{Reader: r}
}
_, ok := r.(io.ReadSeeker)

crc := crc32.NewIEEE()
return &Scanner{
r: newTeeReader(newByteReadSeeker(seeker), crc),
r: newScannerReader(r, crc),
crc: crc,
IsSeekable: ok,
}
}

func (s *Scanner) Reset(r io.Reader) {
_, ok := r.(io.ReadSeeker)

s.r.Reset(r)
s.crc.Reset()
s.IsSeekable = ok
s.pendingObject = nil
s.version = 0
s.objects = 0
}

// Header reads the whole packfile header (signature, version and object count).
// It returns the version and the object count and performs checks on the
// validity of the signature and the version fields.
Expand Down Expand Up @@ -182,8 +189,7 @@ func (s *Scanner) NextObjectHeader() (*ObjectHeader, error) {
// nextObjectHeader returns the ObjectHeader for the next object in the reader
// without the Offset field
func (s *Scanner) nextObjectHeader() (*ObjectHeader, error) {
defer s.Flush()

s.r.Flush()
s.crc.Reset()

h := &ObjectHeader{}
Expand Down Expand Up @@ -304,35 +310,29 @@ func (s *Scanner) readLength(first byte) (int64, error) {
// NextObject writes the content of the next object into the reader, returns
// the number of bytes written, the CRC32 of the content and an error, if any
func (s *Scanner) NextObject(w io.Writer) (written int64, crc32 uint32, err error) {
defer s.crc.Reset()

s.pendingObject = nil
written, err = s.copyObject(w)
s.Flush()

s.r.Flush()
crc32 = s.crc.Sum32()
s.crc.Reset()

return
}

// ReadRegularObject reads and write a non-deltified object
// from it zlib stream in an object entry in the packfile.
func (s *Scanner) copyObject(w io.Writer) (n int64, err error) {
if s.zr == nil {
var zr io.ReadCloser
zr, err = zlib.NewReader(s.r)
if err != nil {
return 0, fmt.Errorf("zlib initialization error: %s", err)
}
zr := zlibReaderPool.Get().(io.ReadCloser)
defer zlibReaderPool.Put(zr)

s.zr = zr.(readerResetter)
} else {
if err = s.zr.Reset(s.r, nil); err != nil {
return 0, fmt.Errorf("zlib reset error: %s", err)
}
if err = zr.(zlib.Resetter).Reset(s.r, nil); err != nil {
return 0, fmt.Errorf("zlib reset error: %s", err)
}

defer ioutil.CheckClose(s.zr, &err)
defer ioutil.CheckClose(zr, &err)
buf := byteSlicePool.Get().([]byte)
n, err = io.CopyBuffer(w, s.zr, buf)
n, err = io.CopyBuffer(w, zr, buf)
byteSlicePool.Put(buf)
return
}
Expand Down Expand Up @@ -378,110 +378,89 @@ func (s *Scanner) Close() error {
return err
}

// Flush finishes writing the buffer to crc hasher in case we are using
// a teeReader. Otherwise it is a no-op.
// Flush is a no-op (deprecated)
func (s *Scanner) Flush() error {
tee, ok := s.r.(*teeReader)
if ok {
return tee.Flush()
}
return nil
}

type trackableReader struct {
count int64
io.Reader
// scannerReader has the following characteristics:
// - Provides an io.SeekReader impl for bufio.Reader, when the underlying
// reader supports it.
// - Keeps track of the current read position, for when the underlying reader
// isn't an io.SeekReader, but we still want to know the current offset.
// - Writes to the hash writer what it reads, with the aid of a smaller buffer.
// The buffer helps avoid a performance penality for performing small writes
// to the crc32 hash writer.
type scannerReader struct {
reader io.Reader
crc io.Writer
rbuf *bufio.Reader
wbuf *bufio.Writer
offset int64
}

// Read reads up to len(p) bytes into p.
func (r *trackableReader) Read(p []byte) (n int, err error) {
n, err = r.Reader.Read(p)
r.count += int64(n)

return
}

// Seek only supports io.SeekCurrent, any other operation fails
func (r *trackableReader) Seek(offset int64, whence int) (int64, error) {
if whence != io.SeekCurrent {
return -1, ErrSeekNotSupported
func newScannerReader(r io.Reader, h io.Writer) *scannerReader {
sr := &scannerReader{
rbuf: bufio.NewReader(nil),
wbuf: bufio.NewWriterSize(nil, 64),
crc: h,
}
sr.Reset(r)

return r.count, nil
return sr
}

func newByteReadSeeker(r io.ReadSeeker) *bufferedSeeker {
return &bufferedSeeker{
r: r,
Reader: *bufio.NewReader(r),
}
}
func (r *scannerReader) Reset(reader io.Reader) {
r.reader = reader
r.rbuf.Reset(r.reader)
r.wbuf.Reset(r.crc)

type bufferedSeeker struct {
r io.ReadSeeker
bufio.Reader
}

func (r *bufferedSeeker) Seek(offset int64, whence int) (int64, error) {
if whence == io.SeekCurrent && offset == 0 {
current, err := r.r.Seek(offset, whence)
if err != nil {
return current, err
}

return current - int64(r.Buffered()), nil
r.offset = 0
if seeker, ok := r.reader.(io.ReadSeeker); ok {
r.offset, _ = seeker.Seek(0, io.SeekCurrent)
}

defer r.Reader.Reset(r.r)
return r.r.Seek(offset, whence)
}

type readerResetter interface {
io.ReadCloser
zlib.Resetter
}
func (r *scannerReader) Read(p []byte) (n int, err error) {
n, err = r.rbuf.Read(p)

type reader interface {
io.Reader
io.ByteReader
io.Seeker
r.offset += int64(n)
if _, err := r.wbuf.Write(p[:n]); err != nil {
return n, err
}
return
}

type teeReader struct {
reader
w hash.Hash32
bufWriter *bufio.Writer
func (r *scannerReader) ReadByte() (b byte, err error) {
b, err = r.rbuf.ReadByte()
if err == nil {
r.offset++
return b, r.wbuf.WriteByte(b)
}
return
}

func newTeeReader(r reader, h hash.Hash32) *teeReader {
return &teeReader{
reader: r,
w: h,
bufWriter: bufio.NewWriter(h),
}
func (r *scannerReader) Flush() error {
return r.wbuf.Flush()
}

func (r *teeReader) Read(p []byte) (n int, err error) {
r.Flush()
// Seek seeks to a location. If the underlying reader is not an io.ReadSeeker,
// then only whence=io.SeekCurrent is supported, any other operation fails.
func (r *scannerReader) Seek(offset int64, whence int) (int64, error) {
var err error

n, err = r.reader.Read(p)
if n > 0 {
if n, err := r.w.Write(p[:n]); err != nil {
return n, err
if seeker, ok := r.reader.(io.ReadSeeker); !ok {
if whence != io.SeekCurrent || offset != 0 {
return -1, ErrSeekNotSupported
}
} else {
if whence == io.SeekCurrent && offset == 0 {
return r.offset, nil
}
}
return
}

func (r *teeReader) ReadByte() (b byte, err error) {
b, err = r.reader.ReadByte()
if err == nil {
return b, r.bufWriter.WriteByte(b)
r.offset, err = seeker.Seek(offset, whence)
r.rbuf.Reset(r.reader)
}

return
}

func (r *teeReader) Flush() (err error) {
return r.bufWriter.Flush()
return r.offset, err
}
49 changes: 49 additions & 0 deletions plumbing/format/packfile/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,55 @@ func (s *ScannerSuite) TestSeekObjectHeaderNonSeekable(c *C) {
c.Assert(err, Equals, ErrSeekNotSupported)
}

func (s *ScannerSuite) TestReaderReset(c *C) {
r := fixtures.Basic().One().Packfile()
p := NewScanner(r)

version, objects, err := p.Header()
c.Assert(version, Equals, VersionSupported)
c.Assert(objects, Equals, uint32(31))

h, err := p.SeekObjectHeader(expectedHeadersOFS[0].Offset)
c.Assert(err, IsNil)
c.Assert(h, DeepEquals, &expectedHeadersOFS[0])

p.Reset(r)
c.Assert(p.pendingObject, IsNil)
c.Assert(p.version, Equals, uint32(0))
c.Assert(p.objects, Equals, uint32(0))
c.Assert(p.r.reader, Equals, r)
c.Assert(p.r.offset > expectedHeadersOFS[0].Offset, Equals, true)

p.Reset(bytes.NewReader(nil))
c.Assert(p.r.offset, Equals, int64(0))
}

func (s *ScannerSuite) TestReaderResetSeeks(c *C) {
r := fixtures.Basic().One().Packfile()

// seekable
p := NewScanner(r)
c.Assert(p.IsSeekable, Equals, true)
h, err := p.SeekObjectHeader(expectedHeadersOFS[0].Offset)
c.Assert(err, IsNil)
c.Assert(h, DeepEquals, &expectedHeadersOFS[0])

// reset with seekable
p.Reset(r)
c.Assert(p.IsSeekable, Equals, true)
h, err = p.SeekObjectHeader(expectedHeadersOFS[1].Offset)
c.Assert(err, IsNil)
c.Assert(h, DeepEquals, &expectedHeadersOFS[1])

// reset with non-seekable
f := fixtures.Basic().ByTag("ref-delta").One()
p.Reset(io.MultiReader(f.Packfile()))
c.Assert(p.IsSeekable, Equals, false)

_, err = p.SeekObjectHeader(expectedHeadersOFS[4].Offset)
c.Assert(err, Equals, ErrSeekNotSupported)
}

var expectedHeadersOFS = []ObjectHeader{
{Type: plumbing.CommitObject, Offset: 12, Length: 254},
{Type: plumbing.OFSDeltaObject, Offset: 186, Length: 93, OffsetReference: 12},
Expand Down

0 comments on commit 9d4279f

Please sign in to comment.