Skip to content

Commit

Permalink
fix(worker): digest by key (#7569)
Browse files Browse the repository at this point in the history
  • Loading branch information
djabarovgeorge authored Jan 23, 2025
1 parent 75d2a5c commit 692100c
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ export const DigestControlValues = () => {
return null;
}

const { ['amount']: amount, ['unit']: unit, ['cron']: cron } = uiSchema.properties ?? {};
const { ['amount']: amount, ['digestKey']: digestKey, ['unit']: unit, ['cron']: cron } = uiSchema.properties ?? {};

return (
<div className="flex flex-col">
{/* {digestKey && (
{digestKey && (
<>
<SidebarContent size="lg">
{getComponentByType({
Expand All @@ -25,7 +25,7 @@ export const DigestControlValues = () => {
</SidebarContent>
<Separator />
</>
)} */}
)}
{((amount && unit) || cron) && (
<>
<SidebarContent size="lg">
Expand Down
14 changes: 8 additions & 6 deletions apps/worker/src/app/workflow/usecases/add-job/add-job.usecase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,14 @@ export class AddJob {
private async updateMetadata(response: ExecuteOutput, command: AddJobCommand) {
let metadata = {} as IWorkflowStepMetadata;
const outputs = response.outputs as DigestOutput;
// digest value is pre-computed by framework and passed as digestKey
const outputDigestValue = outputs?.digestKey;
const digestType = getDigestType(outputs);

if (isTimedDigestOutput(outputs)) {
metadata = {
type: DigestTypeEnum.TIMED,
digestKey: outputs?.digestKey,
digestValue: outputDigestValue,
timed: { cronExpression: outputs?.cron },
} as IDigestTimedMetadata;

Expand All @@ -278,7 +280,7 @@ export class AddJob {
{
$set: {
'digest.type': metadata.type,
'digest.digestKey': metadata.digestKey,
'digest.digestValue': metadata.digestValue,
'digest.amount': metadata.amount,
'digest.unit': metadata.unit,
'digest.timed.cronExpression': metadata.timed?.cronExpression,
Expand All @@ -291,7 +293,7 @@ export class AddJob {
metadata = {
type: digestType,
amount: outputs?.amount,
digestKey: outputs?.digestKey,
digestValue: outputDigestValue,
unit: outputs.unit ? castUnitToDigestUnitEnum(outputs?.unit) : undefined,
backoff: digestType === DigestTypeEnum.BACKOFF,
backoffAmount: outputs.lookBackWindow?.amount,
Expand All @@ -306,7 +308,7 @@ export class AddJob {
{
$set: {
'digest.type': metadata.type,
'digest.digestKey': metadata.digestKey,
'digest.digestValue': metadata.digestValue,
'digest.amount': metadata.amount,
'digest.unit': metadata.unit,
'digest.backoff': metadata.backoff,
Expand All @@ -321,7 +323,7 @@ export class AddJob {
metadata = {
type: digestType,
amount: outputs?.amount,
digestKey: outputs?.digestKey,
digestValue: outputDigestValue,
unit: outputs.unit ? castUnitToDigestUnitEnum(outputs?.unit) : undefined,
} as IDigestRegularMetadata;

Expand All @@ -333,7 +335,7 @@ export class AddJob {
{
$set: {
'digest.type': metadata.type,
'digest.digestKey': metadata.digestKey,
'digest.digestValue': metadata.digestValue,
'digest.amount': metadata.amount,
'digest.unit': metadata.unit,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export class MergeOrCreateDigest {

const digestMeta = job.digest as IDigestBaseMetadata;
const digestKey = digestMeta?.digestKey;
const digestValue = getNestedValue(job.payload, digestKey);
const digestValue = digestMeta?.digestValue ?? getNestedValue(job.payload, digestKey);

const digestAction = command.filtered
? { digestResult: DigestCreationResultEnum.SKIPPED }
Expand Down Expand Up @@ -150,9 +150,13 @@ export class MergeOrCreateDigest {
}

private getLockKey(job: JobEntity, digestKey: string | undefined, digestValue: string | number | undefined): string {
let resource = `environment:${job._environmentId}:template:${job._templateId}:subscriber:${job._subscriberId}`;
const resource = `environment:${job._environmentId}:template:${job._templateId}:subscriber:${job._subscriberId}`;
if (digestKey && digestValue) {
resource = `${resource}:digestKey:${digestKey}:digestValue:${digestValue}`;
return `${resource}:digestKey:${digestKey}:digestValue:${digestValue}`;
}

if (digestValue) {
return `${resource}:digestValue:${digestValue}`;
}

return resource;
Expand Down
23 changes: 14 additions & 9 deletions libs/dal/src/repositories/job/job.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,10 @@ export class JobRepository extends BaseRepository<JobDBModel, JobEntity, Enforce
job.digest?.type === DigestTypeEnum.BACKOFF ||
(job.digest as IDigestRegularMetadata)?.backoff ||
(digestMeta?.backoff && digestMeta?.backoff);
const digestQuery = this.buildDigestQuery(digestKey, digestValue);

if (isBackoff) {
const trigger = await this.getTrigger(job, digestMeta, digestKey, digestValue);
const trigger = await this.getTriggerJob(job, digestMeta, digestQuery);
if (!trigger) {
return {
digestResult: DigestCreationResultEnum.SKIPPED,
Expand Down Expand Up @@ -219,7 +220,7 @@ export class JobRepository extends BaseRepository<JobDBModel, JobEntity, Enforce
_templateId: job._templateId,
_environmentId: this.convertStringToObjectId(job._environmentId),
_subscriberId: this.convertStringToObjectId(job._subscriberId),
...(digestKey && { [`payload.${digestKey}`]: digestValue }),
...digestQuery,
},
'_id _notificationId'
);
Expand Down Expand Up @@ -252,18 +253,22 @@ export class JobRepository extends BaseRepository<JobDBModel, JobEntity, Enforce
};
}

private buildDigestQuery(digestKey: string | undefined, digestValue: string | number | undefined) {
const digestQueryV1 = digestKey ? { [`payload.${digestKey}`]: digestValue } : null;
// Digest key parsing is handled by the framework, leaving only the digest value available here
const digestQueryV2 = !digestKey && digestValue ? { [`digest.digestValue`]: digestValue } : null;
const digestQuery = digestQueryV1 || digestQueryV2;

return digestQuery || {};
}

private getBackoffDate(metadata: IDigestRegularMetadata | undefined) {
return sub(new Date(), {
[metadata?.backoffUnit as string]: metadata?.backoffAmount,
});
}

private getTrigger(
job: JobEntity,
metadata?: IDigestRegularMetadata,
digestKey?: string,
digestValue?: string | number
) {
private getTriggerJob(job: JobEntity, metadata?: IDigestRegularMetadata, digestQuery?: Record<string, unknown>) {
const query = {
updatedAt: {
$gte: this.getBackoffDate(metadata),
Expand All @@ -276,7 +281,7 @@ export class JobRepository extends BaseRepository<JobDBModel, JobEntity, Enforce
type: StepTypeEnum.TRIGGER,
_environmentId: job._environmentId,
_subscriberId: job._subscriberId,
...(digestKey && { [`payload.${digestKey}`]: digestValue }),
...digestQuery,
};

return this.findOne(query);
Expand Down
3 changes: 3 additions & 0 deletions libs/dal/src/repositories/job/job.schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ const jobSchema = new Schema<JobDBModel>(
digestKey: {
type: Schema.Types.String,
},
digestValue: {
type: Schema.Types.String,
},
type: {
type: Schema.Types.String,
},
Expand Down
1 change: 1 addition & 0 deletions packages/shared/src/entities/step/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ export interface IAmountAndUnitDigest {

export interface IDigestBaseMetadata extends IAmountAndUnitDigest {
digestKey?: string;
digestValue?: string;
}

export interface IDigestRegularMetadata extends IDigestBaseMetadata {
Expand Down

0 comments on commit 692100c

Please sign in to comment.