diff --git a/README.md b/README.md index 01ff1d0..3c7acb5 100644 --- a/README.md +++ b/README.md @@ -98,7 +98,7 @@ Inverse of the split action: Given a stream of incoming messages a sum message i #### List of Expected Config fields ```Behavior``` - Has 3 different behaviour variants(options): * Produce Groups of Fixed Size (Don't Emit Partial Groups): A message is emitted once the group size is reached for the given group. If arriving messages for a particular group are less than the defined group size then the group will not be emitted. -* Group All Incoming Messages: All incomming messages will be gathered until there are no more incoming messages at which point messages will be emitted for each group. +* Group All Incoming Messages: All incomming messages will be gathered until there are no more incoming messages in the specifeid timeframe (delay timer) at which point messages will be emitted for each group. * Produce Groups of Fixed Size (Emit Partial Groups): Specify both group size and delay timer. Once a group is complete, that group will be emitted. Once there are no more incoming messages, then partially completed groups will also be emitted. Supported: @@ -120,7 +120,9 @@ If all the messages in the group do not arrive, then the group will not be emitt ```groupId``` - Globally unique id for the group to distinguish it from other groups. This value needs to be the same for all messages in a group. ```messageId``` - Id for a message to distinguish it from other messages in the group. -Must be unique per group but does not have to be globally unique. This value needs to be different for all messages in a group. +Must be unique per group but does not have to be globally unique. This value needs to be different for all messages in a group. +In case a messageId occures multiple times, only the messageData of the latest message survives. +If the messageId is not defined, a random guid will be generated and used as messageID. ```messageData``` - Data from individual messages can be inserted here in form of an object. This object is then inserted into an array which is available in the message emitted for this group. diff --git a/lib/actions/reassemble.js b/lib/actions/reassemble.js index 8162aa4..057370c 100644 --- a/lib/actions/reassemble.js +++ b/lib/actions/reassemble.js @@ -1,5 +1,6 @@ // eslint-disable-next-line const { messages } = require('elasticio-node'); +const { v4: uuidv4 } = require('uuid'); const ObjectStorageWrapperExtended = require('./utils-wrapper/ObjectStorageWrapperExtended'); let timeHandle; @@ -18,13 +19,14 @@ async function timer(this_) { // eslint-disable-next-line no-await-in-loop await this_.emit('data', messages.newMessageWithBody({ - groupSize: results.messages.length, + groupSize: Object.keys(results.messageIdsSeen).length, groupId: results.messages[0].groupId, messageData: incomingData, })); // eslint-disable-next-line no-await-in-loop await storage.deleteObjectById(groupList[i]); } + groupList = []; } async function processAction(msg, cfg) { @@ -33,7 +35,7 @@ async function processAction(msg, cfg) { const { groupSize, groupId, - messageId, + messageId = uuidv4(), messageData, } = msg.body; const incomingData = {}; @@ -157,7 +159,7 @@ async function getMetaModel(cfg) { }, messageId: { type: 'string', - required: true, + required: false, title: 'Unique ID to describe this message', order: 4, }, @@ -195,7 +197,7 @@ async function getMetaModel(cfg) { }, messageId: { type: 'string', - required: true, + required: false, title: 'Unique ID to describe this message', order: 4, }, @@ -236,7 +238,7 @@ async function getMetaModel(cfg) { }, messageId: { type: 'string', - required: true, + required: false, title: 'Unique ID to describe this message', order: 4, }, diff --git a/package-lock.json b/package-lock.json index 7a54bc3..dca4996 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1051,6 +1051,13 @@ "request": "^2.85.0", "stream-counter": "1.0.0", "uuid": "3.0.1" + }, + "dependencies": { + "uuid": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-3.0.1.tgz", + "integrity": "sha1-ZUS7ot/ajBzxfmKaOjBeK7H+5sE=" + } } }, "elasticio-rest-node": { @@ -1233,6 +1240,11 @@ "psl": "^1.1.28", "punycode": "^2.1.1" } + }, + "uuid": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-3.0.1.tgz", + "integrity": "sha1-ZUS7ot/ajBzxfmKaOjBeK7H+5sE=" } } }, @@ -3356,9 +3368,9 @@ } }, "uuid": { - "version": "3.0.1", - "resolved": "https://registry.npmjs.org/uuid/-/uuid-3.0.1.tgz", - "integrity": "sha1-ZUS7ot/ajBzxfmKaOjBeK7H+5sE=" + "version": "8.3.2", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz", + "integrity": "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==" }, "v8-compile-cache": { "version": "2.1.1", diff --git a/package.json b/package.json index 52dd2ea..d87a795 100644 --- a/package.json +++ b/package.json @@ -19,7 +19,8 @@ "@elastic.io/maester-client": "3.3.0", "elasticio-node": "0.0.9", "elasticio-sailor-nodejs": "2.6.26", - "lodash": "4.17.19" + "lodash": "4.17.19", + "uuid": "8.3.2" }, "devDependencies": { "@elastic.io/component-logger": "0.0.1", diff --git a/spec/reassemble.spec.js b/spec/reassemble.spec.js index 0a45107..d8befee 100644 --- a/spec/reassemble.spec.js +++ b/spec/reassemble.spec.js @@ -289,4 +289,247 @@ describe('Split on JSONata ', () => { expect(getMessageGroup2.isDone()).to.equal(true); expect(deleteMessageGroup.isDone()).to.equal(true); }); + + it('Base Case: Group Size is 2, with different messageId and messageData', async () => { + const msg = [ + { + groupId: 'a', groupSize: 2, messageId: '1', messageData: '1-1', + }, + { + groupId: 'a', groupSize: 2, messageId: '2', messageData: '1-2', + }, + ]; + + // First Run + nock('https://ma.estr').get('/objects?query[externalid]=a').reply(200, []); + nock('https://ma.estr') + .post('/objects', { messages: [], messageIdsSeen: {} }) + .matchHeader('x-query-externalid', 'a') + .reply(200, { objectId: 'a' }); + nock('https://ma.estr') + .get('/objects/a') + .reply(200, { messages: [], messageIdsSeen: {} }); + nock('https://ma.estr').put('/objects/a').reply(200, {}); + nock('https://ma.estr') + .get('/objects/a') + .reply(200, { + messages: [{ + groupSize: 2, messageId: '1', groupId: 'a', messageData: '1-1', + }], + messageIdsSeen: { 1: '1' }, + }); + nock('https://ma.estr').put('/objects/a').reply(200, {}); + nock('https://ma.estr').delete('/objects/a').reply(200, {}); + + // Second Run + nock('https://ma.estr').get('/objects?query[externalid]=a').reply(200, []); + nock('https://ma.estr') + .post('/objects', { messages: [], messageIdsSeen: {} }) + .matchHeader('x-query-externalid', 'a') + .reply(200, { objectId: 'a' }); + nock('https://ma.estr') + .get('/objects/a') + .reply(200, { messages: [], messageIdsSeen: {} }); + nock('https://ma.estr').put('/objects/a').reply(200, {}); + nock('https://ma.estr') + .get('/objects/a') + .reply(200, { + messages: [{ + groupSize: 2, groupId: '1', messageId: '1', messageData: '1-1', + }, { + groupSize: 2, groupId: '2', messageId: '2', messageData: '1-2', + }], + messageIdsSeen: { 1: '1', 2: '2' }, + }); + nock('https://ma.estr').put('/objects/a').reply(200, {}); + nock('https://ma.estr').delete('/objects/a').reply(200, {}); + + for (let i = 0; i < msg.length; i += 1) { + // eslint-disable-next-line no-await-in-loop + await reassemble.process.call(self, { body: msg[i] }, { mode: 'groupSize' }); + + // eslint-disable-next-line default-case + switch (i) { + case 0: + expect(self.emit.callCount).to.be.equal(0); + break; + case 1: + expect(self.emit.callCount).to.be.equal(1); + expect(self.emit.lastCall.args[1].body).to.deep.equal({ + groupSize: 2, + groupId: 'a', + messageData: { + 1: '1-1', + 2: '1-2', + }, + }); + break; + } + } + }); + + it('Base Case: Group Size is 2, with messageId UNDEFINED and messageData defined', async () => { + const msg = [ + { + groupId: 'b', groupSize: 2, messageData: '1-1', + }, + { + groupId: 'b', groupSize: 2, messageData: '1-2', + }, + ]; + + // First Run + nock('https://ma.estr').get('/objects?query[externalid]=b').reply(200, []); + nock('https://ma.estr') + .post('/objects', { messages: [], messageIdsSeen: {} }) + .matchHeader('x-query-externalid', 'b') + .reply(200, { objectId: 'b' }); + nock('https://ma.estr') + .get('/objects/b') + .reply(200, { messages: [], messageIdsSeen: {} }); + nock('https://ma.estr').put('/objects/b').reply(200, {}); + nock('https://ma.estr') + .get('/objects/b') + .reply(200, { + messages: [{ + groupSize: 2, groupId: 'b', messageData: '1-1', + }], + messageIdsSeen: { 1: '1' }, + }); + nock('https://ma.estr').put('/objects/b').reply(200, {}); + nock('https://ma.estr').delete('/objects/b').reply(200, {}); + + // Second Run + nock('https://ma.estr').get('/objects?query[externalid]=b').reply(200, []); + nock('https://ma.estr') + .post('/objects', { messages: [], messageIdsSeen: {} }) + .matchHeader('x-query-externalid', 'b') + .reply(200, { objectId: 'b' }); + nock('https://ma.estr') + .get('/objects/b') + .reply(200, { messages: [], messageIdsSeen: {} }); + nock('https://ma.estr').put('/objects/b').reply(200, {}); + nock('https://ma.estr') + .get('/objects/b') + .reply(200, { + messages: [{ + groupSize: 2, groupId: 'b', messageData: '1-1', + }, { + groupSize: 2, groupId: 'b', messageData: '1-2', + }], + messageIdsSeen: { 1: '1', 2: '2' }, + }); + nock('https://ma.estr').put('/objects/b').reply(200, {}); + nock('https://ma.estr').delete('/objects/b').reply(200, {}); + + for (let i = 0; i < msg.length; i += 1) { + // eslint-disable-next-line no-await-in-loop + await reassemble.process.call(self, { body: msg[i] }, { mode: 'groupSize' }); + + // eslint-disable-next-line default-case + switch (i) { + case 0: + expect(self.emit.callCount).to.be.equal(0); + break; + case 1: + expect(self.emit.callCount).to.be.equal(1); + // eslint-disable-next-line no-case-declarations + const results = self.emit.lastCall.args[1].body; + expect(results).to.deep.equal({ + groupSize: 2, + groupId: 'b', + messageData: results.messageData, + }); + break; + } + } + }); + + it('Base Case: Using time delay, with messageId UNDEFINED and messageData defined', async () => { + const msg = [ + { + groupId: 'c', timersec: 1000, messageData: '1-1', + }, + { + groupId: 'c', timersec: 1000, messageData: '1-2', + }, + ]; + + // First Run + nock('https://ma.estr').get('/objects?query[externalid]=c').reply(200, []); + nock('https://ma.estr') + .post('/objects', { messages: [], messageIdsSeen: {} }) + .matchHeader('x-query-externalid', 'c') + .reply(200, { objectId: 'c' }); + nock('https://ma.estr') + .get('/objects/c') + .reply(200, { messages: [], messageIdsSeen: {} }); + nock('https://ma.estr').put('/objects/c').reply(200, {}); + nock('https://ma.estr') + .get('/objects/c') + .reply(200, { + messages: [{ + groupId: 'c', messageData: '1-1', + }], + messageIdsSeen: { }, + }); + nock('https://ma.estr').put('/objects/c').reply(200, {}); + + // Second Run + nock('https://ma.estr').get('/objects?query[externalid]=c').reply(200, []); + nock('https://ma.estr') + .post('/objects', { messages: [], messageIdsSeen: {} }) + .matchHeader('x-query-externalid', 'c') + .reply(200, { objectId: 'c' }); + nock('https://ma.estr') + .get('/objects/c') + .reply(200, { messages: [], messageIdsSeen: {} }); + nock('https://ma.estr').put('/objects/c').reply(200, {}); + nock('https://ma.estr') + .get('/objects/c') + .reply(200, { + messages: [{ + groupId: 'c', messageData: '1-1', + }, { + groupId: 'c', messageData: '1-2', + }], + messageIdsSeen: { }, + }); + nock('https://ma.estr').put('/objects/c').reply(200, {}); + nock('https://ma.estr') + .get('/objects/c') + .reply(200, { + messages: [{ + groupId: 'c', messageData: '1-1', + }, { + groupId: 'c', messageData: '1-2', + }], + messageIdsSeen: { 1: '1', 2: '2' }, + }); + nock('https://ma.estr').delete('/objects/c').reply(200, {}); + + for (let i = 0; i < msg.length; i += 1) { + // eslint-disable-next-line no-await-in-loop + await reassemble.process.call(self, { body: msg[i] }, { mode: 'timeout' }); + + // eslint-disable-next-line default-case + switch (i) { + case 0: + expect(self.emit.callCount).to.be.equal(0); + break; + case 1: + // eslint-disable-next-line no-await-in-loop + await sleep(1500); + expect(self.emit.callCount).to.be.equal(1); + // eslint-disable-next-line no-case-declarations + const results = self.emit.lastCall.args[1].body; + expect(results).to.deep.equal({ + groupId: 'c', + groupSize: 2, + messageData: results.messageData, + }); + break; + } + } + }); });