From 629d5c23dacc1040c03c5efcefeb35caa92fb871 Mon Sep 17 00:00:00 2001 From: ilija <276772+ilijaNL@users.noreply.github.com> Date: Mon, 22 Jul 2024 08:55:25 +0200 Subject: [PATCH] Rename + readme example (#3) * Rename + readme example * Reduce performance requirements * Fix ci pnpm cache * Init changeset * Add ci/cd * Fix cd * rename * Fix cicd --- .changeset/README.md | 8 +++++ .changeset/config.json | 11 ++++++ .changeset/good-seahorses-wave.md | 5 +++ .github/workflows/ci.yml | 57 +++++++++++++++++++++++++++++++ .github/workflows/cicd.yml | 55 +++++++++++++++++++++++++++++ .github/workflows/test.yml | 39 --------------------- README.md | 47 +++++++++++++++++++++++++ src/index.ts | 8 +++++ src/maintaince.ts | 4 +-- src/manager.spec.ts | 4 +-- src/plans.spec.ts | 39 ++++++++++----------- src/plans.ts | 4 +-- src/queue-worker.ts | 2 +- src/task-worker.spec.ts | 6 ++-- 14 files changed, 221 insertions(+), 68 deletions(-) create mode 100644 .changeset/README.md create mode 100644 .changeset/config.json create mode 100644 .changeset/good-seahorses-wave.md create mode 100644 .github/workflows/ci.yml create mode 100644 .github/workflows/cicd.yml delete mode 100644 .github/workflows/test.yml diff --git a/.changeset/README.md b/.changeset/README.md new file mode 100644 index 0000000..e5b6d8d --- /dev/null +++ b/.changeset/README.md @@ -0,0 +1,8 @@ +# Changesets + +Hello and welcome! This folder has been automatically generated by `@changesets/cli`, a build tool that works +with multi-package repos, or single-package repos to help you version and publish your code. You can +find the full documentation for it [in our repository](https://github.com/changesets/changesets) + +We have a quick list of common questions to get you started engaging with this project in +[our documentation](https://github.com/changesets/changesets/blob/main/docs/common-questions.md) diff --git a/.changeset/config.json b/.changeset/config.json new file mode 100644 index 0000000..ae82eba --- /dev/null +++ b/.changeset/config.json @@ -0,0 +1,11 @@ +{ + "$schema": "https://unpkg.com/@changesets/config@3.0.2/schema.json", + "changelog": "@changesets/cli/changelog", + "commit": false, + "fixed": [], + "linked": [], + "access": "restricted", + "baseBranch": "main", + "updateInternalDependencies": "patch", + "ignore": [] +} diff --git a/.changeset/good-seahorses-wave.md b/.changeset/good-seahorses-wave.md new file mode 100644 index 0000000..4a3938b --- /dev/null +++ b/.changeset/good-seahorses-wave.md @@ -0,0 +1,5 @@ +--- +'pg-task': patch +--- + +Add initial version diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..3eb6f6e --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,57 @@ +name: integration + +on: + workflow_call: + pull_request: + branches: + - "main" + paths-ignore: + - 'docs/**' + - 'example/**' + - '**/*.md' + +jobs: + # Label of the container job + test: + # Containers must run in Linux based operating systems + runs-on: ubuntu-latest + + strategy: + matrix: + node-version: [18.x, 20.x] + + steps: + - uses: actions/checkout@v4 + - name: Use Node.js ${{ matrix.node-version }} + id: setup-node + uses: actions/setup-node@v4 + with: + node-version: ${{ matrix.node-version }} + + - name: Install pnpm + uses: pnpm/action-setup@v4 + with: + version: 8 + + # See https://github.com/actions/setup-node/issues/641#issuecomment-1358859686 + - name: pnpm cache path + id: pnpm-cache-path + run: | + echo "STORE_PATH=$(pnpm store path)" >> $GITHUB_OUTPUT + + - name: pnpm cache + uses: actions/cache@v3 + with: + path: ${{ steps.pnpm-cache-path.outputs.STORE_PATH }} + key: ${{ runner.os }}-${{ steps.setup-node.outputs.node-version }}-pnpm-store-${{ hashFiles('**/pnpm-lock.yaml') }} + restore-keys: | + ${{ runner.os }}-${{ steps.setup-node.outputs.node-version }}-pnpm-store- + + - name: Install dependencies + run: pnpm install --frozen-lockfile + - name: typecheck + run: pnpm run typecheck + - name: test + run: pnpm run test + - name: ✅ Upload coverage to Codecov + uses: codecov/codecov-action@v3 \ No newline at end of file diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml new file mode 100644 index 0000000..c0de3e1 --- /dev/null +++ b/.github/workflows/cicd.yml @@ -0,0 +1,55 @@ +name: cicd + +on: + push: + branches: + - main + +concurrency: ${{ github.workflow }}-${{ github.ref }} + +jobs: + test: + uses: ./.github/workflows/ci.yml + release: + name: Release + needs: test + runs-on: ubuntu-latest + permissions: + id-token: write + contents: write + packages: write + pull-requests: write + issues: read + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + - uses: actions/setup-node@v4 + id: setup-node + with: + node-version: 20.x + - name: Install pnpm + uses: pnpm/action-setup@v4 + with: + version: 8 + # See https://github.com/actions/setup-node/issues/641#issuecomment-1358859686 + - name: pnpm cache path + id: pnpm-cache-path + run: | + echo "STORE_PATH=$(pnpm store path)" >> $GITHUB_OUTPUT + + - name: pnpm cache + uses: actions/cache@v3 + with: + path: ${{ steps.pnpm-cache-path.outputs.STORE_PATH }} + key: ${{ runner.os }}-${{ steps.setup-node.outputs.node-version }}-pnpm-store-${{ hashFiles('**/pnpm-lock.yaml') }} + restore-keys: | + ${{ runner.os }}-${{ steps.setup-node.outputs.node-version }}-pnpm-store- + + - name: Install dependencies + run: pnpm install --frozen-lockfile + + - name: Create Release Pull Request + uses: changesets/action@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml deleted file mode 100644 index ca35e55..0000000 --- a/.github/workflows/test.yml +++ /dev/null @@ -1,39 +0,0 @@ -name: test - -on: - workflow_call: - pull_request: - branches: - - "main" - paths-ignore: - - 'docs/**' - - 'example/**' - - '**/*.md' - -jobs: - # Label of the container job - test: - # Containers must run in Linux based operating systems - runs-on: ubuntu-latest - - strategy: - matrix: - node-version: [18.x, 20.x] - - steps: - - uses: actions/checkout@v4 - - name: Install pnpm - uses: pnpm/action-setup@v4 - with: - version: 8 - - name: Use Node.js ${{ matrix.node-version }} - uses: actions/setup-node@v4 - with: - node-version: ${{ matrix.node-version }} - cache: 'pnpm' - - name: Install dependencies - run: pnpm install - - name: test - run: pnpm run test - # - name: ✅ Upload coverage to Codecov - # uses: codecov/codecov-action@v3 \ No newline at end of file diff --git a/README.md b/README.md index df26837..369278e 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,50 @@ # pg-task A SQS like solution build on top of Postgres and NodeJS. + +## Usage + +``` +npm install pg-task +``` + +```typescript +import { createManager, executeQuery, createPlans, createTaskQueueFactory } from 'pg-task'; +import { Pool } from 'pg'; + +const pool = new Pool({}); + +const manager = createManager({ + pgClient: pool, + schema, +}); +await manager.start(); + +// Register a worker for `worker-queue` task queue +const workerId = await manager.work({ + queue: 'worker-queue', + async handler(data) { + await Promise.resolve(); + }, +}); + +// enqueue tasks +const plans = createPlans(schema); +const taskFactory = createTaskQueueFactory('worker-queue'); +await executeQuery( + pool, + plans.enqueueTasks( + taskFactory([ + { + data: { somepayload: 'test' }, + }, + { + data: { somepayload: 'test' }, + }, + ]) + ) +); + +// On application shutdown +await manager.stop(); +``` diff --git a/src/index.ts b/src/index.ts index 2ab3336..67d39d5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -10,4 +10,12 @@ export { type TaskResult, type TaskResultState, } from './task'; +export { + executeQuery, + type Pool, + type ClientFromPool, + type QueryClient, + type TypedQuery, + type QueryResultRow, +} from './utils/sql'; export default createManager; diff --git a/src/maintaince.ts b/src/maintaince.ts index 51d386c..c5f1eec 100644 --- a/src/maintaince.ts +++ b/src/maintaince.ts @@ -57,7 +57,7 @@ export const createMaintainceWorker = async (pool: Pool, schema: string, options // complete this task, and reschedule it in future await transactionExecutor(plans.resolveTasks([{ task_id: id, result: data, state: TaskResultStates.success }])); await transactionExecutor( - plans.createTasks( + plans.enqueueTasks( taskFactory([ { data: null, @@ -73,7 +73,7 @@ export const createMaintainceWorker = async (pool: Pool, schema: string, options // ensure we try to create the maintaince tasks always await executor( - plans.createTasks( + plans.enqueueTasks( taskFactory([ { data: null, diff --git a/src/manager.spec.ts b/src/manager.spec.ts index 1d93f34..6b82867 100644 --- a/src/manager.spec.ts +++ b/src/manager.spec.ts @@ -73,7 +73,7 @@ describe('pg worker', () => { await executeQuery( pool, - plans.createTasks([ + plans.enqueueTasks([ { data: { nosmoke: true }, expireInSeconds: 1, @@ -94,6 +94,6 @@ describe('pg worker', () => { await manager.stop(); // we should not have any pending tasks left - await expect(executeQuery(pool, plans.getAndStartTasks(queue, 100))).resolves.toHaveLength(0); + await expect(executeQuery(pool, plans.popTasks(queue, 100))).resolves.toHaveLength(0); }); }); diff --git a/src/plans.spec.ts b/src/plans.spec.ts index d72c70f..c01787b 100644 --- a/src/plans.spec.ts +++ b/src/plans.spec.ts @@ -27,7 +27,7 @@ describe('plans', () => { const plans = createPlans('schema_a'); it('createTasks', () => { - const q = plans.createTasks(generateTasks(3, { works: true, value: '123' })); + const q = plans.enqueueTasks(generateTasks(3, { works: true, value: '123' })); expect(q.text).toMatchInlineSnapshot(` " SELECT @@ -105,8 +105,8 @@ describe('plans', () => { `); }); - it('getAndStartTasks', () => { - const q = plans.getAndStartTasks('queue', 20); + it('popTasks', () => { + const q = plans.popTasks('queue', 20); expect(q.text).toMatchInlineSnapshot(` " SELECT @@ -230,9 +230,9 @@ describe('plans', () => { }, ]; - await executeQuery(pool, plans.createTasks(tasks)); + await executeQuery(pool, plans.enqueueTasks(tasks)); - const fetchedTasks = await executeQuery(pool, plans.getAndStartTasks('test-queue', 11)); + const fetchedTasks = await executeQuery(pool, plans.popTasks('test-queue', 11)); expect(fetchedTasks).toHaveLength(tasks.length); expect(fetchedTasks.map((t) => t.data)).toEqual(tasks.map((t) => t.data)); @@ -271,9 +271,9 @@ describe('plans', () => { startAfterSeconds: 0, }; - await executeQuery(pool, plans.createTasks([task])); + await executeQuery(pool, plans.enqueueTasks([task])); - const fetchedTasks1 = await executeQuery(pool, plans.getAndStartTasks(queue, 11)); + const fetchedTasks1 = await executeQuery(pool, plans.popTasks(queue, 11)); // fails await executeQuery( @@ -287,13 +287,13 @@ describe('plans', () => { ) ); - const fetchedTasks2 = await executeQuery(pool, plans.getAndStartTasks(queue, 11)); + const fetchedTasks2 = await executeQuery(pool, plans.popTasks(queue, 11)); expect(fetchedTasks2.length).toBe(0); await setTimeout(1500); - const [fetchedTask3] = await executeQuery(pool, plans.getAndStartTasks(queue, 11)); + const [fetchedTask3] = await executeQuery(pool, plans.popTasks(queue, 11)); expect(fetchedTask3).toBeDefined(); const succeedTask = fetchedTask3!; @@ -408,7 +408,7 @@ describe('plans', () => { }); const start = process.hrtime(); - const createTaskQuery = plans.createTasks(taskBatch); + const createTaskQuery = plans.enqueueTasks(taskBatch); for (let i = 0; i < 2000; ++i) { await Promise.all([ executeQuery(pool, createTaskQuery), @@ -436,7 +436,7 @@ describe('plans', () => { 0 ); - const createTasksQuery = plans.createTasks(taskBatch); + const createTasksQuery = plans.enqueueTasks(taskBatch); const createPromises = []; // create for (let i = 0; i < 10000; ++i) { @@ -445,7 +445,7 @@ describe('plans', () => { await Promise.all(createPromises); - const getTaskQuery = plans.getAndStartTasks(taskBatch[0]!.queue, 10); + const getTaskQuery = plans.popTasks(taskBatch[0]!.queue, 10); const start = process.hrtime(); const getTasksPromises: Promise[] = []; @@ -459,7 +459,7 @@ describe('plans', () => { expect(seconds).toBeLessThan(10); }); - it('resolves 100000 tasks under 10 seconds', async () => { + it('resolves 50000 tasks under 10 seconds', async () => { jest.setTimeout(40000); // pre-generated batches @@ -471,27 +471,28 @@ describe('plans', () => { 0 ); - const createTasksQuery = plans.createTasks(taskBatch); + const createTasksQuery = plans.enqueueTasks(taskBatch); const createPromises = []; // create - for (let i = 0; i < 1000; ++i) { + for (let i = 0; i < 500; ++i) { createPromises.push(executeQuery(pool, createTasksQuery)); } await Promise.all(createPromises); - const getTaskQuery = plans.getAndStartTasks(taskBatch[0]!.queue, 50); + const getTaskQuery = plans.popTasks(taskBatch[0]!.queue, 50); const getTasksPromises: Promise[] = []; - for (let i = 0; i < 2000; ++i) { + for (let i = 0; i < 1000; ++i) { getTasksPromises.push(executeQuery(pool, getTaskQuery)); } const taskIds = (await Promise.all(getTasksPromises)).flat().map((t) => t.id); - expect(taskIds.length).toBe(100000); + expect(taskIds.length).toBe(50000); + const resolveBatcher = createBatcher({ - maxSize: 5, + maxSize: 10, maxTimeInMs: 100, onFlush: async (batch) => { await executeQuery(pool, plans.resolveTasks(batch.map((item) => item.data))); diff --git a/src/plans.ts b/src/plans.ts index ac59d64..2a87c6a 100644 --- a/src/plans.ts +++ b/src/plans.ts @@ -18,7 +18,7 @@ const itemsToKeys = >(items: T[], init: KeysToArr< }, init); export const createPlans = (schema: string) => ({ - createTasks: (tasks: ConfiguredTask[]) => { + enqueueTasks: (tasks: ConfiguredTask[]) => { const payload = itemsToKeys(tasks, { data: new Array(tasks.length), expireInSeconds: new Array(tasks.length), @@ -49,7 +49,7 @@ FROM ${rawSql(schema)}.create_tasks( ) `; }, - getAndStartTasks: (queue: string, amount: number): TypedQuery => { + popTasks: (queue: string, amount: number): TypedQuery => { return sql<{ id: string; data: JsonValue; diff --git a/src/queue-worker.ts b/src/queue-worker.ts index 96dc48c..0f081e7 100644 --- a/src/queue-worker.ts +++ b/src/queue-worker.ts @@ -41,7 +41,7 @@ export const createQueueWorker = ( return createTaskWorker( { async popTasks(amount) { - return queryExecutor(sqlPlans.getAndStartTasks(props.queue, amount)); + return queryExecutor(sqlPlans.popTasks(props.queue, amount)); }, async resolveTask(task) { await resolveTaskBatcher.add(task); diff --git a/src/task-worker.spec.ts b/src/task-worker.spec.ts index a98f590..305a18f 100644 --- a/src/task-worker.spec.ts +++ b/src/task-worker.spec.ts @@ -128,7 +128,7 @@ describe('task-worker', () => { } }, }, - { maxConcurrency: amountOfTasks, poolInternvalInMs: 20, refillThresholdPct: 0.33 } + { maxConcurrency: amountOfTasks, poolInternvalInMs: 80, refillThresholdPct: 0.33 } ); worker.start(); @@ -221,7 +221,7 @@ describe('task-worker', () => { await setTimeout(200); }, }, - { maxConcurrency: amountOfTasks, poolInternvalInMs: 20, refillThresholdPct: 0.33 } + { maxConcurrency: amountOfTasks, poolInternvalInMs: 100, refillThresholdPct: 0.33 } ); worker.start(); @@ -270,7 +270,7 @@ describe('task-worker', () => { }; }, }, - { maxConcurrency: amountOfTasks, poolInternvalInMs: 60, refillThresholdPct: 0.33 } + { maxConcurrency: amountOfTasks, poolInternvalInMs: 100, refillThresholdPct: 0.33 } ); const promise = once(ee, 'completed');