From a8153adf2f2acd77c4520e7e0ee4df5042875260 Mon Sep 17 00:00:00 2001 From: Balazs Kreith Date: Sun, 10 Mar 2024 10:00:57 +0200 Subject: [PATCH 1/5] 3.6.0 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 86311cc..f5c96cc 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@observertc/client-monitor-js", - "version": "3.5.0", + "version": "3.6.0", "description": "ObserveRTC Client Integration Javascript Library", "main": "lib/index.js", "types": "lib/index.d.ts", From bef3871d37d723ce87109898036ce5c09d19c34a Mon Sep 17 00:00:00 2001 From: Balazs Kreith Date: Wed, 20 Mar 2024 16:22:17 +0200 Subject: [PATCH 2/5] change detectors --- src/ClientMonitor.ts | 72 +++--- src/Detectors.ts | 134 ----------- src/detectors/AudioDesyncDetector.ts | 133 +++++----- src/detectors/CongestionDetector.ts | 308 +++++++----------------- src/detectors/CpuPerformanceDetector.ts | 2 +- src/index.ts | 5 +- src/utils/TypedEmitter.ts | 1 + 7 files changed, 197 insertions(+), 458 deletions(-) delete mode 100644 src/Detectors.ts diff --git a/src/ClientMonitor.ts b/src/ClientMonitor.ts index 248f678..f0c9cb8 100644 --- a/src/ClientMonitor.ts +++ b/src/ClientMonitor.ts @@ -8,10 +8,8 @@ import { Sampler } from './Sampler'; import { createAdapterMiddlewares } from './collectors/Adapter'; import * as validators from './utils/validators'; import { PeerConnectionEntry, TrackStats } from './entries/StatsEntryInterfaces'; -import { createDetectors } from './Detectors'; -import { AudioDesyncDetectorConfig } from './detectors/AudioDesyncDetector'; -import { CpuPerformanceDetectorConfig } from './detectors/CpuPerformanceDetector'; -import { CongestionDetectorConfig } from './detectors/CongestionDetector'; +import { AudioDesyncDetector, AudioDesyncDetectorConfig } from './detectors/AudioDesyncDetector'; +import { CongestionDetector } from './detectors/CongestionDetector'; const logger = createLogger('ClientMonitor'); @@ -48,10 +46,6 @@ export interface ClientMonitorEvents { elapsedSinceLastSampleInMs: number, clientSample: ClientSample, }, - - 'congestion-alert': AlertState, - 'audio-desync-alert': AlertState, - 'cpu-performance-alert': AlertState, } export class ClientMonitor extends TypedEventEmitter { @@ -61,9 +55,7 @@ export class ClientMonitor extends TypedEventEmitter { public readonly collectors = createCollectors({ storage: this.storage, }); - private readonly _detectors = createDetectors({ - clientMonitor: this, - }); + private readonly _detectors = new Map void, once: (e: 'close', l: () => void) => void }>(); private readonly _sampler = new Sampler(this.storage); private _timer?: ReturnType; @@ -79,6 +71,8 @@ export class ClientMonitor extends TypedEventEmitter { private _config: ClientMonitorConfig ) { super(); + this.setMaxListeners(Infinity); + this.meta = new ClientMetaData(); this._sampler.addBrowser(this.meta.browser); this._sampler.addEngine(this.meta.engine); @@ -242,43 +236,45 @@ export class ClientMonitor extends TypedEventEmitter { this._setupTimer(); } - public get audioDesyncDetector() { - return this._detectors.audioDesyncDetector; + public createCongestionDetector(): CongestionDetector { + const exxistingDetector = this._detectors.get(CongestionDetector.name); + + if (exxistingDetector) return exxistingDetector as CongestionDetector; + + const detector = new CongestionDetector(); + const onUpdate = () => detector.update(this.storage.peerConnections()); + + detector.once('close', () => { + this.off('stats-collected', onUpdate); + this._detectors.delete(CongestionDetector.name); + }); + this.on('stats-collected', onUpdate); + this._detectors.set(CongestionDetector.name, detector); + + return detector; } - public addAudioDesyncDetector(config?: AudioDesyncDetectorConfig) { - this._detectors.addAudioDesyncDetector({ + public createAudioDesyncDetector(config?: AudioDesyncDetectorConfig): AudioDesyncDetector { + const exxistingDetector = this._detectors.get(AudioDesyncDetector.name); + + if (exxistingDetector) return exxistingDetector as AudioDesyncDetector; + + const detector = new AudioDesyncDetector({ fractionalCorrectionAlertOnThreshold: config?.fractionalCorrectionAlertOnThreshold ?? 0.1, fractionalCorrectionAlertOffThreshold: config?.fractionalCorrectionAlertOffThreshold ?? 0.05, }); - } - - public get cpuPerformanceDetector() { - return this._detectors.cpuPerformanceDetector; - } + const onUpdate = () => detector.update(this.storage.inboundRtps()); - public addCpuPerformanceDetector(config?: CpuPerformanceDetectorConfig) { - this._detectors.addCpuPerformanceDetector({ - droppedIncomingFramesFractionAlertOff: config?.droppedIncomingFramesFractionAlertOff ?? 0.1, - droppedIncomingFramesFractionAlertOn: config?.droppedIncomingFramesFractionAlertOn ?? 0.2, + detector.once('close', () => { + this.off('stats-collected', onUpdate); + this._detectors.delete(AudioDesyncDetector.name); }); - } + this.on('stats-collected', onUpdate); + this._detectors.set(AudioDesyncDetector.name, detector); - public get congestionDetector() { - return this._detectors.congestionDetector; + return detector; } - public addCongestionDetector(config?: CongestionDetectorConfig) { - this._detectors.addCongestionDetector({ - deviationFoldThreshold: config?.deviationFoldThreshold ?? 3, - measurementsWindowInMs: config?.measurementsWindowInMs ?? 10000, - minConsecutiveTickThreshold: 3, - minDurationThresholdInMs: 3000, - minMeasurementsLengthInMs: 5000, - minRTTDeviationThresholdInMs: 100, - fractionLossThreshold: 0.2, - }); - } public getTrackStats(trackId: string): TrackStats | undefined { return this.storage.getTrack(trackId); diff --git a/src/Detectors.ts b/src/Detectors.ts deleted file mode 100644 index 3f976ab..0000000 --- a/src/Detectors.ts +++ /dev/null @@ -1,134 +0,0 @@ -import { ClientMonitor } from "./ClientMonitor"; -import { AudioDesyncDetector, AudioDesyncDetectorConfig, createAudioDesyncDetector } from "./detectors/AudioDesyncDetector"; -import { CongestionDetectorConfig, createCongestionDetector } from "./detectors/CongestionDetector"; -import { CpuPerformanceDetectorConfig, createCpuPerformanceDetector } from "./detectors/CpuPerformanceDetector"; -import { StatsStorage } from "./entries/StatsStorage"; -import { Middleware } from "./utils/Processor"; -import { createLogger } from "./utils/logger"; - -const logger = createLogger(`Detectors`); - -export type DetectorsContext = { - clientMonitor: ClientMonitor, -}; - -export interface Detector { - id: string; - readonly alert: 'on' | 'off'; - update(): Promise; -} - -export type Detectors = ReturnType; - -export function createDetectors(context: DetectorsContext) { - const { - clientMonitor - } = context; - const savedDetectors = new Map, - }>(); - let audioDesyncDetector: AudioDesyncDetector | undefined; - let cpuPerformanceDetector: Detector | undefined; - let congestionDetector: Detector | undefined; - - function add(detector: Detector) { - if (savedDetectors.has(detector.id)) { - logger.warn(`Detector ${detector.id} already exists.`); - return; - } - const middleware: Middleware = (storage, next) => { - detector.update().then(() => { - return next(storage); - }).catch((error) => { - logger.error(`Detector ${detector.id} update failed: ${error}, the detector will be removed.`); - remove(detector.id); - next(storage); - }); - } - savedDetectors.set(detector.id, { - detector, - middleware, - }); - clientMonitor.storage.processor.addMiddleware(middleware); - } - - function addAudioDesyncDetector(config: AudioDesyncDetectorConfig) { - if (audioDesyncDetector) return audioDesyncDetector; - audioDesyncDetector = createAudioDesyncDetector({ - ...config, - clientMonitor, - }); - add(audioDesyncDetector); - return audioDesyncDetector; - } - - function addCongestionDetector(config: CongestionDetectorConfig) { - if (congestionDetector) return congestionDetector; - congestionDetector = createCongestionDetector({ - ...config, - clientMonitor, - }) - add(congestionDetector); - return congestionDetector; - } - - function addCpuPerformanceDetector(config: CpuPerformanceDetectorConfig) { - if (cpuPerformanceDetector) return cpuPerformanceDetector; - cpuPerformanceDetector = createCpuPerformanceDetector({ - ...config, - clientMonitor, - }); - add(cpuPerformanceDetector); - return cpuPerformanceDetector; - } - - function get(detectorId: string) { - const savedDetector = savedDetectors.get(detectorId); - if (!savedDetector) return; - return savedDetector.detector; - } - - function remove(detectorId: string) { - const savedDetector = savedDetectors.get(detectorId); - if (!savedDetector) return; - savedDetectors.delete(detectorId); - clientMonitor.storage.processor.removeMiddleware(savedDetector.middleware); - if (savedDetector.detector.id === audioDesyncDetector?.id) { - audioDesyncDetector = undefined; - } else if (savedDetector.detector.id === cpuPerformanceDetector?.id) { - cpuPerformanceDetector = undefined; - } else if (savedDetector.detector.id === congestionDetector?.id) { - congestionDetector = undefined; - } - } - - function clear() { - for (const detectorId of Array.from(savedDetectors.keys())) { - remove(detectorId); - } - savedDetectors.clear(); - } - - return { - get, - add, - addAudioDesyncDetector, - addCongestionDetector, - addCpuPerformanceDetector, - remove, - clear, - [Symbol.iterator]: () => { - return Array.from(savedDetectors.values()).map((savedDetector) => savedDetector.detector).values(); - }, - get audioDesyncDetector() { - return audioDesyncDetector; - }, - get cpuPerformanceDetector() { - return cpuPerformanceDetector; - }, - get congestionDetector() { - return congestionDetector; - }, - } -} diff --git a/src/detectors/AudioDesyncDetector.ts b/src/detectors/AudioDesyncDetector.ts index bed19cd..725d521 100644 --- a/src/detectors/AudioDesyncDetector.ts +++ b/src/detectors/AudioDesyncDetector.ts @@ -1,4 +1,6 @@ +import EventEmitter from "events"; import { AlertState, ClientMonitor } from "../ClientMonitor"; +import { InboundRtpEntry } from "../entries/StatsEntryInterfaces"; /** * Configuration object for the AudioDesyncDetector function. @@ -22,93 +24,94 @@ export type AudioDesyncDetectorConfig = { fractionalCorrectionAlertOffThreshold: number; }; -export type AudioDesyncDetectorContext = AudioDesyncDetectorConfig & { - clientMonitor: ClientMonitor, +export type AudioDesyncDetectorEvents = { + desync: [string], + sync: [string], + close: [], } -export type AudioDesyncDetector = ReturnType; +type AudioSyncTrace = { + correctedSamples: number, + prevCorrectedSamples: number, + desync: boolean, + visited: boolean, + inboundRtp: InboundRtpEntry, +} -/** - * Creates an audio desynchronization detector process. - * @param emitter The event emitter used to emit audio desynchronization events. - * @param config The configuration for the audio desynchronization detector. - * @returns The evaluator process function. - */ -export function createAudioDesyncDetector( - context: AudioDesyncDetectorContext -) { - type AudioSyncTrace = { - correctedSamples: number, - prevCorrectedSamples: number, - visited: boolean, +export declare interface AudioDesyncDetector { + on(event: K, listener: (...events: AudioDesyncDetectorEvents[K]) => void): this; + off(event: K, listener: (...events: AudioDesyncDetectorEvents[K]) => void): this; + once(event: K, listener: (...events: AudioDesyncDetectorEvents[K]) => void): this; + emit(event: K, ...events: AudioDesyncDetectorEvents[K]): boolean; +} + +export class AudioDesyncDetector extends EventEmitter { + private _closed = false; + private readonly _states = new Map(); + + public constructor( + private readonly config: AudioDesyncDetectorConfig + ) { + super(); + this.setMaxListeners(Infinity); + } - const { - clientMonitor - } = context; - const audioSyncTraces = new Map(); - const id = 'audio-desync-detector'; - let alertState: AlertState = 'off'; - const desyncedTrackIds = new Set(); - async function update() { - for (const inboundRtp of clientMonitor.storage.inboundRtps()) { + + public close() { + if (this._closed) return; + this._closed = true; + + this._states.clear(); + this.emit('close'); + } + + public update(inboundRtps: IterableIterator) { + for (const inboundRtp of inboundRtps) { const stats = inboundRtp.stats; + const ssrc = stats.ssrc; if (stats.kind !== 'audio' || inboundRtp.getTrackId() === undefined) { continue; } - const trackId = inboundRtp.getTrackId(); - if (!trackId) { - continue; - } - let trace = audioSyncTraces.get(trackId); - if (!trace) { - trace = { + + let state = this._states.get(ssrc); + if (!state) { + state = { correctedSamples: 0, prevCorrectedSamples: 0, visited: false, + desync: false, + inboundRtp, }; - audioSyncTraces.set(trackId, trace); + this._states.set(ssrc, state); } - trace.visited = true; - trace.correctedSamples = (stats.insertedSamplesForDeceleration ?? 0) + (stats.removedSamplesForAcceleration ?? 0); - const dCorrectedSamples = trace.correctedSamples - trace.prevCorrectedSamples; + state.visited = true; + state.correctedSamples = (stats.insertedSamplesForDeceleration ?? 0) + (stats.removedSamplesForAcceleration ?? 0); + const dCorrectedSamples = state.correctedSamples - state.prevCorrectedSamples; if (dCorrectedSamples < 1 || (inboundRtp.receivedSamples ?? 0) < 1) { continue; } + const wasDesync = state.desync; const fractionalCorrection = dCorrectedSamples / (dCorrectedSamples + (inboundRtp.receivedSamples ?? 0)); - if (desyncedTrackIds.has(trackId)) { - // it is on for this track - if (context.fractionalCorrectionAlertOffThreshold < fractionalCorrection) { - desyncedTrackIds.add(trackId) - } - } else if (context.fractionalCorrectionAlertOnThreshold < fractionalCorrection) { - desyncedTrackIds.add(trackId) + if (wasDesync) { + state.desync = this.config.fractionalCorrectionAlertOffThreshold < fractionalCorrection; + } else { + state.desync = this.config.fractionalCorrectionAlertOnThreshold < fractionalCorrection; } - - } - for (const [trackId, trace] of Array.from(audioSyncTraces.entries())) { - if (trace.visited === false) { - audioSyncTraces.delete(trackId); - continue; + if (wasDesync !== state.desync) { + const trackId = inboundRtp.getTrackId(); + trackId && this.emit(state.desync ? 'desync' : 'sync', trackId); } - trace.prevCorrectedSamples = trace.correctedSamples; - trace.visited = false; } - const prevAlertState = alertState; - alertState = 0 < desyncedTrackIds.size ? 'on' : 'off'; - if (prevAlertState !== alertState) { - clientMonitor.emit('audio-desync-alert', alertState); - } - } - return { - id, - update, - desyncedTrackIds, - get alert() { - return alertState; - }, + for (const state of this._states.values()) { + if (state.visited === false) { + this._states.delete(state.inboundRtp.stats.ssrc); + } + state.prevCorrectedSamples = state.correctedSamples; + state.visited = false; + } } -} \ No newline at end of file +} diff --git a/src/detectors/CongestionDetector.ts b/src/detectors/CongestionDetector.ts index 1a41f5d..8724702 100644 --- a/src/detectors/CongestionDetector.ts +++ b/src/detectors/CongestionDetector.ts @@ -1,240 +1,114 @@ -import { AlertState, ClientMonitor, ClientMonitorEvents } from "../ClientMonitor"; -import { PeerConnectionEntry } from "../entries/StatsEntryInterfaces"; - -/** - * Configuration object for the CongestionDetector function. - */ -export type CongestionDetectorConfig = { - /** - * The minimum deviation threshold for Round-Trip Time (RTT) in milliseconds. - * A higher value indicates a higher threshold for detecting congestion based on RTT deviation. - */ - minRTTDeviationThresholdInMs: number; - - /** - * The minimum duration threshold in milliseconds. - * If congestion is detected, this is the minimum duration before a reevaluation takes place. - */ - minDurationThresholdInMs: number; - - /** - * The deviation fold threshold. - * This value is used as a multiplier with the standard deviation to compare against the deviation in RTT. - */ - deviationFoldThreshold: number; - - /** - * The fraction loss threshold for packet loss. - * If the fraction of packets lost is greater than this threshold, it is considered as congestion. - */ - fractionLossThreshold?: number; - - /** - * The window for measuring the RTT in milliseconds. - */ - measurementsWindowInMs: number; - - /** - * The minimum length of measurements in milliseconds. - * This determines the minimum duration for which measurements should be taken before considering them for congestion detection. - */ - minMeasurementsLengthInMs: number; - - /** - * The minimum number of consecutive ticks required to consider a connection as congested. - * A tick represents a deviation above the deviation fold threshold. - */ - minConsecutiveTickThreshold: number; +import EventEmitter from "events"; +import { ClientMonitor } from "../ClientMonitor"; +import { IceCandidatePairEntry, PeerConnectionEntry } from "../entries/StatsEntryInterfaces"; + +type PeerConnectionState = { + congested: boolean; + outgoingBitrateBeforeCongestion?: number; + outgoingBitrateAfterCongestion?: number; + incomingBitrateBeforeCongestion?: number; + incomingBitrateAfterCongestion?: number; +} +export type CongestionDetectorEvents = { + congestion: [PeerConnectionState[]]; + close: []; } -export function createCongestionDetector(config: CongestionDetectorConfig & { - clientMonitor: ClientMonitor -}) { - const { - clientMonitor, - } = config; - - type PeerConnectionState = { - measurements: { added: number, RttInMs: number }[], - sum: number, - sumSquares: number, - congested?: number, - visited: boolean, - ticks: number, - } - let alertState: AlertState = 'off'; - const congestedPeerConnectionIds = new Set(); - const peerConnectionStates = new Map(); - const isCongested = (now: number, state: PeerConnectionState, peerConnection: PeerConnectionEntry): boolean => { - const { - deltaInboundPacketsLost: totalInboundPacketsLost = 0, - deltaInboundPacketsReceived: totalInboundPacketsReceived = 0, - avgRttInS = -1, - deltaOutboundPacketsSent: totalOutboundPacketsSent = 0, - deltaOutboundPacketsLost: totalOutboundPacketsLost = 0, - } = peerConnection; - - if (state.congested !== undefined && (now - state.congested) < config.minDurationThresholdInMs) { - return true; - } +export declare interface CongestionDetector { + on(event: K, listener: (...events: CongestionDetectorEvents[K]) => void): this; + off(event: K, listener: (...events: CongestionDetectorEvents[K]) => void): this; + once(event: K, listener: (...events: CongestionDetectorEvents[K]) => void): this; + emit(event: K, ...events: CongestionDetectorEvents[K]): boolean; +} - let inbFL = -1; - let outbFL = -1; +export class CongestionDetector extends EventEmitter { + private _closed = false; + private _states = new Map(); - if (0 <= totalInboundPacketsLost && 0 < totalInboundPacketsReceived) { - inbFL = totalInboundPacketsLost / (totalInboundPacketsReceived + totalInboundPacketsLost); - } - if (0 <= totalOutboundPacketsLost && 0 < totalOutboundPacketsSent) { - outbFL = totalOutboundPacketsLost / (totalOutboundPacketsSent + totalOutboundPacketsLost); - } - const maxFL = Math.max(inbFL, outbFL); - if (config.fractionLossThreshold !== undefined && config.fractionLossThreshold < maxFL) { - return true; - } - - if (avgRttInS < 0 || config.measurementsWindowInMs < 1) { - return false; - } - const value = avgRttInS * 1000; - - state.sum += value; - state.sumSquares += value * value; - state.measurements.push({ - added: now, - RttInMs: value, - }); - for (let check = true; check && 0 < state.measurements.length; ) { - const elapsedInMs = now - state.measurements[0].added; - if (elapsedInMs < config.measurementsWindowInMs) { - check = false; - continue; - } - const removedValue = state.measurements.shift()!.RttInMs; - state.sum -= removedValue; - state.sumSquares -= removedValue * removedValue; - } - if (state.measurements.length < 1 || (now - state.measurements[0].added) < config.minMeasurementsLengthInMs) { - return false; - } + public constructor( + ) { + super(); + this.setMaxListeners(Infinity); - const mean = state.sum / state.measurements.length; - const variance = (state.sumSquares / state.measurements.length) - (mean * mean); - const stdDev = Math.sqrt(variance); - const deviation = Math.abs(value - mean); - if (deviation < config.minRTTDeviationThresholdInMs) { - return false; - } - if (config.deviationFoldThreshold * stdDev < deviation) { - ++state.ticks; - } else { - state.ticks = 0; - } - return Math.max(0, config.minConsecutiveTickThreshold) < state.ticks; } - let highestSeenSendingBitrate = 0 - let highestSeenReceivingBitrate = 0 - let highestSeenAvailableOutgoingBitrate = 0; - let highestSeenAvailableIncomingBitrate = 0; - - async function update() { - const { storage } = clientMonitor; - const now = Date.now(); - const congestedPeerConnectionIds = new Set(); - const trackIds: string[] = []; - for (const peerConnection of storage.peerConnections()) { - const { - peerConnectionId - } = peerConnection; - let state = peerConnectionStates.get(peerConnectionId); + + public update(peerConnections: IterableIterator) { + const visitedPeerConnectionIds = new Set(); + let gotCongested = false; + + for (const peerConnection of peerConnections) { + const { peerConnectionId } = peerConnection; + let state = this._states.get(peerConnectionId); + + visitedPeerConnectionIds.add(peerConnectionId); + if (!state) { state = { - measurements: [], - sum: 0, - sumSquares: 0, - visited: false, - ticks: 0, + congested: false, + // outgoingBitrateBeforeCongestion: 0, + // outgoingBitrateAfterCongestion: 0, + // incomingBitrateBeforeCongestion: 0, + // incomingBitrateAfterCongestion: 0, }; - peerConnectionStates.set(peerConnectionId, state); + this._states.set(peerConnectionId, state); } - state.visited = true; + const wasCongested = state.congested; + let isCongested = false; - const wasCongested = state.congested !== undefined; - const congested = isCongested(now, state, peerConnection); - if (wasCongested && congested) { - // no change - continue; + for (const outboundRtp of peerConnection.outboundRtps()) { + isCongested ||= outboundRtp.stats.qualityLimitationReason === 'bandwidth'; } - if (!wasCongested && congested) { - // it become congested now - state.congested = now; - congestedPeerConnectionIds.add(peerConnectionId); - continue; + + let selectedCandidatePair: IceCandidatePairEntry | undefined; + for (const transport of peerConnection.transports()) { + selectedCandidatePair = transport.getSelectedIceCandidatePair(); + if (selectedCandidatePair) break; } - if (wasCongested && !congested) { - state.congested = undefined; - congestedPeerConnectionIds.delete(peerConnectionId); - continue; + + if (!selectedCandidatePair) { + selectedCandidatePair = Array.from(peerConnection.iceCandidatePairs()).find(pair => pair.stats.nominated === true); } - // it was not congested, and has not become congested - highestSeenSendingBitrate = Math.max( - highestSeenSendingBitrate, - (storage.sendingAudioBitrate ?? 0) + (storage.sendingVideoBitrate ?? 0) - ); - highestSeenReceivingBitrate = Math.max( - highestSeenReceivingBitrate, - (storage.receivingAudioBitrate ?? 0) + (storage.receivingVideoBitrate ?? 0) - ); - highestSeenAvailableOutgoingBitrate = Math.max( - highestSeenAvailableOutgoingBitrate, - (storage.totalAvailableOutgoingBitrate ?? 0) - ); - highestSeenAvailableIncomingBitrate = Math.max( - highestSeenAvailableIncomingBitrate, - (storage.totalAvailableIncomingBitrate ?? 0) - ); - } + selectedCandidatePair?.stats.availableIncomingBitrate; + selectedCandidatePair?.stats.availableOutgoingBitrate; - for (const [pcId, state] of Array.from(peerConnectionStates)) { - if (state.visited) { - state.visited = false; + if (isCongested) { + state.incomingBitrateAfterCongestion = selectedCandidatePair?.stats.availableIncomingBitrate ?? state.incomingBitrateAfterCongestion; + state.outgoingBitrateAfterCongestion = selectedCandidatePair?.stats.availableOutgoingBitrate ?? state.outgoingBitrateAfterCongestion; + state.congested = true; } else { - peerConnectionStates.delete(pcId); + state.incomingBitrateBeforeCongestion = selectedCandidatePair?.stats.availableIncomingBitrate ?? state.incomingBitrateBeforeCongestion; + state.outgoingBitrateBeforeCongestion = selectedCandidatePair?.stats.availableOutgoingBitrate ?? state.outgoingBitrateBeforeCongestion; + state.congested = false; + } + + if (wasCongested === isCongested) { + if (isCongested) { + state.incomingBitrateBeforeCongestion = undefined; + state.outgoingBitrateBeforeCongestion = undefined; + } else { + state.incomingBitrateAfterCongestion = undefined; + state.outgoingBitrateAfterCongestion = undefined; + } + } else if (!wasCongested && isCongested) { + gotCongested = true; } } - const prevState = alertState; - alertState = congestedPeerConnectionIds.size < 1 ? 'off' : 'on'; - if (prevState !== alertState) { - clientMonitor.emit('congestion-alert', alertState); - if (alertState === 'off') { - highestSeenSendingBitrate = 0; - highestSeenReceivingBitrate = 0; - highestSeenAvailableOutgoingBitrate = 0; - highestSeenAvailableIncomingBitrate = 0; + + gotCongested && this.emit('congestion', Array.from(this._states.values())); + + for (const [peerConnectionId] of Array.from(this._states)) { + if (!visitedPeerConnectionIds.has(peerConnectionId)) { + this._states.delete(peerConnectionId); } } } - return { - id: 'congestion-detector', - get alert() { - return alertState; - }, - get congestedPeerConnectionIds() { - return congestedPeerConnectionIds; - }, - get highestSeenAvailableIncomingBitrate() { - return highestSeenAvailableIncomingBitrate; - }, - get highestSeenAvailableOutgoingBitrate() { - return highestSeenAvailableOutgoingBitrate; - }, - get highestSeenReceivingBitrate() { - return highestSeenReceivingBitrate; - }, - get highestSeenSendingBitrate() { - return highestSeenSendingBitrate; - }, - update, - }; -} \ No newline at end of file + + public close() { + if (this._closed) return; + this._closed = true; + + this.emit('close'); + } +} diff --git a/src/detectors/CpuPerformanceDetector.ts b/src/detectors/CpuPerformanceDetector.ts index bd68858..9e26114 100644 --- a/src/detectors/CpuPerformanceDetector.ts +++ b/src/detectors/CpuPerformanceDetector.ts @@ -67,7 +67,7 @@ export function createCpuPerformanceDetector(config: CpuPerformanceDetectorConfi const previousAlertState = alertState; alertState = inboundIsOk && outboundIsOk ? 'off' : 'on'; if (previousAlertState !== alertState) { - clientMonitor.emit('cpu-performance-alert', alertState); + // clientMonitor.emit('cpu-performance-alert', alertState); } return { id: 'cpu-issue-detector', diff --git a/src/index.ts b/src/index.ts index e0154ad..6bec7ed 100644 --- a/src/index.ts +++ b/src/index.ts @@ -8,10 +8,9 @@ export type { ClientMonitorEvents, } from "./ClientMonitor"; -export type { CongestionDetectorConfig } from './detectors/CongestionDetector'; +export type { CongestionDetector } from './detectors/CongestionDetector'; export type { CpuPerformanceDetectorConfig } from './detectors/CpuPerformanceDetector'; -export type { AudioDesyncDetectorConfig } from './detectors/AudioDesyncDetector'; -export type { Detectors } from './Detectors'; +export type { AudioDesyncDetector, AudioDesyncDetectorConfig } from './detectors/AudioDesyncDetector'; export type { Collectors } from './Collectors'; export type { diff --git a/src/utils/TypedEmitter.ts b/src/utils/TypedEmitter.ts index a8e9f96..0f9f742 100644 --- a/src/utils/TypedEmitter.ts +++ b/src/utils/TypedEmitter.ts @@ -5,6 +5,7 @@ export class TypedEventEmitter extends EventEmitter { public constructor( ) { super(); + this.setMaxListeners(Infinity); } public on>(type: U, listener: (data: T[U]) => void): this { From a5a423209a247a85e28cbf870385badb74ae0a8b Mon Sep 17 00:00:00 2001 From: Balazs Kreith Date: Wed, 20 Mar 2024 16:35:38 +0200 Subject: [PATCH 3/5] save --- src/ClientMonitor.ts | 39 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/src/ClientMonitor.ts b/src/ClientMonitor.ts index f0c9cb8..ed054f3 100644 --- a/src/ClientMonitor.ts +++ b/src/ClientMonitor.ts @@ -9,7 +9,7 @@ import { createAdapterMiddlewares } from './collectors/Adapter'; import * as validators from './utils/validators'; import { PeerConnectionEntry, TrackStats } from './entries/StatsEntryInterfaces'; import { AudioDesyncDetector, AudioDesyncDetectorConfig } from './detectors/AudioDesyncDetector'; -import { CongestionDetector } from './detectors/CongestionDetector'; +import { CongestionDetector, CongestionDetectorEvents } from './detectors/CongestionDetector'; const logger = createLogger('ClientMonitor'); @@ -46,6 +46,12 @@ export interface ClientMonitorEvents { elapsedSinceLastSampleInMs: number, clientSample: ClientSample, }, + 'congestion': { + incomingBitrateAfterCongestion: number | undefined; + incomingBitrateBeforeCongestion: number | undefined; + outgoingBitrateAfterCongestion: number | undefined; + outgoingBitrateBeforeCongestion: number | undefined; + } } export class ClientMonitor extends TypedEventEmitter { @@ -243,12 +249,43 @@ export class ClientMonitor extends TypedEventEmitter { const detector = new CongestionDetector(); const onUpdate = () => detector.update(this.storage.peerConnections()); + const onCongestion = (...event: CongestionDetectorEvents['congestion']) => { + const [ + peerConnectionStates + ] = event; + let incomingBitrateAfterCongestion: number | undefined; + let incomingBitrateBeforeCongestion: number | undefined; + let outgoingBitrateAfterCongestion: number | undefined; + let outgoingBitrateBeforeCongestion: number | undefined; + for (const state of peerConnectionStates) { + if (state.incomingBitrateAfterCongestion) { + incomingBitrateAfterCongestion = (incomingBitrateAfterCongestion ?? 0) + state.incomingBitrateAfterCongestion; + } + if (state.incomingBitrateBeforeCongestion) { + incomingBitrateBeforeCongestion = (incomingBitrateBeforeCongestion ?? 0) + state.incomingBitrateBeforeCongestion; + } + if (state.outgoingBitrateAfterCongestion) { + outgoingBitrateAfterCongestion = (outgoingBitrateAfterCongestion ?? 0) + state.outgoingBitrateAfterCongestion; + } + if (state.outgoingBitrateBeforeCongestion) { + outgoingBitrateBeforeCongestion = (outgoingBitrateBeforeCongestion ?? 0) + state.outgoingBitrateBeforeCongestion; + } + } + this.emit('congestion', { + incomingBitrateAfterCongestion, + incomingBitrateBeforeCongestion, + outgoingBitrateAfterCongestion, + outgoingBitrateBeforeCongestion, + }); + } detector.once('close', () => { this.off('stats-collected', onUpdate); + detector.off('congestion', onCongestion); this._detectors.delete(CongestionDetector.name); }); this.on('stats-collected', onUpdate); + detector.on('congestion', onCongestion); this._detectors.set(CongestionDetector.name, detector); return detector; From c8c6ad3be4424b6d3524459928a0342575415636 Mon Sep 17 00:00:00 2001 From: Balazs Kreith Date: Wed, 20 Mar 2024 16:36:45 +0200 Subject: [PATCH 4/5] save --- src/ClientMonitor.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ClientMonitor.ts b/src/ClientMonitor.ts index ed054f3..86dad6d 100644 --- a/src/ClientMonitor.ts +++ b/src/ClientMonitor.ts @@ -105,6 +105,8 @@ export class ClientMonitor extends TypedEventEmitter { adapterMiddlewares.forEach((middleware) => { this.collectors.processor.addMiddleware(middleware); }); + + this.createCongestionDetector(); } public get closed() { From 3940b793ec3d43f776f6065312553768ab36881f Mon Sep 17 00:00:00 2001 From: Balazs Kreith Date: Mon, 8 Apr 2024 16:34:18 +0300 Subject: [PATCH 5/5] add `Detectors and Issues` section --- README.md | 180 +++++++++++----- src/ClientMonitor.ts | 271 +++++++++++++++++++++++- src/detectors/CongestionDetector.ts | 3 +- src/detectors/CpuPerformanceDetector.ts | 119 ++++------- src/entries/StatsStorage.ts | 5 +- src/index.ts | 1 + 6 files changed, 436 insertions(+), 143 deletions(-) diff --git a/README.md b/README.md index 651f0a8..2df20b8 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,8 @@ Table of Contents: - [MediaStreamTrack Entry](#mediastreamtrack-entry) - [InboundRTP Entry](#inboundrtp-entry) - [OutboundRTP Entry](#outboundrtp-entry) -- [Detectors and Alerts](#detectors-and-alerts) +- [Detectors and Issues](#issues-and-detectors) + - [Congestion Detector](#congestion-detector) - [Audio Desync Detector](#audio-desync-detector) - [CPU Performance Detector](#cpu-performance-detector) - [Configurations](#configurations) @@ -149,6 +150,11 @@ monitor.on('stats-collected', () => { sendingVideoBitrate, receivingAudioBitrate, receivingVideoBitrate, + + highestSeenSendingBitrate, + highestSeenReceivingBitrate, + highestSeenAvailableOutgoingBitrate, + highestSeenAvailableIncomingBitrate, } = monitor.storage console.log(`Total inbound packets lost: ${totalInboundPacketsLost}`); @@ -180,6 +186,11 @@ monitor.on('stats-collected', () => { console.log(`Sending video bitrate: ${sendingVideoBitrate}`); console.log(`Receiving audio bitrate: ${receivingAudioBitrate}`); console.log(`Receiving video bitrate: ${receivingVideoBitrate}`); + + console.log(`Highest seen sending bitrate: ${highestSeenSendingBitrate}`); + console.log(`Highest seen receiving bitrate: ${highestSeenReceivingBitrate}`); + console.log(`Highest seen available outgoing bitrate: ${highestSeenAvailableOutgoingBitrate}`); + console.log(`Highest seen available incoming bitrate: ${highestSeenAvailableIncomingBitrate}`); }); @@ -391,83 +402,136 @@ monitor.on('stats-collected', () => { ``` -## Detectors and Alerts +## Detectors and Issues -Detectors and alerts provide events of tracking and responding to anomalies or performance issues based on the polled stats. Detectors are components that continuously monitor for specific conditions in the polled stats, and set an alert if certain thresholds are hit. You can subscribe to alerts of an instantiated client-monitor-js and configure detectors via initial configurations. +ClientMonitor comes with Detectors detect various issues and anomalies in the collected stats. These detectors are continuously monitoring the stats and set an alert if certain thresholds are hit. You can create detectors and subscribe to alerts of an instantiated client-monitor-js. -List of built-in alerts: +For example: -- `audio-desync-alert`: triggered when for an audio track several acceleration and deceleration is detected in a short amount of time indicating that the controlling system tries compensate discrepancies. -- `cpu-performance-alert`: triggered whenever a browser detects quality limitation becasue of CPU, or the number of decoded frames per sec hit a certain threshold +```javascript +const detector = monitor.createCongestionDetector(); -### Audio Desync Detector +detector.on('congestion', event => { + console.log('Congestion ', event); +}); + +// Once the detector is created, you can subscribe to the monitor congestion event + +monitor.on('congestion', event => { + // same as above +}); +``` +Detector can be created to add issue to the monitor whenever a certain condition is met. + ```javascript -import { createClientMonitor } from 'client-monitor-js'; +const detector = monitor.createCongestionDetector({ + createIssueOnDetection: { + severity: 'major', + attachments: { + // various custom data + }, + }, +}); +``` + +Issues can be added through the client monitor instance. For example: -// Create a ClientMonitor instance -const monitor = createClientMonitor({ - collectingPeriodInMs: 2000, +```javascript +monitor.addIssue({ + severity: 'critical', + description: 'Media device is crashed', + timestamp: Date.now(), + attachments: { + clientId, + reason, + }, }); +``` -monitor.detectors.addAudioDesyncDetector({ - /** - * The fractional threshold used to determine if the audio desynchronization - * correction is considered significant or not. - * It represents the minimum required ratio of corrected samples to total samples. - * For example, a value of 0.1 means that if the corrected samples ratio - * exceeds 0.1, it will be considered a significant audio desynchronization issue. - */ - fractionalCorrectionAlertOnThreshold: 0.1; - /** - * The fractional threshold used to determine if the audio desynchronization - * correction is considered negligible and the alert should be turned off. - * It represents the maximum allowed ratio of corrected samples to total samples. - * For example, a value of 0.05 means that if the corrected samples ratio - * falls below 0.05, the audio desynchronization alert will be turned off. - */ - fractionalCorrectionAlertOffThreshold: 0.2; +The severity of the issue can be one of the following values: `critical`, `major`, `minor`. + +As mentioned above, if a detector is created with an option to create an issue on detection, the detector will automatically add an issue to the monitor whenever the condition is met. + +Currently the following Detectors are added to ClientMonitor: + +### Congestion Detector + +```javascript +const detector = monitor.createCongestionDetector({ + createIssueOnDetection: { + severity: 'major', + attachments: { + // various custom data + }, + } }); -monitor.on('audio-desync-alert', (alertState) => { - if (alertState === 'on') { - console.log('Audio is desynced from video'); - } else if (alertState === 'off') { - console.log('Audio is synced back'); +const onCongestion = (event) => { + console.log('congestion detected on media streaming'); + console.log("Available incoming bitrate before congestion", event.incomingBitrateBeforeCongestion); + console.log("Available outgoing bitrate before congestion", event.outgoingBitrateBeforeCongestion); + console.log("Available incoming bitrate after congestion", event.incomingBitrateAfterCongestion); + console.log("Available outgoing bitrate after congestion", event.outgoingBitrateAfterCongestion); +}; + +detector.once('close', () => { + console.log('congestion detector is closed'); + detector.off('congestion', onCongestion); +}) +detector.on('congestion', onCongestion); + +setTimeout(() => { + detector.close(); +}, 10000); +``` + +### Audio Desync Detector + +```javascript +const detector = monitor.createAudioDesyncDetector({ + createIssueOnDetection: { + severity: 'major', + attachments: { + // various custom data + }, } }); + +const onDesync = (trackId) => { + console.log('Audio desync detected on track', trackId); +} + +detector.once('close', () => { + detector.off('desync', onDesync); +}); +detector.on('desync', onDesync); + ``` ### CPU Performance Detector -```javascript -monitor.detectors.addCpuPerformanceDetector({ - /** - * The fractional threshold used to determine if the incoming frames - * dropped fraction is considered significant or not. - * It represents the maximum allowed ratio of dropped frames to received frames. - * For example, a value of 0.1 means that if the dropped frames fraction - * exceeds 0.1, it will be considered a significant issue. - */ - droppedIncomingFramesFractionAlertOn: 0.5; - /** - * The fractional threshold used to determine if the incoming frames - * dropped fraction is considered negligible and the alert should be turned off. - * It represents the maximum allowed ratio of dropped frames to received frames. - * For example, a value of 0.05 means that if the dropped frames fraction - * falls below 0.05, the CPU issue alert will be turned off. - */ - droppedIncomingFramesFractionAlertOff: 0.8; -}) +```javascript -monitor.on('cpu-performance-alert', alertState => { - if (alertState === 'on') { - console.log('CPU performance problem is detected'); - } else if (alertState === 'off') { - console.log('CPU performance problem is gone'); +const detector = monitor.createCpuPerformanceIssueDetector({ + createIssueOnDetection: { + severity: 'major', + attachments: { + // various custom data + }, } -}) +}); + +const onStateChanged = (state) => { + console.log('CPU performance state changed', state); +}; + +detector.once('close', () => { + detector.off('statechanged', onStateChanged); +}); +detector.on('statechanged', onStateChanged); + ``` ## Configurations diff --git a/src/ClientMonitor.ts b/src/ClientMonitor.ts index 86dad6d..7ebc3e9 100644 --- a/src/ClientMonitor.ts +++ b/src/ClientMonitor.ts @@ -10,6 +10,7 @@ import * as validators from './utils/validators'; import { PeerConnectionEntry, TrackStats } from './entries/StatsEntryInterfaces'; import { AudioDesyncDetector, AudioDesyncDetectorConfig } from './detectors/AudioDesyncDetector'; import { CongestionDetector, CongestionDetectorEvents } from './detectors/CongestionDetector'; +import { CpuPerformanceDetector, CpuPerformanceDetectorConfig } from './detectors/CpuPerformanceDetector'; const logger = createLogger('ClientMonitor'); @@ -33,6 +34,15 @@ export type ClientMonitorConfig = { samplingTick?: number; }; +export type ClientIssue = { + severity: 'critical' | 'major' | 'minor'; + timestamp?: number, + description?: string, + peerConnectionId?: string, + mediaTrackId?: string, + attachments?: Record, +} + export type AlertState = 'on' | 'off'; export interface ClientMonitorEvents { @@ -51,7 +61,10 @@ export interface ClientMonitorEvents { incomingBitrateBeforeCongestion: number | undefined; outgoingBitrateAfterCongestion: number | undefined; outgoingBitrateBeforeCongestion: number | undefined; - } + }, + 'cpulimitation': AlertState, + 'audio-desync': AlertState, + 'issue': ClientIssue, } export class ClientMonitor extends TypedEventEmitter { @@ -105,8 +118,6 @@ export class ClientMonitor extends TypedEventEmitter { adapterMiddlewares.forEach((middleware) => { this.collectors.processor.addMiddleware(middleware); }); - - this.createCongestionDetector(); } public get closed() { @@ -234,6 +245,20 @@ export class ClientMonitor extends TypedEventEmitter { this._sampler.addLocalSDP(localSDP); } + public addIssue(issue: ClientIssue) { + this._sampler.addCustomCallEvent({ + name: 'CLIENT_ISSUE', + value: issue.severity, + peerConnectionId: issue.peerConnectionId, + mediaTrackId: issue.mediaTrackId, + message: issue.description, + timestamp: issue.timestamp ?? Date.now(), + attachments: issue.attachments ? JSON.stringify(issue.attachments): undefined, + }); + + this.emit('issue', issue); + } + public setCollectingPeriod(collectingPeriodInMs: number): void { this._config.collectingPeriodInMs = collectingPeriodInMs; this._setupTimer(); @@ -244,10 +269,18 @@ export class ClientMonitor extends TypedEventEmitter { this._setupTimer(); } - public createCongestionDetector(): CongestionDetector { - const exxistingDetector = this._detectors.get(CongestionDetector.name); + public createCongestionDetector(options?: { + createIssueOnDetection?: { + attachments?: Record, + severity: 'critical' | 'major' | 'minor', + }, + }): CongestionDetector { + const existingDetector = this._detectors.get(CongestionDetector.name); + const { + createIssueOnDetection, + } = options ?? {}; - if (exxistingDetector) return exxistingDetector as CongestionDetector; + if (existingDetector) return existingDetector as CongestionDetector; const detector = new CongestionDetector(); const onUpdate = () => detector.update(this.storage.peerConnections()); @@ -279,6 +312,22 @@ export class ClientMonitor extends TypedEventEmitter { outgoingBitrateAfterCongestion, outgoingBitrateBeforeCongestion, }); + + if (createIssueOnDetection) { + this.addIssue({ + severity: createIssueOnDetection.severity, + description: 'Congestion detected', + timestamp: Date.now(), + attachments: { + ...(createIssueOnDetection.attachments ?? {}), + incomingBitrateAfterCongestion, + incomingBitrateBeforeCongestion, + outgoingBitrateAfterCongestion, + outgoingBitrateBeforeCongestion, + + }, + }) + } } detector.once('close', () => { @@ -293,27 +342,99 @@ export class ClientMonitor extends TypedEventEmitter { return detector; } - public createAudioDesyncDetector(config?: AudioDesyncDetectorConfig): AudioDesyncDetector { - const exxistingDetector = this._detectors.get(AudioDesyncDetector.name); + public createAudioDesyncDetector(config?: AudioDesyncDetectorConfig & { + createIssueOnDetection?: { + attachments?: Record, + severity: 'critical' | 'major' | 'minor', + }, + }): AudioDesyncDetector { + const existingDetector = this._detectors.get(AudioDesyncDetector.name); - if (exxistingDetector) return exxistingDetector as AudioDesyncDetector; + if (existingDetector) return existingDetector as AudioDesyncDetector; const detector = new AudioDesyncDetector({ fractionalCorrectionAlertOnThreshold: config?.fractionalCorrectionAlertOnThreshold ?? 0.1, fractionalCorrectionAlertOffThreshold: config?.fractionalCorrectionAlertOffThreshold ?? 0.05, }); const onUpdate = () => detector.update(this.storage.inboundRtps()); + const { + createIssueOnDetection, + } = config ?? {}; + + const onDesync = (trackId: string) => { + this.emit('audio-desync', 'on'); + if (!createIssueOnDetection) return; + + this.addIssue({ + severity: createIssueOnDetection.severity, + description: 'Audio desync detected', + timestamp: Date.now(), + peerConnectionId: this.storage.getTrack(trackId)?.getPeerConnection()?.peerConnectionId, + mediaTrackId: trackId, + attachments: createIssueOnDetection.attachments, + }); + }; + const onSync = () => { + this.emit('audio-desync', 'off'); + } detector.once('close', () => { this.off('stats-collected', onUpdate); + detector.off('desync', onDesync); + detector.off('sync', onSync); this._detectors.delete(AudioDesyncDetector.name); }); this.on('stats-collected', onUpdate); + detector.on('desync', onDesync); + detector.on('sync', onSync); + this._detectors.set(AudioDesyncDetector.name, detector); return detector; } + public createCpuPerformanceIssueDetector(config?: CpuPerformanceDetectorConfig & { + createIssueOnDetection?: { + attachments?: Record, + severity: 'critical' | 'major' | 'minor', + }, + }): CpuPerformanceDetector { + const existingDetector = this._detectors.get(CpuPerformanceDetector.name); + + if (existingDetector) return existingDetector as CpuPerformanceDetector; + + const detector = new CpuPerformanceDetector(config ?? {}); + const onUpdate = () => detector.update(this.storage.peerConnections()); + const { + createIssueOnDetection, + } = config ?? {}; + + const onStateChanged = (state: AlertState) => { + this.emit('cpulimitation', state); + + if (!createIssueOnDetection || state !== 'on') return; + + this.addIssue({ + severity: createIssueOnDetection.severity, + description: 'Audio desync detected', + timestamp: Date.now(), + attachments: createIssueOnDetection.attachments, + }); + }; + + detector.once('close', () => { + this.off('stats-collected', onUpdate); + detector.off('statechanged', onStateChanged); + this._detectors.delete(CpuPerformanceDetector.name); + }); + this.on('stats-collected', onUpdate); + detector.on('statechanged', onStateChanged); + + this._detectors.set(CpuPerformanceDetector.name, detector); + + return detector; + } + public getTrackStats(trackId: string): TrackStats | undefined { return this.storage.getTrack(trackId); @@ -403,6 +524,138 @@ export class ClientMonitor extends TypedEventEmitter { return [...this.storage.tracks()]; } + public get sendingAudioBitrate() { + return this.storage.sendingAudioBitrate; + } + + public get sendingVideoBitrate() { + return this.storage.sendingVideoBitrate; + } + + public get receivingAudioBitrate() { + return this.storage.receivingAudioBitrate; + } + + public get receivingVideoBitrate() { + return this.storage.receivingVideoBitrate; + } + + public get totalInboundPacketsLost() { + return this.storage.totalInboundPacketsLost; + } + + public get totalInboundPacketsReceived() { + return this.storage.totalInboundPacketsReceived; + } + + public get totalOutboundPacketsSent() { + return this.storage.totalOutboundPacketsSent; + } + + public get totalOutboundPacketsReceived() { + return this.storage.totalOutboundPacketsReceived; + } + + public get totalOutboundPacketsLost() { + return this.storage.totalOutboundPacketsLost; + } + + public get totalDataChannelBytesSent() { + return this.storage.totalDataChannelBytesSent; + } + + public get totalDataChannelBytesReceived() { + return this.storage.totalDataChannelBytesReceived; + } + + public get totalSentAudioBytes() { + return this.storage.totalSentAudioBytes; + } + + public get totalSentVideoBytes() { + return this.storage.totalSentVideoBytes; + } + + public get totalReceivedAudioBytes() { + return this.storage.totalReceivedAudioBytes; + } + + public get totalReceivedVideoBytes() { + return this.storage.totalReceivedVideoBytes; + } + + public get totalAvailableIncomingBitrate() { + return this.storage.totalAvailableIncomingBitrate; + } + + public get totalAvailableOutgoingBitrate() { + return this.storage.totalAvailableOutgoingBitrate; + } + + public get deltaInboundPacketsLost() { + return this.storage.deltaInboundPacketsLost; + } + + public get deltaInboundPacketsReceived() { + return this.storage.deltaInboundPacketsReceived; + } + + public get deltaOutboundPacketsSent() { + return this.storage.deltaOutboundPacketsSent; + } + + public get deltaOutboundPacketsReceived() { + return this.storage.deltaOutboundPacketsReceived; + } + + public get deltaOutboundPacketsLost() { + return this.storage.deltaOutboundPacketsLost; + } + + public get deltaDataChannelBytesSent() { + return this.storage.deltaDataChannelBytesSent; + } + + public get deltaDataChannelBytesReceived() { + return this.storage.deltaDataChannelBytesReceived; + } + + public get deltaSentAudioBytes() { + return this.storage.deltaSentAudioBytes; + } + + public get deltaSentVideoBytes() { + return this.storage.deltaSentVideoBytes; + } + + public get deltaReceivedAudioBytes() { + return this.storage.deltaReceivedAudioBytes; + } + + public get deltaReceivedVideoBytes() { + return this.storage.deltaReceivedVideoBytes; + } + + public get avgRttInSec() { + return this.storage.avgRttInS; + } + + public get highestSeenSendingBitrate() { + return this.storage.highestSeenSendingBitrate; + } + + public get highestSeenReceivingBitrate() { + return this.storage.highestSeenReceivingBitrate; + } + + public get highestSeenAvailableOutgoingBitrate() { + return this.storage.highestSeenAvailableOutgoingBitrate; + } + + public get highestSeenAvailableIncomingBitrate() { + return this.storage.highestSeenAvailableIncomingBitrate; + } + private _setupTimer(): void { this._timer && clearInterval(this._timer); this._timer = undefined; diff --git a/src/detectors/CongestionDetector.ts b/src/detectors/CongestionDetector.ts index 8724702..10d194d 100644 --- a/src/detectors/CongestionDetector.ts +++ b/src/detectors/CongestionDetector.ts @@ -1,5 +1,4 @@ import EventEmitter from "events"; -import { ClientMonitor } from "../ClientMonitor"; import { IceCandidatePairEntry, PeerConnectionEntry } from "../entries/StatsEntryInterfaces"; type PeerConnectionState = { @@ -30,7 +29,7 @@ export class CongestionDetector extends EventEmitter { ) { super(); this.setMaxListeners(Infinity); - + } public update(peerConnections: IterableIterator) { diff --git a/src/detectors/CpuPerformanceDetector.ts b/src/detectors/CpuPerformanceDetector.ts index 9e26114..7ca8687 100644 --- a/src/detectors/CpuPerformanceDetector.ts +++ b/src/detectors/CpuPerformanceDetector.ts @@ -1,79 +1,54 @@ -import { AlertState, ClientMonitor } from "../ClientMonitor"; -/** - * Configuration for the dropped frames detector. - */ +import EventEmitter from "events"; +import { AlertState } from "../ClientMonitor"; +import { PeerConnectionEntry } from "../entries/StatsEntryInterfaces"; + export type CpuPerformanceDetectorConfig = { - /** - * The fractional threshold used to determine if the incoming frames - * dropped fraction is considered significant or not. - * It represents the maximum allowed ratio of dropped frames to received frames. - * For example, a value of 0.1 means that if the dropped frames fraction - * exceeds 0.1, it will be considered a significant issue. - */ - droppedIncomingFramesFractionAlertOn: number; - /** - * The fractional threshold used to determine if the incoming frames - * dropped fraction is considered negligible and the alert should be turned off. - * It represents the maximum allowed ratio of dropped frames to received frames. - * For example, a value of 0.05 means that if the dropped frames fraction - * falls below 0.05, the CPU issue alert will be turned off. - */ - droppedIncomingFramesFractionAlertOff: number; -}; - - /** - * Creates a CPU issue detector process. - * @param emitter The event emitter used to emit CPU issue events. - * @param config The configuration for the dropped frames detector. - * @returns The evaluator process function. - */ -export function createCpuPerformanceDetector(config: CpuPerformanceDetectorConfig & { - clientMonitor: ClientMonitor, -}) { - const { - clientMonitor, - } = config; - let alertState: AlertState = 'off'; - let outboundIsOk = true; - let inboundIsOk = true; - async function update() { - const { storage } = clientMonitor; - inboundIsOk = true; - for (const inboundRtp of storage.inboundRtps()) { - const receivedFrames = inboundRtp.receivedFrames ?? 0; - const decodedFrames = inboundRtp.decodedFrames ?? 0; - if (receivedFrames < 1 || decodedFrames < 1) { - continue; - } - const droppedFrames = inboundRtp.droppedFrames ?? 0; - const framesDroppedFraction = droppedFrames / receivedFrames; - if (!inboundIsOk) { - if (framesDroppedFraction < config.droppedIncomingFramesFractionAlertOff) { - continue; - } - } else if (framesDroppedFraction < config.droppedIncomingFramesFractionAlertOn) { - continue; + // empty +} + +export type CpuPerformanceDetectorEvents = { + statechanged: [AlertState], + close: [], +} + +export declare interface CpuPerformanceDetector { + on(event: K, listener: (...events: CpuPerformanceDetectorEvents[K]) => void): this; + off(event: K, listener: (...events: CpuPerformanceDetectorEvents[K]) => void): this; + once(event: K, listener: (...events: CpuPerformanceDetectorEvents[K]) => void): this; + emit(event: K, ...events: CpuPerformanceDetectorEvents[K]): boolean; +} + +export class CpuPerformanceDetector extends EventEmitter { + private _closed = false; + private _alertState: AlertState = 'off'; + + public constructor( + public readonly config: CpuPerformanceDetectorConfig + ) { + super(); + this.setMaxListeners(Infinity); + } + + public update(peerConnections: IterableIterator) { + let gotLimited = false; + + for (const peerConnection of peerConnections) { + for (const outboundRtp of peerConnection.outboundRtps()) { + gotLimited ||= outboundRtp.stats.qualityLimitationReason === 'cpu'; } - inboundIsOk = false; } - outboundIsOk = true; - for (const outboundRtp of storage.outboundRtps()) { - if (outboundRtp.stats.qualityLimitationReason !== 'cpu') { - continue; - } - outboundIsOk = false; + + const wasLimited = this._alertState === 'on'; + this._alertState = gotLimited ? 'on' : 'off'; + + if (wasLimited !== gotLimited) { + this.emit('statechanged', this._alertState); } } - const previousAlertState = alertState; - alertState = inboundIsOk && outboundIsOk ? 'off' : 'on'; - if (previousAlertState !== alertState) { - // clientMonitor.emit('cpu-performance-alert', alertState); + + public close() { + if (this._closed) return; + this._closed = true; + this.emit('close'); } - return { - id: 'cpu-issue-detector', - update, - get alert() { - return alertState; - }, - }; } \ No newline at end of file diff --git a/src/entries/StatsStorage.ts b/src/entries/StatsStorage.ts index 257b3ab..58b93f7 100644 --- a/src/entries/StatsStorage.ts +++ b/src/entries/StatsStorage.ts @@ -56,6 +56,8 @@ export class StatsStorage { public totalSentVideoBytes = 0; public totalReceivedAudioBytes = 0; public totalReceivedVideoBytes = 0; + public totalAvailableIncomingBitrate?: number; + public totalAvailableOutgoingBitrate?: number; public deltaInboundPacketsLost?: number; public deltaInboundPacketsReceived?: number; @@ -68,8 +70,7 @@ export class StatsStorage { public deltaSentVideoBytes?: number; public deltaReceivedAudioBytes?: number; public deltaReceivedVideoBytes?: number; - public totalAvailableIncomingBitrate?: number; - public totalAvailableOutgoingBitrate?: number; + public avgRttInS?: number; public highestSeenSendingBitrate?: number; diff --git a/src/index.ts b/src/index.ts index 6bec7ed..3948e1e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,6 +3,7 @@ export type { StatsProvider } from "./collectors/StatsProvider"; export type { MediasoupStatsCollector } from "./collectors/MediasoupStatsCollector"; export type { PeerConnectionStatsCollector } from "./collectors/PeerConnectionStatsCollector"; export type { + ClientIssue, ClientMonitor, ClientMonitorConfig, ClientMonitorEvents,