Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(api-service): Fix api e2e deadlock #7575

Merged
merged 12 commits into from
Feb 6, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ describe('Create expireAt - TTL support - with pending jobs', function () {
});

it('should add expireAt to pending events that were digested', async function () {
await session.awaitRunningJobs(digestTemplate?._id, false, 5);
await session.waitForJobCompletion(digestTemplate?._id, false, 5);

await notificationExpireAt(query);

Expand Down
6 changes: 3 additions & 3 deletions apps/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
"pretest": "pnpm build:metadata",
"generate:swagger": "cross-env NODE_ENV=test PORT=1336 ts-node exportOpenAPIJSON.ts",
"generate:sdk": " (cd ../../libs/internal-sdk && speakeasy run --skip-compile --minimal --skip-versioning) && (cd ../../libs/internal-sdk && pnpm build) ",
"test": "cross-env TS_NODE_COMPILER_OPTIONS='{\"strictNullChecks\": false}' NODE_ENV=test mocha --require ts-node/register --exit 'src/**/*.spec.ts'",
"test:e2e:novu-v1": "cross-env TS_NODE_COMPILER_OPTIONS='{\"strictNullChecks\": false}' NODE_ENV=test mocha --grep '#novu-v1' --require ts-node/register --exit --file e2e/setup.ts src/**/*.e2e{,-ee}.ts",
"test:e2e:novu-v2": "cross-env TS_NODE_COMPILER_OPTIONS='{\"strictNullChecks\": false}' NODE_ENV=test CI_EE_TEST=true CLERK_ENABLED=true NODE_OPTIONS=--max_old_space_size=8192 mocha --grep '#novu-v2' --require ts-node/register --exit --file e2e/setup.ts 'src/**/*.e2e{,-ee}.ts'",
"test": "cross-env TS_NODE_COMPILER_OPTIONS='{\"strictNullChecks\": false}' NODE_ENV=test mocha --timeout 5000 --require ts-node/register --exit 'src/**/*.spec.ts'",
"test:e2e:novu-v1": "cross-env TS_NODE_COMPILER_OPTIONS='{\"strictNullChecks\": false}' NODE_ENV=test mocha --timeout 5000 --grep '#novu-v1' --require ts-node/register --exit --file e2e/setup.ts src/**/*.e2e{,-ee}.ts ",
"test:e2e:novu-v2": "cross-env TS_NODE_COMPILER_OPTIONS='{\"strictNullChecks\": false}' NODE_ENV=test CI_EE_TEST=true CLERK_ENABLED=true NODE_OPTIONS=--max_old_space_size=8192 mocha --timeout 5000 --grep '#novu-v2' --require ts-node/register --exit --file e2e/setup.ts 'src/**/*.e2e{,-ee}.ts'",
"migration": "cross-env NODE_ENV=local MIGRATION=true ts-node --transpileOnly",
"link:submodules": "pnpm link ../../enterprise/packages/auth && pnpm link ../../enterprise/packages/translation && pnpm link ../../enterprise/packages/billing",
"admin:remove-user-account": "cross-env NODE_ENV=local MIGRATION=true ts-node --transpileOnly ./admin/remove-user-account.ts",
Expand Down
65 changes: 36 additions & 29 deletions apps/api/src/app/events/e2e/bridge-trigger.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ contexts.forEach((context: Context) => {
}
);

console.time('CHECKPOINT:> Starting mock bridge server');
await bridgeServer.start({ workflows: [newWorkflow] });
console.timeEnd('CHECKPOINT:> Starting mock bridge server');

if (context.isStateful) {
await discoverAndSyncBridge(session, workflowsRepository, workflowId, bridgeServer);
Expand All @@ -141,8 +143,13 @@ contexts.forEach((context: Context) => {
}
}

console.time('CHECKPOINT:> Starting triggering event');
await triggerEvent(session, workflowId, subscriber.subscriberId, { name: 'test_name' }, bridge);
await session.awaitRunningJobs();
console.timeEnd('CHECKPOINT:> Starting triggering event');

console.time('CHECKPOINT:> Starting waiting for job completion');
await session.waitForJobCompletion();
console.timeEnd('CHECKPOINT:> Starting waiting for job completion');

const messages = await messageRepository.find({
_environmentId: session.environment._id,
Expand Down Expand Up @@ -202,7 +209,7 @@ contexts.forEach((context: Context) => {
}

await triggerEvent(session, workflowIdSkipByStatic, subscriber.subscriberId, {}, bridge);
await session.awaitRunningJobs();
await session.waitForJobCompletion();

const executedMessageByStatic = await messageRepository.find({
_environmentId: session.environment._id,
Expand Down Expand Up @@ -266,7 +273,7 @@ contexts.forEach((context: Context) => {
}

await triggerEvent(session, workflowIdSkipByVariable, subscriber.subscriberId, {}, bridge);
await session.awaitRunningJobs();
await session.waitForJobCompletion();

const executedMessage = await messageRepository.find({
_environmentId: session.environment._id,
Expand Down Expand Up @@ -318,7 +325,7 @@ contexts.forEach((context: Context) => {

await triggerEvent(session, workflowId, subscriber.subscriberId, {}, bridge);

await session.awaitRunningJobs(undefined);
await session.waitForJobCompletion(undefined);

const messagesAfter = await messageRepository.find({
_environmentId: session.environment._id,
Expand All @@ -340,7 +347,7 @@ contexts.forEach((context: Context) => {
await executionDetailsRepository.delete({ _environmentId: session.environment._id });

await triggerEvent(session, workflowId, subscriber.subscriberId, { name: 4 }, bridge);
await session.awaitRunningJobs();
await session.waitForJobCompletion();

const executionDetailsInvalidType = await executionDetailsRepository.find({
_environmentId: session.environment._id,
Expand Down Expand Up @@ -399,7 +406,7 @@ contexts.forEach((context: Context) => {

await triggerEvent(session, workflowId, subscriber.subscriberId, {}, bridge);

await session.awaitRunningJobs();
await session.waitForJobCompletion();

const messagesAfterInApp = await messageRepository.find({
_environmentId: session.environment._id,
Expand Down Expand Up @@ -478,7 +485,7 @@ contexts.forEach((context: Context) => {
await triggerEvent(session, workflowId, subscriber.subscriberId, { name: 'John' }, bridge);
await triggerEvent(session, workflowId, subscriber.subscriberId, { name: 'Bela' }, bridge);

await session.awaitRunningJobs();
await session.waitForJobCompletion();

const messages = await messageRepository.find({
_environmentId: session.environment._id,
Expand Down Expand Up @@ -559,7 +566,7 @@ contexts.forEach((context: Context) => {

await triggerEvent(session, workflowId, subscriber.subscriberId, {}, bridge);

await session.awaitRunningJobs();
await session.waitForJobCompletion();

const messagesAfter = await messageRepository.find({
_environmentId: session.environment._id,
Expand Down Expand Up @@ -595,7 +602,7 @@ contexts.forEach((context: Context) => {
}

const result = await triggerEvent(session, exceedMaxTierDurationWorkflowId, subscriber.subscriberId, {}, bridge);
await session.awaitRunningJobs();
await session.waitForJobCompletion();

const executionDetails = await executionDetailsRepository.find({
_environmentId: session.environment._id,
Expand Down Expand Up @@ -649,9 +656,9 @@ contexts.forEach((context: Context) => {
}

await triggerEvent(session, workflowId, subscriber.subscriberId, {}, bridge);
await session.awaitRunningJobs();
await session.waitForJobCompletion();
await triggerEvent(session, workflowId, subscriber.subscriberId, { name: 'payload_name' }, bridge);
await session.awaitRunningJobs();
await session.waitForJobCompletion();

const sentMessage = await messageRepository.find({
_environmentId: session.environment._id,
Expand Down Expand Up @@ -714,7 +721,7 @@ contexts.forEach((context: Context) => {

const controls = { steps: { [stepId]: { name: 'stored_control_name' } } };
await triggerEvent(session, workflowId, subscriber.subscriberId, {}, bridge, controls);
await session.awaitRunningJobs();
await session.waitForJobCompletion();

const sentMessage = await messageRepository.find({
_environmentId: session.environment._id,
Expand All @@ -740,7 +747,7 @@ contexts.forEach((context: Context) => {
}

await triggerEvent(session, workflowId, subscriber.subscriberId, {}, bridge);
await session.awaitRunningJobs();
await session.waitForJobCompletion();

const sentMessages = await messageRepository.find({
_environmentId: session.environment._id,
Expand Down Expand Up @@ -776,7 +783,7 @@ contexts.forEach((context: Context) => {
}

await triggerEvent(session, workflowId, subscriber.subscriberId, {}, bridge);
await session.awaitRunningJobs();
await session.waitForJobCompletion();

const sentMessages = await messageRepository.find({
_environmentId: session.environment._id,
Expand Down Expand Up @@ -813,7 +820,7 @@ contexts.forEach((context: Context) => {
}

await triggerEvent(session, workflowId, subscriber.subscriberId, {}, bridge);
await session.awaitRunningJobs();
await session.waitForJobCompletion();

const sentMessages = await messageRepository.find({
_environmentId: session.environment._id,
Expand Down Expand Up @@ -850,7 +857,7 @@ contexts.forEach((context: Context) => {
}

await triggerEvent(session, workflowId, subscriber.subscriberId, {}, bridge);
await session.awaitRunningJobs();
await session.waitForJobCompletion();

const sentMessages = await messageRepository.find({
_environmentId: session.environment._id,
Expand Down Expand Up @@ -901,7 +908,7 @@ contexts.forEach((context: Context) => {
}

await triggerEvent(session, workflowId, subscriber.subscriberId, {}, bridge);
await session.awaitRunningJobs();
await session.waitForJobCompletion();

const sentMessages = await messageRepository.find({
_environmentId: session.environment._id,
Expand Down Expand Up @@ -936,7 +943,7 @@ contexts.forEach((context: Context) => {
}

await triggerEvent(session, workflowId, subscriber.subscriberId, {}, bridge);
await session.awaitRunningJobs();
await session.waitForJobCompletion();

const sentMessages = await messageRepository.find({
_environmentId: session.environment._id,
Expand Down Expand Up @@ -1000,7 +1007,7 @@ contexts.forEach((context: Context) => {
}

await triggerEvent(session, workflowId, subscriber._id, {}, bridge);
await session.awaitRunningJobs();
await session.waitForJobCompletion();

const sentMessages = await messageRepository.find({
_environmentId: session.environment._id,
Expand Down Expand Up @@ -1053,7 +1060,7 @@ contexts.forEach((context: Context) => {
}

await triggerEvent(session, workflowId, subscriber._id, {}, bridge);
await session.awaitRunningJobs();
await session.waitForJobCompletion();

const sentMessages = await messageRepository.find({
_environmentId: session.environment._id,
Expand Down Expand Up @@ -1115,7 +1122,7 @@ contexts.forEach((context: Context) => {
}

await triggerEvent(session, workflowId, subscriber._id, {}, bridge);
await session.awaitRunningJobs();
await session.waitForJobCompletion();

const sentMessages = await messageRepository.find({
_environmentId: session.environment._id,
Expand Down Expand Up @@ -1166,7 +1173,7 @@ contexts.forEach((context: Context) => {
}

await triggerEvent(session, workflowId, subscriber._id, {}, bridge);
await session.awaitRunningJobs();
await session.waitForJobCompletion();

const sentMessages = await messageRepository.find({
_environmentId: session.environment._id,
Expand Down Expand Up @@ -1231,7 +1238,7 @@ contexts.forEach((context: Context) => {
});

await triggerEvent(session, workflowId, subscriber._id, {}, bridge);
await session.awaitRunningJobs();
await session.waitForJobCompletion();

const sentMessages = await messageRepository.find({
_environmentId: session.environment._id,
Expand Down Expand Up @@ -1286,7 +1293,7 @@ contexts.forEach((context: Context) => {
});

await triggerEvent(session, workflowId, subscriber._id, {}, bridge);
await session.awaitRunningJobs();
await session.waitForJobCompletion();

const sentMessages = await messageRepository.find({
_environmentId: session.environment._id,
Expand Down Expand Up @@ -1361,7 +1368,7 @@ contexts.forEach((context: Context) => {
in_app: true,
});
await triggerEvent(session, workflowId, subscriber._id, {}, bridge);
await session.awaitRunningJobs();
await session.waitForJobCompletion();

const sentMessages = await messageRepository.find({
_environmentId: session.environment._id,
Expand Down Expand Up @@ -1426,7 +1433,7 @@ contexts.forEach((context: Context) => {
});

await triggerEvent(session, workflowId, subscriber._id, {}, bridge);
await session.awaitRunningJobs();
await session.waitForJobCompletion();

const sentMessages = await messageRepository.find({
_environmentId: session.environment._id,
Expand Down Expand Up @@ -1510,7 +1517,7 @@ contexts.forEach((context: Context) => {
}

await triggerEvent(session, workflowId, subscriber.subscriberId, { userName: 'John Doe' }, bridge);
await session.awaitRunningJobs();
await session.waitForJobCompletion();

// Verify inApp message was skipped
const inAppMessages = await messageRepository.find({
Expand Down Expand Up @@ -1589,7 +1596,7 @@ contexts.forEach((context: Context) => {
}

await triggerEvent(session, workflowId, subscriber.subscriberId, { userName: 'Jane Doe' }, bridge);
await session.awaitRunningJobs();
await session.waitForJobCompletion();

// Verify inApp message was not skipped
const inAppMessages = await messageRepository.find({
Expand Down Expand Up @@ -1657,7 +1664,7 @@ describe('Novu-Hosted Bridge Trigger #novu-v2', () => {
const responseData = response.body.data as WorkflowResponseDto;

await triggerEvent(session, responseData.workflowId, subscriber._id, {});
await session.awaitRunningJobs();
await session.waitForJobCompletion();

const sentMessages = await messageRepository.find({
_environmentId: session.environment._id,
Expand Down
4 changes: 2 additions & 2 deletions apps/api/src/app/events/e2e/bulk-trigger.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ describe('Trigger bulk events - /v1/events/trigger/bulk (POST) #novu-v2', functi
],
});

await session.awaitRunningJobs(template._id);
await session.awaitRunningJobs(secondTemplate._id);
await session.waitForJobCompletion(template._id);
await session.waitForJobCompletion(secondTemplate._id);

const notifications = await notificationRepository.findBySubscriberId(session.environment._id, subscriber._id);
expect(notifications.length).to.equal(1);
Expand Down
Loading
Loading