From 692100c2feb47b5c1fe8b95125b0609c65bbfe21 Mon Sep 17 00:00:00 2001
From: George Djabarov <39195835+djabarovgeorge@users.noreply.github.com>
Date: Thu, 23 Jan 2025 15:55:10 +0200
Subject: [PATCH] fix(worker): digest by key (#7569)
---
.../steps/digest/digest-control-values.tsx | 6 ++---
.../usecases/add-job/add-job.usecase.ts | 14 ++++++-----
.../add-job/merge-or-create-digest.usecase.ts | 10 +++++---
.../src/repositories/job/job.repository.ts | 23 +++++++++++--------
libs/dal/src/repositories/job/job.schema.ts | 3 +++
packages/shared/src/entities/step/index.ts | 1 +
6 files changed, 36 insertions(+), 21 deletions(-)
diff --git a/apps/dashboard/src/components/workflow-editor/steps/digest/digest-control-values.tsx b/apps/dashboard/src/components/workflow-editor/steps/digest/digest-control-values.tsx
index a5e4eae3bd1..edfe866de89 100644
--- a/apps/dashboard/src/components/workflow-editor/steps/digest/digest-control-values.tsx
+++ b/apps/dashboard/src/components/workflow-editor/steps/digest/digest-control-values.tsx
@@ -12,11 +12,11 @@ export const DigestControlValues = () => {
return null;
}
- const { ['amount']: amount, ['unit']: unit, ['cron']: cron } = uiSchema.properties ?? {};
+ const { ['amount']: amount, ['digestKey']: digestKey, ['unit']: unit, ['cron']: cron } = uiSchema.properties ?? {};
return (
- {/* {digestKey && (
+ {digestKey && (
<>
{getComponentByType({
@@ -25,7 +25,7 @@ export const DigestControlValues = () => {
>
- )} */}
+ )}
{((amount && unit) || cron) && (
<>
diff --git a/apps/worker/src/app/workflow/usecases/add-job/add-job.usecase.ts b/apps/worker/src/app/workflow/usecases/add-job/add-job.usecase.ts
index 453b5428e1c..2dc4d6e81a5 100644
--- a/apps/worker/src/app/workflow/usecases/add-job/add-job.usecase.ts
+++ b/apps/worker/src/app/workflow/usecases/add-job/add-job.usecase.ts
@@ -261,12 +261,14 @@ export class AddJob {
private async updateMetadata(response: ExecuteOutput, command: AddJobCommand) {
let metadata = {} as IWorkflowStepMetadata;
const outputs = response.outputs as DigestOutput;
+ // digest value is pre-computed by framework and passed as digestKey
+ const outputDigestValue = outputs?.digestKey;
const digestType = getDigestType(outputs);
if (isTimedDigestOutput(outputs)) {
metadata = {
type: DigestTypeEnum.TIMED,
- digestKey: outputs?.digestKey,
+ digestValue: outputDigestValue,
timed: { cronExpression: outputs?.cron },
} as IDigestTimedMetadata;
@@ -278,7 +280,7 @@ export class AddJob {
{
$set: {
'digest.type': metadata.type,
- 'digest.digestKey': metadata.digestKey,
+ 'digest.digestValue': metadata.digestValue,
'digest.amount': metadata.amount,
'digest.unit': metadata.unit,
'digest.timed.cronExpression': metadata.timed?.cronExpression,
@@ -291,7 +293,7 @@ export class AddJob {
metadata = {
type: digestType,
amount: outputs?.amount,
- digestKey: outputs?.digestKey,
+ digestValue: outputDigestValue,
unit: outputs.unit ? castUnitToDigestUnitEnum(outputs?.unit) : undefined,
backoff: digestType === DigestTypeEnum.BACKOFF,
backoffAmount: outputs.lookBackWindow?.amount,
@@ -306,7 +308,7 @@ export class AddJob {
{
$set: {
'digest.type': metadata.type,
- 'digest.digestKey': metadata.digestKey,
+ 'digest.digestValue': metadata.digestValue,
'digest.amount': metadata.amount,
'digest.unit': metadata.unit,
'digest.backoff': metadata.backoff,
@@ -321,7 +323,7 @@ export class AddJob {
metadata = {
type: digestType,
amount: outputs?.amount,
- digestKey: outputs?.digestKey,
+ digestValue: outputDigestValue,
unit: outputs.unit ? castUnitToDigestUnitEnum(outputs?.unit) : undefined,
} as IDigestRegularMetadata;
@@ -333,7 +335,7 @@ export class AddJob {
{
$set: {
'digest.type': metadata.type,
- 'digest.digestKey': metadata.digestKey,
+ 'digest.digestValue': metadata.digestValue,
'digest.amount': metadata.amount,
'digest.unit': metadata.unit,
},
diff --git a/apps/worker/src/app/workflow/usecases/add-job/merge-or-create-digest.usecase.ts b/apps/worker/src/app/workflow/usecases/add-job/merge-or-create-digest.usecase.ts
index efc8742c72d..f8cdb977c37 100644
--- a/apps/worker/src/app/workflow/usecases/add-job/merge-or-create-digest.usecase.ts
+++ b/apps/worker/src/app/workflow/usecases/add-job/merge-or-create-digest.usecase.ts
@@ -47,7 +47,7 @@ export class MergeOrCreateDigest {
const digestMeta = job.digest as IDigestBaseMetadata;
const digestKey = digestMeta?.digestKey;
- const digestValue = getNestedValue(job.payload, digestKey);
+ const digestValue = digestMeta?.digestValue ?? getNestedValue(job.payload, digestKey);
const digestAction = command.filtered
? { digestResult: DigestCreationResultEnum.SKIPPED }
@@ -150,9 +150,13 @@ export class MergeOrCreateDigest {
}
private getLockKey(job: JobEntity, digestKey: string | undefined, digestValue: string | number | undefined): string {
- let resource = `environment:${job._environmentId}:template:${job._templateId}:subscriber:${job._subscriberId}`;
+ const resource = `environment:${job._environmentId}:template:${job._templateId}:subscriber:${job._subscriberId}`;
if (digestKey && digestValue) {
- resource = `${resource}:digestKey:${digestKey}:digestValue:${digestValue}`;
+ return `${resource}:digestKey:${digestKey}:digestValue:${digestValue}`;
+ }
+
+ if (digestValue) {
+ return `${resource}:digestValue:${digestValue}`;
}
return resource;
diff --git a/libs/dal/src/repositories/job/job.repository.ts b/libs/dal/src/repositories/job/job.repository.ts
index b5db39c6236..f295bc9a9a1 100644
--- a/libs/dal/src/repositories/job/job.repository.ts
+++ b/libs/dal/src/repositories/job/job.repository.ts
@@ -189,9 +189,10 @@ export class JobRepository extends BaseRepository) {
const query = {
updatedAt: {
$gte: this.getBackoffDate(metadata),
@@ -276,7 +281,7 @@ export class JobRepository extends BaseRepository(
digestKey: {
type: Schema.Types.String,
},
+ digestValue: {
+ type: Schema.Types.String,
+ },
type: {
type: Schema.Types.String,
},
diff --git a/packages/shared/src/entities/step/index.ts b/packages/shared/src/entities/step/index.ts
index ed408e14904..9e57ebb1c58 100644
--- a/packages/shared/src/entities/step/index.ts
+++ b/packages/shared/src/entities/step/index.ts
@@ -88,6 +88,7 @@ export interface IAmountAndUnitDigest {
export interface IDigestBaseMetadata extends IAmountAndUnitDigest {
digestKey?: string;
+ digestValue?: string;
}
export interface IDigestRegularMetadata extends IDigestBaseMetadata {