Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cohttp websocket upgrade connection #45

Merged
merged 1 commit into from
Mar 30, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .merlin
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ PKG conduit.lwt-unix
PKG nocrypto
PKG core
PKG async
PKG cohttp.async
PKG cohttp.async
20 changes: 19 additions & 1 deletion _tags
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ true: bin_annot, debug, safe_string
package(nocrypto.lwt), \
package(conduit)

<lib/websocket_cohttp_lwt.*>: package(lwt), \
package(lwt.ppx), \
package(uri), \
package(cohttp.lwt), \
package(containers), \
package(conduit)

<lib/websocket_async.*>: package(async), \
package(uri), \
package(cohttp.async), \
Expand All @@ -43,4 +50,15 @@ true: bin_annot, debug, safe_string
package(uri), \
package(cohttp.async), \
package(nocrypto.unix), \
package(containers)
package(containers)

<tests/upgrade_connection.*>: \
package(core), \
package(containers), \
package(cohttp), \
package(cohttp.lwt), \
package(conduit), \
package(lwt), \
package(lwt.ppx), \
package(nocrypto), \
package(ppx_deriving)
66 changes: 66 additions & 0 deletions lib/websocket_cohttp_lwt.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
module C = Cohttp
module Lwt_IO = Websocket.IO(Cohttp_lwt_unix_io)

open Lwt

let send_frames stream oc =
let buf = Buffer.create 128 in
let send_frame fr =
Buffer.clear buf;
Lwt_IO.write_frame_to_buf ~masked:false buf fr;
Lwt_io.write oc @@ Buffer.contents buf
in
Lwt_stream.iter_s send_frame stream
;;

let read_frames icoc handler_fn =
let read_frame () =
let rf = Lwt_IO.make_read_frame ~masked:false icoc () in
match%lwt rf with
| `Ok frame -> Lwt.return frame
| `Error msg -> Lwt.fail_with msg
in
while%lwt true do
read_frame () >>= Lwt.wrap1 handler_fn
done
;;

let upgrade_connection request conn incoming_handler =
let headers = Cohttp.Request.headers request in
let key = CCOpt.get_exn @@ Cohttp.Header.get headers "sec-websocket-key" in
let hash = key ^ Websocket.websocket_uuid |> Websocket.b64_encoded_sha1sum in
let response_headers =
Cohttp.Header.of_list
["Upgrade", "websocket"
;"Connection", "Upgrade"
;"Sec-WebSocket-Accept", hash]
in
let resp =
Cohttp.Response.make
~status:`Switching_protocols
~encoding:Cohttp.Transfer.Unknown
~headers:response_headers
~flush:true
()
in

let frames_out_stream, frames_out_fn = Lwt_stream.create () in

let body_stream, stream_push = Lwt_stream.create () in
let _ =
let open Conduit_lwt_unix in
match conn with
| TCP (tcp : tcp_flow) ->
let oc = Lwt_io.of_fd ~mode:Lwt_io.output tcp.fd in
let ic = Lwt_io.of_fd ~mode:Lwt_io.input tcp.fd in
Lwt.join [
(* input: data from the client is read from the input channel
* of the tcp connection; pass it to handler function *)
read_frames (ic, oc) incoming_handler;
(* output: data for the client is written to the output
* channel of the tcp connection *)
send_frames frames_out_stream oc;
]
| _ -> Lwt.fail_with "expected TCP Websocket connection"
in
Lwt.return (resp, Cohttp_lwt_body.of_stream body_stream, frames_out_fn)
5 changes: 5 additions & 0 deletions lib/websocket_cohttp_lwt.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
val upgrade_connection:
Cohttp.Request.t ->
Conduit_lwt_unix.flow ->
(Websocket.Frame.t -> unit) ->
(Cohttp.Response.t * Cohttp_lwt_body.t * (Websocket.Frame.t option -> unit)) Lwt.t
1 change: 1 addition & 0 deletions pkg/build.ml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ let () =
Pkg.bin ~cond:lwt ~auto:true "tests/wscat";
Pkg.bin ~cond:async ~auto:true "tests/wscat_async";
Pkg.bin ~cond:lwt ~auto:true "tests/reynir";
Pkg.bin ~cond:lwt ~auto:true "tests/upgrade_connection";
]
2 changes: 1 addition & 1 deletion pkg/topkg.ml
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ module Pkg : Pkg = struct
let describe pkg ~builder mvs =
let mvs = List.sort compare (List.flatten mvs) in
let btool, bdir = match builder with
| `OCamlbuild -> "ocamlbuild -use-ocamlfind -classic-display", "_build"
| `OCamlbuild -> "ocamlbuild -tag thread -use-ocamlfind -classic-display", "_build"
| `Other (btool, bdir) -> btool, bdir
in
match Topkg.cmd with
Expand Down
102 changes: 102 additions & 0 deletions tests/upgrade_connection.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
open Lwt
open Core.Std

let handler
(conn : Conduit_lwt_unix.flow * Cohttp.Connection.t)
(req : Cohttp_lwt_unix.Request.t)
(body : Cohttp_lwt_body.t) =
Lwt_io.eprintf
"[CONN] %s\n%!" (Cohttp.Connection.to_string @@ snd conn)
>>= fun _ ->
let uri = Cohttp.Request.uri req in
match Uri.path uri with
| "/" ->
Lwt_io.eprintf "[PATH] /\n%!"
>>= fun () ->
Cohttp_lwt_unix.Server.respond_string
~status:`OK
~body: {|
<html>
<head>
<meta charset="utf-8">
<script src="//code.jquery.com/jquery-1.11.3.min.js"></script>
<script>
$(window).on('load', function(){
ws = new WebSocket('ws://localhost:7777/ws');
ws.onmessage = function(x) {
console.log(x.data);
var m = "<- Pong " + parseInt((x.data.substring(8)) - 1);
$('#msg').html("<p>" + x.data + "</p><p>" + m + "</p>");
ws.send(m);
};
});
</script>
</head>
<body>
<div id='msg'></div>
</body>
</html>
|}
()
| "/ws" ->
Lwt_io.eprintf "[PATH] /ws\n%!"
>>= fun () ->
Cohttp_lwt_body.drain_body body
>>= fun () ->
Websocket_cohttp_lwt.upgrade_connection req (fst conn) (
fun f ->
match f.Websocket.Frame.opcode with
| Websocket.Frame.Opcode.Close ->
Printf.eprintf "[RECV] CLOSE\n%!"
| _ ->
Printf.eprintf "[RECV] %s\n%!" f.Websocket.Frame.content
)
>>= fun (resp, body, frames_out_fn) ->
(* send a message to the client every second *)
let _ =
let num_ref = ref 10 in
let rec go () =
if !num_ref > 0 then
let msg = Printf.sprintf "-> Ping %d" !num_ref in
Lwt_io.eprintf "[SEND] %s\n%!" msg
>>= fun () ->
Lwt.wrap1 frames_out_fn @@
Some (
Websocket.Frame.of_bytes @@
BytesLabels.of_string @@
msg
)
>>= fun () ->
Lwt.return (num_ref := !num_ref - 1)
>>= fun () ->
Lwt_unix.sleep 1.
>>= go
else
Lwt_io.eprintf "[INFO] Test done\n%!"
>>= Lwt.return
in
go ()
in
Lwt.return (resp, (body :> Cohttp_lwt_body.t))
| _ ->
Lwt_io.eprintf "[PATH] Catch-all\n%!"
>>= fun () ->
Cohttp_lwt_unix.Server.respond_string
~status:`Not_found
~body:(Sexp.to_string_hum (Cohttp.Request.sexp_of_t req))
()

let start_server host port () =
let conn_closed (ch,_) =
Printf.eprintf "[SERV] connection %s closed\n%!"
(Sexplib.Sexp.to_string_hum (Conduit_lwt_unix.sexp_of_flow ch))
in
Lwt_io.eprintf "[SERV] Listening for HTTP on port %d\n%!" port
>>= fun _ ->
Cohttp_lwt_unix.Server.create
~mode:(`TCP (`Port port))
(Cohttp_lwt_unix.Server.make ~callback:handler ~conn_closed ())

(* main *)
let () =
Lwt_main.run (start_server "localhost" 7777 ())