Skip to content

Commit

Permalink
don't block shutdown on any waiting executions
Browse files Browse the repository at this point in the history
  • Loading branch information
netroy committed Feb 6, 2025
1 parent bb8c22d commit 37bb649
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 26 deletions.
6 changes: 5 additions & 1 deletion packages/cli/src/__tests__/active-executions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ describe('ActiveExecutions', () => {
activeExecutions.setStatus(waitingExecutionId2, 'waiting');
});

test('Should cancel only new and waiting executions with response-promises by default', async () => {
test('Should cancel only executions with response-promises by default', async () => {
const stopExecutionSpy = jest.spyOn(activeExecutions, 'stopExecution');

expect(activeExecutions.getActiveExecutions()).toHaveLength(4);
Expand All @@ -263,6 +263,10 @@ describe('ActiveExecutions', () => {
expect(stopExecutionSpy).toHaveBeenCalledWith(waitingExecutionId1);
expect(stopExecutionSpy).not.toHaveBeenCalledWith(newExecutionId2);
expect(stopExecutionSpy).not.toHaveBeenCalledWith(waitingExecutionId2);

await new Promise(setImmediate);
// the other two executions aren't cancelled, but still removed from memory
expect(activeExecutions.getActiveExecutions()).toHaveLength(0);
});

test('Should cancel all executions when cancelAll is true', async () => {
Expand Down
41 changes: 16 additions & 25 deletions packages/cli/src/active-executions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,24 +214,31 @@ export class ActiveExecutions {

/** Wait for all active executions to finish */
async shutdown(cancelAll = false) {
const { withResponsePromise, others } = this.groupExecutionIds();
const executionIdsToCancel = [...withResponsePromise];

if (config.getEnv('executions.mode') === 'regular') {
const isRegularMode = config.getEnv('executions.mode') === 'regular';
if (isRegularMode) {
// removal of active executions will no longer release capacity back,
// so that throttled executions cannot resume during shutdown
this.concurrencyControl.disable();
}

if (cancelAll) {
executionIdsToCancel.push(...others);
let executionIds = Object.keys(this.activeExecutions);
const toCancel: string[] = [];
for (const executionId of executionIds) {
const { responsePromise, status } = this.activeExecutions[executionId];
if (!!responsePromise || (isRegularMode && cancelAll)) {
// Cancel all exectutions that have a response promise, because these promises can't be retained between restarts
this.stopExecution(executionId);
toCancel.push(executionId);
} else if (status === 'waiting' || status === 'new') {
// Remove waiting and new executions to not block shutdown
delete this.activeExecutions[executionId];
}
}

executionIdsToCancel.forEach((executionId) => this.stopExecution(executionId));
await this.concurrencyControl.removeAll(executionIdsToCancel);
await this.concurrencyControl.removeAll(toCancel);

let count = 0;
let executionIds = Object.keys(this.activeExecutions);
executionIds = Object.keys(this.activeExecutions);
while (executionIds.length !== 0) {
if (count++ % 4 === 0) {
this.logger.info(`Waiting for ${executionIds.length} active executions to finish...`);
Expand All @@ -242,22 +249,6 @@ export class ActiveExecutions {
}
}

private groupExecutionIds() {
const groups: Record<'withResponsePromise' | 'others', string[]> = {
withResponsePromise: [],
others: [],
};
return Object.entries(this.activeExecutions).reduce((acc, [executionId, execution]) => {
const { status, responsePromise } = execution;
const group =
responsePromise && (status === 'new' || status === 'waiting')
? acc.withResponsePromise
: acc.others;
group.push(executionId);
return acc;
}, groups);
}

getExecutionOrFail(executionId: string): IExecutingWorkflowData {
const execution = this.activeExecutions[executionId];
if (!execution) {
Expand Down

0 comments on commit 37bb649

Please sign in to comment.