Skip to content

Commit

Permalink
Fixed a timer issue (#99)
Browse files Browse the repository at this point in the history
  • Loading branch information
shulkaolka authored Mar 29, 2023
1 parent 73d0341 commit 097ab87
Show file tree
Hide file tree
Showing 13 changed files with 776 additions and 840 deletions.
16 changes: 9 additions & 7 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: 2.1
parameters:
node-version:
type: string
default: "16.13.2"
default: "18.13.0"
orbs:
node: circleci/[email protected]
slack: circleci/[email protected]
Expand Down Expand Up @@ -72,23 +72,23 @@ commands:
jobs:
test:
docker:
- image: circleci/node:16-stretch
- image: cimg/node:18.13.0
steps:
- checkout
- node/install:
node-version: << pipeline.parameters.node-version >>
- run:
name: Audit Dependencies
command: npm audit --production --audit-level=high
- node/install-packages:
cache-path: ./node_modules
override-ci-command: npm install
- run:
name: Audit Dependencies
command: npm run audit
- run:
name: Running Mocha Tests
command: npm test
build:
docker:
- image: circleci/node:16-stretch
- image: cimg/node:18.13.0
user: root
steps:
- checkout
Expand Down Expand Up @@ -118,8 +118,10 @@ workflows:
jobs:
- build:
name: "Build and publish docker image"
context:
- componentspusher
filters:
branches:
ignore: /.*/
tags:
only: /^([0-9]+)\.([0-9]+)\.([0-9]+)(?:-([0-9A-Za-z-]+(?:\.[0-9A-Za-z-]+)*))?(?:\+[0-9A-Za-z-]+)?$/
only: /^([0-9]+)\.([0-9]+)\.([0-9]+)(?:-([0-9A-Za-z-]+(?:\.[0-9A-Za-z-]+)*))?(?:\+[0-9A-Za-z-]+)?$/
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ELASTICIO_OBJECT_STORAGE_TOKEN=token
ELASTICIO_OBJECT_STORAGE_URI=http://127.0.0.1:3002
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ build/Release
node_modules

.idea
.env
10 changes: 10 additions & 0 deletions .grype-ignore.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
ignore:
- vulnerability: CVE-2022-3996
package:
name: libssl3
version: 3.0.7-r0

- vulnerability: CVE-2022-3996
package:
name: libcrypto3
version: 3.0.7-r0
6 changes: 6 additions & 0 deletions .nsprc
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"GHSA-27h2-hvpr-p74q": {
"active": true,
"notes": "We don't use verify function from jsonwebtoken, so not affected"
}
}
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 1.4.4 (March 28, 2023)
* Fix [issue](https://github.com/elasticio/splitter-component/issues/97) with timer

## 1.4.3 (November 04, 2022)
* Update Sailor version to 2.7.1

Expand Down
2 changes: 1 addition & 1 deletion component.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"title": "Splitter",
"description": "Splits a message into multiple messages.",
"version": "1.4.3",
"version": "1.4.4",
"actions": {
"split": {
"deprecated": true,
Expand Down
68 changes: 34 additions & 34 deletions lib/actions/reassemble.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,28 @@ const { messages } = require('elasticio-node');
const { v4: uuidv4 } = require('uuid');
const ObjectStorageWrapperExtended = require('./utils-wrapper/ObjectStorageWrapperExtended');

let timeHandle;
let groupList = [];
const groupList = {};
let delay;

async function timer(this_) {
for (let i = 0; i < groupList.length; i += 1) {
async function timer(this_, groupId) {
try {
const storage = new ObjectStorageWrapperExtended(this_);
// eslint-disable-next-line no-await-in-loop
const results = await storage.lookupObjectById(groupList[i]);
const results = await storage.lookupObjectById(groupId);
const incomingData = {};
results.messages.forEach((message) => {
incomingData[message.messageId] = message.messageData;
});

// eslint-disable-next-line no-await-in-loop
await this_.emit('data', messages.newMessageWithBody({
groupSize: Object.keys(results.messageIdsSeen).length,
groupId: results.messages[0].groupId,
messageData: incomingData,
}));
// eslint-disable-next-line no-await-in-loop
await storage.deleteObjectById(groupList[i]);
await storage.deleteObjectById(groupId);
delete groupList[groupId];
} catch (e) {
this_.emit('error', e.message);
this_.logger.error(e);
}
groupList = [];
}

async function processAction(msg, cfg) {
Expand Down Expand Up @@ -65,23 +63,17 @@ async function processAction(msg, cfg) {
messageGroupSize,
isCreated,
} = await storage.createMessageGroupIfNotExists(groupId, groupSize);
await storage.createNewObjectInMessageGroup(object, messageGroupId);

if (isCreated) {
await storage.createNewObjectInMessageGroup(object, messageGroupId);
this.logger.info('New Group created. Added message');
}
if (!isCreated) {
await storage.createNewObjectInMessageGroup(object, messageGroupId);
this.logger.info('Existed Group found. Added message');
this.logger.info(`Saved messages: ${Object.keys(messageGroup.messageIdsSeen).join(', ')}`);
}
const parsedMessageGroup = await storage.lookupObjectById(messageGroupId);
const filteredMessages = parsedMessageGroup.messages
.filter((message) => message.messageId !== messageId);
filteredMessages.push(object);
parsedMessageGroup.messages = filteredMessages;
await storage.updateObjectById(messageGroupId, parsedMessageGroup);
const messagesNumberSeen = Object.keys(parsedMessageGroup.messageIdsSeen).length;
const updatedMessageGroup = await storage.lookupObjectById(messageGroupId);
const messagesNumberSeen = Object.keys(updatedMessageGroup.messageIdsSeen).length;

this.logger.info(
`Saw message ${messageId} of group ${groupId}.
Expand All @@ -90,7 +82,7 @@ async function processAction(msg, cfg) {
// when groupSized option is selected
if (mode === 'groupSize') {
if (messagesNumberSeen >= messageGroupSize) {
parsedMessageGroup.messages.forEach((message) => {
updatedMessageGroup.messages.forEach((message) => {
incomingData[message.messageId] = message.messageData;
});
await this.emit('data', messages.newMessageWithBody({
Expand All @@ -106,25 +98,30 @@ async function processAction(msg, cfg) {
// When delay timer option is selected
if (mode === 'timeout') {
delay = (timersec >= 20000) ? 20000 : timersec;
clearTimeout(timeHandle);
timeHandle = setTimeout(timer, delay, this);
if (!groupList.includes(messageGroupId)) {
groupList.push(messageGroupId);
if (!groupList[messageGroupId]) {
groupList[messageGroupId] = {};
}
const group = groupList[messageGroupId];
if (group.timeoutId) {
clearTimeout(group.timeoutId);
}
group.timeoutId = setTimeout(timer, delay, this, messageGroupId);
}

// When both groupSize and delay timer option is selected
if (mode === 'groupSize&timeout') {
delay = (timersec >= 20000) ? 20000 : timersec;
clearTimeout(timeHandle);
timeHandle = setTimeout(timer, delay, this);

if (!groupList.includes(messageGroupId)) {
groupList.push(messageGroupId);
if (!groupList[messageGroupId]) {
groupList[messageGroupId] = {};
}
const group = groupList[messageGroupId];
if (group.timeoutId) {
clearTimeout(group.timeoutId);
}
group.timeoutId = setTimeout(timer, delay, this, messageGroupId);

if (messagesNumberSeen >= messageGroupSize) {
parsedMessageGroup.messages.forEach((message) => {
updatedMessageGroup.messages.forEach((message) => {
incomingData[message.messageId] = message.messageData;
});

Expand All @@ -133,9 +130,12 @@ async function processAction(msg, cfg) {
groupId,
messageData: incomingData,
}));
await storage.deleteObjectById(messageGroupId);
this.logger.info(`Message group with id ${messageGroupId} has been deleted`);
groupList = groupList.filter((def) => def !== messageGroupId);
if (groupList[messageGroupId]) {
clearTimeout(groupList[messageGroupId].timeoutId);
await storage.deleteObjectById(messageGroupId);
delete groupList[messageGroupId];
this.logger.info(`Message group with id ${messageGroupId} has been deleted`);
}
}
}
}
Expand Down
Loading

0 comments on commit 097ab87

Please sign in to comment.