Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DepLibUring improvements #1440

Merged
merged 11 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/mediasoup-rust.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ jobs:
ci:
- os: ubuntu-20.04
- os: ubuntu-22.04
- os: ubuntu-24.04
- os: macos-12
- os: macos-14
- os: windows-2022
Expand Down
9 changes: 5 additions & 4 deletions .github/workflows/mediasoup-worker-fuzzer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ jobs:
strategy:
matrix:
build:
- os: ubuntu-22.04
- os: ubuntu-24.04
cc: clang
cxx: clang++
pip-break-system-packages: true
build-type:
- Release
- Debug
Expand All @@ -34,12 +35,12 @@ jobs:
uses: actions/checkout@v4

# We need to install pip invoke manually.
- if: runner.os != 'macOS'
- if: ${{ !matrix.build.pip-break-system-packages }}
name: pip3 install invoke
run: pip3 install invoke

# In macOS we need to specify this option.
- if: runner.os == 'macOS'
# In modern OSs we need to run pip with this option.
- if: ${{ matrix.build.pip-break-system-packages }}
name: pip3 install --break-system-packages invoke
run: pip3 install --break-system-packages invoke

Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/mediasoup-worker-prebuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ jobs:
cc: gcc
cxx: g++
# Worker prebuild for Linux with kernel version 6 Ubuntu (22.04).
# Let's not use Ubutu 24.04 to avoid same potential problem as described
# above.
- os: ubuntu-22.04
cc: gcc
cxx: g++
Expand Down
16 changes: 13 additions & 3 deletions .github/workflows/mediasoup-worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,22 @@ jobs:
- os: ubuntu-22.04
cc: clang
cxx: clang++
- os: ubuntu-24.04
cc: gcc
cxx: g++
pip-break-system-packages: true
- os: ubuntu-24.04
cc: clang
cxx: clang++
pip-break-system-packages: true
- os: macos-12
cc: gcc
cxx: g++
pip-break-system-packages: true
- os: macos-14
cc: clang
cxx: clang++
pip-break-system-packages: true
- os: windows-2022
cc: cl
cxx: cl
Expand Down Expand Up @@ -75,12 +85,12 @@ jobs:
${{ matrix.build.os }}-node-${{matrix.build.cc}}-

# We need to install pip invoke manually.
- if: runner.os != 'macOS'
- if: ${{ !matrix.build.pip-break-system-packages }}
name: pip3 install invoke
run: pip3 install invoke

# In macOS we need to specify this option.
- if: runner.os == 'macOS'
# In modern OSs we need to run pip with this option.
- if: ${{ matrix.build.pip-break-system-packages }}
name: pip3 install --break-system-packages invoke
run: pip3 install --break-system-packages invoke

Expand Down
10 changes: 9 additions & 1 deletion worker/include/DepLibUring.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ class DepLibUring

using SendBuffer = uint8_t[SendBufferSize];

static bool IsRuntimeSupported();
static void ClassInit();
static void ClassDestroy();
static void CheckRuntimeSupport();
static bool IsEnabled();
static flatbuffers::Offset<FBS::LibUring::Dump> FillBuffer(flatbuffers::FlatBufferBuilder& builder);
static void StartPollingCQEs();
static void StopPollingCQEs();
Expand All @@ -50,9 +51,12 @@ class DepLibUring

class LibUring;

// Whether liburing is enabled or not after runtime checks.
static bool enabled;
thread_local static LibUring* liburing;

public:
// Singleton.
class LibUring
{
public:
Expand Down Expand Up @@ -98,6 +102,10 @@ class DepLibUring
}

private:
void SetInactive()
{
this->active = false;
}
UserData* GetUserData();
bool IsDataInSendBuffers(const uint8_t* data) const
{
Expand Down
98 changes: 44 additions & 54 deletions worker/src/DepLibUring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include <sys/utsname.h>

/* Static variables. */

bool DepLibUring::enabled{ false };
/* liburing instance per thread. */
thread_local DepLibUring::LibUring* DepLibUring::liburing{ nullptr };
/* Completion queue entry array used to retrieve processes tasks. */
Expand Down Expand Up @@ -114,7 +114,36 @@ inline static void onFdEvent(uv_poll_t* handle, int status, int events)

/* Static class methods */

bool DepLibUring::IsRuntimeSupported()
void DepLibUring::ClassInit()
{
const auto mayor = io_uring_major_version();
const auto minor = io_uring_minor_version();

MS_DEBUG_TAG(info, "liburing version: \"%i.%i\"", mayor, minor);

// This must be called first.
DepLibUring::CheckRuntimeSupport();

if (DepLibUring::IsEnabled())
{
DepLibUring::liburing = new LibUring();

MS_DEBUG_TAG(info, "liburing enabled");
}
else
{
MS_DEBUG_TAG(info, "liburing not enabled");
}
}

void DepLibUring::ClassDestroy()
{
MS_TRACE();

delete DepLibUring::liburing;
}

void DepLibUring::CheckRuntimeSupport()
{
// clang-format off
struct utsname buffer{};
Expand All @@ -134,43 +163,19 @@ bool DepLibUring::IsRuntimeSupported()

// liburing `sento` capabilities are supported for kernel versions greather
// than or equal to 6.
return kernelMayorLong >= 6;
}

void DepLibUring::ClassInit()
{
const auto mayor = io_uring_major_version();
const auto minor = io_uring_minor_version();

MS_DEBUG_TAG(info, "liburing version: \"%i.%i\"", mayor, minor);

if (DepLibUring::IsRuntimeSupported())
{
DepLibUring::liburing = new LibUring();

MS_DEBUG_TAG(info, "liburing supported, enabled");
}
else
{
MS_DEBUG_TAG(info, "liburing not supported, not enabled");
}
DepLibUring::enabled = kernelMayorLong >= 6;
}

void DepLibUring::ClassDestroy()
bool DepLibUring::IsEnabled()
{
MS_TRACE();

delete DepLibUring::liburing;
return DepLibUring::enabled;
}

flatbuffers::Offset<FBS::LibUring::Dump> DepLibUring::FillBuffer(flatbuffers::FlatBufferBuilder& builder)
{
MS_TRACE();

if (!DepLibUring::liburing)
{
return 0;
}
MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not enabled");

return DepLibUring::liburing->FillBuffer(builder);
}
Expand All @@ -179,10 +184,7 @@ void DepLibUring::StartPollingCQEs()
{
MS_TRACE();

if (!DepLibUring::liburing)
{
return;
}
MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not enabled");

DepLibUring::liburing->StartPollingCQEs();
}
Expand All @@ -191,10 +193,7 @@ void DepLibUring::StopPollingCQEs()
{
MS_TRACE();

if (!DepLibUring::liburing)
{
return;
}
MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not enabled");

DepLibUring::liburing->StopPollingCQEs();
}
Expand All @@ -203,7 +202,7 @@ uint8_t* DepLibUring::GetSendBuffer()
{
MS_TRACE();

MS_ASSERT(DepLibUring::liburing, "DepLibUring::liburing is not set");
MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not enabled");

return DepLibUring::liburing->GetSendBuffer();
}
Expand All @@ -213,7 +212,7 @@ bool DepLibUring::PrepareSend(
{
MS_TRACE();

MS_ASSERT(DepLibUring::liburing, "DepLibUring::liburing is not set");
MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not enabled");

return DepLibUring::liburing->PrepareSend(sockfd, data, len, addr, cb);
}
Expand All @@ -223,7 +222,7 @@ bool DepLibUring::PrepareWrite(
{
MS_TRACE();

MS_ASSERT(DepLibUring::liburing, "DepLibUring::liburing is not set");
MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not enabled");

return DepLibUring::liburing->PrepareWrite(sockfd, data1, len1, data2, len2, cb);
}
Expand All @@ -232,10 +231,7 @@ void DepLibUring::Submit()
{
MS_TRACE();

if (!DepLibUring::liburing)
{
return;
}
MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not enabled");

DepLibUring::liburing->Submit();
}
Expand All @@ -244,10 +240,7 @@ void DepLibUring::SetActive()
{
MS_TRACE();

if (!DepLibUring::liburing)
{
return;
}
MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not enabled");

DepLibUring::liburing->SetActive();
}
Expand All @@ -256,10 +249,7 @@ bool DepLibUring::IsActive()
{
MS_TRACE();

if (!DepLibUring::liburing)
{
return false;
}
MS_ASSERT(DepLibUring::enabled, "DepLibUring::liburing not enabled");

return DepLibUring::liburing->IsActive();
}
Expand Down Expand Up @@ -580,7 +570,7 @@ void DepLibUring::LibUring::Submit()
MS_TRACE();

// Unset active flag.
this->active = false;
SetInactive();

auto err = io_uring_submit(std::addressof(this->ring));

Expand Down
22 changes: 14 additions & 8 deletions worker/src/DepUsrSCTP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,19 +251,25 @@ void DepUsrSCTP::Checker::OnTimer(TimerHandle* /*timer*/)
const int elapsedMs = this->lastCalledAtMs ? static_cast<int>(nowMs - this->lastCalledAtMs) : 0;

#ifdef MS_LIBURING_SUPPORTED
// Activate liburing usage.
// 'usrsctp_handle_timers()' will synchronously call the send/recv
// callbacks for the pending data. If there are multiple messages to be
// sent over the network then we will send those messages within a single
// system call.
DepLibUring::SetActive();
if (DepLibUring::IsEnabled())
{
// Activate liburing usage.
// 'usrsctp_handle_timers()' will synchronously call the send/recv
// callbacks for the pending data. If there are multiple messages to be
// sent over the network then we will send those messages within a single
// system call.
DepLibUring::SetActive();
}
#endif

usrsctp_handle_timers(elapsedMs);

#ifdef MS_LIBURING_SUPPORTED
// Submit all prepared submission entries.
DepLibUring::Submit();
if (DepLibUring::IsEnabled())
{
// Submit all prepared submission entries.
DepLibUring::Submit();
}
#endif

this->lastCalledAtMs = nowMs;
Expand Down
Loading
Loading