Skip to content

Commit

Permalink
fix(server): distinguish local mutex correctly (#9444)
Browse files Browse the repository at this point in the history
  • Loading branch information
darkskygit authored Dec 31, 2024
1 parent 1c6c219 commit f64d62d
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 5 deletions.
7 changes: 4 additions & 3 deletions packages/backend/server/src/base/mutex/locker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ import { Lock } from './lock';
const lockScript = `local key = KEYS[1]
local owner = ARGV[1]
-- if lock is not exists or lock is owned by the owner
-- then set lock to the owner and return 1, otherwise return 0
-- if lock is not exists then set lock to the owner and return 1, otherwise return 0
-- if the lock is not released correctly due to unexpected reasons
-- lock will be released after 60 seconds
if redis.call("get", key) == owner or redis.call("set", key, owner, "NX", "EX", 60) then
if redis.call("get", key) == owner then
return 0
elseif redis.call("set", key, owner, "NX", "EX", 60) then
return 1
else
return 0
Expand Down
8 changes: 6 additions & 2 deletions packages/backend/server/src/base/mutex/mutex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { randomUUID } from 'node:crypto';
import { Inject, Injectable, Logger, Scope } from '@nestjs/common';
import { ModuleRef, REQUEST } from '@nestjs/core';
import type { Request } from 'express';
import { nanoid } from 'nanoid';

import { GraphqlContext } from '../graphql';
import { retryable } from '../utils/promise';
Expand All @@ -14,7 +15,7 @@ export const MUTEX_WAIT = 100;
@Injectable()
export class Mutex {
protected logger = new Logger(Mutex.name);
private readonly clusterIdentifier = `cluster:${randomUUID()}`;
private readonly clusterIdentifier = `cluster:${nanoid()}`;

constructor(protected readonly locker: Locker) {}

Expand All @@ -39,7 +40,10 @@ export class Mutex {
* @param key resource key
* @returns LockGuard
*/
async acquire(key: string, owner: string = this.clusterIdentifier) {
async acquire(
key: string,
owner: string = `${this.clusterIdentifier}:${nanoid()}`
) {
try {
return await retryable(
() => this.locker.lock(owner, key),
Expand Down
81 changes: 81 additions & 0 deletions packages/backend/server/tests/mutex.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import { randomUUID } from 'node:crypto';

import { TestingModule } from '@nestjs/testing';
import ava, { TestFn } from 'ava';
import Sinon from 'sinon';

import { Locker, Mutex } from '../src/base/mutex';
import { SessionRedis } from '../src/base/redis';
import { createTestingModule, sleep } from './utils';

const test = ava as TestFn<{
module: TestingModule;
mutex: Mutex;
locker: Locker;
session: SessionRedis;
}>;

test.beforeEach(async t => {
const module = await createTestingModule();

t.context.module = module;
t.context.mutex = module.get(Mutex);
t.context.locker = module.get(Locker);
t.context.session = module.get(SessionRedis);
});

test.afterEach(async t => {
await t.context.module.close();
});

const lockerPrefix = randomUUID();
test('should be able to acquire lock', async t => {
const { mutex } = t.context;

{
t.truthy(
await mutex.acquire(`${lockerPrefix}1`),
'should be able to acquire lock'
);
t.falsy(
await mutex.acquire(`${lockerPrefix}1`),
'should not be able to acquire lock again'
);
}

{
const lock1 = await mutex.acquire(`${lockerPrefix}2`);
t.truthy(lock1);
await lock1?.release();
const lock2 = await mutex.acquire(`${lockerPrefix}2`);
t.truthy(lock2);
}
});

test('should be able to acquire lock parallel', async t => {
const { mutex, locker } = t.context;
const spyedLocker = Sinon.spy(locker, 'lock');
const requestLock = async (key: string) => {
const lock = mutex.acquire(key);
await using _lock = await lock;
const lastCall = spyedLocker.lastCall.returnValue;
try {
// in rare cases, the lock can be acquired
// in which case skip the error message check
await lastCall;
} catch {
await t.throwsAsync(lastCall, {
message: `Failed to acquire lock for resource [${key}]`,
});
}

await sleep(100);
};

await t.notThrowsAsync(
Promise.all(
Array.from({ length: 10 }, _ => requestLock(`${lockerPrefix}3`))
),
'should be able to acquire lock parallel'
);
});

0 comments on commit f64d62d

Please sign in to comment.