Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cacheable - adding in stampede protection via coalesce async #891

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion packages/cacheable/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* Scalable and trusted storage engine by Keyv
* Memory Caching with LRU and Expiration `CacheableMemory`
* Resilient to failures with try/catch and offline
* Wrap / Memoization for Sync and Async Functions
* Wrap / Memoization for Sync and Async Functions with Stampede Protection
* Hooks and Events to extend functionality
* Shorthand for ttl in milliseconds `(1m = 60000) (1h = 3600000) (1d = 86400000)`
* Non-blocking operations for layer 2 caching
Expand Down Expand Up @@ -310,6 +310,30 @@ const wrappedFunction = cache.wrap(asyncFunction, options);
console.log(await wrappedFunction(2)); // 4
console.log(await wrappedFunction(2)); // 4 from cache
```
With `Cacheable` we have also included stampede protection so that a `Promise` based call will only be called once if multiple requests of the same are executed at the same time. Here is an example of how to test for stampede protection:

```javascript
import { Cacheable } from 'cacheable';
const asyncFunction = async (value: number) => {
return value;
};

const cache = new Cacheable();
const options = {
ttl: '1h', // 1 hour
keyPrefix: 'p1', // key prefix. This is used if you have multiple functions and need to set a unique prefix.
}

const wrappedFunction = cache.wrap(asyncFunction, options);
const promises = [];
for (let i = 0; i < 10; i++) {
promises.push(wrappedFunction(i));
}

const results = await Promise.all(promises); // all results should be the same

console.log(results); // [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
```

In this example we are wrapping an `async` function in a cache with a `ttl` of `1 hour`. This will cache the result of the function for `1 hour` and then expire the value. You can also wrap a `sync` function in a cache:

Expand Down
6 changes: 3 additions & 3 deletions packages/cacheable/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "cacheable",
"version": "1.8.3",
"version": "1.8.4",
"description": "Simple Caching Engine using Keyv",
"type": "module",
"main": "./dist/index.cjs",
Expand Down Expand Up @@ -29,9 +29,9 @@
},
"devDependencies": {
"@keyv/redis": "^3.0.1",
"@types/node": "^22.8.4",
"@types/node": "^22.9.0",
"@vitest/coverage-v8": "^2.1.4",
"lru-cache": "^11.0.1",
"lru-cache": "^11.0.2",
"rimraf": "^6.0.1",
"tsup": "^8.3.5",
"typescript": "^5.6.3",
Expand Down
92 changes: 92 additions & 0 deletions packages/cacheable/src/coalesce-async.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
type PromiseCallback<T = any, E = Error> = {
resolve: (value: T | PromiseLike<T>) => void;
reject: (reason: E) => void;
};

const callbacks = new Map<string, PromiseCallback[]>();

function hasKey(key: string): boolean {
return callbacks.has(key);
}

function addKey(key: string): void {
callbacks.set(key, []);
}

function removeKey(key: string): void {
callbacks.delete(key);
}

function addCallbackToKey<T>(key: string, callback: PromiseCallback<T>): void {
const stash = getCallbacksByKey<T>(key);
stash.push(callback);
callbacks.set(key, stash);
}

function getCallbacksByKey<T>(key: string): Array<PromiseCallback<T>> {
return callbacks.get(key) ?? [];
}

async function enqueue<T>(key: string): Promise<T> {
return new Promise<T>((resolve, reject) => {
const callback: PromiseCallback<T> = {resolve, reject};
addCallbackToKey(key, callback);
});
}

function dequeue<T>(key: string): Array<PromiseCallback<T>> {
const stash = getCallbacksByKey<T>(key);
removeKey(key);
return stash;
}

function coalesce<T>(options: {key: string; error?: Error; result?: T}): void {
const {key, error, result} = options;

for (const callback of dequeue(key)) {
if (error) {
/* c8 ignore next 3 */
callback.reject(error);
} else {
callback.resolve(result);
}
}
}

/**
* Enqueue a promise for the group identified by `key`.
*
* All requests received for the same key while a request for that key
* is already being executed will wait. Once the running request settles
* then all the waiting requests in the group will settle, too.
* This minimizes how many times the function itself runs at the same time.
* This function resolves or rejects according to the given function argument.
*
* @url https://github.com/douglascayers/promise-coalesce
*/
export async function coalesceAsync<T>(
/**
* Any identifier to group requests together.
*/
key: string,
/**
* The function to run.
*/
fnc: () => T | PromiseLike<T>,
): Promise<T> {
if (!hasKey(key)) {
addKey(key);
try {
const result = await Promise.resolve(fnc());
coalesce({key, result});
return result;
} catch (error: any) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
coalesce({key, error});
// eslint-disable-next-line @typescript-eslint/only-throw-error
throw error;
}
}

return enqueue(key);
}
26 changes: 17 additions & 9 deletions packages/cacheable/src/wrap.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {hash} from './hash.js';
import {coalesceAsync} from './coalesce-async.js';
import {type Cacheable, type CacheableMemory} from './index.js';

export type WrapFunctionOptions = {
Expand Down Expand Up @@ -39,15 +40,22 @@ export function wrap<T>(function_: AnyFunction, options: WrapOptions): AnyFuncti
const {ttl, keyPrefix, cache} = options;

return async function (...arguments_: any[]) {
const cacheKey = createWrapKey(function_, arguments_, keyPrefix);

let value = await cache.get(cacheKey) as T | undefined;

if (value === undefined) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
value = await function_(...arguments_) as T;

await cache.set(cacheKey, value, ttl);
let value;
try {
const cacheKey = createWrapKey(function_, arguments_, keyPrefix);

value = await cache.get(cacheKey) as T | undefined;

if (value === undefined) {
value = await coalesceAsync(cacheKey, async () => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
const result = await function_(...arguments_) as T;
await cache.set(cacheKey, result, ttl);
return result;
});
}
} catch {
// ignore
}

return value;
Expand Down
39 changes: 37 additions & 2 deletions packages/cacheable/test/wrap.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import {
describe, it, expect,
describe, it, expect, vi,
} from 'vitest';
import {Cacheable, CacheableMemory} from '../src/index.js';
import {Cacheable, CacheableMemory, KeyvCacheableMemory} from '../src/index.js';
import {
wrap, createWrapKey, wrapSync, type WrapOptions, type WrapSyncOptions,
} from '../src/wrap.js';
Expand Down Expand Up @@ -162,3 +162,38 @@ describe('wrap function', () => {
expect(cacheResult).toBe(undefined);
});
});

describe('wrap function with stampede protection', () => {
it('should only execute the wrapped function once when called concurrently with the same key', async () => {
const cache = new Cacheable();
const mockFunction = vi.fn().mockResolvedValue('result');
const mockedKey = createWrapKey(mockFunction, ['arg1'], 'test');
const wrappedFunction = wrap(mockFunction, {cache, keyPrefix: 'test'});

// Call the wrapped function concurrently
const [result1, result2, result3, result4] = await Promise.all([wrappedFunction('arg1'), wrappedFunction('arg1'), wrappedFunction('arg2'), wrappedFunction('arg2')]);

// Verify that the wrapped function was only called two times do to arg1 and arg2
expect(mockFunction).toHaveBeenCalledTimes(2);

// Verify that both calls returned the same result
expect(result1).toBe('result');
expect(result2).toBe('result');
expect(result3).toBe('result');

// Verify that the result was cached
expect(await cache.has(mockedKey)).toBe(true);
});

it('should handle error if the function fails', async () => {
const cache = new Cacheable();
const mockFunction = vi.fn().mockRejectedValue(new Error('Function failed'));
const mockedKey = createWrapKey(mockFunction, ['arg1'], 'test');
const wrappedFunction = wrap(mockFunction, {cache, keyPrefix: 'test'});

await wrappedFunction('arg1');

// Verify that the wrapped function was only called once
expect(mockFunction).toHaveBeenCalledTimes(1);
});
});
Loading