diff --git a/CHANGELOG.md b/CHANGELOG.md index 42c4a76..a19340c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 1.4.0 (November 26, 2021) +* Update `Re-assembled message` action: Make Message ID optional +* Update sailor version to 2.6.26 + ## 1.3.0 (October 1, 2021) * Update `Re-assembled message` action: Implemented option to select processing behavior of incoming messages. diff --git a/README.md b/README.md index 01ff1d0..c4bb457 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ The splitting expression is "users", action will return output: ### Split on JSONata Expression -This component takes the incoming message body and applies the configured JSONata tranformation on it. The evaluated transformation must be an array. This array is split and emitted into multiple messages. +This component takes the incoming message body and applies the configured JSONata transformation on it. The evaluated transformation must be an array. This array is split and emitted into multiple messages. For example, given the following message: @@ -98,7 +98,7 @@ Inverse of the split action: Given a stream of incoming messages a sum message i #### List of Expected Config fields ```Behavior``` - Has 3 different behaviour variants(options): * Produce Groups of Fixed Size (Don't Emit Partial Groups): A message is emitted once the group size is reached for the given group. If arriving messages for a particular group are less than the defined group size then the group will not be emitted. -* Group All Incoming Messages: All incomming messages will be gathered until there are no more incoming messages at which point messages will be emitted for each group. +* Group All Incoming Messages: All incoming messages will be gathered until there are no more incoming messages in the specifeid timeframe (delay timer) at which point messages will be emitted for each group. * Produce Groups of Fixed Size (Emit Partial Groups): Specify both group size and delay timer. Once a group is complete, that group will be emitted. Once there are no more incoming messages, then partially completed groups will also be emitted. Supported: @@ -120,7 +120,9 @@ If all the messages in the group do not arrive, then the group will not be emitt ```groupId``` - Globally unique id for the group to distinguish it from other groups. This value needs to be the same for all messages in a group. ```messageId``` - Id for a message to distinguish it from other messages in the group. -Must be unique per group but does not have to be globally unique. This value needs to be different for all messages in a group. +Must be unique per group but does not have to be globally unique. This value needs to be different for all messages in a group. +In case a messageId occures multiple times, only the messageData of the latest message survives. +If the messageId is not defined, a random guid will be generated and used as messageID. ```messageData``` - Data from individual messages can be inserted here in form of an object. This object is then inserted into an array which is available in the message emitted for this group. diff --git a/component.json b/component.json index b0a03a2..cb1f531 100644 --- a/component.json +++ b/component.json @@ -1,6 +1,6 @@ { "title": "Splitter", - "version": "1.3.0", + "version": "1.4.0", "description": "Splits a message into multiple messages.", "buildType":"docker", "actions": { diff --git a/lib/actions/reassemble.js b/lib/actions/reassemble.js index 8162aa4..5c2848b 100644 --- a/lib/actions/reassemble.js +++ b/lib/actions/reassemble.js @@ -1,5 +1,6 @@ // eslint-disable-next-line const { messages } = require('elasticio-node'); +const { v4: uuidv4 } = require('uuid'); const ObjectStorageWrapperExtended = require('./utils-wrapper/ObjectStorageWrapperExtended'); let timeHandle; @@ -18,13 +19,14 @@ async function timer(this_) { // eslint-disable-next-line no-await-in-loop await this_.emit('data', messages.newMessageWithBody({ - groupSize: results.messages.length, + groupSize: Object.keys(results.messageIdsSeen).length, groupId: results.messages[0].groupId, messageData: incomingData, })); // eslint-disable-next-line no-await-in-loop await storage.deleteObjectById(groupList[i]); } + groupList = []; } async function processAction(msg, cfg) { @@ -33,7 +35,7 @@ async function processAction(msg, cfg) { const { groupSize, groupId, - messageId, + messageId = uuidv4(), messageData, } = msg.body; const incomingData = {}; @@ -157,14 +159,14 @@ async function getMetaModel(cfg) { }, messageId: { type: 'string', - required: true, + required: false, title: 'Unique ID to describe this message', order: 4, }, groupSize: { type: 'number', required: true, - title: 'Number of messages produced by splitter', + title: 'Number of messages expected to be reassembled into the group', order: 3, }, messageData: { @@ -195,7 +197,7 @@ async function getMetaModel(cfg) { }, messageId: { type: 'string', - required: true, + required: false, title: 'Unique ID to describe this message', order: 4, }, @@ -236,13 +238,13 @@ async function getMetaModel(cfg) { }, messageId: { type: 'string', - required: true, + required: false, title: 'Unique ID to describe this message', order: 4, }, groupSize: { type: 'number', - title: 'Number of messages produced by splitter', + title: 'Number of messages expected to be reassembled into the group', order: 3, }, timersec: { diff --git a/package-lock.json b/package-lock.json index 7a54bc3..dca4996 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1051,6 +1051,13 @@ "request": "^2.85.0", "stream-counter": "1.0.0", "uuid": "3.0.1" + }, + "dependencies": { + "uuid": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-3.0.1.tgz", + "integrity": "sha1-ZUS7ot/ajBzxfmKaOjBeK7H+5sE=" + } } }, "elasticio-rest-node": { @@ -1233,6 +1240,11 @@ "psl": "^1.1.28", "punycode": "^2.1.1" } + }, + "uuid": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-3.0.1.tgz", + "integrity": "sha1-ZUS7ot/ajBzxfmKaOjBeK7H+5sE=" } } }, @@ -3356,9 +3368,9 @@ } }, "uuid": { - "version": "3.0.1", - "resolved": "https://registry.npmjs.org/uuid/-/uuid-3.0.1.tgz", - "integrity": "sha1-ZUS7ot/ajBzxfmKaOjBeK7H+5sE=" + "version": "8.3.2", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz", + "integrity": "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==" }, "v8-compile-cache": { "version": "2.1.1", diff --git a/package.json b/package.json index 7411c31..d87a795 100644 --- a/package.json +++ b/package.json @@ -18,8 +18,9 @@ "@elastic.io/component-commons-library": "1.1.4", "@elastic.io/maester-client": "3.3.0", "elasticio-node": "0.0.9", - "elasticio-sailor-nodejs": "2.6.24", - "lodash": "4.17.19" + "elasticio-sailor-nodejs": "2.6.26", + "lodash": "4.17.19", + "uuid": "8.3.2" }, "devDependencies": { "@elastic.io/component-logger": "0.0.1", diff --git a/spec/reassemble.spec.js b/spec/reassemble.spec.js index 0a45107..d8befee 100644 --- a/spec/reassemble.spec.js +++ b/spec/reassemble.spec.js @@ -289,4 +289,247 @@ describe('Split on JSONata ', () => { expect(getMessageGroup2.isDone()).to.equal(true); expect(deleteMessageGroup.isDone()).to.equal(true); }); + + it('Base Case: Group Size is 2, with different messageId and messageData', async () => { + const msg = [ + { + groupId: 'a', groupSize: 2, messageId: '1', messageData: '1-1', + }, + { + groupId: 'a', groupSize: 2, messageId: '2', messageData: '1-2', + }, + ]; + + // First Run + nock('https://ma.estr').get('/objects?query[externalid]=a').reply(200, []); + nock('https://ma.estr') + .post('/objects', { messages: [], messageIdsSeen: {} }) + .matchHeader('x-query-externalid', 'a') + .reply(200, { objectId: 'a' }); + nock('https://ma.estr') + .get('/objects/a') + .reply(200, { messages: [], messageIdsSeen: {} }); + nock('https://ma.estr').put('/objects/a').reply(200, {}); + nock('https://ma.estr') + .get('/objects/a') + .reply(200, { + messages: [{ + groupSize: 2, messageId: '1', groupId: 'a', messageData: '1-1', + }], + messageIdsSeen: { 1: '1' }, + }); + nock('https://ma.estr').put('/objects/a').reply(200, {}); + nock('https://ma.estr').delete('/objects/a').reply(200, {}); + + // Second Run + nock('https://ma.estr').get('/objects?query[externalid]=a').reply(200, []); + nock('https://ma.estr') + .post('/objects', { messages: [], messageIdsSeen: {} }) + .matchHeader('x-query-externalid', 'a') + .reply(200, { objectId: 'a' }); + nock('https://ma.estr') + .get('/objects/a') + .reply(200, { messages: [], messageIdsSeen: {} }); + nock('https://ma.estr').put('/objects/a').reply(200, {}); + nock('https://ma.estr') + .get('/objects/a') + .reply(200, { + messages: [{ + groupSize: 2, groupId: '1', messageId: '1', messageData: '1-1', + }, { + groupSize: 2, groupId: '2', messageId: '2', messageData: '1-2', + }], + messageIdsSeen: { 1: '1', 2: '2' }, + }); + nock('https://ma.estr').put('/objects/a').reply(200, {}); + nock('https://ma.estr').delete('/objects/a').reply(200, {}); + + for (let i = 0; i < msg.length; i += 1) { + // eslint-disable-next-line no-await-in-loop + await reassemble.process.call(self, { body: msg[i] }, { mode: 'groupSize' }); + + // eslint-disable-next-line default-case + switch (i) { + case 0: + expect(self.emit.callCount).to.be.equal(0); + break; + case 1: + expect(self.emit.callCount).to.be.equal(1); + expect(self.emit.lastCall.args[1].body).to.deep.equal({ + groupSize: 2, + groupId: 'a', + messageData: { + 1: '1-1', + 2: '1-2', + }, + }); + break; + } + } + }); + + it('Base Case: Group Size is 2, with messageId UNDEFINED and messageData defined', async () => { + const msg = [ + { + groupId: 'b', groupSize: 2, messageData: '1-1', + }, + { + groupId: 'b', groupSize: 2, messageData: '1-2', + }, + ]; + + // First Run + nock('https://ma.estr').get('/objects?query[externalid]=b').reply(200, []); + nock('https://ma.estr') + .post('/objects', { messages: [], messageIdsSeen: {} }) + .matchHeader('x-query-externalid', 'b') + .reply(200, { objectId: 'b' }); + nock('https://ma.estr') + .get('/objects/b') + .reply(200, { messages: [], messageIdsSeen: {} }); + nock('https://ma.estr').put('/objects/b').reply(200, {}); + nock('https://ma.estr') + .get('/objects/b') + .reply(200, { + messages: [{ + groupSize: 2, groupId: 'b', messageData: '1-1', + }], + messageIdsSeen: { 1: '1' }, + }); + nock('https://ma.estr').put('/objects/b').reply(200, {}); + nock('https://ma.estr').delete('/objects/b').reply(200, {}); + + // Second Run + nock('https://ma.estr').get('/objects?query[externalid]=b').reply(200, []); + nock('https://ma.estr') + .post('/objects', { messages: [], messageIdsSeen: {} }) + .matchHeader('x-query-externalid', 'b') + .reply(200, { objectId: 'b' }); + nock('https://ma.estr') + .get('/objects/b') + .reply(200, { messages: [], messageIdsSeen: {} }); + nock('https://ma.estr').put('/objects/b').reply(200, {}); + nock('https://ma.estr') + .get('/objects/b') + .reply(200, { + messages: [{ + groupSize: 2, groupId: 'b', messageData: '1-1', + }, { + groupSize: 2, groupId: 'b', messageData: '1-2', + }], + messageIdsSeen: { 1: '1', 2: '2' }, + }); + nock('https://ma.estr').put('/objects/b').reply(200, {}); + nock('https://ma.estr').delete('/objects/b').reply(200, {}); + + for (let i = 0; i < msg.length; i += 1) { + // eslint-disable-next-line no-await-in-loop + await reassemble.process.call(self, { body: msg[i] }, { mode: 'groupSize' }); + + // eslint-disable-next-line default-case + switch (i) { + case 0: + expect(self.emit.callCount).to.be.equal(0); + break; + case 1: + expect(self.emit.callCount).to.be.equal(1); + // eslint-disable-next-line no-case-declarations + const results = self.emit.lastCall.args[1].body; + expect(results).to.deep.equal({ + groupSize: 2, + groupId: 'b', + messageData: results.messageData, + }); + break; + } + } + }); + + it('Base Case: Using time delay, with messageId UNDEFINED and messageData defined', async () => { + const msg = [ + { + groupId: 'c', timersec: 1000, messageData: '1-1', + }, + { + groupId: 'c', timersec: 1000, messageData: '1-2', + }, + ]; + + // First Run + nock('https://ma.estr').get('/objects?query[externalid]=c').reply(200, []); + nock('https://ma.estr') + .post('/objects', { messages: [], messageIdsSeen: {} }) + .matchHeader('x-query-externalid', 'c') + .reply(200, { objectId: 'c' }); + nock('https://ma.estr') + .get('/objects/c') + .reply(200, { messages: [], messageIdsSeen: {} }); + nock('https://ma.estr').put('/objects/c').reply(200, {}); + nock('https://ma.estr') + .get('/objects/c') + .reply(200, { + messages: [{ + groupId: 'c', messageData: '1-1', + }], + messageIdsSeen: { }, + }); + nock('https://ma.estr').put('/objects/c').reply(200, {}); + + // Second Run + nock('https://ma.estr').get('/objects?query[externalid]=c').reply(200, []); + nock('https://ma.estr') + .post('/objects', { messages: [], messageIdsSeen: {} }) + .matchHeader('x-query-externalid', 'c') + .reply(200, { objectId: 'c' }); + nock('https://ma.estr') + .get('/objects/c') + .reply(200, { messages: [], messageIdsSeen: {} }); + nock('https://ma.estr').put('/objects/c').reply(200, {}); + nock('https://ma.estr') + .get('/objects/c') + .reply(200, { + messages: [{ + groupId: 'c', messageData: '1-1', + }, { + groupId: 'c', messageData: '1-2', + }], + messageIdsSeen: { }, + }); + nock('https://ma.estr').put('/objects/c').reply(200, {}); + nock('https://ma.estr') + .get('/objects/c') + .reply(200, { + messages: [{ + groupId: 'c', messageData: '1-1', + }, { + groupId: 'c', messageData: '1-2', + }], + messageIdsSeen: { 1: '1', 2: '2' }, + }); + nock('https://ma.estr').delete('/objects/c').reply(200, {}); + + for (let i = 0; i < msg.length; i += 1) { + // eslint-disable-next-line no-await-in-loop + await reassemble.process.call(self, { body: msg[i] }, { mode: 'timeout' }); + + // eslint-disable-next-line default-case + switch (i) { + case 0: + expect(self.emit.callCount).to.be.equal(0); + break; + case 1: + // eslint-disable-next-line no-await-in-loop + await sleep(1500); + expect(self.emit.callCount).to.be.equal(1); + // eslint-disable-next-line no-case-declarations + const results = self.emit.lastCall.args[1].body; + expect(results).to.deep.equal({ + groupId: 'c', + groupSize: 2, + messageData: results.messageData, + }); + break; + } + } + }); });