Skip to content

Commit

Permalink
fix(core): "Respond to Webhook" should work with workflows with waiti…
Browse files Browse the repository at this point in the history
…ng nodes (#12806)
  • Loading branch information
netroy authored Feb 3, 2025
1 parent 18b6867 commit e8635f2
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 9 deletions.
40 changes: 35 additions & 5 deletions packages/cli/src/__tests__/active-executions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ describe('ActiveExecutions', () => {
});

test('Should initialize activeExecutions with empty list', () => {
expect(activeExecutions.getActiveExecutions().length).toBe(0);
expect(activeExecutions.getActiveExecutions()).toHaveLength(0);
});

test('Should add execution to active execution list', async () => {
const newExecution = mockExecutionData();
const executionId = await activeExecutions.add(newExecution);

expect(executionId).toBe(FAKE_EXECUTION_ID);
expect(activeExecutions.getActiveExecutions().length).toBe(1);
expect(activeExecutions.getActiveExecutions()).toHaveLength(1);
expect(createNewExecution).toHaveBeenCalledTimes(1);
expect(updateExistingExecution).toHaveBeenCalledTimes(0);
});
Expand All @@ -59,7 +59,7 @@ describe('ActiveExecutions', () => {
const executionId = await activeExecutions.add(newExecution, FAKE_SECOND_EXECUTION_ID);

expect(executionId).toBe(FAKE_SECOND_EXECUTION_ID);
expect(activeExecutions.getActiveExecutions().length).toBe(1);
expect(activeExecutions.getActiveExecutions()).toHaveLength(1);
expect(createNewExecution).toHaveBeenCalledTimes(0);
expect(updateExistingExecution).toHaveBeenCalledTimes(1);
});
Expand Down Expand Up @@ -93,6 +93,37 @@ describe('ActiveExecutions', () => {
await expect(deferredPromise.promise).resolves.toEqual(fakeResponse);
});

test('Should copy over startedAt and responsePromise when resuming a waiting execution', async () => {
const newExecution = mockExecutionData();
const executionId = await activeExecutions.add(newExecution);
activeExecutions.setStatus(executionId, 'waiting');
activeExecutions.attachResponsePromise(executionId, mockDeferredPromise());

const waitingExecution = activeExecutions.getExecution(executionId);
expect(waitingExecution.responsePromise).toBeDefined();

// Resume the execution
await activeExecutions.add(newExecution, executionId);

const resumedExecution = activeExecutions.getExecution(executionId);
expect(resumedExecution.startedAt).toBe(waitingExecution.startedAt);
expect(resumedExecution.responsePromise).toBe(waitingExecution.responsePromise);
});

test('Should not remove a waiting execution', async () => {
const newExecution = mockExecutionData();
const executionId = await activeExecutions.add(newExecution);
activeExecutions.setStatus(executionId, 'waiting');
activeExecutions.finalizeExecution(executionId);

// Wait until the next tick to ensure that the post-execution promise has settled
await new Promise(setImmediate);

// Execution should still be in activeExecutions
expect(activeExecutions.getActiveExecutions()).toHaveLength(1);
expect(activeExecutions.getStatus(executionId)).toBe('waiting');
});

test('Should remove an existing execution', async () => {
// ARRANGE
const newExecution = mockExecutionData();
Expand All @@ -105,11 +136,10 @@ describe('ActiveExecutions', () => {
await new Promise(setImmediate);

// ASSERT
expect(activeExecutions.getActiveExecutions().length).toBe(0);
expect(activeExecutions.getActiveExecutions()).toHaveLength(0);
});

test('Should not try to resolve a post-execute promise for an inactive execution', async () => {
// @ts-expect-error Private method
const getExecutionSpy = jest.spyOn(activeExecutions, 'getExecution');

activeExecutions.finalizeExecution('inactive-execution-id', mockFullRunData());
Expand Down
12 changes: 8 additions & 4 deletions packages/cli/src/active-executions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,15 @@ export class ActiveExecutions {
await this.executionRepository.updateExistingExecution(executionId, execution);
}

const resumingExecution = this.activeExecutions[executionId];
const postExecutePromise = createDeferredPromise<IRun | undefined>();

this.activeExecutions[executionId] = {
executionData,
startedAt: new Date(),
startedAt: resumingExecution?.startedAt ?? new Date(),
postExecutePromise,
status: executionStatus,
responsePromise: resumingExecution?.responsePromise,
};

// Automatically remove execution once the postExecutePromise settles
Expand All @@ -111,8 +113,10 @@ export class ActiveExecutions {
})
.finally(() => {
this.concurrencyControl.release({ mode: executionData.executionMode });
delete this.activeExecutions[executionId];
this.logger.debug('Execution removed', { executionId });
if (this.activeExecutions[executionId]?.status !== 'waiting') {
delete this.activeExecutions[executionId];
this.logger.debug('Execution removed', { executionId });
}
});

this.logger.debug('Execution added', { executionId });
Expand Down Expand Up @@ -227,7 +231,7 @@ export class ActiveExecutions {
}
}

private getExecution(executionId: string): IExecutingWorkflowData {
getExecution(executionId: string): IExecutingWorkflowData {
const execution = this.activeExecutions[executionId];
if (!execution) {
throw new ExecutionNotFoundError(executionId);
Expand Down

0 comments on commit e8635f2

Please sign in to comment.