Skip to content

Commit

Permalink
websocket: Fix websocket data being modelled as a stream internally
Browse files Browse the repository at this point in the history
Currently WebSocket implementation has the following input data path:

input_stream -> websocket parser -> queue -> data_sink -> input_stream
wrapper

On the output side, the data path is as follows:

output_stream wrapper -> data_source -> queue -> websocket serializer ->
output_stream

The input_stream and output_stream wrappers are what is exposed to the
user. This is problematic, because WebSocket is a message-based protocol
and streams do not have the concept of messages, they model data as a
stream of bytes. As a result, the payloads that the user sends or
receives will not correspond to a single websocket message, breaking the
underlying protocol in most cases. E.g. json or protobuf payloads no
longer contain valid data and so on.

Currently this behavior is seen on high load, when the stream wrappers
start to coalesce and split messages in the write path due to more than
one message being available at a time.

The solution is to expose data_sink and data_source that are backed by
message queues directly to the user.
  • Loading branch information
p12tic committed Feb 7, 2025
1 parent 384661a commit 0543087
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 25 deletions.
7 changes: 3 additions & 4 deletions demos/websocket_server_demo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,14 @@ int main(int argc, char** argv) {

return async([port] {
websocket::server ws;
ws.register_handler("echo", [] (input_stream<char>& in,
output_stream<char>& out) {
ws.register_handler("echo", [](data_source& in, data_sink& out) {
return repeat([&in, &out]() {
return in.read().then([&out](temporary_buffer<char> f) {
return in.get().then([&out](temporary_buffer<char> f) {
std::cerr << "f.size(): " << f.size() << "\n";
if (f.empty()) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
} else {
return out.write(std::move(f)).then([&out]() {
return out.put(std::move(f)).then([&out]() {
return out.flush().then([] {
return make_ready_future<stop_iteration>(stop_iteration::no);
});
Expand Down
9 changes: 3 additions & 6 deletions doc/websocket.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,16 @@ Here's an example of how to register a simple echo protocol:
```cpp
using namespace seastar;
static experimental::websocket::server ws;
ws.register_handler("echo", [] (input_stream<char>& in, output_stream<char>& out) -> future<> {
ws.register_handler("echo", [] (data_source& in, data_sink& out) -> future<> {
while (true) {
auto buf = co_await in.read();
if (buf.empty()) {
co_return;
}
auto buf = co_await in.get();
co_await out.write(std::move(buf));
co_await out.flush();
}
});
```

Note: the developers should assume that the input stream provides decoded and unmasked data - so the stream should be treated as if it was backed by a TCP socket. Similarly, responses should be sent to the output stream as is, and the WebSocket server implementation will handle its proper serialization, masking and so on.
Note: the developers should assume that the data source provides decoded and unmasked data. Similarly, responses should be sent to the output stream as is, and the WebSocket server implementation will handle its proper serialization, masking and so on.

## Error handling

Expand Down
12 changes: 5 additions & 7 deletions include/seastar/websocket/common.hh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace seastar::experimental::websocket {

extern sstring magic_key_suffix;

using handler_t = std::function<future<>(input_stream<char>&, output_stream<char>&)>;
using handler_t = std::function<future<>(data_source&, data_sink&)>;

class server;

Expand Down Expand Up @@ -126,9 +126,9 @@ protected:

websocket_parser _websocket_parser;
queue <temporary_buffer<char>> _input_buffer;
input_stream<char> _input;
data_source _input;
queue <temporary_buffer<char>> _output_buffer;
output_stream<char> _output;
data_sink _output;

sstring _subprotocol;
handler_t _handler;
Expand All @@ -143,10 +143,8 @@ public:
, _input_buffer{PIPE_SIZE}
, _output_buffer{PIPE_SIZE}
{
_input = input_stream<char>{data_source{
std::make_unique<connection_source_impl>(&_input_buffer)}};
_output = output_stream<char>{data_sink{
std::make_unique<connection_sink_impl>(&_output_buffer)}};
_input = data_source{std::make_unique<connection_source_impl>(&_input_buffer)};
_output = data_sink{std::make_unique<connection_sink_impl>(&_output_buffer)};
}

/*!
Expand Down
14 changes: 6 additions & 8 deletions tests/unit/websocket_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,14 @@ future<> test_websocket_handshake_common(std::string subprotocol) {
auto output = sock.output();

websocket::server dummy;
dummy.register_handler(subprotocol, [] (input_stream<char>& in,
output_stream<char>& out) {
dummy.register_handler(subprotocol, [] (data_source& in, data_sink& out) {
return repeat([&in, &out]() {
return in.read().then([&out](temporary_buffer<char> f) {
return in.get().then([&out](temporary_buffer<char> f) {
std::cerr << "f.size(): " << f.size() << "\n";
if (f.empty()) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
} else {
return out.write(std::move(f)).then([&out]() {
return out.put(std::move(f)).then([&out]() {
return out.flush().then([] {
return make_ready_future<stop_iteration>(stop_iteration::no);
});
Expand Down Expand Up @@ -115,15 +114,14 @@ future<> test_websocket_handler_registration_common(std::string subprotocol) {

// Setup server
websocket::server ws;
ws.register_handler(subprotocol, [] (input_stream<char>& in,
output_stream<char>& out) {
ws.register_handler(subprotocol, [] (data_source& in, data_sink& out) {
return repeat([&in, &out]() {
return in.read().then([&out](temporary_buffer<char> f) {
return in.get().then([&out](temporary_buffer<char> f) {
std::cerr << "f.size(): " << f.size() << "\n";
if (f.empty()) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
} else {
return out.write(std::move(f)).then([&out]() {
return out.put(std::move(f)).then([&out]() {
return out.flush().then([] {
return make_ready_future<stop_iteration>(stop_iteration::no);
});
Expand Down

0 comments on commit 0543087

Please sign in to comment.