Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance monitoring #1829

Merged
merged 34 commits into from
Apr 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
192b28c
Implement collecting of duration metrics
timbset Apr 5, 2021
6ee9e7f
Rework errors metrics collecting
timbset Apr 6, 2021
411cc2b
Add some unit tests
timbset Apr 7, 2021
18e3e8a
Rename publish metrics function
timbset Apr 7, 2021
ec236cf
Extract monitoring. Add tests
timbset Apr 7, 2021
81c0d1e
Add more tests. Remove await for monitoring error
timbset Apr 8, 2021
fe7de6b
Implement collecting of duration metrics
timbset Apr 9, 2021
21804b6
Add resolve version into server assemblies
timbset Apr 9, 2021
d7d65b8
Pass monitoring into adapters
timbset Apr 12, 2021
0c15c17
Add notification data into read model
timbset Apr 12, 2021
ced8006
Add postgres serverless metrics collecting points
timbset Apr 19, 2021
9e4515b
Implement grouping in monitoring
timbset Apr 19, 2021
f8c4cb9
Use monitoring group for error collecting
timbset Apr 20, 2021
84bdd17
Use group for monitoring instead of meta objects
timbset Apr 20, 2021
d35ee83
Merge remote-tracking branch 'origin/dev' into feature/monitoring-dur…
timbset Apr 20, 2021
d773bc6
Fix compiling errors
timbset Apr 21, 2021
838b877
Fix some errors. Improve monitoring to get some limits around
timbset Apr 22, 2021
29e6c1c
Add more tests
timbset Apr 22, 2021
9e36586
Fix some tests
timbset Apr 22, 2021
26faa68
Fix tests
timbset Apr 22, 2021
cf52e1b
Merge remote-tracking branch 'origin/dev' into feature/monitoring-dur…
timbset Apr 22, 2021
b8eaf34
Fix prettier
timbset Apr 22, 2021
10213a3
Fix adapter tests
timbset Apr 22, 2021
37b081f
Fix testing tools tests
timbset Apr 26, 2021
506a0a8
Fix some monitoring issues
timbset Apr 26, 2021
6fcaf5a
Improve read model monitoring
timbset Apr 26, 2021
dc8a39b
Fix unit tests
timbset Apr 26, 2021
cc85c1a
Merge remote-tracking branch 'origin/dev' into feature/monitoring-dur…
timbset Apr 26, 2021
105accb
wip
max-vasin Apr 26, 2021
6778bc6
Improve metrics in readmodel adapter
timbset Apr 26, 2021
81116a3
Merge remote-tracking branch 'origin/feature/monitoring-duration' int…
timbset Apr 26, 2021
c63271f
Fix prettier
timbset Apr 26, 2021
7bddbdb
Fix unit tests
timbset Apr 26, 2021
8773303
Fix read model adapter tests
timbset Apr 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import {
CommandResult,
} from '../types/core'

import { makeMonitoringSafe } from '../helpers'

type AggregateData = {
aggregateVersion: number
aggregateId: string
Expand Down Expand Up @@ -372,13 +370,24 @@ const executeCommand = async (
runtime: AggregateRuntime,
command: Command
): Promise<CommandResult> => {
const monitoring = makeMonitoringSafe(runtime.monitoring)
const monitoringGroup =
runtime.monitoring != null
? runtime.monitoring
.group({ Part: 'Command' })
.group({ AggregateName: command.aggregateName })
.group({ Type: command.type })
: null

if (monitoringGroup != null) {
monitoringGroup.time('Execution')
}

const { jwt: actualJwt, jwtToken: deprecatedJwt } = command

const jwt = actualJwt || deprecatedJwt

const subSegment = getPerformanceTracerSubsegment(
monitoring,
runtime.monitoring,
'executeCommand'
)

Expand Down Expand Up @@ -419,7 +428,7 @@ const executeCommand = async (
context: CommandContext
): Promise<CommandResult> => {
const subSegment = getPerformanceTracerSubsegment(
monitoring,
runtime.monitoring,
'processCommand'
)
try {
Expand Down Expand Up @@ -483,7 +492,10 @@ const executeCommand = async (
}

await (async (): Promise<void> => {
const subSegment = getPerformanceTracerSubsegment(monitoring, 'saveEvent')
const subSegment = getPerformanceTracerSubsegment(
runtime.monitoring,
'saveEvent'
)

try {
return await saveEvent(runtime, aggregate, command, processedEvent)
Expand All @@ -498,9 +510,16 @@ const executeCommand = async (
return processedEvent
} catch (error) {
subSegment.addError(error)
await monitoring?.error?.(error, 'command', { command })

if (monitoringGroup != null) {
monitoringGroup.group({ AggregateId: command.aggregateId }).error(error)
}
throw error
} finally {
if (monitoringGroup != null) {
monitoringGroup.timeEnd('Execution')
}

subSegment.close()
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/core/core/src/aggregate/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export type AggregateRuntimeHooks = {
}

export type AggregateRuntime = {
monitoring: Monitoring
monitoring?: Monitoring
secretsManager: SecretsManager
eventstore: Eventstore
hooks?: AggregateRuntimeHooks
Expand Down
8 changes: 4 additions & 4 deletions packages/core/core/src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ export function firstOfType<T>(
return vars.find((i) => selector(i)) as T
}

const createSafeHandler = <T extends (...args: any[]) => Promise<void>>(
fn: (...args: Parameters<T>) => Promise<void>
) => async (...args: Parameters<T>): Promise<void> => {
const createSafeHandler = <T extends (...args: any[]) => void>(
fn: (...args: Parameters<T>) => void
) => (...args: Parameters<T>): void => {
try {
await fn(...args)
fn(...args)
} catch (e) {}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,20 @@ import { getPerformanceTracerSubsegment } from '../utils'
import { ReadModelMeta } from '../types/runtime'
import getLog from '../get-log'

const monitoredError = async (
const monitoredError = (
runtime: ReadModelRuntime,
error: Error,
meta: any
readModelName: string,
resolverName: string
) => {
await runtime.monitoring?.error?.(error, 'readModelResolver', meta)
if (runtime.monitoring != null) {
const monitoringGroup = runtime.monitoring
.group({ Part: 'ReadModelResolver' })
.group({ ReadModel: readModelName })
.group({ Resolver: resolverName })

monitoringGroup.error(error)
}
return error
}

Expand All @@ -43,16 +51,14 @@ const getReadModelInterop = (
const invoker = resolverInvokerMap[resolver]
if (invoker == null) {
log.error(`unable to find invoker for the resolver`)
throw await monitoredError(
throw monitoredError(
runtime,
createHttpError(
HttpStatusCodes.UnprocessableEntity,
`Resolver "${resolver}" does not exist`
),
{
readModelName: name,
resolverName: resolver,
}
name,
resolver
)
}

Expand All @@ -69,6 +75,18 @@ const getReadModelInterop = (
}
)

const monitoringGroup =
monitoring != null
? monitoring
.group({ Part: 'ReadModelResolver' })
.group({ ReadModel: name })
.group({ Resolver: resolver })
: null

if (monitoringGroup != null) {
monitoringGroup.time('Execution')
}

try {
log.debug(`invoking the resolver`)
const data = await invoker(connection, args, {
Expand All @@ -84,12 +102,16 @@ const getReadModelInterop = (
if (subSegment != null) {
subSegment.addError(error)
}
await monitoring?.error?.(error, 'readModelResolver', {
readModelName: name,
resolverName: resolver,
})

if (monitoringGroup != null) {
monitoringGroup.error(error)
}
throw error
} finally {
if (monitoringGroup != null) {
monitoringGroup.timeEnd('Execution')
}

if (subSegment != null) {
subSegment.close()
}
Expand All @@ -113,10 +135,14 @@ const getReadModelInterop = (
try {
return await handler()
} catch (error) {
await monitoring?.error?.(error, 'readModelProjection', {
readModelName: readModel.name,
eventType,
})
if (monitoring != null) {
const monitoringGroup = monitoring
.group({ Part: 'ReadModelProjection' })
.group({ ReadModel: readModel.name })
.group({ EventType: eventType })

monitoringGroup.error(error)
}
throw error
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/core/core/src/read-model/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export type ReadModelRuntimeEventHandler = () => Promise<void>

export type ReadModelRuntime = {
secretsManager: SecretsManager
monitoring: Monitoring
monitoring?: Monitoring
}

export type ReadModelInterop = {
Expand Down
14 changes: 10 additions & 4 deletions packages/core/core/src/saga/get-sagas-interop-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,16 @@ const getInterop = (
try {
return await handler()
} catch (error) {
await runtime.monitoring?.error?.(error, 'readModelProjection', {
readModelName: saga.name,
eventType,
})
if (runtime.monitoring != null) {
const monitoringGroup = runtime.monitoring
.group({
Part: 'SagaProjection',
})
.group({ Saga: saga.name })
.group({ EventType: eventType })

monitoringGroup.error(error)
}
throw error
}
}
Expand Down
44 changes: 9 additions & 35 deletions packages/core/core/src/types/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import {
AggregateProjection,
EventHandlerEncryptionFactory,
ReadModelResolvers,
Command,
} from './core'

export type PerformanceSubsegment = {
Expand All @@ -27,6 +26,15 @@ export type PerformanceTracer = {
getSegment: () => PerformanceSegment
}

export type Monitoring = {
group: (config: Record<string, any>) => Monitoring
error: (error: Error) => void
time: (name: string, timestamp?: number) => void
timeEnd: (name: string, timestamp?: number) => void
publish: () => Promise<void>
performance?: PerformanceTracer
}

export type Eventstore = {
saveEvent: (event: any) => Promise<void>
saveSnapshot: Function
Expand Down Expand Up @@ -82,37 +90,3 @@ export type ViewModelMeta = {
encryption: EventHandlerEncryptionFactory
invariantHash: string
}

export type MonitoringPartMap = {
command: {
command: Command
}
readModelProjection: {
readModelName: string
eventType: string
}
readModelResolver: {
readModelName: string
resolverName: string
}
viewModelProjection: {
name: string
eventType: string
}
viewModelResolver: {
name: string
}
}
export type MonitoringPart = keyof MonitoringPartMap
export type MonitoringMeta<
TPart extends MonitoringPart
> = MonitoringPartMap[TPart]

export type Monitoring = {
error?: <K extends MonitoringPart, U extends MonitoringMeta<K>>(
error: Error,
part: K,
meta: U
) => Promise<void>
performance?: PerformanceTracer
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,15 @@ const buildViewModel = async (
} catch (error) {
subSegment.addError(error)
log.error(error.message)
await monitoring?.error?.(error, 'viewModelProjection', {
name,
eventType: event.type,
})

if (monitoring != null) {
const monitoringGroup = monitoring
.group({ Part: 'ViewModelProjection' })
.group({ ViewModel: name })
.group({ EventType: event.type })

monitoringGroup.error(error)
}
throw error
} finally {
subSegment.close()
Expand Down Expand Up @@ -220,9 +225,13 @@ const getViewModelInterop = (
} catch (error) {
subSegment.addError(error)

await monitoring?.error?.(error, 'viewModelResolver', {
name,
})
if (monitoring != null) {
const monitoringGroup = monitoring
.group({ Part: 'ViewModelResolver' })
.group({ ViewModel: name })

monitoringGroup.error(error)
}
throw error
} finally {
subSegment.close()
Expand Down
2 changes: 1 addition & 1 deletion packages/core/core/src/view-model/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export type ViewModelBuildResult = {
}

export type ViewModelRuntime = {
monitoring: Monitoring
monitoring?: Monitoring
eventstore: Eventstore
secretsManager: SecretsManager
}
Expand Down
Loading