Skip to content

Commit

Permalink
Added maester-library to re-assembling messages action (#70)
Browse files Browse the repository at this point in the history
Implemented support of maester storage in `Re-assembled message` action (maester-client library 3.3.0)
  • Loading branch information
olegosh authored Jul 13, 2021
1 parent c358bfe commit 24bf657
Show file tree
Hide file tree
Showing 7 changed files with 687 additions and 60 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 1.2.1 (July 23, 2021)
* Implemented support of maester storage in `Re-assembled message` action (maester-client library 3.3.0)

## 1.2.0 (July 9, 2021)
* Add Ability to pass data from the individual messages to re-assembled message
* Add Ability to pass data from the individual messages to `Re-assembled message` action

## 1.1.9 (February 12, 2021)
* Update sailor version to 2.6.24
Expand Down
2 changes: 1 addition & 1 deletion component.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"title": "Splitter",
"version": "1.2.0",
"version": "1.2.1",
"description": "Splits a message into multiple messages.",
"buildType":"docker",
"actions": {
Expand Down
59 changes: 44 additions & 15 deletions lib/actions/reassemble.js
Original file line number Diff line number Diff line change
@@ -1,42 +1,71 @@
// eslint-disable-next-line
const { messages } = require('elasticio-node');

const groupsSeen = {};
const ObjectStorageWrapperExtended = require('./utils-wrapper/ObjectStorageWrapperExtended');

async function processAction(msg) {
const storage = new ObjectStorageWrapperExtended(this);
const {
groupSize,
groupId,
messageId,
messageData,
} = msg.body;
const incomingData = {};
const object = {
messageId,
groupId,
messageData,
};

if (groupSize <= 0) {
throw new Error('Size must be a positive integer.');
}
if (!messageData) {
incomingData[messageId] = undefined;
}

if (!groupsSeen[groupId]) {
groupsSeen[groupId] = {
groupSize,
messageIdsSeen: new Set(),
incomingData: {},
};
const {
messageGroup,
messageGroupId,
messageGroupSize,
isCreated,
} = await storage.createMessageGroupIfNotExists(groupId, groupSize);

if (isCreated) {
await storage.createNewObjectInMessageGroup(object, messageGroupId);
this.logger.info('New Group created. Added message');
}
if (!isCreated) {
await storage.createNewObjectInMessageGroup(object, messageGroupId);
this.logger.info('Existed Group found. Added message');
this.logger.info(`Saved messages: ${Object.keys(messageGroup.messageIdsSeen).join(', ')}`);
}

groupsSeen[groupId].messageIdsSeen.add(messageId);
groupsSeen[groupId].incomingData[messageId] = messageData;
const numberSeen = groupsSeen[groupId].messageIdsSeen.size;
const parsedMessageGroup = await storage.lookupParsedObjectById(messageGroupId);
const filteredMessages = parsedMessageGroup.messages
.filter((message) => message.messageId !== messageId);
filteredMessages.push(object);
parsedMessageGroup.messages = filteredMessages;
await storage.updateObject(messageGroupId, parsedMessageGroup);
const messagesNumberSeen = Object.keys(parsedMessageGroup.messageIdsSeen).length;

this.logger.info(
`Saw message ${messageId} of group ${groupId} Currently the group has ${numberSeen} of ${groupSize} message(s).`,
`Saw message ${messageId} of group ${groupId}.
Currently the group has ${messagesNumberSeen} of ${messageGroupSize} message(s).`,
);

if (numberSeen >= groupSize) {
if (messagesNumberSeen >= messageGroupSize) {
parsedMessageGroup.messages.forEach((message) => {
incomingData[message.messageId] = message.messageData;
});

await this.emit('data', messages.newMessageWithBody({
groupSize,
groupId,
messageData: groupsSeen[groupId].incomingData,
messageData: incomingData,
}));
delete groupsSeen[groupId];
await storage.deleteObjectById(messageGroupId);
this.logger.info(`Message group with id ${messageGroupId} has been deleted`);
}
}

Expand Down
58 changes: 58 additions & 0 deletions lib/actions/utils-wrapper/ObjectStorageWrapperExtended.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
const { ObjectStorageWrapper } = require('@elastic.io/maester-client/dist/ObjectStorageWrapper');

class ObjectStorageWrapperExtended extends ObjectStorageWrapper {
constructor(context) {
super(context);
this.logger = context.logger;
this.EXTERNAL_ID_QUERY_HEADER_NAME = 'externalid';
this.TTL_TWO_DAYS = 172800;
}

async lookupParsedObjectById(messageGroupId) {
const messageGroup = await this.lookupObjectById(messageGroupId);
return JSON.parse(messageGroup);
}

async createMessageGroupIfNotExists(externalId, messageGroupSize) {
this.logger.info('Processing creation of the new message group');
const messageGroups = await this.lookupObjectsByQueryParameters(
[{ key: this.EXTERNAL_ID_QUERY_HEADER_NAME, value: externalId }],
);
if (messageGroups.length > 1) {
throw new Error('Several message groups with the same ids can not exist');
}
if (!messageGroups.length) {
this.logger.info('No message groups found');
const newMessageGroup = {
messages: [],
messageIdsSeen: {},
};
const { objectId: messageGroupId } = await this.createObject(
newMessageGroup, [{ key: this.EXTERNAL_ID_QUERY_HEADER_NAME, value: externalId }],
[], this.TTL_TWO_DAYS,
);
this.logger.info('Created new message group');
return {
messageGroup: newMessageGroup, messageGroupSize, messageGroupId, isCreated: true,
};
}
this.logger.info('MessageGroup found');
const messageGroupId = messageGroups[0].objectId;
const parsedMessageGroup = await this.lookupParsedObjectById(messageGroupId);
return {
messageGroup: parsedMessageGroup, messageGroupSize, messageGroupId, isCreated: false,
};
}

async createNewObjectInMessageGroup(object, messageGroupId) {
this.logger.info('Processing creation of the new object');
const parsedMessageGroup = await this.lookupParsedObjectById(messageGroupId);
this.logger.info('...Updating message group');
parsedMessageGroup.messageIdsSeen[object.messageId] = object.messageId;
return this.updateObject(messageGroupId, {
...parsedMessageGroup, messages: [...parsedMessageGroup.messages, object],
});
}
}

module.exports = ObjectStorageWrapperExtended;
Loading

0 comments on commit 24bf657

Please sign in to comment.