Skip to content

Commit

Permalink
fix(gcs-resumable-upload): Stop Duplicate Response Handlers on Retries
Browse files Browse the repository at this point in the history
  • Loading branch information
d-goog committed Jan 26, 2022
1 parent a6b78f5 commit 15825ff
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 29 deletions.
18 changes: 8 additions & 10 deletions src/gcs-resumable-upload/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -767,12 +767,6 @@ export class Upload extends Pumpify {
},
});

// This should be 'once' as `startUploading` can be called again for
// multi chunk uploads and each request would have its own response.
this.once('response', resp => {
responseReceived = true;
this.responseHandler(resp);
});
let headers: GaxiosOptions['headers'] = {};

// If using multiple chunk upload, set appropriate header
Expand All @@ -797,7 +791,11 @@ export class Upload extends Pumpify {
};

try {
await this.makeRequestStream(reqOpts);
const resp = await this.makeRequestStream(reqOpts);
if (resp) {
responseReceived = true;
this.responseHandler(resp);
}
} catch (err) {
const e = err as Error;
this.destroy(e);
Expand Down Expand Up @@ -989,7 +987,7 @@ export class Upload extends Pumpify {
return res;
}

private async makeRequestStream(reqOpts: GaxiosOptions): GaxiosPromise {
private async makeRequestStream(reqOpts: GaxiosOptions) {
const controller = new AbortController();
const errorCallback = () => controller.abort();
this.once('error', errorCallback);
Expand All @@ -1008,10 +1006,10 @@ export class Upload extends Pumpify {
reqOpts
);
const res = await this.authClient.request(combinedReqOpts);
this.onResponse(res);
const successfulRequest = this.onResponse(res);
this.removeListener('error', errorCallback);

return res;
return successfulRequest ? res : null;
}

private restart() {
Expand Down
45 changes: 26 additions & 19 deletions test/gcs-resumable-upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import * as mockery from 'mockery';
import * as nock from 'nock';
import * as path from 'path';
import * as sinon from 'sinon';
import {PassThrough, Readable} from 'stream';
import {Readable} from 'stream';

import {
ApiError,
Expand Down Expand Up @@ -906,7 +906,7 @@ describe('gcs-resumable-upload', () => {

describe('#startUploading', () => {
beforeEach(() => {
up.makeRequestStream = async () => new PassThrough();
up.makeRequestStream = async () => null;
up.upstreamChunkBuffer = Buffer.alloc(16);
});

Expand Down Expand Up @@ -978,14 +978,6 @@ describe('gcs-resumable-upload', () => {
up.startUploading();
});

it("should setup a 'response' listener", async () => {
assert.equal(up.eventNames().includes('response'), false);

await up.startUploading();

assert.equal(up.eventNames().includes('response'), true);
});

it('should destroy the stream if the request failed', done => {
const error = new Error('Error.');
up.on('error', (e: Error) => {
Expand Down Expand Up @@ -1697,14 +1689,27 @@ describe('gcs-resumable-upload', () => {
up.makeRequestStream(REQ_OPTS);
});

it('should return the response', async () => {
const response = {};
it('should return the response if successful', async () => {
const response = {some: 'response'};
up.authClient = {
request: async () => response,
};
up.onResponse = () => true;

const stream = await up.makeRequestStream(REQ_OPTS);
assert.strictEqual(stream, response);
});

it('should return `null` if the response is unsuccessful', async () => {
const response = {some: 'response'};
up.authClient = {
request: async () => response,
};
up.onResponse = () => false;

const stream = await up.makeRequestStream(REQ_OPTS);
assert.strictEqual(stream, null);
});
});

describe('#restart', () => {
Expand Down Expand Up @@ -2261,7 +2266,7 @@ describe('gcs-resumable-upload', () => {
let dataReceived = 0;
let chunkWritesInRequest = 0;

await new Promise(resolve => {
const res = await new Promise(resolve => {
opts.body.on('data', (data: Buffer) => {
dataReceived += data.byteLength;
overallDataReceived += data.byteLength;
Expand All @@ -2271,14 +2276,16 @@ describe('gcs-resumable-upload', () => {
opts.body.on('end', () => {
requests.push({dataReceived, opts, chunkWritesInRequest});

up.emit('response', {
resolve({
status: 200,
data: {},
});

resolve(null);
});
});

return res;
};

up.on('error', done);
Expand Down Expand Up @@ -2400,7 +2407,7 @@ describe('gcs-resumable-upload', () => {
let dataReceived = 0;
let chunkWritesInRequest = 0;

await new Promise(resolve => {
const res = await new Promise(resolve => {
opts.body.on('data', (data: Buffer) => {
dataReceived += data.byteLength;
overallDataReceived += data.byteLength;
Expand All @@ -2415,23 +2422,23 @@ describe('gcs-resumable-upload', () => {
? overallDataReceived - 1
: 0;

up.emit('response', {
resolve({
status: RESUMABLE_INCOMPLETE_STATUS_CODE,
headers: {
range: `bytes=0-${lastByteReceived}`,
},
data: {},
});
} else {
up.emit('response', {
resolve({
status: 200,
data: {},
});
}

resolve(null);
});
});

return res;
};

up.on('error', done);
Expand Down

0 comments on commit 15825ff

Please sign in to comment.