Skip to content

Commit

Permalink
Correctly handle disconnects (#8)
Browse files Browse the repository at this point in the history
* Correctly handle disconnects

* Add changeset
  • Loading branch information
ilijaNL authored Jul 27, 2024
1 parent 6d2ba8c commit cdf1360
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 11 deletions.
5 changes: 5 additions & 0 deletions .changeset/quiet-houses-fry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'pg-task': patch
---

Deal with unexpected connection errors
4 changes: 2 additions & 2 deletions __utils__/db.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Pool } from 'pg';
import { QueryClient } from '../src';

export async function cleanupSchema(pool: Pool, schema: string) {
export async function cleanupSchema(pool: QueryClient, schema: string) {
await pool.query(`DROP SCHEMA ${schema} CASCADE`);
}

Expand Down
2 changes: 1 addition & 1 deletion src/maintaince.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export const createMaintainceWorker = async (pool: Pool, schema: string, options
poolInternvalInMs: 60000,
onResolve(_, err, _result) {
if (err) {
console.error('failed maintaince task', err);
console.log('failed maintaince task', err);
}
},
},
Expand Down
64 changes: 61 additions & 3 deletions src/manager.spec.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import { PostgreSqlContainer, StartedPostgreSqlContainer } from '@testcontainers/postgresql';
import { Pool } from 'pg';
import { cleanupSchema, createRandomSchema } from '../__utils__/db';
import { createManager } from './manager';
import { createManager, WorkerManager } from './manager';
import EventEmitter, { once } from 'node:events';
import { executeQuery } from './utils/sql';
import { createPlans } from './plans';
import { setTimeout } from 'timers/promises';
import { DeferredPromise } from './utils/common';

describe('pg worker', () => {
jest.setTimeout(30000);
let pool: Pool;
let container: StartedPostgreSqlContainer;
let manager: WorkerManager;

beforeAll(async () => {
container = await new PostgreSqlContainer().start();
Expand All @@ -26,16 +28,18 @@ describe('pg worker', () => {
pool = new Pool({
connectionString: container.getConnectionUri(),
});

schema = createRandomSchema();
});

afterEach(async () => {
await manager?.stop();
await cleanupSchema(pool, schema);
await pool?.end();
});

it('smoke test', async () => {
const manager = createManager({
manager = createManager({
pgClient: pool,
schema,
});
Expand All @@ -48,7 +52,7 @@ describe('pg worker', () => {
});

it('smoke test with task', async () => {
const manager = createManager({
manager = createManager({
pgClient: pool,
schema,
});
Expand Down Expand Up @@ -96,4 +100,58 @@ describe('pg worker', () => {
// we should not have any pending tasks left
await expect(executeQuery(pool, plans.popTasks(queue, 100))).resolves.toHaveLength(0);
});

it('smoke test with connection drop', async () => {
manager = createManager({
pgClient: pool,
schema,
});

const queue = 'test-queue';

const deferredPromise = new DeferredPromise();

await manager.register({
queue: queue,
options: {
poolInternvalInMs: 20,
maxConcurrency: 10,
refillThresholdPct: 0.33,
},
handler(data) {
deferredPromise.resolve(data);
},
});

await setTimeout(200);

pool
.query(
'SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND usename = current_user'
)
.catch(() => {});

await setTimeout(200);

const plans = createPlans(schema);

await executeQuery(
pool,
plans.enqueueTasks([
{
data: { nosmoke: true },
expireInSeconds: 1,
maxAttempts: 1,
metaData: {},
queue: queue,
retryBackoff: false,
retryDelayInSeconds: 10,
singletonKey: null,
startAfterSeconds: 0,
},
])
);

await expect(deferredPromise.promise).resolves.toEqual({ nosmoke: true });
});
});
7 changes: 7 additions & 0 deletions src/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ export const createManager = (properties: WorkerManagerProperties): WorkerManage

let startPromise: Promise<void> | null = null;

const onError = (err: any) => {
console.log('pgClient error', err?.message ?? err);
};

pgClient.on('error', onError);

async function init() {
state = 'starting';

Expand Down Expand Up @@ -118,6 +124,7 @@ export const createManager = (properties: WorkerManagerProperties): WorkerManage

state = 'idle';
startPromise = null;
pgClient.off('error', onError);
},
};
};
5 changes: 4 additions & 1 deletion src/task-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ export const createTaskWorker = (implementation: WorkerImpl, config: TaskWorkerC
}

const requestedAmount = maxConcurrency - activeTasks.size;
const tasks = await implementation.popTasks(requestedAmount);
const tasks = await implementation.popTasks(requestedAmount).catch((err) => {
console.log('error popping tasks', 'message' in err ? err.message : err);
return [] as SelectedTask[];
});

// high chance that there are more tasks when requested amount is same as fetched
probablyHasMoreTasks = tasks.length === requestedAmount;
Expand Down
16 changes: 14 additions & 2 deletions src/utils/sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ export type TypedQuery<TRow = QueryResultRow> = {
[rowTypeSymbol]: TRow;
};

export interface Notification {
processId: number;
channel: string;
payload?: string | undefined;
}

export interface QueryClient {
query(query: string, values?: any[]): Promise<any>;
query<T = unknown>(props: {
Expand All @@ -25,10 +31,16 @@ export interface QueryClient {

export interface Pool extends QueryClient {
connect(): Promise<ClientFromPool>;
on(event: 'error', listener: (...args: any[]) => void): this;
off(event: 'error', listener: (...args: any[]) => void): this;
}

export interface ClientFromPool extends QueryClient {
release(err?: boolean | Error | undefined): void;
on(event: 'notification', listener: (message: Notification) => void): unknown;
off(event: 'notification', listener: (message: Notification) => void): any;
on(event: 'error', listener: (...args: any[]) => void): unknown;
off(event: 'error', listener: (...args: any[]) => void): unknown;
}

export async function executeQuery<TRowResult extends QueryResultRow>(
Expand Down Expand Up @@ -69,12 +81,12 @@ export async function runTransaction<T>(pool: Pool, handler: (client: ClientFrom
/**
* Values supported by SQL engine.
*/
export type Value = unknown;
type Value = unknown;

/**
* Supported value or SQL instance.
*/
export type RawValue = Value | Sql;
type RawValue = Value | Sql;

/**
* A SQL instance can be nested within each other to build SQL strings.
Expand Down
5 changes: 4 additions & 1 deletion src/utils/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ export function createBaseWorker(run: () => Promise<ShouldContinue>, props: { lo
while (state.polling) {
const started = Date.now();
state.executing = true;
const shouldContinue = await run();
const shouldContinue = await run().catch((err) => {
console.log('error in worker loop', err?.message ?? err);
return false;
});
state.executing = false;
const duration = Date.now() - started;

Expand Down
2 changes: 1 addition & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@
"strict": true
},
"exclude": ["**/node_modules**/"],

"include": ["./src/*", "__utils__"]
}

0 comments on commit cdf1360

Please sign in to comment.