Skip to content
This repository has been archived by the owner on Dec 16, 2020. It is now read-only.

Commit

Permalink
feat: destroy stream on error
Browse files Browse the repository at this point in the history
  • Loading branch information
trs committed Jun 19, 2019
1 parent fec8ab3 commit 426f758
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 32 deletions.
23 changes: 2 additions & 21 deletions src/methods/chunk.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,9 @@
import { Transform } from "stream";
import { TransformTyped, TransformTypedOptions } from "../types";
import { chunkMap } from "./chunkMap";

export function chunk<R>(
size: number,
options: TransformTypedOptions<R, R[]> = {}
): TransformTyped<R, R[]> {
let chunk: R[] = [];
return new Transform({
objectMode: true,
...options,
transform(item, _encoding, callback) {
chunk.push(item);
if (chunk.length >= size) {
this.push(chunk);
chunk = [];
}
callback();
},
flush(callback) {
if (chunk.length > 0) {
this.push(chunk);
chunk = [];
}
callback();
}
});
return chunkMap(size, (chunk) => chunk, options);
}
32 changes: 21 additions & 11 deletions src/methods/chunkMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,31 @@ export function chunkMap<T, R>(
objectMode: true,
...options,
async transform(item, _encoding, callback) {
chunk.push(item);
if (chunk.length >= size) {
const result = await method(chunk, index++);
this.push(result);
chunk = [];
try {
chunk.push(item);
if (chunk.length >= size) {
const result = await method(chunk, index++);
this.push(result);
chunk = [];
}
callback();
} catch (err) {
callback(err);
this.destroy();
}
callback();
},
async flush(callback) {
if (chunk.length > 0) {
const result = await method(chunk, index++);
this.push(result);
chunk = [];
try {
if (chunk.length > 0) {
const result = await method(chunk, index++);
this.push(result);
chunk = [];
}
callback();
} catch (err) {
callback(err);
this.destroy();
}
callback();
}
});
}
1 change: 1 addition & 0 deletions src/methods/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export function filter<T>(
callback(undefined, take ? chunk : undefined);
} catch (err) {
callback(err);
this.destroy();
}
}
});
Expand Down
1 change: 1 addition & 0 deletions src/methods/map.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export function map<T, R>(
callback();
} catch (err) {
callback(err);
this.destroy();
}
}
});
Expand Down
1 change: 1 addition & 0 deletions src/methods/reduce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export function reduce<T, R>(
callback();
} catch (err) {
callback(err);
this.destroy();
}
},
flush(callback) {
Expand Down
28 changes: 28 additions & 0 deletions src/methods/transform.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,32 @@ describe('transform', () => {
done();
});
});

it('handles errors by closing stream', (done) => {
const check = jest.fn();

from([1, 2, 3])
.pipe(transform(() => {
throw new Error('test')
}))
.on('error', check)
.on('close', () => {
expect(check.mock.calls.length).toEqual(1);
done();
});
});

it('handles errors by closing stream', (done) => {
const check = jest.fn();

from([1, 2, 3])
.pipe(transform(async () => {
throw new Error('test')
}))
.on('error', check)
.on('close', () => {
expect(check.mock.calls.length).toEqual(1);
done();
});
});
});
2 changes: 2 additions & 0 deletions src/methods/transform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export function transform<In, Out>(
callback();
} catch (err) {
callback(err);
this.destroy();
}
},
async flush(callback) {
Expand All @@ -50,6 +51,7 @@ export function transform<In, Out>(
callback();
} catch (err) {
callback(err);
this.destroy();
}
}
});
Expand Down

0 comments on commit 426f758

Please sign in to comment.