Skip to content

Commit

Permalink
Update response apis to be able to work for connection upgrades
Browse files Browse the repository at this point in the history
  • Loading branch information
anuragsoni committed Jul 31, 2023
1 parent 485a0e6 commit 7edf4b8
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 40 deletions.
3 changes: 2 additions & 1 deletion example/dune
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@
core_unix.filename_unix
core_unix.command_unix
async
shuttle_http))
shuttle_http
async_websocket))
28 changes: 27 additions & 1 deletion example/http_server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,36 @@ open! Core
open! Async
open Shuttle_http

let websocket_handler request =
Log.Global.info !"Request: %{sexp: Request.t}" request;
match Request.header request "Sec-WebSocket-Key" with
| None -> return (Response.create `Bad_request)
| Some v ->
let accept_key = Websocket.sec_websocket_accept_header_value ~sec_websocket_key:v in
let handler ?unconsumed_data fd =
Log.Global.info !"Unconsumed data: %{sexp: string option}" unconsumed_data;
let reader = Reader.create fd in
let writer = Writer.create fd in
let ws = Websocket.create ~role:Websocket.Websocket_role.Server reader writer in
let rd, wr = Websocket.pipes ws in
Pipe.transfer rd wr ~f:(fun x ->
Log.Global.info "received: %S" x;
x)
in
return
(Response.upgrade
~headers:
[ "Upgrade", "websocket"
; "Connection", "Upgrade"
; "Sec-WebSocket-Accept", accept_key
]
handler)
;;

let service context request =
Log.Global.info "Peer address: %s" (Socket.Address.to_string (Server.peer_addr context));
match Request.path request, Request.meth request with
| "/echo", `POST -> return (Response.create ~body:(Request.body request) `Ok)
| "/echo", `GET -> websocket_handler request
| "/", `GET -> return (Response.create ~body:(Body.string "Hello World") `Ok)
| ("/echo" | "/"), _ -> return (Response.create `Method_not_allowed)
| _ -> return (Response.create `Not_found)
Expand Down
2 changes: 1 addition & 1 deletion http/src/body.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ open! Async

module Stream = struct
include Body0.Stream

let of_pipe encoding reader = { encoding; reader; read_started = false }
let close t = Pipe.close_read t.reader
let encoding t = t.encoding


let iter_without_pushback t ~f =
if t.read_started then raise_s [%message "Only one consumer can read from a stream"];
t.read_started <- true;
Expand Down
2 changes: 1 addition & 1 deletion http/src/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ module Connection = struct
(match parse_body conn.reader (Response.transfer_encoding response) with
| Error error -> Error.raise error
| Ok body ->
let response = Response.with_body response body in
let response = Response0.with_body response body in
if not (Response.keep_alive response && Request.keep_alive request)
then close t;
Ivar.fill ivar response;
Expand Down
30 changes: 18 additions & 12 deletions http/src/response.ml
Original file line number Diff line number Diff line change
@@ -1,13 +1,5 @@
open Core

type t =
{ version : Version.t
; status : Status.t
; reason_phrase : string
; headers : Headers.t
; body : Body.t
}
[@@deriving sexp_of]
include Response0

let create
?(version = Version.Http_1_1)
Expand All @@ -17,15 +9,29 @@ let create
status
=
let reason_phrase = Option.value reason_phrase ~default:(Status.to_string status) in
{ version; status; reason_phrase; headers; body }
{ version; status; reason_phrase; headers; body = Response body }
;;

let upgrade ?(headers = []) handler =
let reason_phrase = Status.to_reason_phrase `Switching_protocols in
{ version = Http_1_1
; status = `Switching_protocols
; reason_phrase
; headers
; body = Upgrade handler
}
;;

let version t = t.version
let status t = t.status
let reason_phrase t = t.reason_phrase
let headers t = t.headers
let body t = t.body
let with_body t body = if phys_equal t.body body then t else { t with body }

let body t =
match t.body with
| Response b -> b
| Upgrade _ -> Body.empty
;;

let transfer_encoding t =
match List.rev @@ Headers.find_multi t.headers "Transfer-Encoding" with
Expand Down
11 changes: 6 additions & 5 deletions http/src/response.mli
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(** [t] Represents a HTTP 1.1 response. *)
type t [@@deriving sexp_of]
type t = Response0.t [@@deriving sexp_of]

val create
: ?version:Version.t
Expand All @@ -9,6 +9,11 @@ val create
-> Status.t
-> t

val upgrade
: ?headers:Headers.t
-> (?unconsumed_data:string -> Async_unix.Fd.t -> unit Async_kernel.Deferred.t)
-> t

(** [version] returns the HTTP version number for the response. *)
val version : t -> Version.t

Expand All @@ -24,10 +29,6 @@ val headers : t -> (string * string) list
(** [body] returns the body payload of this response. *)
val body : t -> Body.t

(** [with_body] returns a new response where every value is the same as the input response
but the body is replaced with the function input. *)
val with_body : t -> Body.t -> t

(** [transfer_encoding] returns the inferred transfer encoding based on the response's
http headers. *)
val transfer_encoding : t -> [> `Bad_response | `Chunked | `Fixed of int ]
Expand Down
23 changes: 23 additions & 0 deletions http/src/response0.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
open Core
open Async

type body =
| Response of Body.t
| Upgrade of (?unconsumed_data:string -> Fd.t -> unit Deferred.t)
[@@deriving sexp_of]

type t =
{ version : Version.t
; status : Status.t
; reason_phrase : string
; headers : Headers.t
; body : body
}
[@@deriving sexp_of]

let with_body t body =
match t.body with
| Response existing_body ->
if phys_equal existing_body body then t else { t with body = Response body }
| Upgrade _ -> raise_s [%message "Attempting to set a body for an upgrade response"]
;;
47 changes: 30 additions & 17 deletions http/src/server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -227,27 +227,40 @@ let run_server_loop t handler =
if Deferred.is_determined promise
then write_response_and_continue t req (Deferred.value_exn promise)
else promise >>> fun response -> write_response_and_continue t req response
and write_response_and_continue t req response =
and write_response_and_continue t req (response : Response0.t) =
let is_keep_alive = Request.keep_alive req && Response.keep_alive response in
(write_response t response;
Body0.writer (Response.body response) t.writer)
>>> fun () ->
if is_keep_alive
then (
match Request.body req with
| Body0.Empty | Fixed _ ->
if Time_ns.Span.is_positive t.read_header_timeout
then parse_request_with_timeout t t.read_header_timeout
else parse_request t
| Stream stream ->
(if Body.Stream.read_started stream
then Body.Stream.closed stream
else Body.Stream.drain stream)
>>> fun () ->
if Time_ns.Span.is_positive t.read_header_timeout
then parse_request_with_timeout t t.read_header_timeout
else parse_request t)
else Ivar.fill t.closed ()
match response.body with
| Upgrade handler ->
let (view : Slice.t) = Input_channel.view t.reader in
let unconsumed_data =
if view.len = 0
then None
else Some (Bigstring.to_string view.buf ~pos:view.pos ~len:view.len)
in
let reader_fd = Input_channel.fd t.reader in
let writer_fd = Output_channel.fd t.writer in
assert (phys_equal reader_fd writer_fd);
handler ?unconsumed_data reader_fd >>> fun () -> Ivar.fill t.closed ()
| Response _ ->
if is_keep_alive
then (
match Request.body req with
| Body0.Empty | Fixed _ ->
if Time_ns.Span.is_positive t.read_header_timeout
then parse_request_with_timeout t t.read_header_timeout
else parse_request t
| Stream stream ->
(if Body.Stream.read_started stream
then Body.Stream.closed stream
else Body.Stream.drain stream)
>>> fun () ->
if Time_ns.Span.is_positive t.read_header_timeout
then parse_request_with_timeout t t.read_header_timeout
else parse_request t)
else Ivar.fill t.closed ()
in
Monitor.detach t.monitor;
Scheduler.within ~priority:Priority.normal ~monitor:t.monitor (fun () ->
Expand Down
2 changes: 1 addition & 1 deletion http/test/test_http.ml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ let%expect_test "Simple http endpoint with http client" =
[%expect
{|
((version Http_1_1) (status Ok) (reason_phrase "")
(headers ((Content-Length 11))) (body (Fixed "Hello World"))) |}])
(headers ((Content-Length 11))) (body (Response (Fixed "Hello World")))) |}])
;;

let%expect_test "Test default error handler" =
Expand Down
2 changes: 1 addition & 1 deletion http/test/test_parser.ml
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ let%expect_test "can parse a single response" =
((consumed 49)
(value
((version Http_1_1) (status Ok) (reason_phrase OK)
(headers ((Content-Length 21) (Foo bar))) (body Empty)))))
(headers ((Content-Length 21) (Foo bar))) (body (Response Empty))))))
49) |}]
;;

Expand Down

0 comments on commit 7edf4b8

Please sign in to comment.