Skip to content

Commit

Permalink
Optional message (#78)
Browse files Browse the repository at this point in the history
* building an optional messageID
  • Loading branch information
FranckTala authored Nov 24, 2021
1 parent 1d9cfb6 commit de3e82e
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 11 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ Inverse of the split action: Given a stream of incoming messages a sum message i
#### List of Expected Config fields
```Behavior``` - Has 3 different behaviour variants(options):
* Produce Groups of Fixed Size (Don't Emit Partial Groups): A message is emitted once the group size is reached for the given group. If arriving messages for a particular group are less than the defined group size then the group will not be emitted.
* Group All Incoming Messages: All incomming messages will be gathered until there are no more incoming messages at which point messages will be emitted for each group.
* Group All Incoming Messages: All incomming messages will be gathered until there are no more incoming messages in the specifeid timeframe (delay timer) at which point messages will be emitted for each group.
* Produce Groups of Fixed Size (Emit Partial Groups): Specify both group size and delay timer. Once a group is complete, that group will be emitted. Once there are no more incoming messages, then partially completed groups will also be emitted.

Supported:
Expand All @@ -120,7 +120,9 @@ If all the messages in the group do not arrive, then the group will not be emitt
```groupId``` - Globally unique id for the group to distinguish it from other groups. This value needs to be the same for all messages in a group.

```messageId``` - Id for a message to distinguish it from other messages in the group.
Must be unique per group but does not have to be globally unique. This value needs to be different for all messages in a group.
Must be unique per group but does not have to be globally unique. This value needs to be different for all messages in a group.
In case a messageId occures multiple times, only the messageData of the latest message survives.
If the messageId is not defined, a random guid will be generated and used as messageID.

```messageData``` - Data from individual messages can be inserted here in form of an object. This object is then inserted into an array which is available in the message emitted for this group.

Expand Down
12 changes: 7 additions & 5 deletions lib/actions/reassemble.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// eslint-disable-next-line
const { messages } = require('elasticio-node');
const { v4: uuidv4 } = require('uuid');
const ObjectStorageWrapperExtended = require('./utils-wrapper/ObjectStorageWrapperExtended');

let timeHandle;
Expand All @@ -18,13 +19,14 @@ async function timer(this_) {

// eslint-disable-next-line no-await-in-loop
await this_.emit('data', messages.newMessageWithBody({
groupSize: results.messages.length,
groupSize: Object.keys(results.messageIdsSeen).length,
groupId: results.messages[0].groupId,
messageData: incomingData,
}));
// eslint-disable-next-line no-await-in-loop
await storage.deleteObjectById(groupList[i]);
}
groupList = [];
}

async function processAction(msg, cfg) {
Expand All @@ -33,7 +35,7 @@ async function processAction(msg, cfg) {
const {
groupSize,
groupId,
messageId,
messageId = uuidv4(),
messageData,
} = msg.body;
const incomingData = {};
Expand Down Expand Up @@ -157,7 +159,7 @@ async function getMetaModel(cfg) {
},
messageId: {
type: 'string',
required: true,
required: false,
title: 'Unique ID to describe this message',
order: 4,
},
Expand Down Expand Up @@ -195,7 +197,7 @@ async function getMetaModel(cfg) {
},
messageId: {
type: 'string',
required: true,
required: false,
title: 'Unique ID to describe this message',
order: 4,
},
Expand Down Expand Up @@ -236,7 +238,7 @@ async function getMetaModel(cfg) {
},
messageId: {
type: 'string',
required: true,
required: false,
title: 'Unique ID to describe this message',
order: 4,
},
Expand Down
18 changes: 15 additions & 3 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
"@elastic.io/maester-client": "3.3.0",
"elasticio-node": "0.0.9",
"elasticio-sailor-nodejs": "2.6.26",
"lodash": "4.17.19"
"lodash": "4.17.19",
"uuid": "8.3.2"
},
"devDependencies": {
"@elastic.io/component-logger": "0.0.1",
Expand Down
243 changes: 243 additions & 0 deletions spec/reassemble.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -289,4 +289,247 @@ describe('Split on JSONata ', () => {
expect(getMessageGroup2.isDone()).to.equal(true);
expect(deleteMessageGroup.isDone()).to.equal(true);
});

it('Base Case: Group Size is 2, with different messageId and messageData', async () => {
const msg = [
{
groupId: 'a', groupSize: 2, messageId: '1', messageData: '1-1',
},
{
groupId: 'a', groupSize: 2, messageId: '2', messageData: '1-2',
},
];

// First Run
nock('https://ma.estr').get('/objects?query[externalid]=a').reply(200, []);
nock('https://ma.estr')
.post('/objects', { messages: [], messageIdsSeen: {} })
.matchHeader('x-query-externalid', 'a')
.reply(200, { objectId: 'a' });
nock('https://ma.estr')
.get('/objects/a')
.reply(200, { messages: [], messageIdsSeen: {} });
nock('https://ma.estr').put('/objects/a').reply(200, {});
nock('https://ma.estr')
.get('/objects/a')
.reply(200, {
messages: [{
groupSize: 2, messageId: '1', groupId: 'a', messageData: '1-1',
}],
messageIdsSeen: { 1: '1' },
});
nock('https://ma.estr').put('/objects/a').reply(200, {});
nock('https://ma.estr').delete('/objects/a').reply(200, {});

// Second Run
nock('https://ma.estr').get('/objects?query[externalid]=a').reply(200, []);
nock('https://ma.estr')
.post('/objects', { messages: [], messageIdsSeen: {} })
.matchHeader('x-query-externalid', 'a')
.reply(200, { objectId: 'a' });
nock('https://ma.estr')
.get('/objects/a')
.reply(200, { messages: [], messageIdsSeen: {} });
nock('https://ma.estr').put('/objects/a').reply(200, {});
nock('https://ma.estr')
.get('/objects/a')
.reply(200, {
messages: [{
groupSize: 2, groupId: '1', messageId: '1', messageData: '1-1',
}, {
groupSize: 2, groupId: '2', messageId: '2', messageData: '1-2',
}],
messageIdsSeen: { 1: '1', 2: '2' },
});
nock('https://ma.estr').put('/objects/a').reply(200, {});
nock('https://ma.estr').delete('/objects/a').reply(200, {});

for (let i = 0; i < msg.length; i += 1) {
// eslint-disable-next-line no-await-in-loop
await reassemble.process.call(self, { body: msg[i] }, { mode: 'groupSize' });

// eslint-disable-next-line default-case
switch (i) {
case 0:
expect(self.emit.callCount).to.be.equal(0);
break;
case 1:
expect(self.emit.callCount).to.be.equal(1);
expect(self.emit.lastCall.args[1].body).to.deep.equal({
groupSize: 2,
groupId: 'a',
messageData: {
1: '1-1',
2: '1-2',
},
});
break;
}
}
});

it('Base Case: Group Size is 2, with messageId UNDEFINED and messageData defined', async () => {
const msg = [
{
groupId: 'b', groupSize: 2, messageData: '1-1',
},
{
groupId: 'b', groupSize: 2, messageData: '1-2',
},
];

// First Run
nock('https://ma.estr').get('/objects?query[externalid]=b').reply(200, []);
nock('https://ma.estr')
.post('/objects', { messages: [], messageIdsSeen: {} })
.matchHeader('x-query-externalid', 'b')
.reply(200, { objectId: 'b' });
nock('https://ma.estr')
.get('/objects/b')
.reply(200, { messages: [], messageIdsSeen: {} });
nock('https://ma.estr').put('/objects/b').reply(200, {});
nock('https://ma.estr')
.get('/objects/b')
.reply(200, {
messages: [{
groupSize: 2, groupId: 'b', messageData: '1-1',
}],
messageIdsSeen: { 1: '1' },
});
nock('https://ma.estr').put('/objects/b').reply(200, {});
nock('https://ma.estr').delete('/objects/b').reply(200, {});

// Second Run
nock('https://ma.estr').get('/objects?query[externalid]=b').reply(200, []);
nock('https://ma.estr')
.post('/objects', { messages: [], messageIdsSeen: {} })
.matchHeader('x-query-externalid', 'b')
.reply(200, { objectId: 'b' });
nock('https://ma.estr')
.get('/objects/b')
.reply(200, { messages: [], messageIdsSeen: {} });
nock('https://ma.estr').put('/objects/b').reply(200, {});
nock('https://ma.estr')
.get('/objects/b')
.reply(200, {
messages: [{
groupSize: 2, groupId: 'b', messageData: '1-1',
}, {
groupSize: 2, groupId: 'b', messageData: '1-2',
}],
messageIdsSeen: { 1: '1', 2: '2' },
});
nock('https://ma.estr').put('/objects/b').reply(200, {});
nock('https://ma.estr').delete('/objects/b').reply(200, {});

for (let i = 0; i < msg.length; i += 1) {
// eslint-disable-next-line no-await-in-loop
await reassemble.process.call(self, { body: msg[i] }, { mode: 'groupSize' });

// eslint-disable-next-line default-case
switch (i) {
case 0:
expect(self.emit.callCount).to.be.equal(0);
break;
case 1:
expect(self.emit.callCount).to.be.equal(1);
// eslint-disable-next-line no-case-declarations
const results = self.emit.lastCall.args[1].body;
expect(results).to.deep.equal({
groupSize: 2,
groupId: 'b',
messageData: results.messageData,
});
break;
}
}
});

it('Base Case: Using time delay, with messageId UNDEFINED and messageData defined', async () => {
const msg = [
{
groupId: 'c', timersec: 1000, messageData: '1-1',
},
{
groupId: 'c', timersec: 1000, messageData: '1-2',
},
];

// First Run
nock('https://ma.estr').get('/objects?query[externalid]=c').reply(200, []);
nock('https://ma.estr')
.post('/objects', { messages: [], messageIdsSeen: {} })
.matchHeader('x-query-externalid', 'c')
.reply(200, { objectId: 'c' });
nock('https://ma.estr')
.get('/objects/c')
.reply(200, { messages: [], messageIdsSeen: {} });
nock('https://ma.estr').put('/objects/c').reply(200, {});
nock('https://ma.estr')
.get('/objects/c')
.reply(200, {
messages: [{
groupId: 'c', messageData: '1-1',
}],
messageIdsSeen: { },
});
nock('https://ma.estr').put('/objects/c').reply(200, {});

// Second Run
nock('https://ma.estr').get('/objects?query[externalid]=c').reply(200, []);
nock('https://ma.estr')
.post('/objects', { messages: [], messageIdsSeen: {} })
.matchHeader('x-query-externalid', 'c')
.reply(200, { objectId: 'c' });
nock('https://ma.estr')
.get('/objects/c')
.reply(200, { messages: [], messageIdsSeen: {} });
nock('https://ma.estr').put('/objects/c').reply(200, {});
nock('https://ma.estr')
.get('/objects/c')
.reply(200, {
messages: [{
groupId: 'c', messageData: '1-1',
}, {
groupId: 'c', messageData: '1-2',
}],
messageIdsSeen: { },
});
nock('https://ma.estr').put('/objects/c').reply(200, {});
nock('https://ma.estr')
.get('/objects/c')
.reply(200, {
messages: [{
groupId: 'c', messageData: '1-1',
}, {
groupId: 'c', messageData: '1-2',
}],
messageIdsSeen: { 1: '1', 2: '2' },
});
nock('https://ma.estr').delete('/objects/c').reply(200, {});

for (let i = 0; i < msg.length; i += 1) {
// eslint-disable-next-line no-await-in-loop
await reassemble.process.call(self, { body: msg[i] }, { mode: 'timeout' });

// eslint-disable-next-line default-case
switch (i) {
case 0:
expect(self.emit.callCount).to.be.equal(0);
break;
case 1:
// eslint-disable-next-line no-await-in-loop
await sleep(1500);
expect(self.emit.callCount).to.be.equal(1);
// eslint-disable-next-line no-case-declarations
const results = self.emit.lastCall.args[1].body;
expect(results).to.deep.equal({
groupId: 'c',
groupSize: 2,
messageData: results.messageData,
});
break;
}
}
});
});

0 comments on commit de3e82e

Please sign in to comment.