Skip to content

Commit

Permalink
cacheable - adding in stampede protection via coalesce async (#891)
Browse files Browse the repository at this point in the history
* cacheable - adding in stampede protection via coalesce async

* Update README.md

* updating development packages

* version bump to v1.8.4
  • Loading branch information
jaredwray authored Nov 10, 2024
1 parent 66c57aa commit 9a7b140
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 15 deletions.
26 changes: 25 additions & 1 deletion packages/cacheable/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* Scalable and trusted storage engine by Keyv
* Memory Caching with LRU and Expiration `CacheableMemory`
* Resilient to failures with try/catch and offline
* Wrap / Memoization for Sync and Async Functions
* Wrap / Memoization for Sync and Async Functions with Stampede Protection
* Hooks and Events to extend functionality
* Shorthand for ttl in milliseconds `(1m = 60000) (1h = 3600000) (1d = 86400000)`
* Non-blocking operations for layer 2 caching
Expand Down Expand Up @@ -310,6 +310,30 @@ const wrappedFunction = cache.wrap(asyncFunction, options);
console.log(await wrappedFunction(2)); // 4
console.log(await wrappedFunction(2)); // 4 from cache
```
With `Cacheable` we have also included stampede protection so that a `Promise` based call will only be called once if multiple requests of the same are executed at the same time. Here is an example of how to test for stampede protection:

```javascript
import { Cacheable } from 'cacheable';
const asyncFunction = async (value: number) => {
return value;
};

const cache = new Cacheable();
const options = {
ttl: '1h', // 1 hour
keyPrefix: 'p1', // key prefix. This is used if you have multiple functions and need to set a unique prefix.
}

const wrappedFunction = cache.wrap(asyncFunction, options);
const promises = [];
for (let i = 0; i < 10; i++) {
promises.push(wrappedFunction(i));
}

const results = await Promise.all(promises); // all results should be the same

console.log(results); // [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
```

In this example we are wrapping an `async` function in a cache with a `ttl` of `1 hour`. This will cache the result of the function for `1 hour` and then expire the value. You can also wrap a `sync` function in a cache:

Expand Down
6 changes: 3 additions & 3 deletions packages/cacheable/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "cacheable",
"version": "1.8.3",
"version": "1.8.4",
"description": "Simple Caching Engine using Keyv",
"type": "module",
"main": "./dist/index.cjs",
Expand Down Expand Up @@ -29,9 +29,9 @@
},
"devDependencies": {
"@keyv/redis": "^3.0.1",
"@types/node": "^22.8.4",
"@types/node": "^22.9.0",
"@vitest/coverage-v8": "^2.1.4",
"lru-cache": "^11.0.1",
"lru-cache": "^11.0.2",
"rimraf": "^6.0.1",
"tsup": "^8.3.5",
"typescript": "^5.6.3",
Expand Down
92 changes: 92 additions & 0 deletions packages/cacheable/src/coalesce-async.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
type PromiseCallback<T = any, E = Error> = {
resolve: (value: T | PromiseLike<T>) => void;
reject: (reason: E) => void;
};

const callbacks = new Map<string, PromiseCallback[]>();

function hasKey(key: string): boolean {
return callbacks.has(key);
}

function addKey(key: string): void {
callbacks.set(key, []);
}

function removeKey(key: string): void {
callbacks.delete(key);
}

function addCallbackToKey<T>(key: string, callback: PromiseCallback<T>): void {
const stash = getCallbacksByKey<T>(key);
stash.push(callback);
callbacks.set(key, stash);
}

function getCallbacksByKey<T>(key: string): Array<PromiseCallback<T>> {
return callbacks.get(key) ?? [];
}

async function enqueue<T>(key: string): Promise<T> {
return new Promise<T>((resolve, reject) => {
const callback: PromiseCallback<T> = {resolve, reject};
addCallbackToKey(key, callback);
});
}

function dequeue<T>(key: string): Array<PromiseCallback<T>> {
const stash = getCallbacksByKey<T>(key);
removeKey(key);
return stash;
}

function coalesce<T>(options: {key: string; error?: Error; result?: T}): void {
const {key, error, result} = options;

for (const callback of dequeue(key)) {
if (error) {
/* c8 ignore next 3 */
callback.reject(error);
} else {
callback.resolve(result);
}
}
}

/**
* Enqueue a promise for the group identified by `key`.
*
* All requests received for the same key while a request for that key
* is already being executed will wait. Once the running request settles
* then all the waiting requests in the group will settle, too.
* This minimizes how many times the function itself runs at the same time.
* This function resolves or rejects according to the given function argument.
*
* @url https://github.com/douglascayers/promise-coalesce
*/
export async function coalesceAsync<T>(
/**
* Any identifier to group requests together.
*/
key: string,
/**
* The function to run.
*/
fnc: () => T | PromiseLike<T>,
): Promise<T> {
if (!hasKey(key)) {
addKey(key);
try {
const result = await Promise.resolve(fnc());
coalesce({key, result});
return result;
} catch (error: any) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
coalesce({key, error});
// eslint-disable-next-line @typescript-eslint/only-throw-error
throw error;
}
}

return enqueue(key);
}
26 changes: 17 additions & 9 deletions packages/cacheable/src/wrap.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {hash} from './hash.js';
import {coalesceAsync} from './coalesce-async.js';
import {type Cacheable, type CacheableMemory} from './index.js';

export type WrapFunctionOptions = {
Expand Down Expand Up @@ -39,15 +40,22 @@ export function wrap<T>(function_: AnyFunction, options: WrapOptions): AnyFuncti
const {ttl, keyPrefix, cache} = options;

return async function (...arguments_: any[]) {
const cacheKey = createWrapKey(function_, arguments_, keyPrefix);

let value = await cache.get(cacheKey) as T | undefined;

if (value === undefined) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
value = await function_(...arguments_) as T;

await cache.set(cacheKey, value, ttl);
let value;
try {
const cacheKey = createWrapKey(function_, arguments_, keyPrefix);

value = await cache.get(cacheKey) as T | undefined;

if (value === undefined) {
value = await coalesceAsync(cacheKey, async () => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
const result = await function_(...arguments_) as T;
await cache.set(cacheKey, result, ttl);
return result;
});
}
} catch {
// ignore
}

return value;
Expand Down
39 changes: 37 additions & 2 deletions packages/cacheable/test/wrap.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import {
describe, it, expect,
describe, it, expect, vi,
} from 'vitest';
import {Cacheable, CacheableMemory} from '../src/index.js';
import {Cacheable, CacheableMemory, KeyvCacheableMemory} from '../src/index.js';
import {
wrap, createWrapKey, wrapSync, type WrapOptions, type WrapSyncOptions,
} from '../src/wrap.js';
Expand Down Expand Up @@ -162,3 +162,38 @@ describe('wrap function', () => {
expect(cacheResult).toBe(undefined);
});
});

describe('wrap function with stampede protection', () => {
it('should only execute the wrapped function once when called concurrently with the same key', async () => {
const cache = new Cacheable();
const mockFunction = vi.fn().mockResolvedValue('result');
const mockedKey = createWrapKey(mockFunction, ['arg1'], 'test');
const wrappedFunction = wrap(mockFunction, {cache, keyPrefix: 'test'});

// Call the wrapped function concurrently
const [result1, result2, result3, result4] = await Promise.all([wrappedFunction('arg1'), wrappedFunction('arg1'), wrappedFunction('arg2'), wrappedFunction('arg2')]);

// Verify that the wrapped function was only called two times do to arg1 and arg2
expect(mockFunction).toHaveBeenCalledTimes(2);

// Verify that both calls returned the same result
expect(result1).toBe('result');
expect(result2).toBe('result');
expect(result3).toBe('result');

// Verify that the result was cached
expect(await cache.has(mockedKey)).toBe(true);
});

it('should handle error if the function fails', async () => {
const cache = new Cacheable();
const mockFunction = vi.fn().mockRejectedValue(new Error('Function failed'));
const mockedKey = createWrapKey(mockFunction, ['arg1'], 'test');
const wrappedFunction = wrap(mockFunction, {cache, keyPrefix: 'test'});

await wrappedFunction('arg1');

// Verify that the wrapped function was only called once
expect(mockFunction).toHaveBeenCalledTimes(1);
});
});

0 comments on commit 9a7b140

Please sign in to comment.