Skip to content

Commit

Permalink
rewrite-bouncer
Browse files Browse the repository at this point in the history
  • Loading branch information
uriva committed Nov 10, 2024
1 parent b74d64b commit b95a6c4
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 101 deletions.
4 changes: 0 additions & 4 deletions src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ export const { inject: injectBotPhone, access: botPhone } = context(
},
);

export const { inject: injectUrl, access: accessUrl } = context((): string => {
throw new Error("no URL in context");
});

type Medium =
| "whatsapp"
| "instantdb"
Expand Down
2 changes: 1 addition & 1 deletion src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export const makeDatabaseHandler = (
path: string,
botName: string,
authenticate: (token: string, userId: string) => Promise<boolean>,
): Endpoint => ({
): Endpoint<ClientRequest> => ({
bounce: true,
predicate: ({ url, method }) => url === path && method === "POST",
handler: async ({ from, text, token }: ClientRequest) => {
Expand Down
2 changes: 1 addition & 1 deletion src/greenApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ export const greenApiHandler = (
credentials: GreenCredentials,
path: string,
doTask: TaskHandler,
): Endpoint => ({
): Endpoint<GreenApiMessage> => ({
bounce: true,
predicate: ({ url, method }) => url === path && method === "POST",
handler: (msg: GreenApiMessage) =>
Expand Down
181 changes: 96 additions & 85 deletions src/taskBouncer.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import type { Buffer } from "https://deno.land/[email protected]/io/buffer.ts";
import http from "node:http";
import querystring from "node:querystring";
import url from "node:url";
import { gamla } from "../deps.ts";
import { injectUrl } from "./api.ts";
import type { Buffer } from "https://deno.land/[email protected]/io/buffer.ts";

const { coerce } = gamla;

Expand Down Expand Up @@ -77,108 +76,120 @@ const getJson = async <T>(req: http.IncomingMessage): Promise<T> => {
throw new Error(`Unsupported incoming type: ${contentType}`);
};

const success = (res: http.ServerResponse, output: string | null) => {
res.writeHead(200, {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET,POST,PUT,DELETE",
"Access-Control-Allow-Headers": "Content-Type",
});
res.end(output ?? JSON.stringify({ message: "Data received successfully" }));
};
const parseUrlParamsAsJson = <T>(requestURL: string) =>
querystring.parse(url.parse(requestURL).query || "") as T;

// deno-lint-ignore no-explicit-any
type Task = { url: string; payload: any; method: string };
const pathForDeferred = "/abstract-bot-api-deferred";

const parseUrlParamsAsJson = (requestURL: string) =>
querystring.parse(url.parse(requestURL).query || "");
const reqToPayload = <T>(req: http.IncomingMessage): Promise<T> =>
req.method === "POST"
? getJson<T>(req)
: Promise.resolve(parseUrlParamsAsJson<T>(coerce(req.url)));

const bouncer = (
domain: string,
shouldDefer: (task: Task) => boolean,
deferredHandler: (task: Task) => Promise<void>,
) =>
(req: http.IncomingMessage, res: http.ServerResponse) => {
if (req.method === "POST" && req.url === "/") {
getJson<Task>(req)
.then(deferredHandler)
.then(() => success(res, null))
.catch((e) => {
type TaskAddress = { method: string; url: string };

const runEndpoint =
<T>(addTask: (payload: T) => void, { bounce, handler }: Endpoint<T>) =>
async (req: http.IncomingMessage, res: http.ServerResponse) => {
try {
const payload: T = await reqToPayload<T>(req);
if (bounce) {
addTask(payload);
res.writeHead(200);
res.end();
} else {
handler(payload, res);
}
} catch (e) {
console.error(e);
res.writeHead(500);
res.end();
}
};

type Task<T> = { payload: T; address: TaskAddress };

export type Endpoint<T> = {
bounce: false;
handler: (task: T, res: http.ServerResponse) => void;
predicate: (task: TaskAddress) => boolean;
} | {
bounce: true;
handler: (payload: T) => Promise<void>;
predicate: (address: TaskAddress) => boolean;
};

const deferredHandlerEndpoint = <T>(eps: Endpoint<T>[]): Endpoint<Task<T>> => ({
bounce: false,
predicate: ({ method, url }) => method === "POST" && url === pathForDeferred,
handler: async ({ address, payload }, res) => {
for (
const relevantEndpoint of eps.filter(({ predicate }) =>
predicate(address)
)
) {
if (!relevantEndpoint.bounce) continue;
try {
await relevantEndpoint.handler(payload);
res.writeHead(200);
res.end();
} catch (e) {
console.error(e);
res.writeHead(500);
res.end();
});
return;
}
const params = req.method === "POST"
? getJson(req)
: Promise.resolve(parseUrlParamsAsJson(coerce(req.url)));
params.then(
(payload) => {
const task = {
method: coerce(req.method),
url: coerce(url.parse(coerce(req.url), true).pathname),
payload,
};
if (shouldDefer(task)) {
// Don't await on this, so telegram won't retry when task takes a long time to finish.
addTask(domain, task)
.catch(
(e) => {
console.error("error submitting task", e);
},
);
return success(res, null);
}
return deferredHandler(task).then((x) =>
success(res, typeof x === "string" ? x : null)
);
},
).catch((e: Error) => {
console.error(e);
res.writeHead(500);
return;
}
res.writeHead(404);
res.end();
});
};
},
});

const reqToTaskAddress = (req: http.IncomingMessage): TaskAddress => ({
method: coerce(req.method),
url: coerce(url.parse(coerce(req.url), true).pathname),
});

const addTask = (domain: string, msg: Task) =>
fetch(domain, {
const addTask = (deferralUrl: string) => <T>(task: Task<T>) =>
fetch(deferralUrl + pathForDeferred, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify(msg),
body: JSON.stringify(task),
});

// deno-lint-ignore no-explicit-any
type Handler = (payload: Task["payload"]) => Promise<any>;

export type Endpoint = {
bounce: boolean;
handler: Handler;
predicate: (task: Task) => boolean;
};

export const bouncerHandler = (domain: string, endpoints: Endpoint[]) =>
bouncer(
domain,
(task: Task) =>
endpoints.find(({ predicate }) => predicate(task))?.bounce ?? false,
(task: Task) => {
for (
const { handler } of endpoints.filter(({ predicate }) =>
predicate(task)
)
) return injectUrl(() => task.url)(handler)(task.payload);
throw new Error("No handler for request");
},
);
const selectAndRunEndpoint =
(addTask: <T>(task: Task<T>) => void, endpoints: Endpoint<any>[]) =>
(req: http.IncomingMessage, res: http.ServerResponse) => {
const address = reqToTaskAddress(req);
for (
const endpoint of endpoints.filter(({ predicate }) => predicate(address))
) {
runEndpoint((payload) => addTask({ address, payload }), endpoint)(
req,
res,
);
return;
}
res.writeHead(404);
res.end();
};

export const bouncerServer = (
domain: string,
port: string,
endpoints: Endpoint[],
// deno-lint-ignore no-explicit-any
endpoints: Endpoint<any>[],
): Promise<http.Server> =>
new Promise((resolve) => {
const server = http.createServer(bouncerHandler(domain, endpoints));
const server = http.createServer(
selectAndRunEndpoint((x) =>
addTask(domain)(x).catch((e) => {
console.error("Failed processing deferred task", e);
}), [
deferredHandlerEndpoint(endpoints),
...endpoints,
]),
);
server.listen(port, () => {
resolve(server);
});
Expand Down
2 changes: 1 addition & 1 deletion src/telegram.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ export const makeTelegramHandler = (
telegramToken: string,
path: string,
doTask: TaskHandler,
): Endpoint => (
): Endpoint<grammy.Update> => (
{
bounce: true,
predicate: ({ url, method }) => url === path && method === "POST",
Expand Down
22 changes: 13 additions & 9 deletions src/whatsapp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,16 +273,20 @@ const toNumber = (
export const whatsappWebhookVerificationHandler = (
verifyToken: string,
path: string,
): Endpoint => ({
): Endpoint<WebhookVerification> => ({
predicate: ({ url, method }) => url === path && method === "POST",
bounce: false,
handler: (msg: WebhookVerification) =>
(
msg["hub.mode"] === "subscribe" &&
verifyToken === msg["hub.verify_token"]
)
? Promise.resolve(msg["hub.challenge"])
: Promise.resolve(),
handler: (msg, res) => {
if (
msg["hub.mode"] === "subscribe" && verifyToken === msg["hub.verify_token"]
) {
res.writeHead(200, msg["hub.challenge"]);
res.end();
} else {
res.writeHead(404);
res.end();
}
},
});

const getText = (msg: WhatsappMessage) =>
Expand All @@ -307,7 +311,7 @@ export const whatsappBusinessHandler = (
token: string,
path: string,
doTask: TaskHandler,
): Endpoint => ({
): Endpoint<WhatsappMessage> => ({
bounce: true,
predicate: ({ url, method }) => url === path && method === "POST",
handler: (msg: WhatsappMessage) =>
Expand Down

0 comments on commit b95a6c4

Please sign in to comment.