From 0a3c6988cd69bc2f73c6a0db6755fb7699c4186b Mon Sep 17 00:00:00 2001 From: Emma Zhu Date: Thu, 1 Feb 2024 10:49:28 +0800 Subject: [PATCH] Fix issue of 'too many handles' error when downloading a large file --- src/common/persistence/FSExtentStore.ts | 29 +++----- src/common/persistence/FileLazyReadStream.ts | 72 ++++++++++++++++++++ 2 files changed, 83 insertions(+), 18 deletions(-) create mode 100644 src/common/persistence/FileLazyReadStream.ts diff --git a/src/common/persistence/FSExtentStore.ts b/src/common/persistence/FSExtentStore.ts index 199a8012d..36a4d7170 100644 --- a/src/common/persistence/FSExtentStore.ts +++ b/src/common/persistence/FSExtentStore.ts @@ -1,6 +1,5 @@ import { close, - createReadStream, createWriteStream, fdatasync, mkdir, @@ -30,6 +29,7 @@ import IExtentStore, { } from "./IExtentStore"; import IOperationQueue from "./IOperationQueue"; import OperationQueue from "./OperationQueue"; +import FileLazyReadStream from "./FileLazyReadStream"; const statAsync = promisify(stat); const mkdirAsync = promisify(mkdir); @@ -333,26 +333,19 @@ export default class FSExtentStore implements IExtentStore { const op = () => new Promise((resolve, reject) => { this.logger.verbose( - `FSExtentStore:readExtent() Creating read stream. LocationId:${persistencyId} extentId:${ - extentChunk.id - } path:${path} offset:${extentChunk.offset} count:${ - extentChunk.count + `FSExtentStore:readExtent() Creating read stream. LocationId:${persistencyId} extentId:${extentChunk.id + } path:${path} offset:${extentChunk.offset} count:${extentChunk.count } end:${extentChunk.offset + extentChunk.count - 1}`, contextId ); - const stream = createReadStream(path, { - start: extentChunk.offset, - end: extentChunk.offset + extentChunk.count - 1 - }).on("close", () => { - this.logger.verbose( - `FSExtentStore:readExtent() Read stream closed. LocationId:${persistencyId} extentId:${ - extentChunk.id - } path:${path} offset:${extentChunk.offset} count:${ - extentChunk.count - } end:${extentChunk.offset + extentChunk.count - 1}`, - contextId - ); - }); + const stream = new FileLazyReadStream( + path, + extentChunk.offset, + extentChunk.offset + extentChunk.count - 1, + this.logger, + persistencyId, + extentChunk.id, + contextId); resolve(stream); }); diff --git a/src/common/persistence/FileLazyReadStream.ts b/src/common/persistence/FileLazyReadStream.ts new file mode 100644 index 000000000..dbe57853a --- /dev/null +++ b/src/common/persistence/FileLazyReadStream.ts @@ -0,0 +1,72 @@ +import { ReadStream, createReadStream } from "fs"; +import { Readable } from "stream"; +import ILogger from "../ILogger"; + + +export default class FileLazyReadStream extends Readable { + private extentStream: ReadStream | undefined; + constructor( + private readonly extentPath: string, + private readonly start: number, + private readonly end: number, + private readonly logger: ILogger, + private readonly persistencyId: string, + private readonly extentId: string, + private readonly contextId?: string) { + super(); + } + + public _read(): void { + if (this.extentStream === undefined) { + this.extentStream = createReadStream(this.extentPath, { + start: this.start, + end: this.end + }).on("close", () => { + this.logger.verbose( + `FSExtentStore:readExtent() Read stream closed. LocationId:${this.persistencyId} extentId:${this.extentId + } path:${this.extentPath} offset:${this.start} end:${this.end}`, + this.contextId + ); + }); + this.setSourceEventHandlers(); + } + this.extentStream?.resume(); + } + + private setSourceEventHandlers() { + this.extentStream?.on("data", this.sourceDataHandler); + this.extentStream?.on("end", this.sourceErrorOrEndHandler); + this.extentStream?.on("error", this.sourceErrorOrEndHandler); + } + + private removeSourceEventHandlers() { + this.extentStream?.removeListener("data", this.sourceDataHandler); + this.extentStream?.removeListener("end", this.sourceErrorOrEndHandler); + this.extentStream?.removeListener("error", this.sourceErrorOrEndHandler); + } + + private sourceDataHandler = (data: Buffer) => { + if (!this.push(data)) { + this.extentStream?.pause(); + } + } + + private sourceErrorOrEndHandler = (err?: Error) => { + if (err && err.name === "AbortError") { + this.destroy(err); + return; + } + + this.removeSourceEventHandlers(); + this.push(null); + this.destroy(err); + } + + _destroy(error: Error | null, callback: (error?: Error) => void): void { + // remove listener from source and release source + //this.removeSourceEventHandlers(); + (this.extentStream as Readable).destroy(); + + callback(error === null ? undefined : error); + } +} \ No newline at end of file