Skip to content

Commit

Permalink
Accommodate index.Generate taking io.ReadSeeker
Browse files Browse the repository at this point in the history
Accommodate to the `index.Generate` signature change, done in #136.
Insead of implementng a new reader that converts `io.ReaderAt` to
`io.ReadSeaker`, make `internalio.OffsetReader` partially implement
`Seek`. The "partial" refers to inability to satisfy `Seek` calls with
`io.SeekEnd` whence since the point of `internalio.OffsetReader` is that
it needs not to know the total size of a file, i.e. cannot know its end.
Instead, it panics if `Seek` is called with whence `io.SeekEnd`.

None of this matterns much since it is all placed under internal APIs.
  • Loading branch information
masih authored and mvdan committed Jul 16, 2021
1 parent 4e863b2 commit c0023ed
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 63 deletions.
46 changes: 35 additions & 11 deletions v2/blockstore/readonly.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ type ReadOnly struct {
//
// There is no need to call ReadOnly.Close on instances returned by this function.
func NewReadOnly(backing io.ReaderAt, idx index.Index) (*ReadOnly, error) {
version, err := carv2.ReadVersion(internalio.NewOffsetReader(backing, 0))
version, err := readVersion(backing)
if err != nil {
return nil, err
}
switch version {
case 1:
if idx == nil {
if idx, err = index.Generate(backing); err != nil {
if idx, err = generateIndex(backing); err != nil {
return nil, err
}
}
Expand All @@ -75,7 +75,7 @@ func NewReadOnly(backing io.ReaderAt, idx index.Index) (*ReadOnly, error) {
if err != nil {
return nil, err
}
} else if idx, err = index.Generate(backing); err != nil {
} else if idx, err = generateIndex(v2r.CarV1Reader()); err != nil {
return nil, err
}
}
Expand All @@ -85,6 +85,28 @@ func NewReadOnly(backing io.ReaderAt, idx index.Index) (*ReadOnly, error) {
}
}

func readVersion(at io.ReaderAt) (uint64, error) {
var rr io.Reader
switch r := at.(type) {
case io.Reader:
rr = r
default:
rr = internalio.NewOffsetReadSeeker(r, 0)
}
return carv2.ReadVersion(rr)
}

func generateIndex(at io.ReaderAt) (index.Index, error) {
var rs io.ReadSeeker
switch r := at.(type) {
case io.ReadSeeker:
rs = r
default:
rs = internalio.NewOffsetReadSeeker(r, 0)
}
return index.Generate(rs)
}

// OpenReadOnly opens a read-only blockstore from a CAR v2 file, generating an index if it does not exist.
// If attachIndex is set to true and the index is not present in the given CAR v2 file,
// then the generated index is written into the given path.
Expand Down Expand Up @@ -120,7 +142,7 @@ func OpenReadOnly(path string, attachIndex bool) (*ReadOnly, error) {
}

func (b *ReadOnly) readBlock(idx int64) (cid.Cid, []byte, error) {
bcid, data, err := util.ReadNode(bufio.NewReader(internalio.NewOffsetReader(b.backing, idx)))
bcid, data, err := util.ReadNode(bufio.NewReader(internalio.NewOffsetReadSeeker(b.backing, idx)))
return bcid, data, err
}

Expand All @@ -140,7 +162,7 @@ func (b *ReadOnly) Has(key cid.Cid) (bool, error) {
} else if err != nil {
return false, err
}
uar := internalio.NewOffsetReader(b.backing, int64(offset))
uar := internalio.NewOffsetReadSeeker(b.backing, int64(offset))
_, err = varint.ReadUvarint(uar)
if err != nil {
return false, err
Expand Down Expand Up @@ -181,11 +203,11 @@ func (b *ReadOnly) GetSize(key cid.Cid) (int, error) {
if err != nil {
return -1, err
}
l, err := varint.ReadUvarint(internalio.NewOffsetReader(b.backing, int64(idx)))
l, err := varint.ReadUvarint(internalio.NewOffsetReadSeeker(b.backing, int64(idx)))
if err != nil {
return -1, blockstore.ErrNotFound
}
_, c, err := cid.CidFromReader(internalio.NewOffsetReader(b.backing, int64(idx+l)))
_, c, err := cid.CidFromReader(internalio.NewOffsetReadSeeker(b.backing, int64(idx+l)))
if err != nil {
return 0, err
}
Expand All @@ -212,7 +234,7 @@ func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
b.mu.RLock()

// TODO we may use this walk for populating the index, and we need to be able to iterate keys in this way somewhere for index generation. In general though, when it's asked for all keys from a blockstore with an index, we should iterate through the index when possible rather than linear reads through the full car.
header, err := carv1.ReadHeader(bufio.NewReader(internalio.NewOffsetReader(b.backing, 0)))
header, err := carv1.ReadHeader(bufio.NewReader(internalio.NewOffsetReadSeeker(b.backing, 0)))
if err != nil {
return nil, fmt.Errorf("error reading car header: %w", err)
}
Expand All @@ -229,7 +251,7 @@ func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {

defer close(ch)

rdr := internalio.NewOffsetReader(b.backing, int64(offset))
rdr := internalio.NewOffsetReadSeeker(b.backing, int64(offset))
for {
l, err := varint.ReadUvarint(rdr)
if err != nil {
Expand All @@ -240,7 +262,9 @@ func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
if err != nil {
return // TODO: log this error
}
rdr.SeekOffset(thisItemForNxt + int64(l))
if _, err := rdr.Seek(thisItemForNxt+int64(l), io.SeekStart); err != nil {
return // TODO: log this error
}

select {
case ch <- c:
Expand All @@ -259,7 +283,7 @@ func (b *ReadOnly) HashOnRead(bool) {

// Roots returns the root CIDs of the backing CAR.
func (b *ReadOnly) Roots() ([]cid.Cid, error) {
header, err := carv1.ReadHeader(bufio.NewReader(internalio.NewOffsetReader(b.backing, 0)))
header, err := carv1.ReadHeader(bufio.NewReader(internalio.NewOffsetReadSeeker(b.backing, 0)))
if err != nil {
return nil, fmt.Errorf("error reading car header: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion v2/blockstore/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func NewReadWrite(path string, roots []cid.Cid, opts ...Option) (*ReadWrite, err
opt(b)
}
b.carV1Writer = internalio.NewOffsetWriter(f, int64(b.header.CarV1Offset))
carV1Reader := internalio.NewOffsetReader(f, int64(b.header.CarV1Offset))
carV1Reader := internalio.NewOffsetReadSeeker(f, int64(b.header.CarV1Offset))
b.ReadOnly = ReadOnly{backing: carV1Reader, idx: idx}
if _, err := f.WriteAt(carv2.Pragma, 0); err != nil {
return nil, err
Expand Down
65 changes: 65 additions & 0 deletions v2/internal/io/offset_read_seeker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package io

import "io"

var (
_ io.ReaderAt = (*OffsetReadSeeker)(nil)
_ io.ReadSeeker = (*OffsetReadSeeker)(nil)
)

// OffsetReadSeeker implements Read, and ReadAt on a section
// of an underlying io.ReaderAt.
// The main difference between io.SectionReader and OffsetReadSeeker is that
// NewOffsetReadSeeker does not require the user to know the number of readable bytes.
//
// It also partially implements Seek, where the implementation panics if io.SeekEnd is passed.
// This is because, OffsetReadSeeker does not know the end of the file therefore cannot seek relative
// to it.
type OffsetReadSeeker struct {
r io.ReaderAt
base int64
off int64
}

// NewOffsetReadSeeker returns an OffsetReadSeeker that reads from r
// starting offset offset off and stops with io.EOF when r reaches its end.
// The Seek function will panic if whence io.SeekEnd is passed.
func NewOffsetReadSeeker(r io.ReaderAt, off int64) *OffsetReadSeeker {
return &OffsetReadSeeker{r, off, off}
}

func (o *OffsetReadSeeker) Read(p []byte) (n int, err error) {
n, err = o.r.ReadAt(p, o.off)
o.off += int64(n)
return
}

func (o *OffsetReadSeeker) ReadAt(p []byte, off int64) (n int, err error) {
if off < 0 {
return 0, io.EOF
}
off += o.base
return o.r.ReadAt(p, off)
}

func (o *OffsetReadSeeker) ReadByte() (byte, error) {
b := []byte{0}
_, err := o.Read(b)
return b[0], err
}

func (o *OffsetReadSeeker) Offset() int64 {
return o.off
}

func (o *OffsetReadSeeker) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
o.off = offset
case io.SeekCurrent:
o.off += offset
case io.SeekEnd:
panic("unsupported whence: SeekEnd")
}
return o.off, nil
}
49 changes: 0 additions & 49 deletions v2/internal/io/offset_reader.go

This file was deleted.

4 changes: 2 additions & 2 deletions v2/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewReader(r io.ReaderAt) (*Reader, error) {
}

func (r *Reader) requireVersion2() (err error) {
or := internalio.NewOffsetReader(r.r, 0)
or := internalio.NewOffsetReadSeeker(r.r, 0)
version, err := ReadVersion(or)
if err != nil {
return
Expand Down Expand Up @@ -100,7 +100,7 @@ func (r *Reader) CarV1Reader() SectionReader {

// IndexReader provides an io.Reader containing the index of this CAR v2.
func (r *Reader) IndexReader() io.Reader {
return internalio.NewOffsetReader(r.r, int64(r.Header.IndexOffset))
return internalio.NewOffsetReadSeeker(r.r, int64(r.Header.IndexOffset))
}

// Close closes the underlying reader if it was opened by NewReaderMmap.
Expand Down

0 comments on commit c0023ed

Please sign in to comment.