Skip to content

Commit

Permalink
More refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
nibanks committed Jan 3, 2025
1 parent 4974f2d commit 7b10d2a
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 47 deletions.
27 changes: 27 additions & 0 deletions src/inc/quic_platform_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -1228,6 +1228,19 @@ CxPlatEventQEnqueue(
return kevent(*queue, &event, 1, NULL, 0, NULL) == 0;
}

inline
BOOLEAN
CxPlatEventQEnqueueEx(
_In_ CXPLAT_EVENTQ* queue,
_In_ CXPLAT_SQE* sqe,
_In_ short filter,
_In_ unsigned short flags,
)
{
struct kevent event = {.ident = sqe->Handle, .filter = filter, .flags = a, .fflags = 0, .data = 0, .udata = sqe};
return kevent(*queue, &event, 1, NULL, 0, NULL) == 0;
}

inline
uint32_t
CxPlatEventQDequeue(
Expand Down Expand Up @@ -1276,6 +1289,20 @@ CxPlatSqeInitialize(
return TRUE;
}

inline
void
CxPlatSqeInitializeEx(
_In_ CXPLAT_EVENTQ* queue,
_In_ uintptr_t handle,
_In_ CXPLAT_EVENT_COMPLETION completion,
_Out_ CXPLAT_SQE* sqe
)
{
UNREFERENCED_PARAMETER(queue);
sqe->Handle = handle;
sqe->Completion = completion;
}

inline
void
CxPlatSqeCleanup(
Expand Down
93 changes: 46 additions & 47 deletions src/platform/datapath_kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ typedef struct QUIC_CACHEALIGN CXPLAT_SOCKET_CONTEXT {
CXPLAT_SQE ShutdownSqe;

//
// The user data for the IO event.
// The event for the IO event.
//
uint32_t IoCqeType;
CXPLAT_SQE IoSqe;

//
// The I/O vector for receive datagrams.
Expand Down Expand Up @@ -376,6 +376,9 @@ typedef struct CXPLAT_DATAPATH {
CXPLAT_DATAPATH_PARTITION Partitions[];

} CXPLAT_DATAPATH;

CXPLAT_EVENT_COMPLETION CxPlatSocketContextUninitializeEventComplete;
CXPLAT_EVENT_COMPLETION CxPlatSocketContextIoEventComplete;

QUIC_STATUS
CxPlatSocketSendInternal(
Expand Down Expand Up @@ -646,19 +649,15 @@ CxPlatSocketContextInitialize(
goto Exit;
}

if (!CxPlatSqeInitialize(
SocketContext->DatapathPartition->EventQ,
&SocketContext->ShutdownSqe.Sqe,
&SocketContext->ShutdownSqe)) {
Status = errno;
QuicTraceEvent(
DatapathErrorStatus,
"[data][%p] ERROR, %u, %s.",
Binding,
Status,
"CxPlatSqeInitialize failed");
goto Exit;
}
CxPlatSqeInitialize(
SocketContext->DatapathPartition->EventQ,
CxPlatSocketContextUninitializeEventComplete,
&SocketContext->ShutdownSqe);
CxPlatSqeInitializeEx(
SocketContext->DatapathPartition->EventQ,
SocketContext->SocketFd,
CxPlatSocketContextIoEventComplete,
&SocketContext->IoSqe);

//
// Set dual (IPv4 & IPv6) socket mode unless we operate in pure IPv4 mode
Expand Down Expand Up @@ -972,9 +971,11 @@ CxPlatSocketContextUninitializeComplete(
}

if (SocketContext->SocketFd != INVALID_SOCKET) {
struct kevent DeleteEvent = {0};
EV_SET(&DeleteEvent, SocketContext->SocketFd, EVFILT_READ, EV_DELETE, 0, 0, &SocketContext->IoCqeType);
(void)kevent(*SocketContext->DatapathPartition->EventQ, &DeleteEvent, 1, NULL, 0, NULL);
CxPlatEventQEnqueueEx(
SocketContext->DatapathPartition->EventQ,
&SocketContext->IoSqe,
EVFILT_READ,
EV_DELETE);
close(SocketContext->SocketFd);
}

Expand All @@ -987,6 +988,16 @@ CxPlatSocketContextUninitializeComplete(
CxPlatSocketRelease(SocketContext->Binding);
}

void
CxPlatSocketContextUninitializeEventComplete(
_In_ CXPLAT_CQE* Cqe
)
{
CXPLAT_SOCKET_CONTEXT* SocketContext =
CXPLAT_CONTAINING_RECORD(CxPlatCqeGetSqe(Cqe), CXPLAT_SOCKET_CONTEXT, ShutdownSqe);
CxPlatSocketContextUninitializeComplete(SocketContext);
}

void
CxPlatSocketContextUninitialize(
_In_ CXPLAT_SOCKET_CONTEXT* SocketContext
Expand All @@ -1005,10 +1016,11 @@ CxPlatSocketContextUninitialize(
//
// Cancel and clean up any pending IO.
//
struct kevent DeleteEvent = {0};
EV_SET(&DeleteEvent, SocketContext->SocketFd, EVFILT_READ, EV_DELETE, 0, 0, &SocketContext->IoCqeType);
(void)kevent(*SocketContext->DatapathPartition->EventQ, &DeleteEvent, 1, NULL, 0, NULL);

CxPlatEventQEnqueueEx(
SocketContext->DatapathPartition->EventQ,
&SocketContext->IoSqe,
EVFILT_READ,
EV_DELETE);
CxPlatEventQEnqueue(
SocketContext->DatapathPartition->EventQ,
&SocketContext->ShutdownSqe);
Expand Down Expand Up @@ -1062,25 +1074,18 @@ CxPlatSocketContextStartReceive(
goto Error;
}

struct kevent Event = {0};
EV_SET(&Event, SocketContext->SocketFd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, &SocketContext->IoCqeType);

int Ret =
kevent(
*SocketContext->DatapathPartition->EventQ,
&Event,
1,
NULL,
0,
NULL);
if (Ret < 0) {
if (!CxPlatEventQEnqueueEx(
SocketContext->DatapathPartition->EventQ,
&SocketContext->IoSqe,
EVFILT_READ,
EV_ADD | EV_ENABLE)) {
Status = errno;
QuicTraceEvent(
DatapathErrorStatus,
"[data][%p] ERROR, %u, %s.",
SocketContext->Binding,
Status,
"kevent failed");
"CxPlatEventQEnqueueEx failed");
goto Error;
}

Expand Down Expand Up @@ -2026,24 +2031,18 @@ CxPlatSocketSendInternal(
CxPlatLockRelease(&SocketContext->PendingSendDataLock);
}
SendPending = TRUE;
struct kevent Event = {0};
EV_SET(&Event, SocketContext->SocketFd, EVFILT_WRITE, EV_ADD | EV_ONESHOT | EV_CLEAR, 0, 0, &SocketContext->IoCqeType);
int Ret =
kevent(
*SocketContext->DatapathPartition->EventQ,
&Event,
1,
NULL,
0,
NULL);
if (Ret < 1) {
if (!CxPlatEventQEnqueueEx(
SocketContext->DatapathPartition->EventQ,
&SocketContext->IoSqe,
EVFILT_WRITE,
EV_ADD | EV_ONESHOT | EV_CLEAR)) {
Status = errno;
QuicTraceEvent(
DatapathErrorStatus,
"[data][%p] ERROR, %u, %s.",
SocketContext->Binding,
Status,
"kevent failed");
"CxPlatEventQEnqueueEx failed");
goto Exit;
}
Status = QUIC_STATUS_PENDING;
Expand Down
16 changes: 16 additions & 0 deletions src/platform/inline.c
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,14 @@ CxPlatEventQEnqueue(
_In_ CXPLAT_SQE* sqe
);

BOOLEAN
CxPlatEventQEnqueueEx(
_In_ CXPLAT_EVENTQ* queue,
_In_ CXPLAT_SQE* sqe,
_In_ short filter,
_In_ unsigned short flags,
);

uint32_t
CxPlatEventQDequeue(
_In_ CXPLAT_EVENTQ* queue,
Expand All @@ -438,6 +446,14 @@ CxPlatSqeInitialize(
_Out_ CXPLAT_SQE* sqe
);

void
CxPlatSqeInitializeEx(
_In_ CXPLAT_EVENTQ* queue,
_In_ uintptr_t handle,
_In_ CXPLAT_EVENT_COMPLETION completion,
_Out_ CXPLAT_SQE* sqe
);

void
CxPlatSqeCleanup(
_In_ CXPLAT_EVENTQ* queue,
Expand Down

0 comments on commit 7b10d2a

Please sign in to comment.