diff --git a/v2/blockstore/readonly.go b/v2/blockstore/readonly.go index 4125cc1a..6ffb6b3b 100644 --- a/v2/blockstore/readonly.go +++ b/v2/blockstore/readonly.go @@ -52,14 +52,14 @@ type ReadOnly struct { // // There is no need to call ReadOnly.Close on instances returned by this function. func NewReadOnly(backing io.ReaderAt, idx index.Index) (*ReadOnly, error) { - version, err := carv2.ReadVersion(internalio.NewOffsetReader(backing, 0)) + version, err := readVersion(backing) if err != nil { return nil, err } switch version { case 1: if idx == nil { - if idx, err = index.Generate(backing); err != nil { + if idx, err = generateIndex(backing); err != nil { return nil, err } } @@ -75,7 +75,7 @@ func NewReadOnly(backing io.ReaderAt, idx index.Index) (*ReadOnly, error) { if err != nil { return nil, err } - } else if idx, err = index.Generate(backing); err != nil { + } else if idx, err = generateIndex(v2r.CarV1Reader()); err != nil { return nil, err } } @@ -85,6 +85,28 @@ func NewReadOnly(backing io.ReaderAt, idx index.Index) (*ReadOnly, error) { } } +func readVersion(at io.ReaderAt) (uint64, error) { + var rr io.Reader + switch r := at.(type) { + case io.Reader: + rr = r + default: + rr = internalio.NewOffsetReadSeeker(r, 0) + } + return carv2.ReadVersion(rr) +} + +func generateIndex(at io.ReaderAt) (index.Index, error) { + var rs io.ReadSeeker + switch r := at.(type) { + case io.ReadSeeker: + rs = r + default: + rs = internalio.NewOffsetReadSeeker(r, 0) + } + return index.Generate(rs) +} + // OpenReadOnly opens a read-only blockstore from a CAR v2 file, generating an index if it does not exist. // If attachIndex is set to true and the index is not present in the given CAR v2 file, // then the generated index is written into the given path. @@ -120,7 +142,7 @@ func OpenReadOnly(path string, attachIndex bool) (*ReadOnly, error) { } func (b *ReadOnly) readBlock(idx int64) (cid.Cid, []byte, error) { - bcid, data, err := util.ReadNode(bufio.NewReader(internalio.NewOffsetReader(b.backing, idx))) + bcid, data, err := util.ReadNode(bufio.NewReader(internalio.NewOffsetReadSeeker(b.backing, idx))) return bcid, data, err } @@ -140,7 +162,7 @@ func (b *ReadOnly) Has(key cid.Cid) (bool, error) { } else if err != nil { return false, err } - uar := internalio.NewOffsetReader(b.backing, int64(offset)) + uar := internalio.NewOffsetReadSeeker(b.backing, int64(offset)) _, err = varint.ReadUvarint(uar) if err != nil { return false, err @@ -181,11 +203,11 @@ func (b *ReadOnly) GetSize(key cid.Cid) (int, error) { if err != nil { return -1, err } - l, err := varint.ReadUvarint(internalio.NewOffsetReader(b.backing, int64(idx))) + l, err := varint.ReadUvarint(internalio.NewOffsetReadSeeker(b.backing, int64(idx))) if err != nil { return -1, blockstore.ErrNotFound } - _, c, err := cid.CidFromReader(internalio.NewOffsetReader(b.backing, int64(idx+l))) + _, c, err := cid.CidFromReader(internalio.NewOffsetReadSeeker(b.backing, int64(idx+l))) if err != nil { return 0, err } @@ -212,7 +234,7 @@ func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { b.mu.RLock() // TODO we may use this walk for populating the index, and we need to be able to iterate keys in this way somewhere for index generation. In general though, when it's asked for all keys from a blockstore with an index, we should iterate through the index when possible rather than linear reads through the full car. - header, err := carv1.ReadHeader(bufio.NewReader(internalio.NewOffsetReader(b.backing, 0))) + header, err := carv1.ReadHeader(bufio.NewReader(internalio.NewOffsetReadSeeker(b.backing, 0))) if err != nil { return nil, fmt.Errorf("error reading car header: %w", err) } @@ -229,7 +251,7 @@ func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { defer close(ch) - rdr := internalio.NewOffsetReader(b.backing, int64(offset)) + rdr := internalio.NewOffsetReadSeeker(b.backing, int64(offset)) for { l, err := varint.ReadUvarint(rdr) if err != nil { @@ -240,7 +262,9 @@ func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { if err != nil { return // TODO: log this error } - rdr.SeekOffset(thisItemForNxt + int64(l)) + if _, err := rdr.Seek(thisItemForNxt+int64(l), io.SeekStart); err != nil { + return // TODO: log this error + } select { case ch <- c: @@ -259,7 +283,7 @@ func (b *ReadOnly) HashOnRead(bool) { // Roots returns the root CIDs of the backing CAR. func (b *ReadOnly) Roots() ([]cid.Cid, error) { - header, err := carv1.ReadHeader(bufio.NewReader(internalio.NewOffsetReader(b.backing, 0))) + header, err := carv1.ReadHeader(bufio.NewReader(internalio.NewOffsetReadSeeker(b.backing, 0))) if err != nil { return nil, fmt.Errorf("error reading car header: %w", err) } diff --git a/v2/blockstore/readwrite.go b/v2/blockstore/readwrite.go index cf10a775..68f8cfab 100644 --- a/v2/blockstore/readwrite.go +++ b/v2/blockstore/readwrite.go @@ -87,7 +87,7 @@ func NewReadWrite(path string, roots []cid.Cid, opts ...Option) (*ReadWrite, err opt(b) } b.carV1Writer = internalio.NewOffsetWriter(f, int64(b.header.CarV1Offset)) - carV1Reader := internalio.NewOffsetReader(f, int64(b.header.CarV1Offset)) + carV1Reader := internalio.NewOffsetReadSeeker(f, int64(b.header.CarV1Offset)) b.ReadOnly = ReadOnly{backing: carV1Reader, idx: idx} if _, err := f.WriteAt(carv2.Pragma, 0); err != nil { return nil, err diff --git a/v2/internal/io/offset_read_seeker.go b/v2/internal/io/offset_read_seeker.go new file mode 100644 index 00000000..c92c3154 --- /dev/null +++ b/v2/internal/io/offset_read_seeker.go @@ -0,0 +1,65 @@ +package io + +import "io" + +var ( + _ io.ReaderAt = (*OffsetReadSeeker)(nil) + _ io.ReadSeeker = (*OffsetReadSeeker)(nil) +) + +// OffsetReadSeeker implements Read, and ReadAt on a section +// of an underlying io.ReaderAt. +// The main difference between io.SectionReader and OffsetReadSeeker is that +// NewOffsetReadSeeker does not require the user to know the number of readable bytes. +// +// It also partially implements Seek, where the implementation panics if io.SeekEnd is passed. +// This is because, OffsetReadSeeker does not know the end of the file therefore cannot seek relative +// to it. +type OffsetReadSeeker struct { + r io.ReaderAt + base int64 + off int64 +} + +// NewOffsetReadSeeker returns an OffsetReadSeeker that reads from r +// starting offset offset off and stops with io.EOF when r reaches its end. +// The Seek function will panic if whence io.SeekEnd is passed. +func NewOffsetReadSeeker(r io.ReaderAt, off int64) *OffsetReadSeeker { + return &OffsetReadSeeker{r, off, off} +} + +func (o *OffsetReadSeeker) Read(p []byte) (n int, err error) { + n, err = o.r.ReadAt(p, o.off) + o.off += int64(n) + return +} + +func (o *OffsetReadSeeker) ReadAt(p []byte, off int64) (n int, err error) { + if off < 0 { + return 0, io.EOF + } + off += o.base + return o.r.ReadAt(p, off) +} + +func (o *OffsetReadSeeker) ReadByte() (byte, error) { + b := []byte{0} + _, err := o.Read(b) + return b[0], err +} + +func (o *OffsetReadSeeker) Offset() int64 { + return o.off +} + +func (o *OffsetReadSeeker) Seek(offset int64, whence int) (int64, error) { + switch whence { + case io.SeekStart: + o.off = offset + case io.SeekCurrent: + o.off += offset + case io.SeekEnd: + panic("unsupported whence: SeekEnd") + } + return o.off, nil +} diff --git a/v2/internal/io/offset_reader.go b/v2/internal/io/offset_reader.go deleted file mode 100644 index 03f7e547..00000000 --- a/v2/internal/io/offset_reader.go +++ /dev/null @@ -1,49 +0,0 @@ -package io - -import "io" - -var _ io.ReaderAt = (*OffsetReader)(nil) - -// OffsetReader implements Read, and ReadAt on a section -// of an underlying io.ReaderAt. -// The main difference between io.SectionReader and OffsetReader is that -// NewOffsetReader does not require the user to know the number of readable bytes. -type OffsetReader struct { - r io.ReaderAt - base int64 - off int64 -} - -// NewOffsetReader returns an OffsetReader that reads from r -// starting offset offset off and stops with io.EOF when r reaches its end. -func NewOffsetReader(r io.ReaderAt, off int64) *OffsetReader { - return &OffsetReader{r, off, off} -} - -func (o *OffsetReader) Read(p []byte) (n int, err error) { - n, err = o.r.ReadAt(p, o.off) - o.off += int64(n) - return -} - -func (o *OffsetReader) ReadAt(p []byte, off int64) (n int, err error) { - if off < 0 { - return 0, io.EOF - } - off += o.base - return o.r.ReadAt(p, off) -} - -func (o *OffsetReader) ReadByte() (byte, error) { - b := []byte{0} - _, err := o.Read(b) - return b[0], err -} - -func (o *OffsetReader) Offset() int64 { - return o.off -} - -func (o *OffsetReader) SeekOffset(off int64) { - o.off = off -} diff --git a/v2/reader.go b/v2/reader.go index d8b82263..6cf684c7 100644 --- a/v2/reader.go +++ b/v2/reader.go @@ -54,7 +54,7 @@ func NewReader(r io.ReaderAt) (*Reader, error) { } func (r *Reader) requireVersion2() (err error) { - or := internalio.NewOffsetReader(r.r, 0) + or := internalio.NewOffsetReadSeeker(r.r, 0) version, err := ReadVersion(or) if err != nil { return @@ -100,7 +100,7 @@ func (r *Reader) CarV1Reader() SectionReader { // IndexReader provides an io.Reader containing the index of this CAR v2. func (r *Reader) IndexReader() io.Reader { - return internalio.NewOffsetReader(r.r, int64(r.Header.IndexOffset)) + return internalio.NewOffsetReadSeeker(r.r, int64(r.Header.IndexOffset)) } // Close closes the underlying reader if it was opened by NewReaderMmap.