Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(): Prevent sub workflow events release early + redis unlink #11641

Merged
merged 7 commits into from
Feb 27, 2025
9 changes: 9 additions & 0 deletions .changeset/large-experts-provide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"@medusajs/cache-redis": patch
"@medusajs/event-bus-redis": patch
"@medusajs/workflow-engine-redis": patch
"@medusajs/locking-redis": patch
"@medusajs/workflows-sdk": patch
---

chore(): Prevent sub workflow events release early + redis unlink
4 changes: 3 additions & 1 deletion packages/core/workflows-sdk/src/helper/workflow-export.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ function createContextualWorkflowRunner<

const { eventGroupId, parentStepIdempotencyKey } = context

attachOnFinishReleaseEvents(events, flow, { logOnError })
if (!parentStepIdempotencyKey) {
attachOnFinishReleaseEvents(events, flow, { logOnError })
}

const flowMetadata = {
eventGroupId,
Expand Down
4 changes: 2 additions & 2 deletions packages/modules/cache-redis/src/services/redis-cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class RedisCacheService implements ICacheService {
return JSON.parse(cached)
}
} catch (err) {
await this.redis.del(cacheKey)
await this.redis.unlink(cacheKey)
}
return null
}
Expand All @@ -92,7 +92,7 @@ class RedisCacheService implements ICacheService {
if (keys.length > 0) {
const deletePipeline = this.redis.pipeline()
for (const key of keys) {
deletePipeline.del(key)
deletePipeline.unlink(key)
}

await deletePipeline.exec()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const redisMock = {
lrange: () => jest.fn(),
disconnect: () => jest.fn(),
expire: () => jest.fn(),
unlink: () => jest.fn(),
} as unknown as Redis

const simpleModuleOptions = { redisUrl: "test-url" }
Expand Down Expand Up @@ -63,7 +64,7 @@ describe("RedisEventBusService", () => {
{
connection: expect.any(Object),
prefix: "RedisEventBusService",
autorun: false
autorun: false,
}
)
})
Expand Down Expand Up @@ -269,15 +270,15 @@ describe("RedisEventBusService", () => {
},
]

redis.del = jest.fn()
redis.unlink = jest.fn()

await eventBus.emit(events, options)

// Expect 1 event to have been send
// Expect 2 pushes to redis as there are 2 groups of events to push
expect(queue.addBulk).toHaveBeenCalledTimes(1)
expect(redis.rpush).toHaveBeenCalledTimes(2)
expect(redis.del).not.toHaveBeenCalled()
expect(redis.unlink).not.toHaveBeenCalled()

const [testGroup1Event] = (eventBus as any).buildEvents(
[events[0]],
Expand Down Expand Up @@ -314,12 +315,12 @@ describe("RedisEventBusService", () => {

expect(queue.addBulk).toHaveBeenCalledTimes(1)
expect(queue.addBulk).toHaveBeenCalledWith([testGroup1Event])
expect(redis.del).toHaveBeenCalledTimes(1)
expect(redis.del).toHaveBeenCalledWith("staging:test-group-1")
expect(redis.unlink).toHaveBeenCalledTimes(1)
expect(redis.unlink).toHaveBeenCalledWith("staging:test-group-1")

queue = (eventBus as any).queue_
queue.addBulk = jest.fn()
redis.del = jest.fn()
redis.unlink = jest.fn()

await eventBus.releaseGroupedEvents("test-group-2")

Expand All @@ -328,8 +329,8 @@ describe("RedisEventBusService", () => {
testGroup2Event,
testGroup2Event2,
])
expect(redis.del).toHaveBeenCalledTimes(1)
expect(redis.del).toHaveBeenCalledWith("staging:test-group-2")
expect(redis.unlink).toHaveBeenCalledTimes(1)
expect(redis.unlink).toHaveBeenCalledWith("staging:test-group-2")
})
})
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
return
}

await this.eventBusRedisConnection_.del(`staging:${eventGroupId}`)
await this.eventBusRedisConnection_.unlink(`staging:${eventGroupId}`)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ export class RedisLockingProvider implements ILockingProvider {
const currentOwner = currentOwners?.[idx]?.[1]

if (currentOwner === ownerId) {
deletePipeline.del(key)
deletePipeline.unlink(key)
}
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async function deleteKeysByPattern(pattern) {
for await (const keys of stream) {
if (keys.length) {
const pipeline = redis.pipeline()
keys.forEach((key) => pipeline.del(key))
keys.forEach((key) => pipeline.unlink(key))
await pipeline.exec()
}
}
Expand Down
Loading