Skip to content

Commit

Permalink
http/server: send headers using io_uring
Browse files Browse the repository at this point in the history
  • Loading branch information
MaxKellermann committed Jan 17, 2025
1 parent 29e348e commit c45aae2
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 4 deletions.
19 changes: 19 additions & 0 deletions src/http/server/Internal.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class SlicePool;
enum class HttpMethod : uint_least8_t;
struct HttpServerRequest;
class HttpHeaders;
class GrowingBuffer;
namespace Net::Log { enum class ContentType : uint8_t; }

struct HttpServerConnection final
Expand Down Expand Up @@ -277,6 +278,15 @@ struct HttpServerConnection final
} response;

#ifdef HAVE_URING
class UringSend;

void StartUringSend(Uring::Queue &queue, GrowingBuffer &&src);
void CancelUringSend() noexcept;
void OnUringSendDone() noexcept;
void OnUringSendError(int error) noexcept;

UringSend *uring_send = nullptr;

class UringSplice final : Uring::Operation {
HttpServerConnection &parent;
Uring::Queue &queue;
Expand Down Expand Up @@ -305,6 +315,14 @@ struct HttpServerConnection final
bool uring_splice_then_eof;
#endif

bool HaveUringSend() const noexcept {
#ifdef HAVE_URING
return uring_send != nullptr;
#else
return false;
#endif
}

enum http_server_score score = HTTP_SERVER_NEW;

bool date_header;
Expand All @@ -320,6 +338,7 @@ struct HttpServerConnection final
SlicePool &_request_slice_pool,
HttpServerConnectionHandler &_handler,
HttpServerRequestHandler &_request_handler) noexcept;
~HttpServerConnection() noexcept;

void Delete() noexcept;

Expand Down
8 changes: 8 additions & 0 deletions src/http/server/Public.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,14 @@ HttpServerConnection::HttpServerConnection(struct pool &_pool,
socket->DeferRead();
}

inline
HttpServerConnection::~HttpServerConnection() noexcept
{
#ifdef HAVE_URING
CancelUringSend();
#endif
}

void
HttpServerConnection::Delete() noexcept
{
Expand Down
11 changes: 10 additions & 1 deletion src/http/server/Response.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ HttpServerConnection::OnData(std::span<const std::byte> src) noexcept
if (!socket->IsConnected())
return 0;

if (HaveUringSend())
// wait for io_uring to complete
return 0;

ssize_t nbytes = socket->Write(src);

if (nbytes >= 0) [[likely]] {
Expand Down Expand Up @@ -82,6 +86,10 @@ HttpServerConnection::OnDirect(FdType type, FileDescriptor fd, off_t offset,
if (!socket->IsConnected())
return IstreamDirectResult::BLOCKING;

if (HaveUringSend())
// wait for io_uring to complete
return IstreamDirectResult::BLOCKING;

#ifdef HAVE_URING
if (uring_splice) {
if (uring_splice->IsUringPending())
Expand Down Expand Up @@ -135,7 +143,8 @@ HttpServerConnection::OnEof() noexcept

ClearInput();

ResponseIstreamFinished();
if (!HaveUringSend())
ResponseIstreamFinished();
}

void
Expand Down
16 changes: 13 additions & 3 deletions src/http/server/Send.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -164,14 +164,24 @@ HttpServerConnection::SubmitResponse(HttpStatus status,

GrowingBuffer headers3 = headers.ToBuffer();
headers3.Write("\r\n"sv);
auto header_stream = istream_gb_new(request_pool, std::move(headers3));

response.length = - header_stream.GetAvailable(false);

/* make sure the access logger gets a negative value if there
is no response body */
response.length -= !body;

#ifdef HAVE_URING
if (auto *uring_queue = socket->GetUringQueue()) {
assert(uring_send == nullptr);

SetResponseIstream(std::move(body));
StartUringSend(*uring_queue, std::move(headers3));
return;
}
#endif
auto header_stream = istream_gb_new(request_pool, std::move(headers3));

response.length = - header_stream.GetAvailable(false);

SetResponseIstream(NewConcatIstream(request_pool, std::move(header_stream),
std::move(body)));
DeferWrite();
Expand Down
130 changes: 130 additions & 0 deletions src/http/server/Uring.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,142 @@

#ifdef HAVE_URING
#include "Internal.hxx"
#include "memory/GrowingBuffer.hxx"
#include "fs/FilteredSocket.hxx"
#include "system/Error.hxx"
#include "io/uring/Queue.hxx"

#include <cassert>

#include <fcntl.h> // for SPLICE_F_MOVE

class HttpServerConnection::UringSend final : Uring::Operation {
HttpServerConnection &parent;
Uring::Queue &queue;

GrowingBufferReader reader;

bool canceled = false;

public:
UringSend(HttpServerConnection &_parent,
Uring::Queue &_queue,
GrowingBuffer &&buffer) noexcept
:parent(_parent), queue(_queue), reader(std::move(buffer))
{
}

void Start();

void Cancel() noexcept;

private:
HttpServerConnection &Finish() noexcept {
assert(!IsUringPending());
assert(!canceled);
assert(parent.uring_send == this);

auto &_parent = parent;
parent.uring_send = nullptr;
delete this;
return _parent;
}

void OnUringCompletion(int res) noexcept override;
};

void
HttpServerConnection::UringSend::Start()
{
assert(!IsUringPending());
assert(!canceled);
assert(parent.uring_send == this);

const auto r = reader.Read();
if (r.empty()) {
Finish().OnUringSendDone();
return;
}

auto &s = queue.RequireSubmitEntry();
io_uring_prep_send(&s, parent.socket->GetSocket().Get(),
r.data(), r.size(), 0);

/* always go async; this way, the overhead for the operation
does not cause latency in the main thread */
io_uring_sqe_set_flags(&s, IOSQE_ASYNC);

queue.Push(s, *this);
}

void
HttpServerConnection::UringSend::Cancel() noexcept
{
assert(IsUringPending());
assert(!canceled);
assert(parent.uring_send == this);

canceled = true;
parent.uring_send = nullptr;

auto &s = queue.RequireSubmitEntry();
io_uring_prep_cancel(&s, GetUringData(), 0);
io_uring_sqe_set_data(&s, nullptr);
queue.Submit();
}

void
HttpServerConnection::UringSend::OnUringCompletion(int res) noexcept
{
if (canceled) [[unlikely]] {
delete this;
return;
}

assert(parent.uring_send == this);

if (res < 0) [[unlikely]] {
Finish().OnUringSendError(-res);
return;
}

reader.Consume(res);
Start();
}

void
HttpServerConnection::StartUringSend(Uring::Queue &queue, GrowingBuffer &&src)
{
assert(uring_send == nullptr);

uring_send = new UringSend(*this, queue, std::move(src));
uring_send->Start();
}

void
HttpServerConnection::CancelUringSend() noexcept
{
if (uring_send != nullptr) {
uring_send->Cancel();
assert(uring_send == nullptr);
}
}

inline void
HttpServerConnection::OnUringSendDone() noexcept
{
if (HasInput())
DeferWrite();
else
ResponseIstreamFinished();
}

inline void
HttpServerConnection::OnUringSendError(int error) noexcept
{
Error(std::make_exception_ptr(MakeErrno(error, "Send failed")));
}

HttpServerConnection::UringSplice::~UringSplice() noexcept
{
if (IsUringPending()) {
Expand Down

0 comments on commit c45aae2

Please sign in to comment.