diff --git a/.changeset/large-experts-provide.md b/.changeset/large-experts-provide.md new file mode 100644 index 0000000000000..44df68080aac8 --- /dev/null +++ b/.changeset/large-experts-provide.md @@ -0,0 +1,9 @@ +--- +"@medusajs/cache-redis": patch +"@medusajs/event-bus-redis": patch +"@medusajs/workflow-engine-redis": patch +"@medusajs/locking-redis": patch +"@medusajs/workflows-sdk": patch +--- + +chore(): Prevent sub workflow events release early + redis unlink diff --git a/packages/core/workflows-sdk/src/helper/workflow-export.ts b/packages/core/workflows-sdk/src/helper/workflow-export.ts index 828f01656f8f7..806cfe5a8ad90 100644 --- a/packages/core/workflows-sdk/src/helper/workflow-export.ts +++ b/packages/core/workflows-sdk/src/helper/workflow-export.ts @@ -93,7 +93,9 @@ function createContextualWorkflowRunner< const { eventGroupId, parentStepIdempotencyKey } = context - attachOnFinishReleaseEvents(events, flow, { logOnError }) + if (!parentStepIdempotencyKey) { + attachOnFinishReleaseEvents(events, flow, { logOnError }) + } const flowMetadata = { eventGroupId, diff --git a/packages/modules/cache-redis/src/services/redis-cache.ts b/packages/modules/cache-redis/src/services/redis-cache.ts index 217af91fa181a..fc2cb687d1015 100644 --- a/packages/modules/cache-redis/src/services/redis-cache.ts +++ b/packages/modules/cache-redis/src/services/redis-cache.ts @@ -66,7 +66,7 @@ class RedisCacheService implements ICacheService { return JSON.parse(cached) } } catch (err) { - await this.redis.del(cacheKey) + await this.redis.unlink(cacheKey) } return null } @@ -92,7 +92,7 @@ class RedisCacheService implements ICacheService { if (keys.length > 0) { const deletePipeline = this.redis.pipeline() for (const key of keys) { - deletePipeline.del(key) + deletePipeline.unlink(key) } await deletePipeline.exec() diff --git a/packages/modules/event-bus-redis/src/services/__tests__/event-bus.ts b/packages/modules/event-bus-redis/src/services/__tests__/event-bus.ts index a504b9d84b649..2e39bb4270c8d 100644 --- a/packages/modules/event-bus-redis/src/services/__tests__/event-bus.ts +++ b/packages/modules/event-bus-redis/src/services/__tests__/event-bus.ts @@ -27,6 +27,7 @@ const redisMock = { lrange: () => jest.fn(), disconnect: () => jest.fn(), expire: () => jest.fn(), + unlink: () => jest.fn(), } as unknown as Redis const simpleModuleOptions = { redisUrl: "test-url" } @@ -63,7 +64,7 @@ describe("RedisEventBusService", () => { { connection: expect.any(Object), prefix: "RedisEventBusService", - autorun: false + autorun: false, } ) }) @@ -269,7 +270,7 @@ describe("RedisEventBusService", () => { }, ] - redis.del = jest.fn() + redis.unlink = jest.fn() await eventBus.emit(events, options) @@ -277,7 +278,7 @@ describe("RedisEventBusService", () => { // Expect 2 pushes to redis as there are 2 groups of events to push expect(queue.addBulk).toHaveBeenCalledTimes(1) expect(redis.rpush).toHaveBeenCalledTimes(2) - expect(redis.del).not.toHaveBeenCalled() + expect(redis.unlink).not.toHaveBeenCalled() const [testGroup1Event] = (eventBus as any).buildEvents( [events[0]], @@ -314,12 +315,12 @@ describe("RedisEventBusService", () => { expect(queue.addBulk).toHaveBeenCalledTimes(1) expect(queue.addBulk).toHaveBeenCalledWith([testGroup1Event]) - expect(redis.del).toHaveBeenCalledTimes(1) - expect(redis.del).toHaveBeenCalledWith("staging:test-group-1") + expect(redis.unlink).toHaveBeenCalledTimes(1) + expect(redis.unlink).toHaveBeenCalledWith("staging:test-group-1") queue = (eventBus as any).queue_ queue.addBulk = jest.fn() - redis.del = jest.fn() + redis.unlink = jest.fn() await eventBus.releaseGroupedEvents("test-group-2") @@ -328,8 +329,8 @@ describe("RedisEventBusService", () => { testGroup2Event, testGroup2Event2, ]) - expect(redis.del).toHaveBeenCalledTimes(1) - expect(redis.del).toHaveBeenCalledWith("staging:test-group-2") + expect(redis.unlink).toHaveBeenCalledTimes(1) + expect(redis.unlink).toHaveBeenCalledWith("staging:test-group-2") }) }) }) diff --git a/packages/modules/event-bus-redis/src/services/event-bus-redis.ts b/packages/modules/event-bus-redis/src/services/event-bus-redis.ts index ba27a3d7576a0..b921ea05725e9 100644 --- a/packages/modules/event-bus-redis/src/services/event-bus-redis.ts +++ b/packages/modules/event-bus-redis/src/services/event-bus-redis.ts @@ -223,7 +223,7 @@ export default class RedisEventBusService extends AbstractEventBusModuleService return } - await this.eventBusRedisConnection_.del(`staging:${eventGroupId}`) + await this.eventBusRedisConnection_.unlink(`staging:${eventGroupId}`) } /** diff --git a/packages/modules/providers/locking-redis/src/services/redis-lock.ts b/packages/modules/providers/locking-redis/src/services/redis-lock.ts index 81c1e365176d4..52caf135a96c9 100644 --- a/packages/modules/providers/locking-redis/src/services/redis-lock.ts +++ b/packages/modules/providers/locking-redis/src/services/redis-lock.ts @@ -258,7 +258,7 @@ export class RedisLockingProvider implements ILockingProvider { const currentOwner = currentOwners?.[idx]?.[1] if (currentOwner === ownerId) { - deletePipeline.del(key) + deletePipeline.unlink(key) } }) diff --git a/packages/modules/workflow-engine-redis/integration-tests/utils/database.ts b/packages/modules/workflow-engine-redis/integration-tests/utils/database.ts index 7b027f5995841..aeb91c562307c 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/utils/database.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/utils/database.ts @@ -33,7 +33,7 @@ async function deleteKeysByPattern(pattern) { for await (const keys of stream) { if (keys.length) { const pipeline = redis.pipeline() - keys.forEach((key) => pipeline.del(key)) + keys.forEach((key) => pipeline.unlink(key)) await pipeline.exec() } }