Skip to content
This repository has been archived by the owner on Jun 24, 2024. It is now read-only.

feat!: Multiple Chunk Upload Support #486

Merged
merged 19 commits into from
Dec 30, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 81 additions & 14 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,16 @@ export interface UploadConfig {
*/
configPath?: string;

/**
* Create a separate request per chunk.
*
* Should be a multiple of 256 KiB.
* We recommend using at least 8 MiB for the chunk size.
*
* @link https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload
*/
chunkSize?: number;

/**
* For each API request we send, you may specify custom request options that
* we'll add onto the request. The request options follow the gaxios API:
Expand Down Expand Up @@ -237,6 +247,7 @@ export class Upload extends Pumpify {
) => Promise<GaxiosResponse<T>> | GaxiosPromise<T>;
};
cacheKey: string;
chunkSize?: number;
customRequestOptions: GaxiosOptions;
generation?: number;
key?: string | Buffer;
Expand Down Expand Up @@ -296,6 +307,8 @@ export class Upload extends Pumpify {
if (typeof cfg.generation === 'number') {
cacheKeyElements.push(`${cfg.generation}`);
}
// TODO: add another element for chunk size? Any conflicts possible here?
danielbankhead marked this conversation as resolved.
Show resolved Hide resolved

this.cacheKey = cacheKeyElements.join('/');

this.customRequestOptions = cfg.customRequestOptions || {};
Expand All @@ -307,6 +320,7 @@ export class Upload extends Pumpify {
this.origin = cfg.origin;
this.params = cfg.params || {};
this.userProject = cfg.userProject;
this.chunkSize = cfg.chunkSize;

if (cfg.key) {
/**
Expand Down Expand Up @@ -482,6 +496,13 @@ export class Upload extends Pumpify {
const bufferStream = this.bufferStream || new PassThrough();
this.bufferStream = bufferStream;

const chunkSizingStream = new Transform({
// Buffer the data up to the highwater mark before passing along
// to subsequent streams. If no chunk size is available, this
// simply passes the data to the next stream.
readableHighWaterMark: this.chunkSize,
});

// The offset stream allows us to analyze each incoming
// chunk to analyze it against what the upstream API already
// has stored for this upload.
Expand All @@ -506,47 +527,85 @@ export class Upload extends Pumpify {

// Process the API response to look for errors that came in
// the response body.
this.on('response', (resp: GaxiosResponse) => {
const responseHandler = (resp: GaxiosResponse) => {
if (resp.data.error) {
this.destroy(resp.data.error);
return;
}

if (resp.status < 200 || resp.status > 299) {
const shouldContinueWithNextMultiChunk =
this.chunkSize &&
resp.status === RESUMABLE_INCOMPLETE_STATUS_CODE &&
resp.headers.range;

if (shouldContinueWithNextMultiChunk) {
// Use the upper value in this header to determine where to start the next chunk.
// We should not assume that the server received all bytes sent in the request.
// https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload
const range: string = resp.headers.range;
this.offset = Number(range.split('-')[1]) + 1;

// continue uploading next chunk
this.continueUploading();
} else if (resp.status < 200 || resp.status > 299) {
const err: ApiError = {
code: resp.status,
name: 'Upload failed',
message: 'Upload failed',
};
this.destroy(err);
return;
}
if (resp && resp.data) {
resp.data.size = Number(resp.data.size);
} else {
if (resp && resp.data) {
resp.data.size = Number(resp.data.size);
}
this.emit('metadata', resp.data);
this.deleteConfig();

// Allow the stream to continue naturally so the user's
// "finish" event fires.
this.uncork();
}
this.emit('metadata', resp.data);
this.deleteConfig();
};

// Allow the stream to continue naturally so the user's
// "finish" event fires.
this.uncork();
});
// This should be 'once' as `startUploading` can be called again
// for multi chunk uploads.
this.once('response', responseHandler);

this.setPipeline(bufferStream, offsetStream, delayStream);
this.setPipeline(
bufferStream,
chunkSizingStream,
offsetStream,
delayStream
);

this.pipe(requestStreamEmbeddedStream);

this.once('restart', () => {
// The upload is being re-attempted. Disconnect the request
// stream, so it won't receive more data.
this.unpipe(requestStreamEmbeddedStream);

// remove local response handler to avoid duplicating
// response handling
this.removeListener('response', responseHandler);
});

let contentRangeHeaderValue = '';

if (this.chunkSize) {
// TODO: determine ending byte for buffer
const endingByteIndex = 0;

contentRangeHeaderValue = `bytes ${this.offset}-${endingByteIndex}/${this.contentLength}`;
} else {
contentRangeHeaderValue = `bytes ${this.offset}-*/${this.contentLength}`;
}

const reqOpts: GaxiosOptions = {
method: 'PUT',
url: this.uri,
headers: {
'Content-Range': 'bytes ' + this.offset + '-*/' + this.contentLength,
'Content-Range': contentRangeHeaderValue,
},
body: requestStreamEmbeddedStream,
};
Expand Down Expand Up @@ -606,6 +665,14 @@ export class Upload extends Pumpify {

this.numBytesWritten += length;

// If we're in multi chunk upload mode (`this.chunkSize` is set), we
// want to pause any upstream data from coming in until we make the
// next request. The preceding stream ensures this chunk is the appropriate
// size for the request, thus 'onChunk' call per request.
if (this.chunkSize) {
this.emit('prefinish');
}

// only push data from the byte after the one we left off on
next(undefined, this.numBytesWritten > offset ? chunk : undefined);
}
Expand Down
14 changes: 12 additions & 2 deletions test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,12 @@ describe('gcs-resumable-upload', () => {

it('should cork the stream on prefinish', done => {
up.cork = done;
up.setPipeline = (buffer: Stream, offset: Stream, delay: Stream) => {
up.setPipeline = (
buffer: Stream,
chunk: Stream,
offset: Stream,
delay: Stream
) => {
setImmediate(() => {
delay.emit('prefinish');
});
Expand All @@ -521,7 +526,12 @@ describe('gcs-resumable-upload', () => {
});

it('should set the pipeline', done => {
up.setPipeline = (buffer: Stream, offset: Stream, delay: Stream) => {
up.setPipeline = (
buffer: Stream,
chunk: Stream,
offset: Stream,
delay: Stream
) => {
assert.strictEqual(buffer, up.bufferStream);
assert.strictEqual(offset, up.offsetStream);
assert.strictEqual(isStream(delay), true);
Expand Down