Skip to content

Commit

Permalink
Streams: split into client and server streams
Browse files Browse the repository at this point in the history
  • Loading branch information
aantron committed Dec 11, 2021
1 parent 4afbe19 commit f69b956
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 14 deletions.
2 changes: 1 addition & 1 deletion src/http/adapt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ let forward_body_general
let bytes_since_flush = ref 0 in

let rec send () =
Dream.body_stream response
Dream.client_stream response
|> fun stream ->
Stream.read
stream
Expand Down
7 changes: 7 additions & 0 deletions src/http/http.ml
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,13 @@ let websocket_handler user's_websocket_handler socket =
let reader = Stream.reader ~read ~close
and writer = Stream.writer ~ready ~write ~flush ~ping ~pong ~close in
let websocket = Stream.stream reader writer in
(* TODO Change WebSockets to use two pipes in the response body, rather than
a weird stream hanging out in the heap. That way, a client and server can
immediately communicate with each other if they are in process, without the
need to interpet the WebSocket response with an HTTP layer. This will also
simplify the WebSocket writing code, as this HTTP adapter code will read
from a pipe rather than implement a writer from scratch. At that point,
Stream.writer can be removed from stream.mli. *)

(* TODO Needs error handling like the top-level app has! *)
Lwt.async (fun () ->
Expand Down
50 changes: 37 additions & 13 deletions src/pure/inmost.ml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ and response = outgoing message
and 'a message = {
specific : 'a;
headers : (string * string) list;
body : Stream.stream;
client_stream : Stream.stream;
server_stream : Stream.stream;
locals : Scope.t;
first : 'a message;
last : 'a message ref;
Expand Down Expand Up @@ -284,14 +285,20 @@ let cookie name request =
try Some (cookie_exn name request)
with Not_found -> None *)

(* TODO NOTE On the client, this will read the client stream until close. *)
let body message =
Stream.read_until_close message.body
Stream.read_until_close message.server_stream

let read message =
Stream.read_convenience message.body
Stream.read_convenience message.server_stream

let body_stream message =
message.body
message.server_stream

(* TODO Temporary internal function so that the HTTP layer can read response
streams. *)
let client_stream message =
message.client_stream

(* TODO Pending the dream.mli interface reorganization for the new stream
API. *)
Expand All @@ -303,19 +310,32 @@ let next =
are setting a new body. Indeed, there might be a concurrent read going on.
That read should not override the new body. So let it mutate the old
request's ref; we generate a new request with a new body ref. *)
(* TODO NOTE In Dream, this should operate on response server_streams. In Hyper,
it should operate on request client_streams, although there is no very good
reason why it can't operate on general messages, which might be useful in
middlewares that preprocess requests on the server and postprocess responses
on the client. Or.... shouldn't this affect the client stream on the server,
replacing its read end? *)
let with_body body message =
(* TODO This is partially redundant with a length check in Stream.string, but
that check is no longer useful as it prevents allocation of only a reader,
rather than a complete stream. *)
let body =
if String.length body = 0 then
(* TODO Should probably preallocate this as a stream. *)
Stream.(stream empty no_writer)
else
Stream.(stream (string body) no_writer)
in
update {message with body}
update {message with server_stream = body}

(* TODO The critical piece: the pipe should be split between the client and
server streams. adapt.ml should be reading from the client stream. *)
let with_stream message =
let reader, writer = Stream.pipe () in
update {message with body = Stream.stream reader writer}
let client_stream = Stream.stream reader Stream.no_writer in
let server_stream = Stream.stream Stream.no_reader writer in
update {message with client_stream; server_stream}

(* TODO Need to expose FIN. However, it can't have any effect even on
WebSockets, because websocket/af does not offer the ability to pass FIN. It
Expand All @@ -327,7 +347,7 @@ let write message chunk =
let buffer = Bigstringaf.of_string ~off:0 ~len:length chunk in
(* TODO Better handling of close? But it can't even occur with http/af. *)
Stream.write
message.body
message.server_stream
buffer 0 length true false
~close:(fun _code -> Lwt.wakeup_later_exn resolver End_of_file)
(Lwt.wakeup_later resolver);
Expand All @@ -344,7 +364,7 @@ let write_buffer ?(offset = 0) ?length message chunk =
(* TODO As above, properly expose FIN. *)
(* TODO Also expose binary/text. *)
Stream.write
message.body
message.server_stream
chunk offset length true false
~close:(fun _code -> Lwt.wakeup_later_exn resolver End_of_file)
(Lwt.wakeup_later resolver);
Expand All @@ -355,13 +375,13 @@ let write_buffer ?(offset = 0) ?length message chunk =
let flush message =
let promise, resolver = Lwt.wait () in
Stream.flush
message.body
message.server_stream
~close:(fun _code -> Lwt.wakeup_later_exn resolver End_of_file)
(Lwt.wakeup_later resolver);
promise

let close_stream message =
Stream.close message.body 1000;
Stream.close message.server_stream 1000;
Lwt.return_unit

(* TODO Rename. *)
Expand Down Expand Up @@ -439,7 +459,8 @@ let request_from_http
upload = initial_multipart_state ();
};
headers;
body;
client_stream = Stream.(stream no_reader no_writer);
server_stream = body;
locals = Scope.empty;
first = request; (* TODO LATER What OCaml version is required for this? *)
last = ref request;
Expand Down Expand Up @@ -480,7 +501,8 @@ let request
upload = initial_multipart_state ();
};
headers;
body = Stream.(stream (string body) no_writer);
client_stream = Stream.(stream (string body) no_writer);
server_stream = Stream.(stream no_reader no_writer);
locals = Scope.empty;
first = request;
last = ref request;
Expand All @@ -507,7 +529,9 @@ let response
websocket = None;
};
headers;
body = Stream.(stream (string body) no_writer);
client_stream = Stream.(stream (string body) no_writer);
server_stream = Stream.(stream no_reader no_writer);
(* TODO This fully dead stream should be preallocated. *)
locals = Scope.empty;
first = response;
last = ref response;
Expand Down

0 comments on commit f69b956

Please sign in to comment.