Skip to content

Commit

Permalink
Move streams utils to the core (elastic#76397)
Browse files Browse the repository at this point in the history
* move utils/streams to the KP

* allow imports from src/core/server/utils

* ts-ify

* import streams from KP

* remove unnecessary ts-expect-error

* fix kbn-es-archiver build

* lost export

* copy array in createListStream

* remove streams from legacy utils

Co-authored-by: spalger <[email protected]>
  • Loading branch information
mshustov and spalger committed Sep 3, 2020
1 parent d405da9 commit 8c738bf
Show file tree
Hide file tree
Showing 51 changed files with 85 additions and 136 deletions.
2 changes: 2 additions & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ module.exports = {
'!src/core/server/mocks{,.ts}',
'!src/core/server/types{,.ts}',
'!src/core/server/test_utils{,.ts}',
'!src/core/server/utils', // ts alias
'!src/core/server/utils/**/*',
// for absolute imports until fixed in
// https://github.com/elastic/kibana/issues/36096
'!src/core/server/*.test.mocks{,.ts}',
Expand Down
2 changes: 1 addition & 1 deletion src/cli_keystore/add.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import { Logger } from '../cli_plugin/lib/logger';
import { confirm, question } from '../legacy/server/utils';
import { createPromiseFromStreams, createConcatStream } from '../legacy/utils';
import { createPromiseFromStreams, createConcatStream } from '../core/server/utils';

/**
* @param {Keystore} keystore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import { exportSavedObjectsToStream } from './get_sorted_objects_for_export';
import { savedObjectsClientMock } from '../service/saved_objects_client.mock';
import { Readable } from 'stream';
import { createPromiseFromStreams, createConcatStream } from '../../../../legacy/utils/streams';
import { createPromiseFromStreams, createConcatStream } from '../../utils/streams';

async function readStreamToCompletion(stream: Readable) {
return createPromiseFromStreams([stream, createConcatStream([])]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/

import Boom from 'boom';
import { createListStream } from '../../../../legacy/utils/streams';
import { createListStream } from '../../utils/streams';
import { SavedObjectsClientContract, SavedObject } from '../types';
import { fetchNestedDependencies } from './inject_nested_depdendencies';
import { sortObjects } from './sort_objects';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import {
createFilterStream,
createMapStream,
createPromiseFromStreams,
} from '../../../../legacy/utils/streams';
} from '../../utils/streams';
import { SavedObject } from '../types';
import { createLimitStream } from './create_limit_stream';
import { SavedObjectsImportError } from './types';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
createConcatStream,
createListStream,
createPromiseFromStreams,
} from '../../../../legacy/utils/streams';
} from '../../utils/streams';
import { createLimitStream } from './create_limit_stream';

describe('createLimitStream()', () => {
Expand Down
6 changes: 1 addition & 5 deletions src/core/server/saved_objects/routes/export.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@

import { schema } from '@kbn/config-schema';
import stringify from 'json-stable-stringify';
import {
createPromiseFromStreams,
createMapStream,
createConcatStream,
} from '../../../../legacy/utils/streams';
import { createPromiseFromStreams, createMapStream, createConcatStream } from '../../utils/streams';
import { IRouter } from '../../http';
import { SavedObjectConfig } from '../saved_objects_config';
import { exportSavedObjectsToStream } from '../export';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jest.mock('../../export', () => ({
}));

import * as exportMock from '../../export';
import { createListStream } from '../../../../../legacy/utils/streams';
import { createListStream } from '../../../utils/streams';
import supertest from 'supertest';
import { UnwrapPromise } from '@kbn/utility-types';
import { SavedObjectConfig } from '../../saved_objects_config';
Expand Down
2 changes: 1 addition & 1 deletion src/core/server/saved_objects/routes/utils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import { createSavedObjectsStreamFromNdJson, validateTypes, validateObjects } from './utils';
import { Readable } from 'stream';
import { createPromiseFromStreams, createConcatStream } from '../../../../legacy/utils/streams';
import { createPromiseFromStreams, createConcatStream } from '../../utils/streams';

async function readStreamToCompletion(stream: Readable) {
return createPromiseFromStreams([stream, createConcatStream([])]);
Expand Down
6 changes: 1 addition & 5 deletions src/core/server/saved_objects/routes/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@

import { Readable } from 'stream';
import { SavedObject, SavedObjectsExportResultDetails } from 'src/core/server';
import {
createSplitStream,
createMapStream,
createFilterStream,
} from '../../../../legacy/utils/streams';
import { createSplitStream, createMapStream, createFilterStream } from '../../utils/streams';

export function createSavedObjectsStreamFromNdJson(ndJsonStream: Readable) {
return ndJsonStream
Expand Down
1 change: 1 addition & 0 deletions src/core/server/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@
export * from './crypto';
export * from './from_root';
export * from './package_json';
export * from './streams';
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

import { createListStream, createPromiseFromStreams, createConcatStream } from './';
import { createListStream, createPromiseFromStreams, createConcatStream } from './index';

describe('concatStream', () => {
test('accepts an initial value', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ import { createReduceStream } from './reduce_stream';
* items will concat with
* @return {Transform}
*/
export function createConcatStream(initial) {
export function createConcatStream<T>(initial?: T) {
return createReduceStream((acc, chunk) => acc.concat(chunk), initial);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

import { PassThrough } from 'stream';
import { Readable, PassThrough, TransformOptions } from 'stream';

/**
* Write the data and errors from a list of stream providers
Expand All @@ -29,7 +29,10 @@ import { PassThrough } from 'stream';
* @param {PassThroughOptions} options options passed to the PassThrough constructor
* @return {WritableStream} combined stream
*/
export function concatStreamProviders(sourceProviders, options = {}) {
export function concatStreamProviders(
sourceProviders: Array<() => Readable>,
options?: TransformOptions
) {
const destination = new PassThrough(options);
const queue = sourceProviders.slice();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
createFilterStream,
createListStream,
createPromiseFromStreams,
} from './';
} from './index';

describe('createFilterStream()', () => {
test('calls the function with each item in the source stream', async () => {
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
createListStream,
createIntersperseStream,
createConcatStream,
} from './';
} from './index';

describe('intersperseStream', () => {
test('places the intersperse value between each provided value', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import { Transform } from 'stream';
* @param {String|Buffer} intersperseChunk
* @return {Transform}
*/
export function createIntersperseStream(intersperseChunk) {
export function createIntersperseStream(intersperseChunk: string | Buffer) {
let first = true;

return new Transform({
Expand All @@ -55,7 +55,7 @@ export function createIntersperseStream(intersperseChunk) {
}

this.push(chunk);
callback(null);
callback();
} catch (err) {
callback(err);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

import { createListStream } from './';
import { createListStream } from './index';

describe('listStream', () => {
test('provides the values in the initial list', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import { Readable } from 'stream';
* @param {Array<any>} items - the list of items to provide
* @return {Readable}
*/
export function createListStream(items = []) {
const queue = [].concat(items);
export function createListStream<T = any>(items: T | T[] = []) {
const queue = Array.isArray(items) ? [...items] : [items];

return new Readable({
objectMode: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ describe('createMapStream()', () => {
test('send the return value from the mapper on the output stream', async () => {
const result = await createPromiseFromStreams([
createListStream([1, 2, 3]),
createMapStream((n) => n * 100),
createMapStream((n: number) => n * 100),
createConcatStream([]),
]);

Expand All @@ -49,7 +49,7 @@ describe('createMapStream()', () => {
test('supports async mappers', async () => {
const result = await createPromiseFromStreams([
createListStream([1, 2, 3]),
createMapStream(async (n, i) => {
createMapStream(async (n: number, i: number) => {
await delay(n);
return n * i;
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import { Transform } from 'stream';

export function createMapStream(fn) {
export function createMapStream<T>(fn: (value: T, i: number) => void) {
let i = 0;

return new Transform({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import { Readable, Writable, Duplex, Transform } from 'stream';

import { createListStream, createPromiseFromStreams, createReduceStream } from './';
import { createListStream, createPromiseFromStreams, createReduceStream } from './index';

describe('promiseFromStreams', () => {
test('pipes together an array of streams', async () => {
Expand Down Expand Up @@ -76,14 +76,13 @@ describe('promiseFromStreams', () => {
test('waits for writing and resolves to final value', async () => {
let written = '';

const duplexReadQueue = [];
const duplexReadQueue: Array<Promise<unknown>> = [];
const duplexItemsToPush = ['foo', 'bar', null];
const result = await createPromiseFromStreams([
createListStream(['a', 'b', 'c']),
new Duplex({
async read() {
const result = await duplexReadQueue.shift();
this.push(result);
this.push(await duplexReadQueue.shift());
},

write(chunk, enc, cb) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,20 @@
* @return {Promise<any>}
*/

import { pipeline, Writable } from 'stream';
import { pipeline, Writable, Readable } from 'stream';

export async function createPromiseFromStreams(streams) {
let finalChunk;
function isReadable(stream: Readable | Writable): stream is Readable {
return 'read' in stream && typeof stream.read === 'function';
}

export async function createPromiseFromStreams<T>(streams: [Readable, ...Writable[]]): Promise<T> {
let finalChunk: any;
const last = streams[streams.length - 1];
if (typeof last.read !== 'function' && streams.length === 1) {
if (!isReadable(last) && streams.length === 1) {
// For a nicer error than what stream.pipeline throws
throw new Error('A minimum of 2 streams is required when a non-readable stream is given');
}
if (typeof last.read === 'function') {
if (isReadable(last)) {
// We are pushing a writable stream to capture the last chunk
streams.push(
new Writable({
Expand All @@ -57,7 +61,9 @@ export async function createPromiseFromStreams(streams) {
})
);
}

return new Promise((resolve, reject) => {
// @ts-expect-error 'pipeline' doesn't support variable length of arguments
pipeline(...streams, (err) => {
if (err) return reject(err);
resolve(finalChunk);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
import { Transform } from 'stream';
import { createReduceStream, createPromiseFromStreams, createListStream } from './index';

import { createReduceStream, createPromiseFromStreams, createListStream } from './';

const promiseFromEvent = (name, emitter) =>
const promiseFromEvent = (name: string, emitter: Transform) =>
new Promise((resolve) => emitter.on(name, () => resolve(name)));

describe('reduceStream', () => {
Expand Down Expand Up @@ -47,7 +47,10 @@ describe('reduceStream', () => {
});

test('emits an error if an iteration fails', async () => {
const reduce = createReduceStream((acc, i) => expect(i).toBe(1), 0);
const reduce = createReduceStream((acc, i) => {
expect(i).toBe(1);
return acc;
}, 0);
const errorEvent = promiseFromEvent('error', reduce);

reduce.write(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ import { Transform } from 'stream';
* initial value.
* @return {Transform}
*/
export function createReduceStream(reducer, initial) {
export function createReduceStream<T>(
reducer: (value: any, chunk: T, enc: string) => T,
initial?: T
) {
let i = -1;
let value = initial;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
import { Writable, Readable } from 'stream';

import {
createReplaceStream,
createConcatStream,
createPromiseFromStreams,
createListStream,
createMapStream,
} from './';
} from './index';

async function concatToString(streams) {
async function concatToString(streams: [Readable, ...Writable[]]) {
return await createPromiseFromStreams([
...streams,
createMapStream((buff) => buff.toString('utf8')),
createMapStream((buff: Buffer) => buff.toString('utf8')),
createConcatStream(''),
]);
}

describe('replaceStream', () => {
test('produces buffers when it receives buffers', async () => {
const chunks = await createPromiseFromStreams([
const chunks = await createPromiseFromStreams<Buffer[]>([
createListStream([Buffer.from('foo'), Buffer.from('bar')]),
createReplaceStream('o', '0'),
createConcatStream([]),
Expand All @@ -47,7 +48,7 @@ describe('replaceStream', () => {
});

test('produces buffers when it receives strings', async () => {
const chunks = await createPromiseFromStreams([
const chunks = await createPromiseFromStreams<string[]>([
createListStream(['foo', 'bar']),
createReplaceStream('o', '0'),
createConcatStream([]),
Expand All @@ -59,6 +60,7 @@ describe('replaceStream', () => {
});

test('expects toReplace to be a string', () => {
// @ts-expect-error
expect(() => createReplaceStream(Buffer.from('foo'))).toThrowError(/be a string/);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import { Transform } from 'stream';

export function createReplaceStream(toReplace, replacement) {
export function createReplaceStream(toReplace: string, replacement: string | Buffer) {
if (typeof toReplace !== 'string') {
throw new TypeError('toReplace must be a string');
}
Expand Down Expand Up @@ -78,6 +78,7 @@ export function createReplaceStream(toReplace, replacement) {
this.push(buffer);
}

// @ts-expect-error
buffer = null;
callback();
},
Expand Down
Loading

0 comments on commit 8c738bf

Please sign in to comment.