Skip to content

Commit

Permalink
refactores buffer flushing to be a single clean stream
Browse files Browse the repository at this point in the history
  • Loading branch information
gmmorris committed Jul 20, 2020
1 parent 3f56067 commit a2d7943
Showing 1 changed file with 45 additions and 30 deletions.
75 changes: 45 additions & 30 deletions x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
*/

import { keyBy, map } from 'lodash';
import { Subject, race, from, Subscription } from 'rxjs';
import { bufferWhen, filter, bufferCount, bufferTime, first } from 'rxjs/operators';
import { Subject, race, from } from 'rxjs';
import { bufferWhen, filter, bufferCount, flatMap, mapTo, first } from 'rxjs/operators';
import { either, Result, asOk, asErr, Ok, Err } from './result_type';

export interface BufferOptions {
Expand Down Expand Up @@ -36,6 +36,9 @@ export type BulkOperation<Input, ErrorOutput, Output = Input> = (
entities: Input[]
) => Promise<Array<OperationResult<Input, ErrorOutput, Output>>>;

const DONT_FLUSH = false;
const FLUSH = true;

export function createBuffer<Input extends Entity, ErrorOutput, Output extends Entity = Input>(
bulkOperation: BulkOperation<Input, ErrorOutput, Output>,
{ bufferMaxDuration = 0, bufferMaxOperations = Number.MAX_VALUE }: BufferOptions = {}
Expand Down Expand Up @@ -74,36 +77,42 @@ export function createBuffer<Input extends Entity, ErrorOutput, Output extends E
});
});

let flushSubscription: Subscription | null = null;
let countInBuffer = 0;
const flushAndResetCounter = () => {
countInBuffer = 0;
flushBuffer.next();
};
storeUpdateBuffer
.pipe(
// complete once the buffer has either filled to `bufferMaxOperations` or
// a `bufferMaxDuration` has passed. Default to `bufferMaxDuration` being the
// current event loop tick rather than a fixed duration
flatMap(() => {
return ++countInBuffer === 1
? race([
// the race is started in response to the first operation into the buffer
// so we flush once the remaining operations come in (which is `bufferMaxOperations - 1`)
storeUpdateBuffer.pipe(bufferCount(bufferMaxOperations - 1)),
bufferMaxDuration
? // if theres a max duration, flush buffer based on that
from(resolveIn(bufferMaxDuration))
: // ensure we flush by the end of the "current" event loop tick
from(resolveImmediate()),
]).pipe(first(), mapTo(FLUSH))
: from([DONT_FLUSH]);
}),
filter((shouldFlush) => shouldFlush)
)
.subscribe({
next: flushAndResetCounter,
// As this stream is just trying to decide when to flush
// there's no data to lose, so in the case that an error
// is thrown, lets just flush
error: flushAndResetCounter,
});

return async function (entity: Input) {
return new Promise((resolve, reject) => {
if (!flushSubscription) {
// complete once the buffer has either filled to `bufferMaxOperations` or
// a `bufferMaxDuration` has passed. Default to `bufferMaxDuration` being the
// current event loop tick rather than a fixed duration
flushSubscription = race([
storeUpdateBuffer.pipe(bufferCount(bufferMaxOperations)),
bufferMaxDuration
? // if theres a max duration, buffer based on that
storeUpdateBuffer.pipe(bufferTime(bufferMaxDuration))
: // ensure we flush by the end of the "current" event loop tick
from(resolveImmediate()),
])
.pipe(first())
.subscribe({
next: () => flushBuffer.next(),
error: () => {
// if any error takes place in the buffe rtiming just flush & reset
flushBuffer.next();
flushSubscription = null;
},
complete: () => {
// reset
flushSubscription = null;
},
});
}

storeUpdateBuffer.next({ entity, onSuccess: resolve, onFailure: reject });
});
};
Expand All @@ -112,3 +121,9 @@ export function createBuffer<Input extends Entity, ErrorOutput, Output extends E
function resolveImmediate() {
return new Promise(setImmediate);
}

function resolveIn(ms: number) {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}

0 comments on commit a2d7943

Please sign in to comment.