-
Notifications
You must be signed in to change notification settings - Fork 52
/
bytestream.go
216 lines (200 loc) · 6.35 KB
/
bytestream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
package client
import (
"bytes"
"context"
"fmt"
"io"
"os"
log "github.com/golang/glog"
bspb "google.golang.org/genproto/googleapis/bytestream"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/chunker"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/uploadinfo"
)
// WriteBytes uploads a byte slice.
func (c *Client) WriteBytes(ctx context.Context, name string, data []byte) error {
ue := uploadinfo.EntryFromBlob(data)
ch, err := chunker.New(ue, false, int(c.ChunkMaxSize))
if err != nil {
return err
}
_, err = c.writeChunked(ctx, name, ch, false, 0)
return err
}
// WriteBytesAtRemoteOffset uploads a byte slice with a given resource name to the CAS
// at an arbitrary offset but retries still resend from the initial Offset. As of now(2023-02-08),
// ByteStream.WriteRequest.FinishWrite and an arbitrary offset are supported for uploads with LogStream
// resource name. If doNotFinalize is set to true, ByteStream.WriteRequest.FinishWrite will be set to false.
func (c *Client) WriteBytesAtRemoteOffset(ctx context.Context, name string, data []byte, doNotFinalize bool, initialOffset int64) (int64, error) {
ue := uploadinfo.EntryFromBlob(data)
ch, err := chunker.New(ue, false, int(c.ChunkMaxSize))
if err != nil {
return 0, fmt.Errorf("failed to create a chunk: %w", err)
}
writtenBytes, err := c.writeChunked(ctx, name, ch, doNotFinalize, initialOffset)
if err != nil {
return 0, err
}
return writtenBytes, nil
}
// withCtx makes the niladic function f behaves like one that accepts a ctx.
func withCtx(ctx context.Context, f func() error) error {
errChan := make(chan error, 1)
go func() {
errChan <- f()
}()
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errChan:
return err
}
}
// writeChunked uploads chunked data with a given resource name to the CAS.
func (c *Client) writeChunked(ctx context.Context, name string, ch *chunker.Chunker, doNotFinalize bool, initialOffset int64) (int64, error) {
var totalBytes int64
closure := func() error {
// Retry by starting the stream from the beginning.
if err := ch.Reset(); err != nil {
return fmt.Errorf("failed to Reset: %w", err)
}
totalBytes = int64(0)
// TODO(olaola): implement resumable uploads. initialOffset passed in allows to
// start writing data at an arbitrary offset, but retries still restart from initialOffset.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := c.Write(ctx)
if err != nil {
return err
}
for ch.HasNext() {
req := &bspb.WriteRequest{ResourceName: name}
chunk, err := ch.Next()
if err != nil {
return err
}
req.WriteOffset = chunk.Offset + initialOffset
req.Data = chunk.Data
if !ch.HasNext() && !doNotFinalize {
req.FinishWrite = true
}
err = c.CallWithTimeout(ctx, "Write", func(ctx context.Context) error {
return withCtx(ctx, func() error {
return stream.Send(req)
})
})
if err == io.EOF {
break
}
if err != nil {
return err
}
totalBytes += int64(len(req.Data))
}
if err := c.CallWithTimeout(ctx, "Write", func(ctx context.Context) error {
return withCtx(ctx, func() error {
_, err := stream.CloseAndRecv()
return err
})
}); err != nil {
return err
}
return nil
}
err := c.Retrier.Do(ctx, closure)
return totalBytes, err
}
// ReadBytes fetches a resource's contents into a byte slice.
//
// ReadBytes panics with ErrTooLarge if an attempt is made to read a resource with contents too
// large to fit into a byte array.
func (c *Client) ReadBytes(ctx context.Context, name string) ([]byte, error) {
buf := &bytes.Buffer{}
_, err := c.readStreamedRetried(ctx, name, 0, 0, buf)
return buf.Bytes(), err
}
// ReadResourceTo writes a resource's contents to a Writer.
func (c *Client) ReadResourceTo(ctx context.Context, name string, w io.Writer) (int64, error) {
return c.readStreamedRetried(ctx, name, 0, 0, w)
}
// ReadResourceToFile fetches a resource's contents, saving it into a file.
//
// The provided resource name must be a child resource of this client's instance,
// e.g. '/blobs/abc-123/45' (NOT 'projects/foo/bar/baz').
//
// The number of bytes read is returned.
func (c *Client) ReadResourceToFile(ctx context.Context, name, fpath string) (int64, error) {
rname, err := c.ResourceName(name)
if err != nil {
return 0, err
}
return c.readToFile(ctx, rname, fpath)
}
func (c *Client) readToFile(ctx context.Context, name string, fpath string) (int64, error) {
f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, c.RegularMode)
if err != nil {
return 0, err
}
defer f.Close()
return c.readStreamedRetried(ctx, name, 0, 0, f)
}
// readStreamed reads from a bytestream and copies the result to the provided Writer, starting
// offset bytes into the stream and reading at most limit bytes (or no limit if limit==0). The
// offset must be non-negative, and an error may be returned if the offset is past the end of the
// stream. The limit must be non-negative, although offset+limit may exceed the length of the
// stream.
func (c *Client) readStreamed(ctx context.Context, name string, offset, limit int64, w io.Writer) (int64, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := c.Read(ctx, &bspb.ReadRequest{
ResourceName: name,
ReadOffset: offset,
ReadLimit: limit,
})
if err != nil {
return 0, err
}
var n int64
for {
var resp *bspb.ReadResponse
err := c.CallWithTimeout(ctx, "Read", func(ctx context.Context) error {
return withCtx(ctx, func() error {
r, err := stream.Recv()
resp = r
return err
})
})
if err == io.EOF {
break
}
if err != nil {
return 0, err
}
log.V(3).Infof("Read: resource:%s offset:%d len(data):%d", name, offset, len(resp.Data))
nm, err := w.Write(resp.Data)
if err != nil {
// Wrapping the error to ensure it may never get retried.
return int64(nm), fmt.Errorf("failed to write to output stream: %v", err)
}
sz := len(resp.Data)
if nm != sz {
return int64(nm), fmt.Errorf("received %d bytes but could only write %d", sz, nm)
}
n += int64(sz)
if limit > 0 {
limit -= int64(sz)
if limit <= 0 {
break
}
}
}
return n, nil
}
func (c *Client) readStreamedRetried(ctx context.Context, name string, offset, limit int64, w io.Writer) (int64, error) {
var n int64
closure := func() error {
m, err := c.readStreamed(ctx, name, offset+n, limit, w)
n += m
return err
}
return n, c.Retrier.Do(ctx, closure)
}