From f4062880525e4fba6b31dda706f77fdf74527433 Mon Sep 17 00:00:00 2001 From: Mikael Finstad Date: Thu, 2 Sep 2021 14:04:41 +0700 Subject: [PATCH] Rewrite Uploader to use fs-capacitor #3098 This allows for upload to start almost immediately without having to first download the file. And it allows for uploading bigger files, because transloadit assembly will not timeout, as it will get upload progress events all the time. No longer need for illusive progress. Also fix eslint warnings and simplify logic Still TODO: TUS pause/resume has a bug: https://github.com/tus/tus-js-client/issues/275 --- package-lock.json | 95 ++-- packages/@uppy/companion/package.json | 1 + .../@uppy/companion/src/server/Uploader.js | 432 +++++++++--------- .../companion/src/server/controllers/get.js | 6 +- .../companion/src/server/controllers/url.js | 6 +- .../companion/test/__tests__/uploader.js | 7 +- 6 files changed, 272 insertions(+), 275 deletions(-) diff --git a/package-lock.json b/package-lock.json index 03f3949b0f..682765ba74 100644 --- a/package-lock.json +++ b/package-lock.json @@ -34916,6 +34916,14 @@ "safe-buffer": "~5.1.0" } }, + "node_modules/fs-capacitor": { + "version": "7.0.1", + "resolved": "https://registry.npmjs.org/fs-capacitor/-/fs-capacitor-7.0.1.tgz", + "integrity": "sha512-YjxAAorsFA/pK3PTLuYJO+FlZ7wvGTIwGPbpGzVAJ+DUp6uof0zZjG6dcYsrGX864BMeUCj9R6lmkYZ5uY5lWQ==", + "engines": { + "node": ">=12" + } + }, "node_modules/fs-constants": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/fs-constants/-/fs-constants-1.0.0.tgz", @@ -78484,7 +78492,7 @@ } }, "packages/@uppy/angular": { - "version": "0.2.0", + "version": "0.2.1", "dependencies": { "@angular/animations": "~12.1.0", "@angular/common": "~12.1.0", @@ -78572,7 +78580,7 @@ "integrity": "sha512-N82ooyxVNm6h1riLCoyS9e3fuJ3AMG2zIZs2Gd1ATcSFjSA23Q0fzjjZeh0jbJvWVDZ0cJT8yaNNaaXHzueNjg==" }, "packages/@uppy/aws-s3": { - "version": "2.0.0", + "version": "2.0.1", "license": "MIT", "dependencies": { "@uppy/companion-client": "file:../companion-client", @@ -78588,7 +78596,7 @@ } }, "packages/@uppy/aws-s3-multipart": { - "version": "2.0.0", + "version": "2.0.2", "license": "MIT", "dependencies": { "@uppy/companion-client": "file:../companion-client", @@ -78614,7 +78622,7 @@ } }, "packages/@uppy/box": { - "version": "1.0.0", + "version": "1.0.1", "license": "MIT", "dependencies": { "@uppy/companion-client": "file:../companion-client", @@ -78627,7 +78635,7 @@ } }, "packages/@uppy/companion": { - "version": "3.0.0", + "version": "3.0.1", "license": "ISC", "dependencies": { "@purest/providers": "1.0.1", @@ -78645,6 +78653,7 @@ "express-prom-bundle": "6.3.0", "express-request-id": "1.4.1", "express-session": "1.17.1", + "fs-capacitor": "^7.0.1", "grant": "4.7.0", "helmet": "^4.6.0", "ip-address": "6.2.0", @@ -78942,7 +78951,7 @@ } }, "packages/@uppy/core": { - "version": "2.0.0", + "version": "2.0.1", "license": "MIT", "dependencies": { "@transloadit/prettier-bytes": "0.0.7", @@ -78967,7 +78976,7 @@ } }, "packages/@uppy/dashboard": { - "version": "2.0.0", + "version": "2.0.1", "license": "MIT", "dependencies": { "@transloadit/prettier-bytes": "0.0.7", @@ -78984,7 +78993,7 @@ "preact": "^10.5.13" }, "devDependencies": { - "@uppy/google-drive": "2.0.0", + "@uppy/google-drive": "2.0.1", "@uppy/status-bar": "*", "resize-observer-polyfill": "^1.5.0" }, @@ -79004,7 +79013,7 @@ } }, "packages/@uppy/drag-drop": { - "version": "2.0.0", + "version": "2.0.1", "license": "MIT", "dependencies": { "@uppy/utils": "file:../utils", @@ -79015,7 +79024,7 @@ } }, "packages/@uppy/drop-target": { - "version": "1.0.0", + "version": "1.0.1", "license": "MIT", "dependencies": { "@uppy/utils": "file:../utils" @@ -79025,7 +79034,7 @@ } }, "packages/@uppy/dropbox": { - "version": "2.0.0", + "version": "2.0.1", "license": "MIT", "dependencies": { "@uppy/companion-client": "file:../companion-client", @@ -79038,7 +79047,7 @@ } }, "packages/@uppy/facebook": { - "version": "2.0.0", + "version": "2.0.1", "license": "MIT", "dependencies": { "@uppy/companion-client": "file:../companion-client", @@ -79051,7 +79060,7 @@ } }, "packages/@uppy/file-input": { - "version": "2.0.0", + "version": "2.0.1", "license": "MIT", "dependencies": { "@uppy/utils": "file:../utils", @@ -79062,7 +79071,7 @@ } }, "packages/@uppy/form": { - "version": "2.0.0", + "version": "2.0.1", "license": "MIT", "dependencies": { "@uppy/utils": "file:../utils", @@ -79073,7 +79082,7 @@ } }, "packages/@uppy/golden-retriever": { - "version": "2.0.0", + "version": "2.0.2", "license": "MIT", "dependencies": { "@transloadit/prettier-bytes": "0.0.7", @@ -79085,7 +79094,7 @@ } }, "packages/@uppy/google-drive": { - "version": "2.0.0", + "version": "2.0.1", "license": "MIT", "dependencies": { "@uppy/companion-client": "file:../companion-client", @@ -79098,7 +79107,7 @@ } }, "packages/@uppy/image-editor": { - "version": "1.0.0", + "version": "1.0.1", "license": "MIT", "dependencies": { "@uppy/utils": "file:../utils", @@ -79110,7 +79119,7 @@ } }, "packages/@uppy/informer": { - "version": "2.0.0", + "version": "2.0.1", "license": "MIT", "dependencies": { "@uppy/utils": "file:../utils", @@ -79121,7 +79130,7 @@ } }, "packages/@uppy/instagram": { - "version": "2.0.0", + "version": "2.0.1", "license": "MIT", "dependencies": { "@uppy/companion-client": "file:../companion-client", @@ -79138,7 +79147,7 @@ "license": "MIT" }, "packages/@uppy/onedrive": { - "version": "2.0.0", + "version": "2.0.1", "license": "MIT", "dependencies": { "@uppy/companion-client": "file:../companion-client", @@ -79151,7 +79160,7 @@ } }, "packages/@uppy/progress-bar": { - "version": "2.0.0", + "version": "2.0.1", "license": "MIT", "dependencies": { "@uppy/utils": "file:../utils", @@ -79162,7 +79171,7 @@ } }, "packages/@uppy/provider-views": { - "version": "2.0.0", + "version": "2.0.1", "license": "MIT", "dependencies": { "@uppy/utils": "file:../utils", @@ -79174,7 +79183,7 @@ } }, "packages/@uppy/react": { - "version": "2.0.0", + "version": "2.0.1", "license": "MIT", "dependencies": { "@uppy/dashboard": "file:../dashboard", @@ -79198,7 +79207,7 @@ } }, "packages/@uppy/react-native": { - "version": "0.2.0", + "version": "0.2.1", "license": "MIT", "dependencies": { "@uppy/instagram": "file:../instagram", @@ -79214,14 +79223,14 @@ } }, "packages/@uppy/redux-dev-tools": { - "version": "2.0.0", + "version": "2.0.1", "license": "MIT", "peerDependencies": { "@uppy/core": "^2.0.0" } }, "packages/@uppy/robodog": { - "version": "2.0.0", + "version": "2.0.2", "license": "MIT", "dependencies": { "@uppy/core": "file:../core", @@ -79247,7 +79256,7 @@ } }, "packages/@uppy/screen-capture": { - "version": "2.0.0", + "version": "2.0.1", "license": "MIT", "dependencies": { "@uppy/utils": "file:../utils", @@ -79258,7 +79267,7 @@ } }, "packages/@uppy/status-bar": { - "version": "2.0.0", + "version": "2.0.1", "license": "MIT", "dependencies": { "@transloadit/prettier-bytes": "0.0.7", @@ -79316,7 +79325,7 @@ } }, "packages/@uppy/svelte": { - "version": "1.0.0", + "version": "1.0.1", "dependencies": { "@uppy/dashboard": "file:../dashboard", "@uppy/drag-drop": "file:../drag-drop", @@ -79358,7 +79367,7 @@ } }, "packages/@uppy/thumbnail-generator": { - "version": "2.0.0", + "version": "2.0.1", "license": "MIT", "dependencies": { "@uppy/utils": "file:../utils", @@ -79372,7 +79381,7 @@ } }, "packages/@uppy/transloadit": { - "version": "2.0.0", + "version": "2.0.1", "license": "MIT", "dependencies": { "@uppy/companion-client": "file:../companion-client", @@ -79390,7 +79399,7 @@ } }, "packages/@uppy/tus": { - "version": "2.0.0", + "version": "2.0.1", "license": "MIT", "dependencies": { "@uppy/companion-client": "file:../companion-client", @@ -79402,7 +79411,7 @@ } }, "packages/@uppy/unsplash": { - "version": "1.0.0", + "version": "1.0.1", "license": "MIT", "dependencies": { "@uppy/companion-client": "file:../companion-client", @@ -79415,7 +79424,7 @@ } }, "packages/@uppy/url": { - "version": "2.0.0", + "version": "2.0.1", "license": "MIT", "dependencies": { "@uppy/companion-client": "file:../companion-client", @@ -79434,7 +79443,7 @@ } }, "packages/@uppy/vue": { - "version": "0.3.0", + "version": "0.3.1", "dependencies": { "@uppy/dashboard": "file:../dashboard", "@uppy/drag-drop": "file:../drag-drop", @@ -79451,7 +79460,7 @@ } }, "packages/@uppy/webcam": { - "version": "2.0.0", + "version": "2.0.1", "license": "MIT", "dependencies": { "@uppy/utils": "file:../utils", @@ -79462,7 +79471,7 @@ } }, "packages/@uppy/xhr-upload": { - "version": "2.0.0", + "version": "2.0.1", "license": "MIT", "dependencies": { "@uppy/companion-client": "file:../companion-client", @@ -79488,7 +79497,7 @@ } }, "packages/@uppy/zoom": { - "version": "1.0.0", + "version": "1.0.1", "license": "MIT", "dependencies": { "@uppy/companion-client": "file:../companion-client", @@ -79501,7 +79510,7 @@ } }, "packages/uppy": { - "version": "2.0.0", + "version": "2.0.2", "license": "MIT", "dependencies": { "@uppy/aws-s3": "file:../@uppy/aws-s3", @@ -92768,6 +92777,7 @@ "express-prom-bundle": "6.3.0", "express-request-id": "1.4.1", "express-session": "1.17.1", + "fs-capacitor": "*", "grant": "4.7.0", "helmet": "^4.6.0", "ip-address": "6.2.0", @@ -93021,7 +93031,7 @@ "version": "file:packages/@uppy/dashboard", "requires": { "@transloadit/prettier-bytes": "0.0.7", - "@uppy/google-drive": "2.0.0", + "@uppy/google-drive": "2.0.1", "@uppy/informer": "file:../informer", "@uppy/provider-views": "file:../provider-views", "@uppy/status-bar": "*", @@ -109269,6 +109279,11 @@ "from2": "^2.0.3" } }, + "fs-capacitor": { + "version": "7.0.1", + "resolved": "https://registry.npmjs.org/fs-capacitor/-/fs-capacitor-7.0.1.tgz", + "integrity": "sha512-YjxAAorsFA/pK3PTLuYJO+FlZ7wvGTIwGPbpGzVAJ+DUp6uof0zZjG6dcYsrGX864BMeUCj9R6lmkYZ5uY5lWQ==" + }, "fs-constants": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/fs-constants/-/fs-constants-1.0.0.tgz", diff --git a/packages/@uppy/companion/package.json b/packages/@uppy/companion/package.json index 829e797f5f..351cede41c 100644 --- a/packages/@uppy/companion/package.json +++ b/packages/@uppy/companion/package.json @@ -45,6 +45,7 @@ "express-prom-bundle": "6.3.0", "express-request-id": "1.4.1", "express-session": "1.17.1", + "fs-capacitor": "^7.0.1", "grant": "4.7.0", "helmet": "^4.6.0", "ip-address": "6.2.0", diff --git a/packages/@uppy/companion/src/server/Uploader.js b/packages/@uppy/companion/src/server/Uploader.js index a5a37fc9b1..cf310f2b81 100644 --- a/packages/@uppy/companion/src/server/Uploader.js +++ b/packages/@uppy/companion/src/server/Uploader.js @@ -1,10 +1,10 @@ -const fs = require('fs') -const path = require('path') const tus = require('tus-js-client') const uuid = require('uuid') const isObject = require('isobject') +// @ts-ignore const validator = require('validator') const request = require('request') + /** @type {any} */ // @ts-ignore - typescript resolves this this to a hoisted version of // serialize-error that ships with a declaration file, we are using a version @@ -26,6 +26,8 @@ const PROTOCOLS = Object.freeze({ tus: 'tus', }) +class AbortError extends Error {} + class Uploader { /** * Uploads file to destination based on the supplied protocol (tus, s3-multipart, multipart) @@ -34,19 +36,19 @@ class Uploader { * * @typedef {object} UploaderOptions * @property {string} endpoint - * @property {string=} uploadUrl + * @property {string} uploadUrl * @property {string} protocol * @property {number} size - * @property {string=} fieldname + * @property {string} fieldname * @property {string} pathPrefix - * @property {any=} s3 + * @property {any} s3 * @property {any} metadata * @property {any} companionOptions - * @property {any=} storage - * @property {any=} headers - * @property {string=} httpMethod - * @property {boolean=} useFormData - * @property {number=} chunkSize + * @property {any} storage + * @property {any} headers + * @property {string} httpMethod + * @property {boolean} useFormData + * @property {number} chunkSize * * @param {UploaderOptions} options */ @@ -58,16 +60,14 @@ class Uploader { this.options = options this.token = uuid.v4() - this.path = `${this.options.pathPrefix}/${Uploader.FILE_NAME_PREFIX}-${this.token}` this.options.metadata = this.options.metadata || {} this.options.fieldname = this.options.fieldname || DEFAULT_FIELD_NAME this.uploadFileName = this.options.metadata.name ? this.options.metadata.name.substring(0, MAX_FILENAME_LENGTH) - : path.basename(this.path) - this.streamsEnded = false + : `${Uploader.FILE_NAME_PREFIX}-${this.token}` this.uploadStopped = false - this.writeStream = fs.createWriteStream(this.path, { mode: 0o666 }) // no executable files - .on('error', (err) => logger.error(`${err}`, 'uploader.write.error', this.shortToken)) + this.bytesWritten = 0 + /** @type {number} */ this.emittedProgress = 0 this.storage = options.storage @@ -75,6 +75,7 @@ class Uploader { if (this.options.protocol === PROTOCOLS.tus) { emitter().on(`pause:${this.token}`, () => { + logger.debug('Received from client: pause', 'uploader', this.shortToken) this._paused = true if (this.tus) { this.tus.abort() @@ -82,23 +83,72 @@ class Uploader { }) emitter().on(`resume:${this.token}`, () => { + logger.debug('Received from client: cancel', 'uploader', this.shortToken) this._paused = false if (this.tus) { this.tus.start() } }) + } - emitter().on(`cancel:${this.token}`, () => { - this._paused = true - if (this.tus) { - const shouldTerminate = !!this.tus.url - this.tus.abort(shouldTerminate).catch(() => {}) - } - this.cleanUp() - }) + emitter().on(`cancel:${this.token}`, () => { + logger.debug('Received from client: cancel', 'uploader', this.shortToken) + this._paused = true + if (this.tus) { + const shouldTerminate = !!this.tus.url + this.tus.abort(shouldTerminate).catch(() => {}) + } + this.capacitor.destroy(new AbortError()) + }) + } + + async _uploadByProtocol (readStream) { + // @todo a default protocol should not be set. We should ensure that the user specifies their protocol. + const protocol = this.options.protocol || PROTOCOLS.multipart + + switch (protocol) { + case PROTOCOLS.multipart: + return this.uploadMultipart(readStream) + case PROTOCOLS.s3Multipart: + return this.uploadS3Multipart(readStream) + case PROTOCOLS.tus: + return this.uploadTus(readStream) + default: + throw new Error('Invalid protocol') + } + } + + async _startUpload (readStream) { + try { + const { url, extraData } = await Promise.race([ + this._uploadByProtocol(readStream), + // If we don't handle stream errors, we get unhandled error in node. + new Promise((resolve, reject) => this.capacitor.on('error', reject)), + new Promise((resolve, reject) => readStream.on('error', reject)), + ]) + this.emitSuccess(url, extraData) + } catch (err) { + if (!(err instanceof AbortError)) { + // console.log(err) + logger.error(err, 'uploader.error', this.shortToken) + this.emitError(err, err.extraData) + } + } finally { + this.cleanUp() } } + async initCapacitor () { + if (this.capacitor) throw new Error('Already initialized capacitor') + // Because it's an ESM so we cannot require + const { WriteStream } = await import('fs-capacitor') + + this.capacitor = new WriteStream({ tmpdir: () => this.options.pathPrefix }) + const readStream = this.capacitor.createReadStream() + + this._startUpload(readStream) + } + /** * returns a substring of the token. Used as traceId for logging * we avoid using the entire token because this is meant to be a short term @@ -136,13 +186,6 @@ class Uploader { } } - /** - * the number of bytes written into the streams - */ - get bytesWritten () { - return this.writeStream.bytesWritten - } - /** * Validate the options passed down to the uplaoder * @@ -236,21 +279,17 @@ class Uploader { return Uploader.shortenToken(this.token) } - /** - * - * @param {Function} callback - */ - onSocketReady (callback) { - emitter().once(`connection:${this.token}`, () => callback()) + async awaitReady () { logger.debug('waiting for connection', 'uploader.socket.wait', this.shortToken) + await new Promise((resolve) => emitter().once(`connection:${this.token}`, resolve)) + await this.initCapacitor() } cleanUp () { - fs.unlink(this.path, (err) => { - if (err) { - logger.error(`cleanup failed for: ${this.path} err: ${err}`, 'uploader.cleanup.error') - } - }) + if (this.uploadStopped) return + logger.debug('cleanup', this.shortToken) + this.capacitor.destroy() + emitter().removeAllListeners(`pause:${this.token}`) emitter().removeAllListeners(`resume:${this.token}`) emitter().removeAllListeners(`cancel:${this.token}`) @@ -264,59 +303,28 @@ class Uploader { */ handleChunk (err, chunk) { if (this.uploadStopped) { + logger.debug('Received chunk after upload stopped', 'uploader.download', this.shortToken) return } if (err) { logger.error(err, 'uploader.download.error', this.shortToken) - this.emitError(err) - this.cleanUp() + this.capacitor.destroy(err) return } - // @todo a default protocol should not be set. We should ensure that the user specifies their protocol. - const protocol = this.options.protocol || PROTOCOLS.multipart - - // The download has completed; close the file and start an upload if necessary. if (chunk === null) { - this.writeStream.on('finish', () => { - this.streamsEnded = true - switch (protocol) { - case PROTOCOLS.multipart: - if (this.options.endpoint) { - this.uploadMultipart() - } - break - case PROTOCOLS.s3Multipart: - if (!this.s3Upload) { - this.uploadS3Multipart() - } else { - logger.warn('handleChunk() called multiple times', 'uploader.s3.duplicate', this.shortToken) - } - break - case PROTOCOLS.tus: - if (!this.tus) { - this.uploadTus() - } else { - logger.warn('handleChunk() called multiple times', 'uploader.tus.duplicate', this.shortToken) - } - break - } - }) - - return this.endStreams() + // Finished! End the write stream so that the read streams will also finish. + this.capacitor.end() + return } - this.writeStream.write(chunk, () => { + this.capacitor.write(chunk, () => { + this.bytesWritten += chunk.length logger.debug(`${this.bytesWritten} bytes`, 'uploader.download.progress', this.shortToken) - return this.emitIllusiveProgress() }) } - endStreams () { - this.writeStream.end() - } - getResponse () { if (this._errRespMessage) { return { body: { message: this._errRespMessage }, status: 400 } @@ -333,46 +341,15 @@ class Uploader { this.storage.set(`${Uploader.STORAGE_PREFIX}:${this.token}`, jsonStringify(state)) } - /** - * This method emits upload progress but also creates an "upload progress" illusion - * for the waiting period while only download is happening. Hence, it combines both - * download and upload into an upload progress. - * - * @see emitProgress - * @param {number=} bytesUploaded the bytes actually Uploaded so far - */ - emitIllusiveProgress (bytesUploaded = 0) { - if (this._paused) { - return - } - - let bytesTotal = this.streamsEnded ? this.bytesWritten : this.options.size - if (!this.streamsEnded) { - bytesTotal = Math.max(bytesTotal, this.bytesWritten) - } - // for a 10MB file, 10MB of download will account for 5MB upload progress - // and 10MB of actual upload will account for the other 5MB upload progress. - const illusiveBytesUploaded = (this.bytesWritten / 2) + (bytesUploaded / 2) - - logger.debug( - `${bytesUploaded} ${illusiveBytesUploaded} ${bytesTotal}`, - 'uploader.illusive.progress', - this.shortToken - ) - this.emitProgress(illusiveBytesUploaded, bytesTotal) - } - /** * * @param {number} bytesUploaded - * @param {number | null} bytesTotal + * @param {number | null} bytesTotalIn */ - emitProgress (bytesUploaded, bytesTotal) { - bytesTotal = bytesTotal || this.options.size - if (this.tus && this.tus.options.uploadLengthDeferred && this.streamsEnded) { - bytesTotal = this.bytesWritten - } - const percentage = (bytesUploaded / bytesTotal * 100) + onProgress (bytesUploaded, bytesTotalIn) { + const bytesTotal = bytesTotalIn || this.options.size + + const percentage = Math.min(Math.max(0, ((bytesUploaded / bytesTotal) * 100)), 100) const formatPercentage = percentage.toFixed(2) logger.debug( `${bytesUploaded} ${bytesTotal} ${formatPercentage}%`, @@ -380,6 +357,10 @@ class Uploader { this.shortToken ) + if (this._paused || this.uploadStopped) { + return + } + const dataToEmit = { action: 'progress', payload: { progress: formatPercentage, bytesUploaded, bytesTotal }, @@ -411,7 +392,7 @@ class Uploader { /** * * @param {Error} err - * @param {object=} extraData + * @param {object} extraData */ emitError (err, extraData = {}) { const serializedErr = serializeError(err) @@ -428,151 +409,145 @@ class Uploader { /** * start the tus upload */ - uploadTus () { - const file = fs.createReadStream(this.path) + async uploadTus (stream) { const uploader = this - this.tus = new tus.Upload(file, { - endpoint: this.options.endpoint, - uploadUrl: this.options.uploadUrl, - uploadLengthDeferred: false, - retryDelays: [0, 1000, 3000, 5000], - uploadSize: this.bytesWritten, - chunkSize: this.options.chunkSize || Infinity, - headers: headerSanitize(this.options.headers), - addRequestId: true, - metadata: { - // file name and type as required by the tusd tus server - // https://github.com/tus/tusd/blob/5b376141903c1fd64480c06dde3dfe61d191e53d/unrouted_handler.go#L614-L646 - filename: this.uploadFileName, - filetype: this.options.metadata.type, - ...this.options.metadata, - }, - /** - * - * @param {Error} error - */ - onError (error) { - logger.error(error, 'uploader.tus.error') - // deleting tus originalRequest field because it uses the same http-agent - // as companion, and this agent may contain sensitive request details (e.g headers) - // previously made to providers. Deleting the field would prevent it from getting leaked - // to the frontend etc. - // @ts-ignore - delete error.originalRequest + return new Promise((resolve, reject) => { + this.tus = new tus.Upload(stream, { + endpoint: this.options.endpoint, + uploadUrl: this.options.uploadUrl, + uploadLengthDeferred: false, + retryDelays: [0, 1000, 3000, 5000], + uploadSize: this.options.size, + chunkSize: this.options.chunkSize || 50e6, + headers: headerSanitize(this.options.headers), + addRequestId: true, + metadata: { + // file name and type as required by the tusd tus server + // https://github.com/tus/tusd/blob/5b376141903c1fd64480c06dde3dfe61d191e53d/unrouted_handler.go#L614-L646 + filename: this.uploadFileName, + filetype: this.options.metadata.type, + ...this.options.metadata, + }, + /** + * + * @param {Error} error + */ + onError (error) { + logger.error(error, 'uploader.tus.error') + // deleting tus originalRequest field because it uses the same http-agent + // as companion, and this agent may contain sensitive request details (e.g headers) + // previously made to providers. Deleting the field would prevent it from getting leaked + // to the frontend etc. + // @ts-ignore + delete error.originalRequest + // @ts-ignore + delete error.originalResponse + reject(error) + }, + /** + * + * @param {number} bytesUploaded + * @param {number} bytesTotal + */ // @ts-ignore - delete error.originalResponse - uploader.emitError(error) - }, - /** - * - * @param {number} bytesUploaded - * @param {number} bytesTotal - */ - onProgress (bytesUploaded, bytesTotal) { // eslint-disable-line no-unused-vars - uploader.emitIllusiveProgress(bytesUploaded) - }, - onSuccess () { - uploader.emitSuccess(uploader.tus.url) - uploader.cleanUp() - }, - }) + onProgress (bytesUploaded, bytesTotal) { // eslint-disable-line no-unused-vars + uploader.onProgress(bytesUploaded, bytesTotal) + }, + onSuccess () { + resolve({ url: uploader.tus.url }) + }, + }) - if (!this._paused) { - this.tus.start() - } + if (!this._paused) { + this.tus.start() + } + }) } - uploadMultipart () { - const file = fs.createReadStream(this.path) + async uploadMultipart (stream) { + if (!this.options.endpoint) { + throw new Error('No multipart endpoint set') + } // upload progress let bytesUploaded = 0 - file.on('data', (data) => { + stream.on('data', (data) => { bytesUploaded += data.length - this.emitIllusiveProgress(bytesUploaded) + this.onProgress(bytesUploaded, undefined) }) const httpMethod = (this.options.httpMethod || '').toLowerCase() === 'put' ? 'put' : 'post' const headers = headerSanitize(this.options.headers) const reqOptions = { url: this.options.endpoint, headers, encoding: null } - const httpRequest = request[httpMethod] + const runRequest = request[httpMethod] + if (this.options.useFormData) { reqOptions.formData = { - ...this.options.metadata, [this.options.fieldname]: { - value: file, + value: stream, options: { filename: this.uploadFileName, contentType: this.options.metadata.type, + knownLength: this.options.size, }, }, } - - httpRequest(reqOptions, (error, response, body) => { - this._onMultipartComplete(error, response, body, bytesUploaded) - }) } else { - reqOptions.headers['content-length'] = this.bytesWritten - reqOptions.body = file - httpRequest(reqOptions, (error, response, body) => { - this._onMultipartComplete(error, response, body, bytesUploaded) - }) + reqOptions.headers['content-length'] = this.options.size + reqOptions.body = stream } - } - _onMultipartComplete (error, response, body, bytesUploaded) { - if (error) { - logger.error(error, 'upload.multipart.error') - this.emitError(error) - return - } - const { headers } = response + const { response, body } = await new Promise((resolve, reject) => { + runRequest(reqOptions, (error, response2, body2) => { + if (error) { + logger.error(error, 'upload.multipart.error') + reject(error) + return + } + + resolve({ response: response2, body: body2 }) + }) + }) + // remove browser forbidden headers - delete headers['set-cookie'] - delete headers['set-cookie2'] + delete response.headers['set-cookie'] + delete response.headers['set-cookie2'] const respObj = { responseText: body.toString(), status: response.statusCode, statusText: response.statusMessage, - headers, + headers: response.headers, } if (response.statusCode >= 400) { logger.error(`upload failed with status: ${response.statusCode}`, 'upload.multipart.error') - this.emitError(new Error(response.statusMessage), respObj) - } else if (bytesUploaded !== this.bytesWritten && bytesUploaded !== this.options.size) { - const errMsg = `uploaded only ${bytesUploaded} of ${this.bytesWritten} with status: ${response.statusCode}` + const err = new Error(response.statusMessage) + // @ts-ignore + err.extraData = respObj + throw err + } + + if (bytesUploaded !== this.options.size) { + const errMsg = `uploaded only ${bytesUploaded} of ${this.options.size} with status: ${response.statusCode}` logger.error(errMsg, 'upload.multipart.mismatch.error') - this.emitError(new Error(errMsg)) - } else { - this.emitSuccess(null, { response: respObj, bytesUploaded }) + throw new Error(errMsg) } - this.cleanUp() + return { url: null, extraData: { response: respObj, bytesUploaded } } } /** * Upload the file to S3 using a Multipart upload. */ - uploadS3Multipart () { - const file = fs.createReadStream(this.path) - - return this._uploadS3MultipartStream(file) - } - - /** - * Upload a stream to S3. - */ - _uploadS3MultipartStream (stream) { + async uploadS3Multipart (stream) { if (!this.options.s3) { - this.emitError(new Error('The S3 client is not configured on this companion instance.')) - return + throw new Error('The S3 client is not configured on this companion instance.') } - const filename = this.options.metadata.name || path.basename(this.path) + const filename = this.uploadFileName const { client, options } = this.options.s3 const upload = client.upload({ @@ -584,28 +559,29 @@ class Uploader { Body: stream, }) - this.s3Upload = upload - upload.on('httpUploadProgress', ({ loaded, total }) => { - this.emitProgress(loaded, total) + this.onProgress(loaded, total) }) - upload.send((error, data) => { - this.s3Upload = null - if (error) { - this.emitError(error) - } else { - const url = data && data.Location ? data.Location : null - this.emitSuccess(url, { - response: { - responseText: JSON.stringify(data), - headers: { - 'content-type': 'application/json', + return new Promise((resolve, reject) => { + upload.send((error, data) => { + if (error) { + reject(error) + return + } + + resolve({ + url: data && data.Location ? data.Location : null, + extraData: { + response: { + responseText: JSON.stringify(data), + headers: { + 'content-type': 'application/json', + }, }, }, }) - } - this.cleanUp() + }) }) } } diff --git a/packages/@uppy/companion/src/server/controllers/get.js b/packages/@uppy/companion/src/server/controllers/get.js index 41dbb24c66..f383f3e347 100644 --- a/packages/@uppy/companion/src/server/controllers/get.js +++ b/packages/@uppy/companion/src/server/controllers/get.js @@ -34,11 +34,11 @@ function get (req, res, next) { // wait till the client has connected to the socket, before starting // the download, so that the client can receive all download/upload progress. logger.debug('Waiting for socket connection before beginning remote download.', null, req.id) - // waiting for socketReady. - uploader.onSocketReady(() => { + uploader.awaitReady().then(() => { logger.debug('Socket connection received. Starting remote download.', null, req.id) provider.download({ id, token, query: req.query }, uploader.handleChunk.bind(uploader)) - }) + }).catch((err2) => logger.error(err2, req.id)) + const response = uploader.getResponse() res.status(response.status).json(response.body) }) diff --git a/packages/@uppy/companion/src/server/controllers/url.js b/packages/@uppy/companion/src/server/controllers/url.js index b5c33ee315..44db6b7831 100644 --- a/packages/@uppy/companion/src/server/controllers/url.js +++ b/packages/@uppy/companion/src/server/controllers/url.js @@ -53,6 +53,8 @@ const downloadURL = (url, onDataChunk, blockLocalIPs, traceId) => { agentClass: getProtectedHttpAgent((new URL(url)).protocol, blockLocalIPs), } + // return onDataChunk(new Error('test error')) + request(opts) .on('response', (resp) => { if (resp.statusCode >= 300) { @@ -123,10 +125,10 @@ const get = async (req, res) => { } logger.debug('Waiting for socket connection before beginning remote download.', null, req.id) - uploader.onSocketReady(() => { + uploader.awaitReady().then(() => { logger.debug('Socket connection received. Starting remote download.', null, req.id) downloadURL(req.body.url, uploader.handleChunk.bind(uploader), !debug, req.id) - }) + }).catch((err2) => logger.error(err2, req.id)) const response = uploader.getResponse() diff --git a/packages/@uppy/companion/test/__tests__/uploader.js b/packages/@uppy/companion/test/__tests__/uploader.js index 364b0c4b92..b496627b1b 100644 --- a/packages/@uppy/companion/test/__tests__/uploader.js +++ b/packages/@uppy/companion/test/__tests__/uploader.js @@ -9,6 +9,9 @@ const standalone = require('../../src/standalone') describe('uploader with tus protocol', () => { test('upload functions with tus protocol', () => { + if (true) { + throw new Error('TODO this test hangs') + } const fileContent = Buffer.from('Some file content') const { companionOptions } = standalone() const opts = { @@ -26,13 +29,13 @@ describe('uploader with tus protocol', () => { return new Promise((resolve) => { // validate that the test is resolved on socket connection - uploader.onSocketReady(() => { + uploader.awaitReady().then(() => { const fileInfo = fs.statSync(uploader.path) expect(fileInfo.isFile()).toBe(true) expect(fileInfo.size).toBe(0) uploader.handleChunk(null, fileContent) uploader.handleChunk(null, null) - }) + }).catch((err2) => console.error(err2)) let progressReceived = 0 // emulate socket connection