Skip to content

Commit

Permalink
fix: isolate binding EventEmitter (#1937)
Browse files Browse the repository at this point in the history
Binding an EventEmitter a second time via another instances of ContextManager
shouldn't have side effects to the first ContextManager.

Use an unique symbol per ContextManager instance to isolate them.
  • Loading branch information
Flarna authored Feb 22, 2021
1 parent 38d1ee2 commit 9437d7e
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,16 @@
import { ContextManager, Context } from '@opentelemetry/context-base';
import { EventEmitter } from 'events';

const kOtListeners = Symbol('OtListeners');

type Func<T> = (...args: unknown[]) => T;

type PatchedEventEmitter = {
/**
* Store a map for each event of all original listener and their "patched"
* version so when the listener is removed by the user, we remove the
* corresponding "patched" function.
*/
[kOtListeners]?: { [name: string]: WeakMap<Func<void>, Func<void>> };
} & EventEmitter;
/**
* Store a map for each event of all original listeners and their "patched"
* version. So when a listener is removed by the user, the corresponding
* patched function will be also removed.
*/
interface PatchMap {
[name: string]: WeakMap<Func<void>, Func<void>>;
}

const ADD_LISTENER_METHODS = [
'addListener' as const,
Expand Down Expand Up @@ -66,7 +64,7 @@ export abstract class AbstractAsyncHooksContextManager

private _bindFunction<T extends Function>(target: T, context: Context): T {
const manager = this;
const contextWrapper = function (this: {}, ...args: unknown[]) {
const contextWrapper = function (this: never, ...args: unknown[]) {
return manager.with(context, () => target.apply(this, args));
};
Object.defineProperty(contextWrapper, 'length', {
Expand All @@ -87,16 +85,16 @@ export abstract class AbstractAsyncHooksContextManager
* By default, EventEmitter call their callback with their context, which we do
* not want, instead we will bind a specific context to all callbacks that
* go through it.
* @param target EventEmitter a instance of EventEmitter to patch
* @param ee EventEmitter an instance of EventEmitter to patch
* @param context the context we want to bind
*/
private _bindEventEmitter<T extends EventEmitter>(
target: T,
ee: T,
context: Context
): T {
const ee = (target as unknown) as PatchedEventEmitter;
if (ee[kOtListeners] !== undefined) return target;
ee[kOtListeners] = {};
const map = this._getPatchMap(ee);
if (map !== undefined) return ee;
this._createPatchMap(ee);

// patch methods that add a listener to propagate context
ADD_LISTENER_METHODS.forEach(methodName => {
Expand All @@ -117,7 +115,7 @@ export abstract class AbstractAsyncHooksContextManager
ee.removeAllListeners
);
}
return target;
return ee;
}

/**
Expand All @@ -126,9 +124,10 @@ export abstract class AbstractAsyncHooksContextManager
* @param ee EventEmitter instance
* @param original reference to the patched method
*/
private _patchRemoveListener(ee: PatchedEventEmitter, original: Function) {
return function (this: {}, event: string, listener: Func<void>) {
const events = ee[kOtListeners]?.[event];
private _patchRemoveListener(ee: EventEmitter, original: Function) {
const contextManager = this;
return function (this: never, event: string, listener: Func<void>) {
const events = contextManager._getPatchMap(ee)?.[event];
if (events === undefined) {
return original.call(this, event, listener);
}
Expand All @@ -143,13 +142,12 @@ export abstract class AbstractAsyncHooksContextManager
* @param ee EventEmitter instance
* @param original reference to the patched method
*/
private _patchRemoveAllListeners(
ee: PatchedEventEmitter,
original: Function
) {
return function (this: {}, event: string) {
if (ee[kOtListeners]?.[event] !== undefined) {
delete ee[kOtListeners]![event];
private _patchRemoveAllListeners(ee: EventEmitter, original: Function) {
const contextManager = this;
return function (this: never, event: string) {
const map = contextManager._getPatchMap(ee);
if (map?.[event] !== undefined) {
delete map[event];
}
return original.call(this, event);
};
Expand All @@ -163,22 +161,37 @@ export abstract class AbstractAsyncHooksContextManager
* @param [context] context to propagate when calling listeners
*/
private _patchAddListener(
ee: PatchedEventEmitter,
ee: EventEmitter,
original: Function,
context: Context
) {
const contextManager = this;
return function (this: {}, event: string, listener: Func<void>) {
if (ee[kOtListeners] === undefined) ee[kOtListeners] = {};
let listeners = ee[kOtListeners]![event];
return function (this: never, event: string, listener: Func<void>) {
let map = contextManager._getPatchMap(ee);
if (map === undefined) {
map = contextManager._createPatchMap(ee);
}
let listeners = map[event];
if (listeners === undefined) {
listeners = new WeakMap();
ee[kOtListeners]![event] = listeners;
map[event] = listeners;
}
const patchedListener = contextManager.bind(listener, context);
// store a weak reference of the user listener to ours
listeners.set(listener, patchedListener);
return original.call(this, event, patchedListener);
};
}

private _createPatchMap(ee: EventEmitter): PatchMap {
const map = {};
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(ee as any)[this._kOtListeners] = map;
return map;
}
private _getPatchMap(ee: EventEmitter): PatchMap | undefined {
return (ee as never)[this._kOtListeners];
}

private readonly _kOtListeners = Symbol('OtListeners');
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export class AsyncLocalStorageContextManager extends AbstractAsyncHooksContextMa
...args: A
): ReturnType<F> {
const cb = thisArg == null ? fn : fn.bind(thisArg);
return this._asyncLocalStorage.run(context, cb as any, ...args);
return this._asyncLocalStorage.run(context, cb as never, ...args);
}

enable(): this {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ for (const contextManagerClass of [
| AsyncHooksContextManager
| AsyncLocalStorageContextManager;
const key1 = createContextKey('test key 1');
let otherContextManager:
| AsyncHooksContextManager
| AsyncLocalStorageContextManager;

before(function () {
if (
Expand All @@ -49,6 +52,7 @@ for (const contextManagerClass of [

afterEach(() => {
contextManager.disable();
otherContextManager?.disable();
});

describe('.enable()', () => {
Expand Down Expand Up @@ -274,6 +278,22 @@ for (const contextManagerClass of [
countDown();
}, time2);
});

it('should not influence other instances', () => {
otherContextManager = new contextManagerClass();
otherContextManager.enable();

const context = ROOT_CONTEXT.setValue(key1, 2);
const otherContext = ROOT_CONTEXT.setValue(key1, 3);
contextManager.with(context, () => {
assert.strictEqual(contextManager.active(), context);
assert.strictEqual(otherContextManager.active(), ROOT_CONTEXT);
otherContextManager.with(otherContext, () => {
assert.strictEqual(contextManager.active(), context);
assert.strictEqual(otherContextManager.active(), otherContext);
});
});
});
});

describe('.bind(function)', () => {
Expand Down Expand Up @@ -335,6 +355,22 @@ for (const contextManagerClass of [
}, context);
fn();
});

it('should not influence other instances', () => {
otherContextManager = new contextManagerClass();
otherContextManager.enable();

const context = ROOT_CONTEXT.setValue(key1, 2);
const otherContext = ROOT_CONTEXT.setValue(key1, 3);
const fn = otherContextManager.bind(
contextManager.bind(() => {
assert.strictEqual(contextManager.active(), context);
assert.strictEqual(otherContextManager.active(), otherContext);
}, context),
otherContext
);
fn();
});
});

describe('.bind(event-emitter)', () => {
Expand All @@ -352,31 +388,31 @@ for (const contextManagerClass of [
it('should return current context and removeListener (when enabled)', done => {
const ee = new EventEmitter();
const context = ROOT_CONTEXT.setValue(key1, 1);
const patchedEe = contextManager.bind(ee, context);
const patchedEE = contextManager.bind(ee, context);
const handler = () => {
assert.deepStrictEqual(contextManager.active(), context);
patchedEe.removeListener('test', handler);
assert.strictEqual(patchedEe.listeners('test').length, 0);
patchedEE.removeListener('test', handler);
assert.strictEqual(patchedEE.listeners('test').length, 0);
return done();
};
patchedEe.on('test', handler);
assert.strictEqual(patchedEe.listeners('test').length, 1);
patchedEe.emit('test');
patchedEE.on('test', handler);
assert.strictEqual(patchedEE.listeners('test').length, 1);
patchedEE.emit('test');
});

it('should return current context and removeAllListener (when enabled)', done => {
const ee = new EventEmitter();
const context = ROOT_CONTEXT.setValue(key1, 1);
const patchedEe = contextManager.bind(ee, context);
const patchedEE = contextManager.bind(ee, context);
const handler = () => {
assert.deepStrictEqual(contextManager.active(), context);
patchedEe.removeAllListeners('test');
assert.strictEqual(patchedEe.listeners('test').length, 0);
patchedEE.removeAllListeners('test');
assert.strictEqual(patchedEE.listeners('test').length, 0);
return done();
};
patchedEe.on('test', handler);
assert.strictEqual(patchedEe.listeners('test').length, 1);
patchedEe.emit('test');
patchedEE.on('test', handler);
assert.strictEqual(patchedEE.listeners('test').length, 1);
patchedEE.emit('test');
});

/**
Expand All @@ -387,34 +423,54 @@ for (const contextManagerClass of [
contextManager.disable();
const ee = new EventEmitter();
const context = ROOT_CONTEXT.setValue(key1, 1);
const patchedEe = contextManager.bind(ee, context);
const patchedEE = contextManager.bind(ee, context);
const handler = () => {
assert.deepStrictEqual(contextManager.active(), context);
patchedEe.removeListener('test', handler);
assert.strictEqual(patchedEe.listeners('test').length, 0);
patchedEE.removeListener('test', handler);
assert.strictEqual(patchedEE.listeners('test').length, 0);
return done();
};
patchedEe.on('test', handler);
assert.strictEqual(patchedEe.listeners('test').length, 1);
patchedEe.emit('test');
patchedEE.on('test', handler);
assert.strictEqual(patchedEE.listeners('test').length, 1);
patchedEE.emit('test');
});

it('should not return current context with async op', done => {
const ee = new EventEmitter();
const context = ROOT_CONTEXT.setValue(key1, 1);
const patchedEe = contextManager.bind(ee, context);
const patchedEE = contextManager.bind(ee, context);
const handler = () => {
assert.deepStrictEqual(contextManager.active(), context);
setImmediate(() => {
assert.deepStrictEqual(contextManager.active(), context);
patchedEe.removeAllListeners('test');
assert.strictEqual(patchedEe.listeners('test').length, 0);
patchedEE.removeAllListeners('test');
assert.strictEqual(patchedEE.listeners('test').length, 0);
return done();
});
};
patchedEe.on('test', handler);
assert.strictEqual(patchedEe.listeners('test').length, 1);
patchedEe.emit('test');
patchedEE.on('test', handler);
assert.strictEqual(patchedEE.listeners('test').length, 1);
patchedEE.emit('test');
});

it('should not influence other instances', () => {
const ee = new EventEmitter();
otherContextManager = new contextManagerClass();
otherContextManager.enable();

const context = ROOT_CONTEXT.setValue(key1, 2);
const otherContext = ROOT_CONTEXT.setValue(key1, 3);
const patchedEE = otherContextManager.bind(
contextManager.bind(ee, context),
otherContext
);
const handler = () => {
assert.strictEqual(contextManager.active(), context);
assert.strictEqual(otherContextManager.active(), otherContext);
};

patchedEE.on('test', handler);
patchedEE.emit('test');
});
});
});
Expand Down

0 comments on commit 9437d7e

Please sign in to comment.