Skip to content

Commit

Permalink
custom adapter per engine possibility
Browse files Browse the repository at this point in the history
  • Loading branch information
paed01 committed Jan 25, 2025
1 parent f7a9c16 commit 345cf17
Show file tree
Hide file tree
Showing 24 changed files with 1,032 additions and 223 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

## Unreleased

## [0.17.0] - 2025-01-22

Playing around with custom adapters requiring specials options per engine revealed some problems. An attempt to solve just that.

- forward query parameters as resume options when calling `(*)?/status/:token`
- forward query parameters as resume options when calling `(*)?/status/:token/:activityId`
- add ability to clone current `res.locals.engines` with override options to facilitate passing custom adapter per engine
- pass engine save state options along when tinkering with call activities

## [0.16.1] - 2025-01-10

- `DELETE (*)?/state/:token` forwards body to adapter delete as options
Expand Down
8 changes: 8 additions & 0 deletions docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ Resume process run
- `idleTimeout`: idle timeout delay, positive number of milliseconds or as ISO 8601 duration
- `sync`: run until end instruction, any value will do, or `false` to disable, returns 504 if not completed within idle timeout

Any other query parametes will also be passed as resume options.

### `POST (*)?/signal/:token`

Signal process activity.
Expand All @@ -247,6 +249,8 @@ Signal process activity.
- `idleTimeout`: idle timeout delay, positive number of milliseconds or as ISO 8601 duration
- `sync`: run until end instruction, any value will do, or `false` to disable, returns 504 if not completed within idle timeout

Any other query parametes will also be passed as resume options.

**Request body:**

- `id`: activity id
Expand All @@ -261,6 +265,8 @@ Cancel process activity.

- `autosaveEngineState`: force autosave engine state, any value will do, or `false` to disable auto save engine state

Any other query parametes will also be passed as resume options.

**Request body:**

- `id`: activity id
Expand All @@ -274,6 +280,8 @@ Fail process activity.

- `autosaveEngineState`: force autosave engine state, any value will do, or `false` to disable auto save engine state

Any other query parametes will also be passed as resume options.

**Request body:**

- `id`: activity id
Expand Down
55 changes: 55 additions & 0 deletions example/adapters/custom-adapter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import fs, { promises as fsp } from 'node:fs';
import { basename, join } from 'node:path';
import { MemoryAdapter, STORAGE_TYPE_FILE, STORAGE_TYPE_DEPLOYMENT, HttpError } from '../../src/index.js';
import { LRUCache } from 'lru-cache';

export class CustomAdapter extends MemoryAdapter {
/**
* @param {string} rootFolder
* @param {import('lru-cache').LRUCache} [storage]
*/
constructor(rootFolder, storage) {
super(storage ?? new LRUCache({ max: 1000, allowStale: false, fetchMethod: fetchMethod.bind(null, rootFolder) }));
this.rootFolder = rootFolder;
}
}

async function fetchMethod(rootFolder, fetchKey) {
const [type, key, ...restKey] = fetchKey.split(':');

if (type === STORAGE_TYPE_DEPLOYMENT && key === 'fs' && restKey.length) {
const fileName = `${basename(restKey.join(''))}.bpmn`;
const filePath = join(rootFolder, fileName);

await fsp.stat(filePath).catch((err) => {
/* c8 ignore next */
if (err.code !== 'ENOENT') throw err;
throw new HttpError(`deployment ${filePath} not found`, 404);
});

return JSON.stringify([{ path: filePath }]);
}
if (type === STORAGE_TYPE_FILE) {
return readFileContent(key).catch((err) => {
/* c8 ignore next */
if (err.code !== 'ENOENT') throw err;
throw new HttpError(`file ${key} not found`, 404);
});
}
}

function readFileContent(file) {
return new Promise((resolve, reject) => {
let content = '';
let size = 0;
fs.createReadStream(file)
.on('data', (chunk) => {
size += chunk.byteLength;
content += chunk;
})
.on('end', () => {
return resolve(JSON.stringify({ mimetype: 'text/xml', size, content }));
})
.on('error', reject);
});
}
2 changes: 1 addition & 1 deletion example/middleware-scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export class MiddlewareScripts extends FlowScripts {
* @param {import('@onify/flow-extensions/FlowScripts').FlowScriptOptions} [options]
*/
constructor(adapter, deploymentName, resourceBase, runContext, options) {
super(deploymentName, resourceBase, runContext, options);
super(deploymentName, resourceBase, runContext, options?.timeout);
this.adapter = adapter;
}
/**
Expand Down
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bpmn-middleware",
"version": "0.16.1",
"version": "0.17.0",
"description": "BPMN engine express middleware",
"type": "module",
"main": "./dist/main.cjs",
Expand Down Expand Up @@ -61,7 +61,7 @@
"globals": "^15.3.0",
"mocha": "^11.0.1",
"mocha-cakes-2": "^3.3.0",
"nock": "^13.3.1",
"nock": "^14.0.0",
"prettier": "^3.3.3",
"rollup": "^4.17.2",
"supertest": "^7.0.0",
Expand All @@ -76,7 +76,7 @@
"bpmn-elements": "*",
"bpmn-engine": ">=15",
"debug": "*",
"express": ">=4",
"express": "4.x",
"smqp": "*"
},
"files": [
Expand Down
114 changes: 83 additions & 31 deletions src/bpmn-middleware.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { Engines } from './engines.js';
import { MemoryAdapter } from './memory-adapter.js';
import { HttpError, StorageError } from './errors.js';
import { MiddlewareEngine } from './middleware-engine.js';
import { fromActivityMessage } from './caller.js';
import { createCallerFromActivityMessage } from './caller.js';
import debug from './debug.js';
import { DeferredCallback } from './deferred.js';

Expand All @@ -28,17 +28,20 @@ const snakeReplacePattern = /\W/g;
/**
* Bpmn Engine Middleware
* @param {import('types').BpmnMiddlewareOptions} options
* @param {Engines} [engines]
*/
export function BpmnEngineMiddleware(options, engines) {
export function BpmnEngineMiddleware(options) {
/** @type {string} BPMN Middleware name */
const name = (this.name = options.name || MIDDLEWARE_DEFAULT_EXCHANGE);
this.adapter = options.adapter;
this.engines = engines ?? new Engines({ ...options });
this.engineOptions = { ...options.engineOptions };

/** @type {import('types').IStorageAdapter} */
const adapter = (this.adapter = options.adapter || new MemoryAdapter());

/** @type {Broker} */
const broker = (this.broker = options.broker || new Broker(this));

this.engines = new Engines({ ...options, name, adapter, broker });
this.engineOptions = { ...options.engineOptions };

broker.assertExchange(name, 'topic', { autoDelete: false, durable: false });

broker.subscribeTmp(name, 'activity.call', (_, msg) => this._startProcessByCallActivity(msg), { noAck: true });
Expand Down Expand Up @@ -171,9 +174,9 @@ BpmnEngineMiddleware.prototype.addResponseLocals = function addResponseLocals()
BpmnEngineMiddleware.prototype.addEngineLocals = function addEngineLocals(req, res, next) {
res.locals.middlewareName = this.name;
res.locals.token = res.locals.token ?? req.params.token;
res.locals.engines = res.locals.engines ?? this.engines;
res.locals.adapter = res.locals.adapter ?? this.adapter;
res.locals.broker = res.locals.broker ?? this.broker;
const engines = (res.locals.engines = res.locals.engines ?? this.engines);
res.locals.adapter = res.locals.adapter ?? engines.adapter;
res.locals.broker = res.locals.broker ?? engines.broker;
res.locals.listener = res.locals.listener ?? this._bpmnEngineListener;
next();
};
Expand Down Expand Up @@ -345,7 +348,7 @@ BpmnEngineMiddleware.prototype.getRunning = async function getRunning(req, res,
BpmnEngineMiddleware.prototype.getStatusByToken = async function getStatusByToken(req, res, next) {
try {
const token = req.params.token;
const status = await res.locals.engines.getStatusByToken(token);
const status = await res.locals.engines.getStatusByToken(token, req.query);
if (!status) throw new HttpError(`Token ${token} not found`, 404);
return res.send(status);
} catch (err) {
Expand Down Expand Up @@ -406,8 +409,16 @@ BpmnEngineMiddleware.prototype.signalActivity = async function signalActivity(re
BpmnEngineMiddleware.prototype.cancelActivity = async function cancelActivity(req, res, next) {
try {
const { token, engines, listener, executeOptions } = res.locals;
await engines.resumeAndCancelActivity(token, listener, req.body, executeOptions);
return res.send(engines.getEngineStatusByToken(token));

const sync = executeOptions.sync && new DeferredCallback(syncExecutionCallback);

const engine = await engines.resumeAndCancelActivity(token, listener, req.body, executeOptions, sync?.callback);

if (!sync) return res.send(engines.getEngineStatusByToken(token));

await sync;

return res.send({ ...engines.getEngineStatus(engine), result: engine.environment.output });
} catch (err) {
next(err);
}
Expand Down Expand Up @@ -695,21 +706,23 @@ BpmnEngineMiddleware.prototype._parseQueryToEngineOptions = function parseQueryT
*/
BpmnEngineMiddleware.prototype._startProcessByCallActivity = async function startProcessByCallActivity(callActivityMessage) {
try {
const { content } = callActivityMessage;
const content = callActivityMessage.content;

const [category, ...rest] = content.calledElement.split(':');

if (category !== STORAGE_TYPE_DEPLOYMENT || !rest.length) return;

// eslint-disable-next-line no-var
var deploymentName = rest.join(':');

if (content.isRecovered) return;

// eslint-disable-next-line no-var
var caller = fromActivityMessage(callActivityMessage);
var caller = createCallerFromActivityMessage(callActivityMessage);

return await this._startDeployment(deploymentName, {
listener: this._bpmnEngineListener,
settings: { caller: { ...caller } },
settings: { ...content.settings, caller: { ...caller } },
variables: { ...content.input },
caller,
});
Expand Down Expand Up @@ -745,7 +758,7 @@ BpmnEngineMiddleware.prototype._startDeployment = async function startDeployment

const deploymentSource = await this.adapter.fetch(STORAGE_TYPE_FILE, deployment[0].path, options);

const { listener, variables, businessKey, caller, idleTimeout } = options;
const { listener, settings, variables, businessKey, caller, idleTimeout } = options;

const token = randomUUID();

Expand All @@ -755,6 +768,10 @@ BpmnEngineMiddleware.prototype._startDeployment = async function startDeployment
token,
source: deploymentSource.content,
listener,
settings: {
...this.engineOptions.settings,
...settings,
},
variables: {
...this.engineOptions.variables,
...variables,
Expand All @@ -778,12 +795,27 @@ BpmnEngineMiddleware.prototype._cancelProcessByCallActivity = async function can

if (category !== STORAGE_TYPE_DEPLOYMENT || !rest.length) return;

const caller = fromActivityMessage(callActivityMessage);
const deploymentName = rest.join(':');

try {
// eslint-disable-next-line no-var
var caller = createCallerFromActivityMessage(callActivityMessage);

const { records } = await this.adapter.query(STORAGE_TYPE_STATE, { state: 'running', caller });
if (!records?.length) return;
const saveEngineStateOptions = callActivityMessage.content.settings?.saveEngineStateOptions;

this.engines.discardByToken(records[0].token);
const { records } = await this.adapter.query(STORAGE_TYPE_STATE, { state: 'running', caller }, saveEngineStateOptions);

if (!records?.length) {
return debug(`cancel called process ignored, found no running process with token ${caller.token}`);
}

debug(`discard ${deploymentName} token ${records[0].token} by call activity ${caller?.executionId}`);

await this.engines.discardByToken(records[0].token, this._bpmnEngineListener, { ...saveEngineStateOptions, resumedBy: caller.token });
} catch (err) {
debug(`failed to discard ${deploymentName} token ${caller.token} by call activity ${caller?.executionId}`, err);
this._bpmnEngineListener.emit('warn', err);
}
};

/**
Expand All @@ -793,24 +825,44 @@ BpmnEngineMiddleware.prototype._cancelProcessByCallActivity = async function can
*/
BpmnEngineMiddleware.prototype._postProcessDefinitionRun = async function postProcessDefinitionRun(definitionEndMessage) {
const { fields, content, properties } = definitionEndMessage;
const { caller } = content;
const { caller, resumedBy, settings } = content;
if (!caller) return;

try {
if (resumedBy === caller.token && !this.engines.getByToken(resumedBy)) {
return debug(
`process ${properties.deployment} (${properties.token}), resumed by ${caller.deployment} (${caller.token}), has already completed`
);
}

if (fields.routingKey === 'definition.error') {
await this.engines.resumeAndFailActivity(caller.token, this._bpmnEngineListener, {
...caller,
fromToken: properties.token,
message: content.error,
});
await this.engines.resumeAndFailActivity(
caller.token,
this._bpmnEngineListener,
{
...caller,
fromToken: properties.token,
message: content.error,
},
settings?.saveEngineStateOptions
);
} else {
await this.engines.resumeAndSignalActivity(caller.token, this._bpmnEngineListener, {
...caller,
fromToken: properties.token,
message: content.output,
});
await this.engines.resumeAndSignalActivity(
caller.token,
this._bpmnEngineListener,
{
...caller,
fromToken: properties.token,
message: content.output,
},
settings?.saveEngineStateOptions
);
}
} catch (err) {
debug(
`failed to post process ${properties.deployment} token ${properties.token} addressing ${caller.deployment} (${caller.token})`,
err
);
this._bpmnEngineListener.emit('warn', err);
}
};
Expand Down
10 changes: 6 additions & 4 deletions src/caller.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,24 @@ export class Caller {
* @param {string} id Calling activity id
* @param {string} type Calling activity type
* @param {string} executionId Calling activity execution id
* @param {number} [index] Calling activity multi-instance index
*/
constructor(token, deployment, id, type, executionId) {
constructor(token, deployment, id, type, executionId, index) {
this.token = token;
this.deployment = deployment;
this.id = id;
this.type = type;
this.executionId = executionId;
this.index = index;
}
}

/**
* Create caller from activity message
* @param {import('smqp').Message} activityApi
*/
export function fromActivityMessage(activityApi) {
export function createCallerFromActivityMessage(activityApi) {
const { token, deployment } = activityApi.properties;
const { id, type, executionId } = activityApi.content;
return new Caller(token, deployment, id, type, executionId);
const { id, type, executionId, index } = activityApi.content;
return new Caller(token, deployment, id, type, executionId, index);
}
4 changes: 3 additions & 1 deletion src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@ export const DEFAULT_IDLE_TIMER = 120000;
export const SAVE_STATE_ROUTINGKEY = 'activity.state.save';
export const ENABLE_SAVE_STATE_ROUTINGKEY = 'activity.state.save.enable';
export const DISABLE_SAVE_STATE_ROUTINGKEY = 'activity.state.save.disable';
export const ERR_STORAGE_KEY_NOT_FOUND = 'ERR_BPMN_MIDDLEWARE_STORAGE_KEY_NOT_FOUND';
export const MIDDLEWARE_DEFAULT_EXCHANGE = 'default';

export const ERR_STORAGE_KEY_NOT_FOUND = 'ERR_BPMN_MIDDLEWARE_STORAGE_KEY_NOT_FOUND';
export const ERR_COMPLETED = 'ERR_BPMN_MIDDLEWARE_COMPLETED';
Loading

0 comments on commit 345cf17

Please sign in to comment.