Skip to content

Commit

Permalink
Replace the old redis cache implementation
Browse files Browse the repository at this point in the history
I created a new version of the Redis cache engine.

Currently it supports the put, match and delete methods. The connection and methods will not block the load of the pages.

It was made to also support reconnection on failure; and even during a failure the website will keep running until the connection gets healthy again.

When an error occurs, the connection will be halted on purpose. Just like a open circuit breaker. The idea is to not fall into a reconnection loop. Once the website tries to cache again, the engine will try to reconnect.
  • Loading branch information
azisaka committed Oct 7, 2024
1 parent 5c682c4 commit 44a55a5
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 146 deletions.
12 changes: 6 additions & 6 deletions runtime/caches/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
} from "./fileSystem.ts";
// TODO(mcandeia) s3 and redis are not being used and together they are 30% of the bundle size of deco,
// so we should remove them for now and add it dinamically later.
// import { caches as redisCache, redis } from "./redis.ts";
import { caches as redisCache } from "./redis.ts";
// import { caches as cachesS3, isS3Available } from "./s3.ts";
import { createTieredCache } from "./tiered.ts";

Expand All @@ -24,18 +24,14 @@ export interface CacheStorageOption {

export type CacheEngine =
// TODO (mcandeia) see line 7.
// | "REDIS"
// | "S3";
// | "KV"
| "REDIS"
| "CACHE_API"
| "FILE_SYSTEM";

const cacheImplByEngine: Record<CacheEngine, CacheStorageOption> = {
// TODO (mcandeia) see line 7
// REDIS: {
// implementation: redisCache,
// isAvailable: redis !== null,
// },
// S3: {
// implementation: cachesS3,
// isAvailable: isS3Available,
Expand All @@ -54,6 +50,10 @@ const cacheImplByEngine: Record<CacheEngine, CacheStorageOption> = {
implementation: cachesFileSystem,
isAvailable: isFileSystemAvailable,
},
REDIS: {
implementation: redisCache,
isAvailable: redisCache !== null,
},
};

for (const [engine, cache] of Object.entries(cacheImplByEngine)) {
Expand Down
241 changes: 101 additions & 140 deletions runtime/caches/redis.ts
Original file line number Diff line number Diff line change
@@ -1,36 +1,28 @@
import { Redis } from "https://deno.land/x/[email protected]/mod.ts";

import { logger, tracer } from "../../observability/otel/config.ts";
import {
assertCanBeCached,
assertNoOptions,
withCacheNamespace,
requestURLSHA1,
} from "./common.ts";
import { createClient } from "npm:redis@^4.5";

const redisUrl = Deno.env.get("UPSTASH_REDIS_REST_URL");
const redisToken = Deno.env.get("UPSTASH_REDIS_REST_TOKEN");

export const redis = redisUrl && redisToken
? new Redis({
url: redisUrl,
token: redisToken,
enableTelemetry: true,
})
: null;

interface ResponseMetadata {
body: string;
status: number;
headers: [string, string][];
}
const REDIS_URL = Deno.env.get("REDIS_URL") || "redis://localhost:6379/0";

function base64encode(str: string): string {
return btoa(encodeURIComponent(str));
function serialize(response: Response): Promise<string> {
return response.text().then((body) =>
JSON.stringify({
body: body,
headers: response.headers,
status: response.status,
})
);
}

function base64decode(str: string): string {
return decodeURIComponent(atob(str));
function deserialize(raw: string): Promise<Response> {
return Promise.resolve(raw).then(JSON.parse).then((data) =>
new Response(data.body, { headers: data.headers, status: data.status })
);
}

export const caches: CacheStorage = {
delete: (_cacheName: string): Promise<boolean> => {
throw new Error("Not Implemented");
Expand All @@ -47,13 +39,33 @@ export const caches: CacheStorage = {
): Promise<Response | undefined> => {
throw new Error("Not Implemented");
},
open: (cacheName: string): Promise<Cache> => {
if (!redis) {
throw new Error(
"Redis coult not be used due to the lack of credentials.",
);
}
const requestURLSHA1 = withCacheNamespace(cacheName);
open: async (_: string): Promise<Cache> => {
const redis = createClient({
url: REDIS_URL,
}).on("error", (err: Error) => {
console.error(err);

redis.disconnect();
}).on("connect", () => {
console.debug("[cache]", "connecting");
}).on("reconnecting", () => {
console.warn("[cache]", "reconnecting");
});

const ensureConnection = () => {
if (!redis.isOpen && !redis.isReady) {
console.log("[cache]", "not ready");

redis.connect();

return false;
}

return true;
};

ensureConnection();

return Promise.resolve({
/** [MDN Reference](https://developer.mozilla.org/docs/Web/API/Cache/add) */
add: (_request: RequestInfo | URL): Promise<void> => {
Expand All @@ -63,16 +75,12 @@ export const caches: CacheStorage = {
addAll: (_requests: RequestInfo[]): Promise<void> => {
throw new Error("Not Implemented");
},
/** [MDN Reference](https://developer.mozilla.org/docs/Web/API/Cache/delete) */
delete: async (
request: RequestInfo | URL,
options?: CacheQueryOptions,
): Promise<boolean> => {
assertNoOptions(options);

return await redis.del(
await requestURLSHA1(request),
) > 0;
/** [MDN Reference](https://developer.mozilla.org/docs/Web/API/Cache/matchAll) */
matchAll: (
_request?: RequestInfo | URL,
_options?: CacheQueryOptions,
): Promise<ReadonlyArray<Response>> => {
throw new Error("Not Implemented");
},
/** [MDN Reference](https://developer.mozilla.org/docs/Web/API/Cache/keys) */
keys: (
Expand All @@ -81,125 +89,78 @@ export const caches: CacheStorage = {
): Promise<ReadonlyArray<Request>> => {
throw new Error("Not Implemented");
},
/** [MDN Reference](https://developer.mozilla.org/docs/Web/API/Cache/match) */
delete: async (
request: RequestInfo | URL,
_?: CacheQueryOptions,
): Promise<boolean> => {
console.debug("[cache]", "delete");

ensureConnection();

return requestURLSHA1(request)
.then(redis.del)
.then(() => true)
.catch((err: Error) => {
console.error(err);

return false;
});
},
match: async (
request: RequestInfo | URL,
options?: CacheQueryOptions,
): Promise<Response | undefined> => {
console.debug("[cache]", "match");

assertNoOptions(options);
const cacheKey = await requestURLSHA1(request);
const span = tracer.startSpan("redis-get", {
attributes: {
cacheKey,
},
});
try {
const data = await redis.get(cacheKey);
if (data === null) {
span.addEvent("cache-miss");
return undefined;
}
span.addEvent("cache-hit");
if (data instanceof Error) {
logger.error(
`error when reading from redis, ${data.toString()}`,
);
return undefined;
}
ensureConnection();

return requestURLSHA1(request)
.then(redis.get)
.then((result: string | null) => {
if (!result) {
console.debug("[cache]", "miss");

return undefined;
}

console.debug("[cache]", "hit");

return deserialize(result);
})
.catch((err: Error) => {
console.error(err);

if (typeof data !== "object") {
logger.error(
`data for ${cacheKey} was stored in a invalid format, thus cache will not be used`,
);
return undefined;
}

const parsedData: ResponseMetadata = typeof data === "string"
? JSON.parse(data)
: data;
return new Response(base64decode(parsedData.body), {
status: parsedData.status,
headers: new Headers(parsedData.headers),
});
} catch (err) {
span.recordException(err);
throw err;
} finally {
span.end();
}
},
/** [MDN Reference](https://developer.mozilla.org/docs/Web/API/Cache/matchAll) */
matchAll: (
_request?: RequestInfo | URL,
_options?: CacheQueryOptions,
): Promise<ReadonlyArray<Response>> => {
throw new Error("Not Implemented");
},
/** [MDN Reference](https://developer.mozilla.org/docs/Web/API/Cache/put) */
put: async (
request: RequestInfo | URL,
response: Response,
): Promise<void> => {
console.debug("[cache]", "put");

const req = new Request(request);
assertCanBeCached(req, response);

if (!response.body) {
return;
}

const cacheKey = await requestURLSHA1(request);
const span = tracer.startSpan("redis-put", {
attributes: {
cacheKey,
},
});

try {
let expires = response.headers.get("expires");
if (!expires && (response.status >= 300 || response.status < 200)) { //cannot be cached
span.addEvent("cannot-be-cached", {
status: response.status,
expires: expires ?? "undefined",
});
return;
}
expires ??= new Date(Date.now() + (180_000)).toUTCString();

const expDate = new Date(expires);
const timeMs = expDate.getTime() - Date.now();
if (timeMs <= 0) {
span.addEvent("negative-time-ms", { timeMs: `${timeMs}` });
return;
}

response.text().then(base64encode).then((body) => {
const newMeta: ResponseMetadata = {
body,
status: response.status,
headers: [...response.headers.entries()],
};

const options = { px: timeMs };
const setSpan = tracer.startSpan("redis-set", {
attributes: { cacheKey },
});
redis.set(cacheKey, JSON.stringify(newMeta), options).catch(
(err) => {
console.error("redis error", err);
setSpan.recordException(err);
},
).finally(() => {
setSpan.end();
}); // do not await for setting cache
}).catch((err) => {
logger.error(`error saving to redis ${err?.message}`);
});
} catch (err) {
span.recordException(err);
throw err;
} finally {
span.end();
}
ensureConnection();

return Promise.all([requestURLSHA1(request), serialize(response)])
.then(([cacheKey, data]) => {
const expirationTimestamp = Date.parse(
response.headers.get("expires") ?? "",
);

const ttl = expirationTimestamp - Date.now();

return redis.set(cacheKey, data, { PX: ttl });
})
.then(() => console.log("[cache]", "store"))
.catch((err) => console.error("[cache]", err));
},
});
},
Expand Down

0 comments on commit 44a55a5

Please sign in to comment.