Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added maester-library to re-assembling messages action #70

Merged
merged 11 commits into from
Jul 13, 2021
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 1.2.1 (July 23, 2021)
* Implemented support of maester storage in `Re-assembled message` action (maester-client library 3.3.0)

## 1.2.0 (July 9, 2021)
* Add Ability to pass data from the individual messages to re-assembled message
* Add Ability to pass data from the individual messages to `Re-assembled message` action

## 1.1.9 (February 12, 2021)
* Update sailor version to 2.6.24
Expand Down
2 changes: 1 addition & 1 deletion component.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"title": "Splitter",
"version": "1.2.0",
"version": "1.2.1",
"description": "Splits a message into multiple messages.",
"buildType":"docker",
"actions": {
Expand Down
59 changes: 44 additions & 15 deletions lib/actions/reassemble.js
Original file line number Diff line number Diff line change
@@ -1,42 +1,71 @@
// eslint-disable-next-line
const { messages } = require('elasticio-node');

const groupsSeen = {};
const ObjectStorageWrapperExtended = require('./utils-wrapper/ObjectStorageWrapperExtended');

async function processAction(msg) {
const storage = new ObjectStorageWrapperExtended(this);
const {
groupSize,
groupId,
messageId,
messageData,
} = msg.body;
const incomingData = {};
const object = {
messageId,
groupId,
messageData,
};

if (groupSize <= 0) {
throw new Error('Size must be a positive integer.');
}
if (!messageData) {
incomingData[messageId] = undefined;
}

if (!groupsSeen[groupId]) {
groupsSeen[groupId] = {
groupSize,
messageIdsSeen: new Set(),
incomingData: {},
};
const {
messageGroup,
messageGroupId,
messageGroupSize,
isCreated,
} = await storage.createMessageGroupIfNotExists(groupId, groupSize);

if (isCreated) {
await storage.createNewObjectInMessageGroup(object, messageGroupId);
this.logger.info('New Group created. Added message');
}
if (!isCreated) {
await storage.createNewObjectInMessageGroup(object, messageGroupId);
this.logger.info('Existed Group found. Added message');
this.logger.info(`Saved messages: ${Object.keys(messageGroup.messageIdsSeen).join(', ')}`);
}

groupsSeen[groupId].messageIdsSeen.add(messageId);
groupsSeen[groupId].incomingData[messageId] = messageData;
const numberSeen = groupsSeen[groupId].messageIdsSeen.size;
const parsedMessageGroup = await storage.lookupParsedObjectById(messageGroupId);
const filteredMessages = parsedMessageGroup.messages
.filter((message) => message.messageId !== messageId);
filteredMessages.push(object);
parsedMessageGroup.messages = filteredMessages;
await storage.updateObject(messageGroupId, parsedMessageGroup);
const messagesNumberSeen = Object.keys(parsedMessageGroup.messageIdsSeen).length;

this.logger.info(
`Saw message ${messageId} of group ${groupId} Currently the group has ${numberSeen} of ${groupSize} message(s).`,
`Saw message ${messageId} of group ${groupId}.
Currently the group has ${messagesNumberSeen} of ${messageGroupSize} message(s).`,
);

if (numberSeen >= groupSize) {
if (messagesNumberSeen >= messageGroupSize) {
parsedMessageGroup.messages.forEach((message) => {
incomingData[message.messageId] = message.messageData;
});

await this.emit('data', messages.newMessageWithBody({
groupSize,
groupId,
messageData: groupsSeen[groupId].incomingData,
messageData: incomingData,
}));
delete groupsSeen[groupId];
await storage.deleteObjectById(messageGroupId);
this.logger.info(`Message group with id ${messageGroupId} has been deleted`);
}
}

Expand Down
58 changes: 58 additions & 0 deletions lib/actions/utils-wrapper/ObjectStorageWrapperExtended.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
const { ObjectStorageWrapper } = require('@elastic.io/maester-client/dist/ObjectStorageWrapper');

class ObjectStorageWrapperExtended extends ObjectStorageWrapper {
constructor(context) {
super(context);
this.logger = context.logger;
this.EXTERNAL_ID_QUERY_HEADER_NAME = 'externalid';
this.TTL_TWO_DAYS = 172800;
}

async lookupParsedObjectById(messageGroupId) {
const messageGroup = await this.lookupObjectById(messageGroupId);
return JSON.parse(messageGroup);
}

async createMessageGroupIfNotExists(externalId, messageGroupSize) {
this.logger.info('Processing creation of the new message group');
const messageGroups = await this.lookupObjectsByQueryParameters(
[{ key: this.EXTERNAL_ID_QUERY_HEADER_NAME, value: externalId }],
);
if (messageGroups.length > 1) {
throw new Error('Several message groups with the same ids can not exist');
}
if (!messageGroups.length) {
this.logger.info('No message groups found');
const newMessageGroup = {
messages: [],
messageIdsSeen: {},
};
const { objectId: messageGroupId } = await this.createObject(
newMessageGroup, [{ key: this.EXTERNAL_ID_QUERY_HEADER_NAME, value: externalId }],
[], this.TTL_TWO_DAYS,
);
this.logger.info('Created new message group');
return {
messageGroup: newMessageGroup, messageGroupSize, messageGroupId, isCreated: true,
};
}
this.logger.info('MessageGroup found');
const messageGroupId = messageGroups[0].objectId;
const parsedMessageGroup = await this.lookupParsedObjectById(messageGroupId);
return {
messageGroup: parsedMessageGroup, messageGroupSize, messageGroupId, isCreated: false,
};
}

async createNewObjectInMessageGroup(object, messageGroupId) {
this.logger.info('Processing creation of the new object');
const parsedMessageGroup = await this.lookupParsedObjectById(messageGroupId);
this.logger.info('...Updating message group');
parsedMessageGroup.messageIdsSeen[object.messageId] = object.messageId;
return this.updateObject(messageGroupId, {
...parsedMessageGroup, messages: [...parsedMessageGroup.messages, object],
});
}
}

module.exports = ObjectStorageWrapperExtended;
Loading