Skip to content

Commit

Permalink
refactor: rename enum and flow run response
Browse files Browse the repository at this point in the history
  • Loading branch information
abuaboud committed Mar 4, 2024
1 parent 33a5028 commit b350fa4
Show file tree
Hide file tree
Showing 27 changed files with 134 additions and 140 deletions.
21 changes: 10 additions & 11 deletions packages/engine/src/lib/handler/context/flow-execution-context.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { ActionType, LoopStepOutput, PauseMetadata, StepOutput, StepOutputStatus, StopResponse, assertEqual, isNil } from '@activepieces/shared'
import { ActionType, FlowRunResponse, FlowRunStatus, LoopStepOutput, PauseMetadata, StepOutput, StepOutputStatus, StopResponse, assertEqual, isNil } from '@activepieces/shared'
import { StepExecutionPath } from './step-execution-path'
import { loggingUtils } from '../../helper/logging-utils'
import { nanoid } from 'nanoid'
import { FlowExecutionResponse, FlowExecutionStatus } from '@activepieces/shared'

export enum ExecutionVerdict {
RUNNING = 'RUNNING',
Expand All @@ -12,10 +11,10 @@ export enum ExecutionVerdict {
}

type VerdictResponse = {
reason: FlowExecutionStatus.PAUSED
reason: FlowRunStatus.PAUSED
pauseMetadata: PauseMetadata
} | {
reason: FlowExecutionStatus.STOPPED
reason: FlowRunStatus.STOPPED
stopResponse: StopResponse
}

Expand Down Expand Up @@ -166,7 +165,7 @@ export class FlowExecutorContext {
})
}

public async toResponse(): Promise<FlowExecutionResponse> {
public async toResponse(): Promise<FlowRunResponse> {
const baseExecutionOutput = {
duration: this.duration,
tasks: this.tasks,
Expand All @@ -177,32 +176,32 @@ export class FlowExecutorContext {
case ExecutionVerdict.FAILED:
return {
...baseExecutionOutput,
status: FlowExecutionStatus.FAILED,
status: FlowRunStatus.FAILED,
}
case ExecutionVerdict.PAUSED: {
const verdictResponse = this.verdictResponse
if (verdictResponse?.reason !== FlowExecutionStatus.PAUSED) {
if (verdictResponse?.reason !== FlowRunStatus.PAUSED) {
throw new Error('Verdict Response should have pause metadata response')
}
return {
...baseExecutionOutput,
status: FlowExecutionStatus.PAUSED,
status: FlowRunStatus.PAUSED,
pauseMetadata: verdictResponse.pauseMetadata,
}
}
case ExecutionVerdict.RUNNING:
case ExecutionVerdict.SUCCEEDED: {
const verdictResponse = this.verdictResponse
if (verdictResponse?.reason === FlowExecutionStatus.STOPPED) {
if (verdictResponse?.reason === FlowRunStatus.STOPPED) {
return {
...baseExecutionOutput,
status: FlowExecutionStatus.STOPPED,
status: FlowRunStatus.STOPPED,
stopResponse: verdictResponse.stopResponse,
}
}
return {
...baseExecutionOutput,
status: FlowExecutionStatus.SUCCEEDED,
status: FlowRunStatus.SUCCEEDED,
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions packages/engine/src/lib/handler/piece-executor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { AUTHENTICATION_PROPERTY_NAME, GenericStepOutput, ActionType, PieceAction, StepOutputStatus, assertNotNullOrUndefined, isNil, ExecutionType, PauseType, FlowExecutionStatus } from '@activepieces/shared'
import { AUTHENTICATION_PROPERTY_NAME, GenericStepOutput, ActionType, PieceAction, StepOutputStatus, assertNotNullOrUndefined, isNil, ExecutionType, PauseType, FlowRunStatus } from '@activepieces/shared'
import { ActionHandler, BaseExecutor } from './base-executor'
import { ExecutionVerdict, FlowExecutorContext } from './context/flow-execution-context'
import { ActionContext, ConnectionsManager, PauseHook, PauseHookParams, PiecePropertyMap, StaticPropsValue, StopHook, StopHookParams, TagsManager } from '@activepieces/pieces-framework'
Expand Down Expand Up @@ -113,19 +113,19 @@ const executeAction: ActionHandler<PieceAction> = async ({ action, executionStat
const runMethodToExecute = (constants.testSingleStepMode && !isNil(pieceAction.test)) ? pieceAction.test : pieceAction.run
const output = await runMethodToExecute(context)
const newExecutionContext = executionState.addTags(hookResponse.tags)

if (hookResponse.stopped) {
assertNotNullOrUndefined(hookResponse.stopResponse, 'stopResponse')
return newExecutionContext.upsertStep(action.name, stepOutput.setOutput(output)).setVerdict(ExecutionVerdict.SUCCEEDED, {
reason: FlowExecutionStatus.STOPPED,
reason: FlowRunStatus.STOPPED,
stopResponse: hookResponse.stopResponse.response,
}).increaseTask()
}
if (hookResponse.paused) {
assertNotNullOrUndefined(hookResponse.pauseResponse, 'pauseResponse')
return newExecutionContext.upsertStep(action.name, stepOutput.setOutput(output).setStatus(StepOutputStatus.PAUSED))
.setVerdict(ExecutionVerdict.PAUSED, {
reason: FlowExecutionStatus.PAUSED,
reason: FlowRunStatus.PAUSED,
pauseMetadata: hookResponse.pauseResponse.pauseMetadata,
})
}
Expand Down
4 changes: 2 additions & 2 deletions packages/engine/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
Action,
ActionType,
isNil,
FlowExecutionResponse,
FlowRunResponse,
} from '@activepieces/shared'
import { pieceHelper } from './lib/helper/piece-helper'
import { triggerHelper } from './lib/helper/trigger-helper'
Expand All @@ -29,7 +29,7 @@ import { ExecutionVerdict, FlowExecutorContext } from './lib/handler/context/flo
import { EngineConstants } from './lib/handler/context/engine-constants'
import { testExecutionContext } from './lib/handler/context/test-execution-context'

const executeFlow = async (input: ExecuteFlowOperation, context: FlowExecutorContext): Promise<EngineResponse<FlowExecutionResponse>> => {
const executeFlow = async (input: ExecuteFlowOperation, context: FlowExecutorContext): Promise<EngineResponse<FlowRunResponse>> => {
const output = await flowExecutor.execute({
action: input.flowVersion.trigger.nextAction,
executionState: context,
Expand Down
3 changes: 2 additions & 1 deletion packages/engine/test/handler/flow-with-response.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { ExecutionVerdict, FlowExecutorContext } from '../../src/lib/handler/context/flow-execution-context'
import { buildPieceAction, generateMockEngineConstants } from './test-helper'
import { flowExecutor } from '../../src/lib/handler/flow-executor'
import { FlowRunStatus } from '@activepieces/shared'

describe('flow with response', () => {

Expand All @@ -24,7 +25,7 @@ describe('flow with response', () => {
})
expect(result.verdict).toBe(ExecutionVerdict.SUCCEEDED)
expect(result.verdictResponse).toEqual({
reason: 'STOPPED',
reason: FlowRunStatus.STOPPED,
stopResponse: response,
})
expect(result.steps.http.output).toEqual(response)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {
ActivepiecesError,
ApEdition,
ErrorCode,
FlowExecutionStatus,
FlowRunStatus,
} from '@activepieces/shared'
import { getEdition } from '../../helper/secret-helper'
import { flowRunService } from '../../flows/flow-run/flow-run-service'
Expand Down Expand Up @@ -32,7 +32,7 @@ export const platformWorkerHooks: FlowWorkerHooks = {
) {
await flowRunService.finish({
flowRunId: runId,
status: FlowExecutionStatus.QUOTA_EXCEEDED,
status: FlowRunStatus.QUOTA_EXCEEDED,
tasks: 0,
logsFileId: null,
tags: [],
Expand Down
24 changes: 12 additions & 12 deletions packages/server/api/src/app/flows/flow-run/flow-response-watcher.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { logger } from '@sentry/utils'
import {
FlowExecutionResponse,
FlowExecutionStatus,
FlowRunResponse,
FlowRunStatus,
PauseType,
apId,
} from '@activepieces/shared'
Expand Down Expand Up @@ -68,7 +68,7 @@ export const flowResponseWatcher = {
async publish(
flowRunId: string,
handlerId: string,
result: FlowExecutionResponse,
result: FlowRunResponse,
): Promise<void> {
logger.info(`[flowRunWatcher#publish] flowRunId=${flowRunId}`)
const flowResponse = await getFlowResponse(result)
Expand All @@ -81,10 +81,10 @@ export const flowResponseWatcher = {
}

async function getFlowResponse(
result: FlowExecutionResponse,
result: FlowRunResponse,
): Promise<FlowResponse> {
switch (result.status) {
case FlowExecutionStatus.PAUSED:
case FlowRunStatus.PAUSED:
if (result.pauseMetadata && result.pauseMetadata.type === PauseType.WEBHOOK) {
return {
status: StatusCodes.OK,
Expand All @@ -97,39 +97,39 @@ async function getFlowResponse(
body: {},
headers: {},
}
case FlowExecutionStatus.STOPPED:
case FlowRunStatus.STOPPED:
return {
status: result.stopResponse?.status ?? StatusCodes.OK,
body: result.stopResponse?.body,
headers: result.stopResponse?.headers ?? {},
}
case FlowExecutionStatus.INTERNAL_ERROR:
case FlowRunStatus.INTERNAL_ERROR:
return {
status: StatusCodes.INTERNAL_SERVER_ERROR,
body: {
message: 'An internal error has occurred',
},
headers: {},
}
case FlowExecutionStatus.FAILED:
case FlowRunStatus.FAILED:
return {
status: StatusCodes.INTERNAL_SERVER_ERROR,
body: {
message: 'The flow has failed and there is no response returned',
},
headers: {},
}
case FlowExecutionStatus.TIMEOUT:
case FlowExecutionStatus.RUNNING:
case FlowRunStatus.TIMEOUT:
case FlowRunStatus.RUNNING:
return {
status: StatusCodes.GATEWAY_TIMEOUT,
body: {
message: 'The request took too long to reply',
},
headers: {},
}
case FlowExecutionStatus.SUCCEEDED:
case FlowExecutionStatus.QUOTA_EXCEEDED:
case FlowRunStatus.SUCCEEDED:
case FlowRunStatus.QUOTA_EXCEEDED:
return {
status: StatusCodes.NO_CONTENT,
body: {},
Expand Down
10 changes: 5 additions & 5 deletions packages/server/api/src/app/flows/flow-run/flow-run-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
FlowRetryStrategy,
PauseType,
ResumePayload,
FlowExecutionStatus,
FlowRunStatus,
ExecutioOutputFile,
} from '@activepieces/shared'
import {
Expand Down Expand Up @@ -186,7 +186,7 @@ export const flowRunService = {
tags,
}: {
flowRunId: FlowRunId
status: FlowExecutionStatus
status: FlowRunStatus
tasks: number
tags: string[]
logsFileId: FileId | null
Expand Down Expand Up @@ -240,7 +240,7 @@ export const flowRunService = {
flowDisplayName: flowVersion.displayName,
})

flowRun.status = FlowExecutionStatus.RUNNING
flowRun.status = FlowRunStatus.RUNNING

const savedFlowRun = await flowRunRepo.save(flowRun)

Expand Down Expand Up @@ -293,7 +293,7 @@ export const flowRunService = {
const { flowRunId, logFileId, pauseMetadata } = params

await flowRunRepo.update(flowRunId, {
status: FlowExecutionStatus.PAUSED,
status: FlowRunStatus.PAUSED,
logsFileId: logFileId,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
pauseMetadata: pauseMetadata as any,
Expand Down Expand Up @@ -360,7 +360,7 @@ type GetOrCreateParams = {
type ListParams = {
projectId: ProjectId
flowId: FlowId | undefined
status: FlowExecutionStatus | undefined
status: FlowRunStatus | undefined
cursor: Cursor | null
tags?: string[]
limit: number
Expand Down
4 changes: 2 additions & 2 deletions packages/server/api/src/app/helper/engine-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import {
FlowVersion,
ExecuteFlowOperation,
PlatformRole,
FlowExecutionResponse,
FlowRunResponse,
} from '@activepieces/shared'
import { Sandbox } from 'server-worker'
import { accessTokenManager } from '../authentication/lib/access-token-manager'
Expand All @@ -55,7 +55,7 @@ type GenerateWorkerTokenParams = {
projectId: ProjectId
}

export type EngineHelperFlowResult = FlowExecutionResponse
export type EngineHelperFlowResult = FlowRunResponse

export type EngineHelperTriggerResult<
T extends TriggerHookType = TriggerHookType,
Expand Down
6 changes: 3 additions & 3 deletions packages/server/api/src/app/helper/notifications.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {
FlowExecutionStatus,
FlowRunStatus,
FlowRun,
NotificationStatus,
RunEnvironment,
Expand All @@ -19,8 +19,8 @@ export const notifications = {
}
if (
![
FlowExecutionStatus.FAILED,
FlowExecutionStatus.INTERNAL_ERROR,
FlowRunStatus.FAILED,
FlowRunStatus.INTERNAL_ERROR,
].includes(flowRun.status)
) {
return
Expand Down
24 changes: 12 additions & 12 deletions packages/server/api/src/app/workers/flow-worker/flow-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ import {
FileCompression,
FileId,
FileType,
FlowExecutionResponse,
FlowExecutionStatus,
FlowRunStatus,
flowHelper,
FlowRunId,
FlowVersion,
Expand All @@ -25,6 +24,7 @@ import {
SourceCode,
Trigger,
TriggerType,
FlowRunResponse,
} from '@activepieces/shared'
import { Sandbox } from 'server-worker'
import { flowVersionService } from '../../flows/flow-version/flow-version.service'
Expand All @@ -48,7 +48,7 @@ import { logSerializer } from 'server-worker'
type FinishExecutionParams = {
flowRunId: FlowRunId
logFileId: FileId
result: FlowExecutionResponse
result: FlowRunResponse
}

type LoadInputAndLogFileIdParams = {
Expand Down Expand Up @@ -94,7 +94,7 @@ const finishExecution = async (

const { flowRunId, logFileId, result } = params

if (result.status === FlowExecutionStatus.PAUSED) {
if (result.status === FlowRunStatus.PAUSED) {
await flowRunService.pause({
flowRunId,
logFileId,
Expand All @@ -113,10 +113,10 @@ const finishExecution = async (
}

const getTerminalStatus = (
status: FlowExecutionStatus,
): FlowExecutionStatus => {
return status == FlowExecutionStatus.STOPPED
? FlowExecutionStatus.SUCCEEDED
status: FlowRunStatus,
): FlowRunStatus => {
return status == FlowRunStatus.STOPPED
? FlowRunStatus.SUCCEEDED
: status
}

Expand Down Expand Up @@ -160,7 +160,7 @@ const loadInputAndLogFileId = async ({
}
case ExecutionType.BEGIN:
if (!isNil(flowRun.logsFileId)) {
if (flowRun.status !== FlowExecutionStatus.INTERNAL_ERROR) {
if (flowRun.status !== FlowRunStatus.INTERNAL_ERROR) {
const trigger = Object.values(flowRun.steps).find((step) => flowHelper.isTrigger(step.type))
assertNotNullOrUndefined(
trigger,
Expand Down Expand Up @@ -280,7 +280,7 @@ async function executeFlow(jobData: OneTimeJobData): Promise<void> {
) {
await flowRunService.finish({
flowRunId: jobData.runId,
status: FlowExecutionStatus.QUOTA_EXCEEDED,
status: FlowRunStatus.QUOTA_EXCEEDED,
tasks: 0,
logsFileId: null,
tags: [],
Expand All @@ -292,7 +292,7 @@ async function executeFlow(jobData: OneTimeJobData): Promise<void> {
) {
await flowRunService.finish({
flowRunId: jobData.runId,
status: FlowExecutionStatus.TIMEOUT,
status: FlowRunStatus.TIMEOUT,
// TODO REVIST THIS
tasks: 10,
logsFileId: null,
Expand All @@ -302,7 +302,7 @@ async function executeFlow(jobData: OneTimeJobData): Promise<void> {
else {
await flowRunService.finish({
flowRunId: jobData.runId,
status: FlowExecutionStatus.INTERNAL_ERROR,
status: FlowRunStatus.INTERNAL_ERROR,
tasks: 0,
logsFileId: null,
tags: [],
Expand Down
Loading

0 comments on commit b350fa4

Please sign in to comment.