Skip to content

Commit

Permalink
Revert "[kbn/optimizer] remove disconnect listener (#67161)"
Browse files Browse the repository at this point in the history
This reverts commit 90fc521.
  • Loading branch information
spalger committed May 22, 2020
1 parent fd7a9a7 commit 556ae52
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 17 deletions.
12 changes: 12 additions & 0 deletions packages/kbn-optimizer/src/worker/observe_parent_offline.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ async function waitForTick() {
}

describe('emits and completes when parent exists because:', () => {
test('"disconnect" event', async () => {
const mockProc = new MockProcess();
const promise = record(observeParentOffline(mockProc, workerMsgs));
mockProc.emit('disconnect');
expect(await promise).toMatchInlineSnapshot(`
Array [
"next: 'parent offline (disconnect event)'",
"complete",
]
`);
});

test('process.connected is false', async () => {
const mockProc = new MockProcess({
connected: false,
Expand Down
41 changes: 24 additions & 17 deletions packages/kbn-optimizer/src/worker/observe_parent_offline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,28 +44,35 @@ export interface Process extends EventEmitter {
* call errored with an 'ERR_IPC_CHANNEL_CLOSED' exception
*/
export function observeParentOffline(process: Process, workerMsgs: WorkerMsgs) {
return sleep(5000).pipe(
mergeMap(() => {
if (!process.connected || !process.send) {
return Rx.of('parent offline (disconnected)');
}
return Rx.race(
Rx.fromEvent(process, 'disconnect').pipe(
take(1),
map(() => 'parent offline (disconnect event)')
),

process.send(workerMsgs.ping());
sleep(5000).pipe(
mergeMap(() => {
if (!process.connected || !process.send) {
return Rx.of('parent offline (disconnected)');
}

const pong$ = Rx.fromEvent<[any]>(process, 'message').pipe(
first(([msg]) => isParentPong(msg)),
map(() => {
throw new Error('parent still online');
})
);
process.send(workerMsgs.ping());

// give the parent some time to respond, if the ping
// wins the race the parent is considered online
const timeout$ = sleep(5000).pipe(map(() => 'parent offline (ping timeout)'));
const pong$ = Rx.fromEvent<[any]>(process, 'message').pipe(
first(([msg]) => isParentPong(msg)),
map(() => {
throw new Error('parent still online');
})
);

return Rx.race(pong$, timeout$);
}),
// give the parent some time to respond, if the ping
// wins the race the parent is considered online
const timeout$ = sleep(5000).pipe(map(() => 'parent offline (ping timeout)'));

return Rx.race(pong$, timeout$);
})
)
).pipe(
/**
* resubscribe to the source observable (triggering the timer,
* ping, wait for response) if the source observable does not
Expand Down

0 comments on commit 556ae52

Please sign in to comment.