Skip to content

Commit

Permalink
add ability to pass options to adapter upsert
Browse files Browse the repository at this point in the history
  • Loading branch information
paed01 committed Dec 30, 2024
1 parent 12b3d57 commit 163a0e8
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 15 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

## [0.15.2] - 2024-12-30

- add some save state options, more docs to come

## [0.15.1] - 2024-12-28

- accept options when deleting from storaga adapter
Expand Down
4 changes: 3 additions & 1 deletion docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ Create BPMN engine middleware.
Options:

- `adapter`: Optional [storage adapter](#storage-adapter). Defaults to in-memory adapter based on [LRU cache](https://www.npmjs.com/package/lru-cache)
- `engineOptions`: Optional BPMN Engine [options](https://github.com/paed01/bpmn-engine/blob/master/docs/API.md)
- `engineOptions`: Optional BPMN Engine [options](https://github.com/paed01/bpmn-engine/blob/master/docs/API.md) with some optional properties
- `settings`: optional engine settings
- `saveEngineStateOptions`: optional object passed to adapter options
- `maxRunning`: Optional number declaring number of max running engines per instance, passed to engines LRU Cache as max, defaults to 1000
- `engineCache`: Optional engine [LRU](https://www.npmjs.com/package/lru-cache) in-memory cache, defaults to `new LRUCache({ max: 1000, disposeAfter(engine) })`
- `broker`: Optional [smqp](https://npmjs.com/package/smqp) broker, used for forwarding events from executing engines, events are shoveled on middleware name topic exchange
Expand Down
19 changes: 14 additions & 5 deletions example/processes/save-state.bpmn
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:camunda="http://camunda.org/schema/1.0/bpmn" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_19hstvm" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.28.0" modeler:executionPlatform="Camunda Platform" modeler:executionPlatformVersion="7.21.0">
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:camunda="http://camunda.org/schema/1.0/bpmn" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_19hstvm" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.30.0" modeler:executionPlatform="Camunda Platform" modeler:executionPlatformVersion="7.21.0">
<bpmn:process id="Process_0" isExecutable="true" camunda:historyTimeToLive="P1D">
<bpmn:startEvent id="StartEvent_1">
<bpmn:outgoing>Flow_1nwcici</bpmn:outgoing>
Expand All @@ -21,6 +21,7 @@
<camunda:connector>
<camunda:inputOutput>
<camunda:inputParameter name="ttl">${30000}</camunda:inputParameter>
<camunda:inputParameter name="mandatoryProp">${true}</camunda:inputParameter>
</camunda:inputOutput>
<camunda:connectorId>saveState</camunda:connectorId>
</camunda:connector>
Expand Down Expand Up @@ -84,7 +85,15 @@
<bpmn:incoming>Flow_06oy5wv</bpmn:incoming>
<bpmn:terminateEventDefinition id="TerminateEventDefinition_0m6c4q0" />
</bpmn:endEvent>
<bpmn:serviceTask id="enable-save-state" name="Enable save state" camunda:expression="${environment.services.enableSaveState}">
<bpmn:serviceTask id="enable-save-state" name="Enable save state">
<bpmn:extensionElements>
<camunda:connector>
<camunda:inputOutput>
<camunda:inputParameter name="mandatoryProp">${true}</camunda:inputParameter>
</camunda:inputOutput>
<camunda:connectorId>enableSaveState</camunda:connectorId>
</camunda:connector>
</bpmn:extensionElements>
<bpmn:incoming>to-enable-save-state</bpmn:incoming>
<bpmn:outgoing>to-end</bpmn:outgoing>
</bpmn:serviceTask>
Expand All @@ -101,9 +110,6 @@
<bpmndi:BPMNShape id="Event_1ezzjai_di" bpmnElement="Event_1ezzjai">
<dc:Bounds x="360" y="202" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0wws7pu_di" bpmnElement="save-state">
<dc:Bounds x="450" y="180" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Gateway_0z27whi_di" bpmnElement="Gateway_0qosno0">
<dc:Bounds x="605" y="195" width="50" height="50" />
</bpmndi:BPMNShape>
Expand All @@ -119,6 +125,9 @@
<bpmndi:BPMNShape id="Event_0e6c6xr_di" bpmnElement="Event_0e6c6xr">
<dc:Bounds x="812" y="202" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0wws7pu_di" bpmnElement="save-state">
<dc:Bounds x="450" y="180" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_0ntsbez_di" bpmnElement="Flow_0ntsbez">
<di:waypoint x="396" y="220" />
<di:waypoint x="450" y="220" />
Expand Down
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bpmn-middleware",
"version": "0.15.1",
"version": "0.15.2",
"description": "BPMN engine express middleware",
"type": "module",
"main": "./dist/main.cjs",
Expand Down Expand Up @@ -76,7 +76,6 @@
"bpmn-engine": ">=15",
"debug": "*",
"express": ">=4",
"moddle-context-serializer": "*",
"smqp": "*"
},
"files": [
Expand Down
7 changes: 5 additions & 2 deletions src/bpmn-middleware.js
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,9 @@ BpmnEngineMiddleware.prototype._resumeOptions = function resumeOptions(req, res,
case 'autosaveenginestate': {
options.autosaveEngineState = v === 'false' ? false : true;
}
default: {
options[k] = v;
}
}
}

Expand Down Expand Up @@ -669,12 +672,12 @@ BpmnEngineMiddleware.prototype._startProcessByCallActivity = async function star
* @returns {Promise<{id:string}>} Started with id token
*/
BpmnEngineMiddleware.prototype._startDeployment = async function startDeployment(deploymentName, options) {
const deployment = await this.adapter.fetch(STORAGE_TYPE_DEPLOYMENT, deploymentName);
const deployment = await this.adapter.fetch(STORAGE_TYPE_DEPLOYMENT, deploymentName, options);
if (!deployment) {
throw new HttpError(`deployment with name ${deploymentName} does not exist`, 404, 'BPMN_DEPLOYMENT_NOT_FOUND');
}

const deploymentSource = await this.adapter.fetch(STORAGE_TYPE_FILE, deployment[0].path);
const deploymentSource = await this.adapter.fetch(STORAGE_TYPE_FILE, deployment[0].path, options);

const { listener, variables, businessKey, caller, idleTimeout } = options;
const token = randomUUID();
Expand Down
11 changes: 7 additions & 4 deletions src/engines.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ Engines.prototype.resume = async function resume(token, listener, options) {
/** @type {MiddlewareEngine} */
let engine = engineCache.get(token);
/** @type {import('types').MiddlewareEngineState} */
const state = await this.adapter.fetch(STORAGE_TYPE_STATE, token);
const state = await this.adapter.fetch(STORAGE_TYPE_STATE, token, options);

if (!state && !engine) {
throw new HttpError(`Token ${token} not found`, 404);
Expand Down Expand Up @@ -191,7 +191,6 @@ Engines.prototype.resumeAndCancelActivity = async function cancelActivity(token,
* @param {string} token
* @param {import('bpmn-engine').IListenerEmitter} listener
* @param {import('types').SignalBody} body
* @param {import('types').ResumeOptions} [options]
*/
Engines.prototype.resuemAndFailActivity = async function failActivity(token, listener, body, options) {
Expand Down Expand Up @@ -541,17 +540,21 @@ Engines.prototype._onStateMessage = async function onStateMessage(routingKey, me

let saveState = autosaveEngineState;
let saveStateIfExists = false;
let saveStateOptions;
let saveStateOptions = { ...engine.environment.settings.saveEngineStateOptions };

try {
switch (routingKey) {
case SAVE_STATE_ROUTINGKEY: {
saveState = true;
saveStateOptions = message.content;
saveStateOptions = { ...saveStateOptions, ...message.content };
break;
}
case ENABLE_SAVE_STATE_ROUTINGKEY: {
engine.environment.settings.autosaveEngineState = true;
engine.environment.settings.saveEngineStateOptions = Object.assign(
engine.environment.settings.saveEngineStateOptions || {},
message.content
);
saveState = false;
break;
}
Expand Down
2 changes: 1 addition & 1 deletion src/memory-adapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ MemoryAdapter.prototype.delete = function deleteByKey(type, key) {
* @param {any} [options] Passed as fetch options to LRU cache
*/
MemoryAdapter.prototype.fetch = async function fetch(type, key, options) {
const value = await this.storage.fetch(`${type}:${key}`, options);
const value = await this.storage.fetch(`${type}:${key}`, { ...options, context: { ...options } });
if (!value) return value;

const data = JSON.parse(value);
Expand Down
165 changes: 165 additions & 0 deletions test/features/resume-feature.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import request from 'supertest';

import * as testHelpers from '../helpers/test-helpers.js';
import { MemoryAdapter, STORAGE_TYPE_STATE, StorageError } from '../../src/index.js';

const saveStateResource = testHelpers.getExampleResource('save-state.bpmn');

class MyStorageAdapter extends MemoryAdapter {
update(type, key, value, options) {
this.assertStateAndMandatoryProps(type, options);
return super.update(type, key, value, options);
}
upsert(type, key, value, options) {
this.assertStateAndMandatoryProps(type, options);
return super.upsert(type, key, value, options);
}
fetch(type, key, options) {
this.assertStateAndMandatoryProps(type, options);
return super.fetch(type, key, options);
}
assertStateAndMandatoryProps(type, options) {
if (type === STORAGE_TYPE_STATE && !options?.mandatoryProp)
throw new StorageError('cannot use adapter if mandatory prop is not present');
}
}

Feature('resume from state', () => {
Scenario('adapter with special needs', () => {
/** @type {MemoryAdapter} */
let adapter;
/** @type {ReturnType<testHelpers.horizontallyScaled>} */
let apps;
/** @type {ReturnType<testHelpers.horizontallyScaled>} */
let appsWithAutosave;
before(() => {
adapter = new MyStorageAdapter();
apps = testHelpers.horizontallyScaled(2, { autosaveEngineState: false, adapter });
});
after(() => {
apps?.stop();
});

let deploymentName;
Given('a source matching scenario is deployed', async () => {
deploymentName = 'save-state-with-props';
await testHelpers.createDeployment(apps.balance(), deploymentName, saveStateResource);
});

let startingApp;
let timer;
let token;
let response;
When('process is started with disabled auto save state', async () => {
startingApp = apps.balance();
timer = testHelpers.waitForProcess(startingApp, deploymentName).timer();

response = await request(startingApp).post(`/rest/process-definition/${deploymentName}/start`).expect(201);

token = response.body.id;
});

Then('timer is started', () => {
return timer;
});

let completed;
When('timer times out', () => {
const [engine] = apps.getRunningByToken(token);
completed = engine.waitFor('end');
const timer = engine.environment.timers.executing.find((t) => t.owner.id === 'timeout');
timer.callback();
});

Then('run completes', () => {
return completed;
});

let signalApp;
When('attempting to signal message event without any query parameters', async () => {
signalApp = apps.balance();
response = await request(signalApp)
.post('/rest/signal/' + token)
.send({
id: 'Message_0',
});
});

Then('bad request is returned since adapter has special needs', () => {
expect(response.statusCode, response.text).to.equal(502);
expect(response.body.message).to.match(/cannot use adapter/);
});

When('signalling message event with mandatory adapter props as query parameters', async () => {
completed = testHelpers.waitForProcess(signalApp, deploymentName).end();

response = await request(signalApp)
.post('/rest/signal/' + token)
.query({ mandatoryProp: true })
.send({
id: 'Message_0',
});
});

Then('resumed run completes', () => {
return completed;
});

describe('auto-save is enabled', () => {
Given('a new middleware is added with auto save enabled and engine settings addressing save state', () => {
appsWithAutosave = testHelpers.horizontallyScaled(2, {
adapter,
autosaveEngineState: true,
engineOptions: {
settings: {
saveEngineStateOptions: {
mandatoryProp: true,
},
},
},
});
});

When('process is started', async () => {
startingApp = appsWithAutosave.balance();
timer = testHelpers.waitForProcess(startingApp, deploymentName).timer();

response = await request(startingApp).post(`/rest/process-definition/${deploymentName}/start`).expect(201);

token = response.body.id;
});

Then('timer is started', () => {
return timer;
});

When('timer times out', () => {
const [engine] = appsWithAutosave.getRunningByToken(token);
completed = engine.waitFor('end');
const timer = engine.environment.timers.executing.find((t) => t.owner.id === 'timeout');
timer.callback();
});

Then('run completes by termination event', () => {
return completed;
});

When('signalling message event with mandatory adapter props as query parameters', async () => {
signalApp = appsWithAutosave.balance();

completed = testHelpers.waitForProcess(signalApp, deploymentName).end();

response = await request(signalApp)
.post('/rest/signal/' + token)
.query({ mandatoryProp: true })
.send({
id: 'Message_0',
});
});

Then('bad request is returned', () => {
expect(response.statusCode, response.text).to.equal(400);
});
});
});
});

0 comments on commit 163a0e8

Please sign in to comment.