Skip to content

Commit

Permalink
feat: WebSocket support (#81)
Browse files Browse the repository at this point in the history
- feat: WebSocket route for `Rotuer` is now supported
- fix: Upgraded connection should not be read from `processRequest`
- added: `markAsResponded()` and `markAsUpgraded()` to `Responder`
  • Loading branch information
keroxp authored Mar 1, 2020
1 parent 15277e9 commit ad37f57
Show file tree
Hide file tree
Showing 11 changed files with 210 additions and 19 deletions.
3 changes: 2 additions & 1 deletion modules-lock.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
"/fmt/colors.ts",
"/media_types/mod.ts",
"/mime/multipart.ts",
"/util/async.ts"
"/util/async.ts",
"/ws/mod.ts"
]
},
"https://dev.jspm.io/react": {
Expand Down
3 changes: 2 additions & 1 deletion modules.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
"/fmt/colors.ts",
"/media_types/mod.ts",
"/mime/multipart.ts",
"/util/async.ts"
"/util/async.ts",
"/ws/mod.ts"
]
},
"https://dev.jspm.io/react": {
Expand Down
26 changes: 25 additions & 1 deletion responder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,16 @@ export interface ServerResponder extends CookieSetter {
}
): Promise<void>;

/** Mark as responded manually */
markAsResponded(status: number): void;

isResponded(): boolean;

/** Mark as connection upgraded */
markAsUpgraded(): void;

isUpgraded(): boolean;

respondedStatus(): number | undefined;
}

Expand All @@ -32,9 +40,13 @@ export function createResponder(
const responseHeaders = new Headers();
const cookie = cookieSetter(responseHeaders);
let responseStatus: number | undefined;
let upgraded = false;
function isResponded() {
return responseStatus !== undefined;
}
function isUpgraded() {
return upgraded;
}
async function redirect(
url: string,
{
Expand All @@ -47,7 +59,10 @@ export function createResponder(
}
async function respond(response: ServerResponse): Promise<void> {
if (isResponded()) {
throw new Error("http: already responded");
throw new Error("Request already responded");
}
if (isUpgraded()) {
throw new Error("Request upgraded");
}
const { status, headers, body } = response;
responseStatus = status;
Expand All @@ -62,14 +77,23 @@ export function createResponder(
body
});
}
function markAsResponded(status: number) {
responseStatus = status;
}
function markAsUpgraded() {
upgraded = true;
}
function respondedStatus() {
return responseStatus;
}
return {
respond,
redirect,
isResponded,
isUpgraded,
respondedStatus,
markAsResponded,
markAsUpgraded,
...cookie
};
}
24 changes: 24 additions & 0 deletions responder_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,28 @@ it("responder", t => {
assertEquals(status, 302);
assertEquals(headers.get("location"), "/index.html");
});

t.run("markResponded()", async () => {
const w = new Buffer();
const res = createResponder(w);
res.markAsResponded(200);
assertEquals(res.isResponded(), true);
assertEquals(res.respondedStatus(), 200);
await assertThrowsAsync(
() => res.respond({ status: 404, body: "404" }),
Error,
"already"
);
});
t.run("markAsUpgraded()", async () => {
const w = new Buffer();
const res = createResponder(w);
res.markAsUpgraded();
assertEquals(res.isUpgraded(), true);
await assertThrowsAsync(
() => res.respond({ status: 404, body: "404" }),
Error,
"upgraded"
);
});
});
54 changes: 46 additions & 8 deletions router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ import { kHttpStatusMessages } from "./serveio.ts";
import { createLogger, Logger, Loglevel, namedLogger } from "./logger.ts";
import ListenOptions = Deno.ListenOptions;
import ListenTLSOptions = Deno.ListenTLSOptions;
import { ServerResponder } from "./responder.ts";
import {
acceptable,
acceptWebSocket,
WebSocket
} from "./vendor/https/deno.land/std/ws/mod.ts";

export interface HttpRouter {
/**
Expand All @@ -41,6 +45,14 @@ export interface HttpRouter {
/** Register POST route */
post(patter: string | RegExp, ...handlers: HttpHandler[]): void;

/** Accept ws upgrade */
ws(pattern: string | RegExp, ...handler: WebSocketHandler[]): void;
ws(
pattern: string | RegExp,
handlers: HttpHandler[],
handler: WebSocketHandler
): void;

/**
* Set global error handler.
* All unhandled promise rejections while processing requests will be passed into this handler.
Expand All @@ -63,6 +75,11 @@ export type RoutedServerRequest = ServerRequest & {
/** Basic handler for http request */
export type HttpHandler = (req: RoutedServerRequest) => void | Promise<void>;

export type WebSocketHandler = (
sock: WebSocket,
req: RoutedServerRequest
) => void | Promise<void>;

/** Global error handler for requests */
export type ErrorHandler = (
e: any | RoutingError,
Expand All @@ -80,8 +97,12 @@ export function createRouter(
logger: createLogger()
}
): HttpRouter {
const middlewares: HttpHandler[] = [];
const routes: { pattern: string | RegExp; handlers: HttpHandler[] }[] = [];
const middlewareList: HttpHandler[] = [];
const routes: {
pattern: string | RegExp;
handlers: HttpHandler[];
wsHandler?: WebSocketHandler;
}[] = [];
const { info, error } = namedLogger("servest:router", opts.logger);
const finalErrorHandler = async (e: any, req: ServerRequest) => {
if (e instanceof RoutingError) {
Expand Down Expand Up @@ -134,7 +155,17 @@ export function createRouter(

function use(...middleware: HttpHandler[]) {
info(`use: ${handlerToString(middleware)}`);
middlewares.push(...middleware);
middlewareList.push(...middleware);
}

function ws(pattern: string | RegExp, ...args: any[]) {
if (Array.isArray(args[0])) {
routes.push({ pattern, handlers: args[0], wsHandler: args[1] });
} else if (typeof args[0] === "function") {
routes.push({ pattern, handlers: [], wsHandler: args[0] });
} else {
throw new Error("invalid function arguments");
}
}

function handleError(handler: ErrorHandler) {
Expand All @@ -143,7 +174,7 @@ export function createRouter(

function createHandler(): ServeHandler {
const handleInternal = async (req: ServerRequest) => {
for (const middleware of middlewares) {
for (const middleware of middlewareList) {
await middleware({ ...req, match: [] });
if (req.isResponded()) {
logRouteStatus(req, req.respondedStatus()!);
Expand All @@ -155,14 +186,21 @@ export function createRouter(
routes.map(v => v.pattern)
);
if (index > -1 && match) {
const { handlers } = routes[index];
const { handlers, wsHandler } = routes[index];
const routedReq = { ...req, match };
for (const handler of handlers) {
await handler({ ...req, match });
await handler(routedReq);
if (req.isResponded()) {
logRouteStatus(req, req.respondedStatus()!);
break;
}
}
if (wsHandler && acceptable(req)) {
const sock = await acceptWebSocket(req);
req.markAsResponded(101);
req.markAsUpgraded();
wsHandler(sock, routedReq);
}
if (!req.isResponded()) {
throw new RoutingError(404, kHttpStatusMessages[404]);
}
Expand Down Expand Up @@ -211,5 +249,5 @@ export function createRouter(
info(`listening on ${listenOptions.hostname || ""}:${listenOptions.port}`);
return listener;
}
return { handle, use, get, post, handleError, listen, listenTLS };
return { handle, use, get, post, ws, handleError, listen, listenTLS };
}
39 changes: 38 additions & 1 deletion router_test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
// Copyright 2019 Yusuke Sakurai. All rights reserved. MIT license.
import { createRouter } from "./router.ts";
import { createRouter, HttpRouter } from "./router.ts";
import {
assertEquals,
assertMatch
} from "./vendor/https/deno.land/std/testing/asserts.ts";
import { it } from "./test_util.ts";
import { Loglevel, setLevel } from "./logger.ts";
import { writeResponse } from "./serveio.ts";
import { connectWebSocket } from "./vendor/https/deno.land/std/ws/mod.ts";
setLevel(Loglevel.NONE);

it("router", t => {
Expand Down Expand Up @@ -39,6 +41,10 @@ it("router", t => {
throw new Error("throw");
});
router.handle("/redirect", req => req.redirect("/index"));
router.handle("/respond-raw", async req => {
await writeResponse(req.bufWriter, { status: 200, body: "ok" });
req.markAsResponded(200);
});
router.handleError((e, req) => {
errorHandled = true;
});
Expand Down Expand Up @@ -101,4 +107,35 @@ it("router", t => {
assertEquals(res.status, 200);
assertEquals(await res.body.text(), "ok");
});
t.run(
"should not go global error handler when markResponded called",
async () => {
const res = await fetch("http://127.0.0.1:8898/respond-raw");
assertEquals(res.status, 200);
assertEquals(await res.body?.text(), "ok");
}
);
});

it("router/ws", t => {
t.beforeAfterAll(() => {
const router = createRouter();
router.ws("/ws", async sock => {
await sock.send("Hello");
await sock.close(1000);
});
const l = router.listen({ port: 8899 });
return () => l.close();
});
t.run("should accept ws", async () => {
const sock = await connectWebSocket("ws://127.0.0.1:8899/ws");
const it = sock.receive();
const { value: msg1 } = await it.next();
assertEquals(msg1, "Hello");
const { value: msg2 } = await it.next();
assertEquals(msg2, { code: 1000, reason: "" });
const { done } = await it.next();
assertEquals(done, true);
assertEquals(sock.isClosed, true);
});
});
24 changes: 17 additions & 7 deletions server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,28 +207,38 @@ export function handleKeepAliveConn(

function scheduleReadRequest(opts: ServeOptions) {
processRequest(opts)
.then(scheduleReadRequest)
.catch(() => conn.close());
.then(v => {
if (v) scheduleReadRequest(v);
})
.catch(() => {
conn.close();
});
}

async function processRequest(opts: ServeOptions): Promise<ServeOptions> {
const req = await readRequest(bufReader, opts);
async function processRequest(
opts: ServeOptions
): Promise<ServeOptions | undefined> {
const baseReq = await readRequest(bufReader, opts);
let responded: Promise<void> = Promise.resolve();
const onResponse = (resp: ServerResponse) => {
responded = q.enqueue(resp);
return responded;
};
const responder = createResponder(bufWriter, onResponse);
const nextReq: ServerRequest = {
...req,
const req: ServerRequest = {
...baseReq,
bufWriter,
bufReader,
conn,
...responder
};
await handler(nextReq);
await handler(req);
await responded;
await req.finalize();
if (req.isUpgraded()) {
// Stop processing
return;
}
let keepAliveTimeout = originalOpts.keepAliveTimeout;
if (req.keepAlive && req.keepAlive.max <= 0) {
throw Deno.EOF;
Expand Down
1 change: 1 addition & 0 deletions site/components/content.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export const Content: FC = ({ children }) => (
</SideBarLink>
<SideBarLink href={"/testing-handler"}>Testing Handler</SideBarLink>
<SideBarLink href={"/manage-cookie"}>Manage Cookie</SideBarLink>
<SideBarLink href={"/handle-ws"}>Handle WebSocket</SideBarLink>
</SideBarSection>
</SideBar>
{children}
Expand Down
38 changes: 38 additions & 0 deletions site/pages/handle-ws.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { Article } from "../components/article.tsx";
import { Code } from "../components/code.tsx";
import React from "../../vendor/https/dev.jspm.io/react/index.js";
import { DFC } from "../../jsx.ts";
import { fetchExample } from "../content.ts";
import { Content } from "../components/content.tsx";
import { Q } from "../components/common.tsx";

const HandleWebSocket: DFC<{ codes: { [key: string]: string } }> = ({
codes
}) => (
<Content>
<Article>
<section id={"handle-ws"}>
<h2>Handle WebSocket</h2>
<p>
Servest provides WebSocket handler for Router API.
<Q>router.ws()</Q> is register for WebSocket route. <br />
Handler will be called after WebSocket upgrade finished.
</p>
<Code href={"/example/handle_ws.ts"} code={codes["handle_ws.ts"]} />
</section>
</Article>
</Content>
);

HandleWebSocket.getInitialProps = async () => {
const codes = Object.fromEntries(
await Promise.all(
["handle_ws.ts"].map(async v => {
return [v, await fetchExample(v)];
})
)
);
return { codes };
};

export default HandleWebSocket;
Loading

0 comments on commit ad37f57

Please sign in to comment.