-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
Copy pathio.go
125 lines (105 loc) · 3.46 KB
/
io.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
package store
import (
"bufio"
"io"
"github.com/pkg/errors"
)
const (
readerBufferSize = 32 * 1024
)
// byteRange holds information about a single byte range.
type byteRange struct {
offset int
length int
}
// byteRanges holds a list of non-overlapping byte ranges sorted by offset.
type byteRanges []byteRange
// size returns the total number of bytes in the byte ranges.
func (r byteRanges) size() int {
size := 0
for _, c := range r {
size += c.length
}
return size
}
// areContiguous returns whether all byte ranges are contiguous (no gaps).
func (r byteRanges) areContiguous() bool {
if len(r) < 2 {
return true
}
for off, idx := r[0].offset+r[0].length, 1; idx < len(r); idx++ {
if r[idx].offset != off {
return false
}
off += r[idx].length
}
return true
}
// readByteRanges reads the provided byteRanges from src and append them to dst. The provided
// byteRanges must be sorted by offset and non overlapping. The byteRanges offset must be
// relative to the beginning of the provided src (offset 0 == first byte will be read from src).
func readByteRanges(src io.Reader, dst []byte, byteRanges byteRanges) ([]byte, error) {
if len(byteRanges) == 0 {
return nil, nil
}
// Ensure the provided dst buffer has enough capacity.
expectedSize := byteRanges.size()
if cap(dst) < expectedSize {
return nil, io.ErrShortBuffer
}
// Size the destination buffer accordingly.
dst = dst[0:expectedSize]
// Optimisation for the case all ranges are contiguous.
if byteRanges[0].offset == 0 && byteRanges.areContiguous() {
// We get an ErrUnexpectedEOF if EOF is reached before we fill allocated dst slice.
// Due to how the reading logic works in the bucket store, we may try to overread at
// the end of an object, so we consider it legit.
if _, err := io.ReadFull(src, dst); err != nil && err != io.ErrUnexpectedEOF {
return nil, err
}
return dst, nil
}
// To keep implementation easier we frequently call Read() for short lengths.
// In such scenario, having a buffered reader improves performances at the cost
// of 1 more buffer allocation and memory copy.
reader := bufio.NewReaderSize(src, readerBufferSize)
for dstOffset, idx := 0, 0; idx < len(byteRanges); idx++ {
curr := byteRanges[idx]
// Read and discard all bytes before the current chunk offset.
discard := 0
if idx == 0 {
discard = curr.offset
} else {
prev := byteRanges[idx-1]
discard = curr.offset - (prev.offset + prev.length)
}
if _, err := reader.Discard(discard); err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
return nil, errors.Wrap(err, "discard bytes")
}
// At this point the next byte to read from the reader is the current chunk,
// so we'll read it fully. io.ReadFull() returns an error if less bytes than
// expected have been read.
readBytes, err := io.ReadFull(reader, dst[dstOffset:dstOffset+curr.length])
if readBytes > 0 {
dstOffset += readBytes
}
if err != nil {
// We get an ErrUnexpectedEOF if EOF is reached before we fill the slice.
// Due to how the reading logic works in the bucket store, we may try to overread
// the last byte range so, if the error occurs on the last one, we consider it legit.
if err == io.ErrUnexpectedEOF && idx == len(byteRanges)-1 {
return dst, nil
}
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
return nil, errors.Wrap(err, "read byte range")
}
}
return dst, nil
}