Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add eventstore adapter runtimeInfo and setReconnectionMode. Add connection tests #2179

Merged
merged 2 commits into from
Dec 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type {
AdapterPoolBoundProps,
AdapterPrimalPool,
AdapterBoundPool,
ReconnectionMode,
} from './types'
import type { LeveledDebugger } from '@resolve-js/debug-levels'
import bindMethod from './bind-method'
Expand Down Expand Up @@ -87,6 +88,8 @@ const createAdapter = <
describe,
establishTimeLimit,
getEventLoaderNative,
runtimeInfo,
setReconnectionMode,
}: AdapterFunctions<ConfiguredProps>,
options: Config,
configure: (props: AdapterPrimalPool<ConfiguredProps>, config: Config) => void
Expand Down Expand Up @@ -204,6 +207,22 @@ const createAdapter = <
adapterPool as AdapterBoundPool<ConfiguredProps>
),
getEventLoader: bindMethod(adapterPool, getEventLoader),

runtimeInfo:
runtimeInfo === undefined
? () => {
return { connectionCount: 0, disposed: adapterPool.disposed }
}
: runtimeInfo.bind(
null,
adapterPool as AdapterBoundPool<ConfiguredProps>
),
setReconnectionMode:
setReconnectionMode === undefined
? (mode: ReconnectionMode) => {
return
}
: bindMethod(adapterPool, setReconnectionMode),
}

Object.assign<AdapterPrimalPool<ConfiguredProps>, Adapter>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ export type {
GatheredSecrets,
EventLoaderFilter,
EventLoader,
AdapterRuntimeInfo,
ReconnectionMode,
} from './types'

export type {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,12 @@ export interface AdapterFunctions<ConfiguredProps extends {}> {
ConfiguredProps,
NonNullable<AdapterPoolBoundProps['getEventLoaderNative']>
>

runtimeInfo: PoolMethod<ConfiguredProps, Adapter['runtimeInfo']>
setReconnectionMode?: PoolMethod<
ConfiguredProps,
Adapter['setReconnectionMode']
>
}

export interface EventLoader {
Expand All @@ -465,6 +471,17 @@ export type EventLoaderOptions = {
preferRegular: boolean // prefer regular implementation via loadEvents over native one
}

export type AdapterRuntimeInfo = {
connectionCount: number
disposed: boolean
[key: string]: any
}

export type ReconnectionMode = {
maxReconnectionTimes?: number
delayBeforeReconnection?: number
}

export interface Adapter extends CoreEventstore {
importEvents: (options?: Partial<ImportOptions>) => ImportEventsStream
exportEvents: (options?: Partial<ExportOptions>) => ExportEventsStream
Expand Down Expand Up @@ -504,4 +521,7 @@ export interface Adapter extends CoreEventstore {
filter: EventLoaderFilter,
options?: EventLoaderOptions
) => Promise<EventLoader>

runtimeInfo: () => AdapterRuntimeInfo
setReconnectionMode: (mode: ReconnectionMode) => void
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import type { AdapterPool } from './types'

const dispose = async ({ database }: AdapterPool): Promise<void> => {
if (database) await database.close()
const dispose = async (pool: AdapterPool): Promise<void> => {
if (pool.database) {
await pool.database.close()
pool.database = undefined
}
}

export default dispose
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import resetReplication from './reset-replication'
import setReplicationLock from './set-replication-lock'
import getCursorUntilEventTypes from './get-cursor-until-event-types'
import describe from './describe'
import runtimeInfo from './runtime-info'

import configure from './configure'

Expand Down Expand Up @@ -89,6 +90,7 @@ const createSqliteAdapter = (options: SqliteAdapterConfig): Adapter => {
setReplicationLock,
getCursorUntilEventTypes,
describe,
runtimeInfo,
},
options,
configure
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import type { AdapterPool } from './types'
import type { AdapterRuntimeInfo } from '@resolve-js/eventstore-base'

const runtimeInfo = (pool: AdapterPool): AdapterRuntimeInfo => {
return {
disposed: pool.disposed,
connectionCount: pool.database === undefined ? 0 : 1,
}
}

export default runtimeInfo
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ const dispose = async (pool: AdapterPool): Promise<any> => {

log.debug(`disposing the event store`)

if (pool.connection) await pool.connection.end()
if (pool.connection) {
await pool.connection.end()
pool.connection = undefined
}

log.debug(`the event store disposed`)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import dropSecrets from './drop-secrets'
import dropFinal from './drop-final'

import describe from './describe'
import runtimeInfo from './runtime-info'

import type { Adapter } from '@resolve-js/eventstore-base'
import type { MysqlAdapterConfig } from './types'
Expand Down Expand Up @@ -71,6 +72,7 @@ const createMysqlAdapter = (options: MysqlAdapterConfig): Adapter => {
getSecret,
setSecret,
describe,
runtimeInfo,
},
options,
configure
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import type { AdapterPool } from './types'
import type { AdapterRuntimeInfo } from '@resolve-js/eventstore-base'

const runtimeInfo = (pool: AdapterPool): AdapterRuntimeInfo => {
return {
disposed: pool.disposed,
connectionCount: pool.connection === undefined ? 0 : 1,
}
}

export default runtimeInfo
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ const configure = (
executeStatement: executeStatement.bind(null, pool as AdapterPool),
createGetConnectPromise,
getConnectPromise: createGetConnectPromise(),
extraConnections: new Set(),
eventLoaders: new Set(),
}

Object.assign<AdapterPoolPrimal, ConfiguredProps>(pool, props)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ export const DEFAULT_QUERY_TIMEOUT = 45000
export const MINIMAL_QUERY_TIMEOUT = 1000

export const MAX_RECONNECTIONS = 5
export const SERVICE_WAIT_TIME = 1000
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,30 @@ import { getLog } from './get-log'

const MAX_DISCONNECT_TIME = 10000

const dispose = async ({ connection }: AdapterPool): Promise<void> => {
if (connection != null) {
const dispose = async (pool: AdapterPool): Promise<void> => {
if (pool.connection != null) {
const log = getLog('dispose')

const extraClient = pool.extraConnections.values()
pool.extraConnections.clear()

//TODO: Promise.allSettled
for (const client of extraClient) {
try {
await client.end()
} catch (error) {
log.error(`Could not end extra client connection: ${error}`)
}
}

//TODO: Promise.allSettled
const loaders = pool.eventLoaders.values()
for (const loader of loaders) {
await loader.close()
}

const connection = pool.connection
pool.connection = undefined
let timeout: NodeJS.Timeout | null = null
try {
await Promise.race([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { AdapterPool, PostgresConnection } from './types'
import {
RequestTimeoutError,
ServiceBusyError,
AlreadyDisposedError,
} from '@resolve-js/eventstore-base'
import {
isTimeoutError,
Expand All @@ -11,10 +12,10 @@ import {
makeUnrecognizedError,
} from './errors'
import { isAlreadyExistsError, isNotExistError } from './resource-errors'
import { MAX_RECONNECTIONS } from './constants'
import { MAX_RECONNECTIONS, SERVICE_WAIT_TIME } from './constants'
import makePostgresClient from './make-postgres-client'

const SERVICE_WAIT_TIME = 1000
const DISPOSED_WHILE_RUNNING_MSG = 'Adapter disposed while operation is running'

const executeStatement = async (
pool: AdapterPool,
Expand All @@ -25,13 +26,22 @@ const executeStatement = async (
let useDistinctConnection = distinctConnection
let distinctConnectionMade = false

const maxReconnectionTimes = pool.maxReconnectionTimes ?? MAX_RECONNECTIONS
const delayBeforeReconnection =
pool.delayBeforeReconnection ?? SERVICE_WAIT_TIME

while (true) {
if (pool.disposed) {
throw new AlreadyDisposedError(DISPOSED_WHILE_RUNNING_MSG)
}

let connection: PostgresConnection | undefined

try {
if (useDistinctConnection) {
connection = makePostgresClient(pool)
await connection.connect()
pool.extraConnections.add(connection)
distinctConnectionMade = true
} else {
connection = await pool.getConnectPromise()
Expand All @@ -41,7 +51,7 @@ const executeStatement = async (
pool.getConnectPromise = pool.createGetConnectPromise()
}
if (isConnectionTerminatedError(error) || isServiceBusyError(error)) {
if (reconnectionTimes >= MAX_RECONNECTIONS) {
if (reconnectionTimes >= maxReconnectionTimes) {
throw new ServiceBusyError(error.message)
}
reconnectionTimes++
Expand All @@ -55,6 +65,10 @@ const executeStatement = async (
let shouldWaitForServiceFree = false

try {
if (pool.disposed) {
throw new AlreadyDisposedError(DISPOSED_WHILE_RUNNING_MSG)
}

const result = await connection.query(sql)

if (result != null && Array.isArray(result.rows)) {
Expand All @@ -63,7 +77,11 @@ const executeStatement = async (

return []
} catch (error) {
if (isAlreadyExistsError(error) || isNotExistError(error)) {
if (
isAlreadyExistsError(error) ||
isNotExistError(error) ||
AlreadyDisposedError.is(error)
) {
throw error
} else if (isServiceBusyError(error)) {
throw new ServiceBusyError(error.message)
Expand All @@ -73,8 +91,8 @@ const executeStatement = async (
if (!useDistinctConnection) {
pool.getConnectPromise = pool.createGetConnectPromise()
}
if (reconnectionTimes >= MAX_RECONNECTIONS) {
throw new ServiceBusyError(error.message)
if (reconnectionTimes >= maxReconnectionTimes) {
throw new ServiceBusyError(error.message + `${useDistinctConnection}`)
}
useDistinctConnection = true
shouldWaitForServiceFree = true
Expand All @@ -83,7 +101,10 @@ const executeStatement = async (
error != null &&
error.message === 'Client was closed and is not queryable'
) {
if (reconnectionTimes >= MAX_RECONNECTIONS) {
if (pool.disposed) {
throw new AlreadyDisposedError(DISPOSED_WHILE_RUNNING_MSG)
}
if (reconnectionTimes >= maxReconnectionTimes) {
throw new ServiceBusyError(error.message)
}
useDistinctConnection = true
Expand All @@ -94,12 +115,13 @@ const executeStatement = async (
}
} finally {
if (distinctConnectionMade) {
connection.end((err) => {
return
})
pool.extraConnections.delete(connection)
await connection.end()
}
if (shouldWaitForServiceFree) {
await new Promise((resolve) => setTimeout(resolve, SERVICE_WAIT_TIME))
await new Promise((resolve) =>
setTimeout(resolve, delayBeforeReconnection)
)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ const getEventLoaderNative = async (
new PgCursor(sqlQuery)
) as unknown) as QueriedPgCursor
} catch (error) {
await client.end()
throw makeKnownError(error) ?? error
}

return {
const eventLoader: EventLoader = {
async close() {
pool.eventLoaders.delete(eventLoader)
// await pgCursor.close() // may never resolve due to https://github.com/brianc/node-postgres/issues/2642
// client.end is enough anyway since cursor lives as long as the connection
// client end may still hang if connection terminated and pgcursor exists
Expand All @@ -93,6 +95,10 @@ const getEventLoaderNative = async (
},
isNative: true,
}

pool.eventLoaders.add(eventLoader)

return eventLoader
}

export default getEventLoaderNative
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import getCursorUntilEventTypes from './get-cursor-until-event-types'
import describe from './describe'
import establishTimeLimit from './establish-time-limit'
import getEventLoaderNative from './get-event-loader-native'
import runtimeInfo from './runtime-info'
import setReconnectionMode from './set-reconnection-mode'

import type { Adapter } from '@resolve-js/eventstore-base'
import type { PostgresqlAdapterConfig } from './types'
Expand Down Expand Up @@ -99,6 +101,8 @@ const createPostgresqlAdapter = (options: PostgresqlAdapterConfig): Adapter => {
describe,
establishTimeLimit,
getEventLoaderNative,
runtimeInfo,
setReconnectionMode,
},
options,
configure
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import type { AdapterPool } from './types'
import type { AdapterRuntimeInfo } from '@resolve-js/eventstore-base'
import { MAX_RECONNECTIONS, SERVICE_WAIT_TIME } from './constants'

const runtimeInfo = (pool: AdapterPool): AdapterRuntimeInfo => {
return {
disposed: pool.disposed,
connectionCount:
pool.extraConnections.size +
pool.eventLoaders.size +
(pool.connection === undefined ? 0 : 1),
maxReconnectionTimes: pool.maxReconnectionTimes ?? MAX_RECONNECTIONS,
delayBeforeReconnection: pool.delayBeforeReconnection ?? SERVICE_WAIT_TIME,
processID: pool.connection ? (pool.connection as any).processID : undefined,
}
}

export default runtimeInfo
Loading