Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): Fix worker shutdown errors when active executions #10353

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/cli/src/commands/BaseCommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,9 @@ export abstract class BaseCommand extends Command {
this.logger.info(`Received ${signal}. Shutting down...`);
this.shutdownService.shutdown();

await Promise.all([this.stopProcess(), this.shutdownService.waitForShutdown()]);
await this.shutdownService.waitForShutdown();

await this.stopProcess();
Comment on lines +336 to +338
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also changes the shutdown behaviour of the start command. It seems to have some duplication (some functions are called in both ShutdownService and StartCommand.stopProcess(). We should fix that also as a follow-up


clearTimeout(forceShutdownTimer);
};
Expand Down
19 changes: 1 addition & 18 deletions packages/cli/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Container } from 'typedi';
import { Flags, type Config } from '@oclif/core';
import express from 'express';
import http from 'http';
import { sleep, ApplicationError } from 'n8n-workflow';
import { ApplicationError } from 'n8n-workflow';

import * as Db from '@/Db';
import * as ResponseHelper from '@/ResponseHelper';
Expand Down Expand Up @@ -61,23 +61,6 @@ export class Worker extends BaseCommand {

try {
await this.externalHooks?.run('n8n.stop', []);

const hardStopTimeMs = Date.now() + this.gracefulShutdownTimeoutInS * 1000;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this time was misreporting, the graceful shutdown had already started at BaseCommand.onTerminationSignal


// Wait for active workflow executions to finish
let count = 0;
while (this.jobProcessor.getRunningJobIds().length !== 0) {
if (count++ % 4 === 0) {
const waitLeft = Math.ceil((hardStopTimeMs - Date.now()) / 1000);
this.logger.info(
`Waiting for ${
Object.keys(this.jobProcessor.getRunningJobIds()).length
} active executions to finish... (max wait ${waitLeft} more seconds)`,
);
}

await sleep(500);
}
} catch (error) {
await this.exitWithCrash('There was an error shutting down n8n.', error);
}
Expand Down
13 changes: 9 additions & 4 deletions packages/cli/src/scaling/__tests__/scaling.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type { Job, JobData, JobOptions, JobQueue } from '../types';
import { ApplicationError } from 'n8n-workflow';
import { mockInstance } from '@test/mocking';
import { GlobalConfig } from '@n8n/config';
import type { JobProcessor } from '../job-processor';

const queue = mock<JobQueue>({
client: { ping: jest.fn() },
Expand Down Expand Up @@ -100,23 +101,27 @@ describe('ScalingService', () => {
});
});

describe('pauseQueue', () => {
it('should pause the queue', async () => {
describe('stop', () => {
it('should pause the queue and check for running jobs', async () => {
/**
* Arrange
*/
const scalingService = new ScalingService(mock(), mock(), mock(), globalConfig);
const jobProcessor = mock<JobProcessor>();
const scalingService = new ScalingService(mock(), mock(), jobProcessor, globalConfig);
await scalingService.setupQueue();
jobProcessor.getRunningJobIds.mockReturnValue([]);
const getRunningJobsCountSpy = jest.spyOn(scalingService, 'getRunningJobsCount');

/**
* Act
*/
await scalingService.pauseQueue();
await scalingService.stop();

/**
* Assert
*/
expect(queue.pause).toHaveBeenCalledWith(true, true);
expect(getRunningJobsCountSpy).toHaveBeenCalled();
});
});

Expand Down
20 changes: 18 additions & 2 deletions packages/cli/src/scaling/scaling.service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import Container, { Service } from 'typedi';
import { ApplicationError, BINARY_ENCODING } from 'n8n-workflow';
import { ApplicationError, BINARY_ENCODING, sleep } from 'n8n-workflow';
import { ActiveExecutions } from '@/ActiveExecutions';
import config from '@/config';
import { Logger } from '@/Logger';
Expand Down Expand Up @@ -59,10 +59,22 @@ export class ScalingService {
}

@OnShutdown(HIGHEST_SHUTDOWN_PRIORITY)
async pauseQueue() {
async stop() {
await this.queue.pause(true, true);

this.logger.debug('[ScalingService] Queue paused');

let count = 0;

while (this.getRunningJobsCount() !== 0) {
if (count++ % 4 === 0) {
this.logger.info(
`Waiting for ${this.getRunningJobsCount()} active executions to finish...`,
);
}

await sleep(500);
}
}

async pingQueue() {
Expand Down Expand Up @@ -113,6 +125,10 @@ export class ScalingService {
}
}

getRunningJobsCount() {
return this.jobProcessor.getRunningJobIds().length;
}

// #endregion

// #region Listeners
Expand Down
Loading