Skip to content

Commit

Permalink
Replace sqlite3 with better-sqlite3 (#2027)
Browse files Browse the repository at this point in the history
  • Loading branch information
FreeSlave authored Sep 2, 2021
1 parent 977f19c commit 3ac7e71
Show file tree
Hide file tree
Showing 64 changed files with 454 additions and 399 deletions.
1 change: 1 addition & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ module.exports = {
'postcss',
'postgres',
'postgresql',
'pragma',
'prefetch',
'principial',
'Postfix',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@
"@resolve-js/debug-levels": "0.31.9",
"@resolve-js/eventstore-base": "0.31.9",
"crc-32": "^1.2.0",
"sqlite": "3.0.2",
"tmp": "0.2.1"
"tmp": "0.2.1",
"better-sqlite3": "7.4.3",
"@types/better-sqlite3": "7.4.0"
},
"devDependencies": {
"jest": "27.0.3",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { AdapterPool } from './types'

const beginIncrementalImport = async ({
database,
eventsTableName,
escapeId,
escape,
executeQuery,
}: AdapterPool): Promise<string> => {
try {
const incrementalImportTableAsId = escapeId(
Expand All @@ -13,7 +13,7 @@ const beginIncrementalImport = async ({
const importId = Buffer.from(`${Date.now()}${Math.random()}`)
.toString('base64')
.replace(/\/|\+|=/gi, 'z')
await database.exec(
await executeQuery(
`CREATE TABLE ${incrementalImportTableAsId}(
-- RESOLVE INCREMENTAL-IMPORT ${escape(importId)} OWNED TABLE
${escapeId('threadId')} BIGINT NOT NULL,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { AdapterPool } from './types'
import type { AdapterPool } from './types'
import isIntegerOverflowError from './integer-overflow-error'

const commitIncrementalImport = async (
{ database, eventsTableName, escapeId, escape }: AdapterPool,
{ executeQuery, eventsTableName, escapeId, escape }: AdapterPool,
importId: string
): Promise<void> => {
const incrementalImportTableAsId = escapeId(
Expand All @@ -13,7 +14,7 @@ const commitIncrementalImport = async (
const eventsTableAsId = escapeId(eventsTableName)

try {
await database.exec(`BEGIN IMMEDIATE;
await executeQuery(`BEGIN IMMEDIATE;
SELECT ABS("CTE1"."IncrementalImportFailed") FROM (
SELECT 0 AS "IncrementalImportFailed"
UNION ALL
Expand All @@ -37,10 +38,10 @@ const commitIncrementalImport = async (
WHERE "MaybeEqualEvents"."payloadA" = "MaybeEqualEvents"."payloadB"
);
SELECT ABS("CTE2"."IncrementalImportFailed") FROM (
SELECT 0 AS "IncrementalImportFailed"
SELECT json_type("CTE2"."IncrementalImportFailed") FROM (
SELECT '{}' AS "IncrementalImportFailed"
UNION ALL
SELECT -9223372036854775808 AS "IncrementalImportFailed"
SELECT 'Malformed' AS "IncrementalImportFailed"
FROM ${eventsTableAsId}
WHERE ${eventsTableAsId}."timestamp" > (
SELECT MIN(${incrementalImportTableAsId}."timestamp") FROM ${incrementalImportTableAsId}
Expand Down Expand Up @@ -111,21 +112,19 @@ const commitIncrementalImport = async (
`)
} catch (error) {
try {
await database.exec(`ROLLBACK;`)
await executeQuery(`ROLLBACK;`)
} catch (e) {}
if (
error != null &&
(error.message === 'SQLITE_ERROR: integer overflow' ||
/^SQLITE_ERROR:.*? not exists$/.test(error.message))
) {
throw new Error(
`Either event batch has timestamps from the past or incremental importId=${importId} does not exist`
)
if (error != null && isIntegerOverflowError(error.message)) {
throw new Error(`Incremental importId=${importId} does not exist`)
} else if (error != null && /^.*? not exists$/.test(error.message)) {
throw new Error('Incremental import table does not exist')
} else if (error != null && error.message.endsWith('malformed JSON')) {
throw new Error('Event batch has timestamps from the past')
} else {
throw error
}
} finally {
await database.exec(`DROP TABLE IF EXISTS ${incrementalImportTableAsId};`)
await executeQuery(`DROP TABLE IF EXISTS ${incrementalImportTableAsId};`)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ import type {
ConnectionDependencies,
SqliteAdapterPoolConnectedProps,
SqliteAdapterConfig,
AdapterPool,
BetterSqliteDb,
} from './types'
import { SqliteAdapterConfigSchema } from './types'
import { validate } from '@resolve-js/eventstore-base'
import executeStatement from './execute-statement'
import executeQuery from './execute-query'

const SQLITE_BUSY = 'SQLITE_BUSY'
const randRange = (min: number, max: number): number =>
Expand All @@ -17,7 +21,7 @@ const fullJitter = (retries: number): number =>

const connect = async (
pool: AdapterPoolPrimal,
{ sqlite, tmp, os, fs }: ConnectionDependencies,
{ BetterSqlite, tmp, os, fs }: ConnectionDependencies,
config: SqliteAdapterConfig
): Promise<void> => {
const log = getLog('connect')
Expand Down Expand Up @@ -47,7 +51,7 @@ const connect = async (
log.verbose(`secretsTableName: ${secretsTableName}`)
log.verbose(`subscribersTableName: ${subscribersTableName}`)

let connector
let connector: () => Promise<SqliteAdapterPoolConnectedProps['database']>
if (databaseFile === ':memory:') {
log.debug(`using memory connector`)
if (process.env.RESOLVE_LAUNCH_ID != null) {
Expand Down Expand Up @@ -79,14 +83,19 @@ const connect = async (
}
}

connector = sqlite.open.bind(sqlite, pool.memoryStore.name)
const memoryStoreFileName = pool.memoryStore.name
connector = async () => {
return new BetterSqlite(memoryStoreFileName)
}
} else {
log.debug(`using disk file connector`)
connector = sqlite.open.bind(sqlite, databaseFile)
connector = async () => {
return new BetterSqlite(databaseFile)
}
}

log.debug(`connecting`)
let database
let database: SqliteAdapterPoolConnectedProps['database']
for (let retry = 0; ; retry++) {
try {
database = await connector()
Expand All @@ -96,33 +105,30 @@ const connect = async (
log.warn(`received SQLITE_BUSY error code, retrying`)
await new Promise((resolve) => setTimeout(resolve, fullJitter(retry)))
} else {
log.error(error.message)
log.verbose(error.stack)
if (error != null) {
log.error(error.message)
log.verbose(error.stack)
}
throw error
}
}
}

log.debug(`adjusting connection`)

log.verbose(`Entering driver serialize mode`)
await (database as any)?.driver?.serialize?.()

log.verbose(`PRAGMA busy_timeout=1000000`)
await database.exec(`PRAGMA busy_timeout=1000000`)

log.verbose(`PRAGMA encoding=${escape('UTF-8')}`)
await database.exec(`PRAGMA encoding=${escape('UTF-8')}`)
const pragma = async (pragmaSetting: string) => {
log.verbose(`PRAGMA ${pragmaSetting}`)
database.pragma(pragmaSetting)
}

log.verbose(`PRAGMA synchronous=EXTRA`)
await database.exec(`PRAGMA synchronous=EXTRA`)
await pragma('busy_timeout=1000000')
await pragma(`encoding=${escape('UTF-8')}`)
await pragma('synchronous=EXTRA')

if (databaseFile === ':memory:') {
log.verbose(`PRAGMA journal_mode=MEMORY`)
await database.exec(`PRAGMA journal_mode=MEMORY`)
await pragma('journal_mode=MEMORY')
} else {
log.verbose(`PRAGMA journal_mode=DELETE`)
await database.exec(`PRAGMA journal_mode=DELETE`)
await pragma('journal_mode=DELETE')
}

Object.assign<AdapterPoolPrimal, Partial<SqliteAdapterPoolConnectedProps>>(
Expand All @@ -134,6 +140,8 @@ const connect = async (
snapshotsTableName,
secretsTableName,
subscribersTableName,
executeStatement: executeStatement.bind(null, pool as AdapterPool),
executeQuery: executeQuery.bind(null, pool as AdapterPool),
}
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
import { getLog } from './get-log'
import { AdapterPool } from './types'
import type { AdapterPool } from './types'
import {
EventstoreFrozenError,
InputEvent,
makeDeleteSecretEvent,
THREAD_COUNT,
} from '@resolve-js/eventstore-base'
import isIntegerOverflowError from './integer-overflow-error'

const deleteSecret = async (
pool: AdapterPool,
selector: string
): Promise<boolean> => {
const { database, secretsTableName, escapeId, escape, eventsTableName } = pool
const {
executeQuery,
secretsTableName,
escapeId,
escape,
eventsTableName,
} = pool

const log = getLog('secretsManager:deleteSecret')
log.debug(`removing secret from the database`)
Expand All @@ -30,7 +37,7 @@ const deleteSecret = async (
log.debug(`executing SQL query`)

try {
await database.exec(
await executeQuery(
`BEGIN IMMEDIATE;
SELECT ABS("CTE"."EventStoreIsFrozen") FROM (
Expand Down Expand Up @@ -91,23 +98,24 @@ const deleteSecret = async (
return true
} catch (error) {
const errorMessage =
error != null && error.message != null ? error.message : ''
const errorCode = error != null && error.code != null ? error.code : ''
error != null && error.message != null ? (error.message as string) : ''
const errorCode =
error != null && error.code != null ? (error.code as string) : ''

if (errorMessage.indexOf('transaction within a transaction') > -1) {
return await deleteSecret(pool, selector)
}

try {
await database.exec('ROLLBACK;')
await executeQuery('ROLLBACK;')
} catch (e) {}

if (errorMessage === 'SQLITE_ERROR: integer overflow') {
if (isIntegerOverflowError(errorMessage)) {
throw new EventstoreFrozenError()
} else if (errorMessage === 'SQLITE_ERROR: malformed JSON') {
} else if (errorMessage.endsWith('malformed JSON')) {
return false
} else if (
errorCode === 'SQLITE_CONSTRAINT' &&
errorCode.startsWith('SQLITE_CONSTRAINT') &&
errorMessage.indexOf('PRIMARY') > -1
) {
return await deleteSecret(pool, selector)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,19 @@ import assert from 'assert'
import { THREAD_COUNT, threadArrayToCursor } from '@resolve-js/eventstore-base'

const describe = async (pool: AdapterPool): Promise<EventStoreDescription> => {
const { database, secretsTableName, escapeId, escape, eventsTableName } = pool
const {
executeStatement,
secretsTableName,
escapeId,
escape,
eventsTableName,
} = pool

const eventsTableNameAsId = escapeId(eventsTableName)
const secretsTableNameAsId = escapeId(secretsTableName)
const freezeTableName = `${eventsTableName}-freeze`

const existingThreads = (await database.all(`
const existingThreads = (await executeStatement(`
SELECT "threadId", MAX("threadCounter") AS "threadCounter" FROM
${eventsTableNameAsId} GROUP BY "threadId" ORDER BY "threadId" ASC`)) as Array<{
threadId: EventThreadData['threadId']
Expand All @@ -30,7 +36,7 @@ const describe = async (pool: AdapterPool): Promise<EventStoreDescription> => {
threadCounters[i]++
}

const rows = await database.all(`SELECT
const rows = await executeStatement(`SELECT
(SELECT COUNT(*) FROM ${eventsTableNameAsId}) AS "eventCount",
(SELECT COUNT(*) FROM ${secretsTableNameAsId}) AS "secretCount",
(SELECT COUNT(*) FROM ${secretsTableNameAsId} WHERE "secret" IS NOT NULL) AS "setSecretCount",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import executeSequence from './execute-sequence'
import { isNotExistError } from './resource-errors'

const dropEvents = async ({
database,
executeQuery,
databaseFile,
eventsTableName,
snapshotsTableName,
Expand All @@ -28,7 +28,7 @@ const dropEvents = async ({
]

const errors: any[] = await executeSequence(
database,
executeQuery,
statements,
log,
(error) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import executeSequence from './execute-sequence'
import { isNotExistError } from './resource-errors'

const dropSecrets = async ({
database,
executeQuery,
databaseFile,
secretsTableName,
escapeId,
Expand All @@ -16,7 +16,7 @@ const dropSecrets = async ({
const statements: string[] = [`DROP TABLE ${escapeId(secretsTableName)}`]

const errors: any[] = await executeSequence(
database,
executeQuery,
statements,
log,
(error) => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { AdapterPool } from './types'

const dropSnapshot = async (
{ database, escape, escapeId, snapshotsTableName }: AdapterPool,
{ executeQuery, escape, escapeId, snapshotsTableName }: AdapterPool,
snapshotKey: string
): Promise<void> => {
await database.exec(
await executeQuery(
`DELETE FROM ${escapeId(snapshotsTableName)}
WHERE ${escapeId('snapshotKey')}=
${escape(snapshotKey)}`
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { AdapterPool } from './types'
import type { AdapterPool } from './types'
import isIntegerOverflowError from './integer-overflow-error'

const ensureEventSubscriber = async (
pool: AdapterPool,
Expand All @@ -17,7 +18,7 @@ const ensureEventSubscriber = async (
status,
updateOnly,
} = params
const { subscribersTableName, database, escapeId, escape } = pool
const { subscribersTableName, executeQuery, escapeId, escape } = pool
const subscribersTableNameAsId = escapeId(subscribersTableName)
if (
(!!updateOnly && destination != null) ||
Expand All @@ -29,7 +30,7 @@ const ensureEventSubscriber = async (
}

try {
await database.exec(`
await executeQuery(`
INSERT OR REPLACE INTO ${subscribersTableNameAsId}(
"applicationName",
"eventSubscriber",
Expand Down Expand Up @@ -68,7 +69,7 @@ const ensureEventSubscriber = async (
const errorMessage =
error != null && error.message != null ? error.message : ''

if (errorMessage === 'SQLITE_ERROR: integer overflow') {
if (isIntegerOverflowError(errorMessage)) {
return false
}

Expand Down
Loading

0 comments on commit 3ac7e71

Please sign in to comment.