diff --git a/src/generated/linux/datapath_raw_xdp_linux.c.clog.h b/src/generated/linux/datapath_raw_xdp_linux.c.clog.h index f2456815ff..9893b21e2d 100644 --- a/src/generated/linux/datapath_raw_xdp_linux.c.clog.h +++ b/src/generated/linux/datapath_raw_xdp_linux.c.clog.h @@ -546,9 +546,9 @@ tracepoint(CLOG_DATAPATH_RAW_XDP_LINUX_C, XdpPartitionShutdown , arg2);\ // Decoder Ring for XdpPartitionShutdownComplete // [ xdp][%p] XDP partition shutdown complete // QuicTraceLogVerbose( - XdpPartitionShutdownComplete, - "[ xdp][%p] XDP partition shutdown complete", - Partition); + XdpPartitionShutdownComplete, + "[ xdp][%p] XDP partition shutdown complete", + Partition); // arg2 = arg2 = Partition = arg2 ----------------------------------------------------------*/ #ifndef _clog_3_ARGS_TRACE_XdpPartitionShutdownComplete @@ -564,9 +564,9 @@ tracepoint(CLOG_DATAPATH_RAW_XDP_LINUX_C, XdpPartitionShutdownComplete , arg2);\ // Decoder Ring for XdpQueueAsyncIoRxComplete // [ xdp][%p] XDP async IO complete (RX) // QuicTraceLogVerbose( - XdpQueueAsyncIoRxComplete, - "[ xdp][%p] XDP async IO complete (RX)", - Queue); + XdpQueueAsyncIoRxComplete, + "[ xdp][%p] XDP async IO complete (RX)", + Queue); // arg2 = arg2 = Queue = arg2 ----------------------------------------------------------*/ #ifndef _clog_3_ARGS_TRACE_XdpQueueAsyncIoRxComplete diff --git a/src/generated/linux/datapath_raw_xdp_linux.c.clog.h.lttng.h b/src/generated/linux/datapath_raw_xdp_linux.c.clog.h.lttng.h index 0a65d87672..5595345fa9 100644 --- a/src/generated/linux/datapath_raw_xdp_linux.c.clog.h.lttng.h +++ b/src/generated/linux/datapath_raw_xdp_linux.c.clog.h.lttng.h @@ -568,9 +568,9 @@ TRACEPOINT_EVENT(CLOG_DATAPATH_RAW_XDP_LINUX_C, XdpPartitionShutdown, // Decoder Ring for XdpPartitionShutdownComplete // [ xdp][%p] XDP partition shutdown complete // QuicTraceLogVerbose( - XdpPartitionShutdownComplete, - "[ xdp][%p] XDP partition shutdown complete", - Partition); + XdpPartitionShutdownComplete, + "[ xdp][%p] XDP partition shutdown complete", + Partition); // arg2 = arg2 = Partition = arg2 ----------------------------------------------------------*/ TRACEPOINT_EVENT(CLOG_DATAPATH_RAW_XDP_LINUX_C, XdpPartitionShutdownComplete, @@ -587,9 +587,9 @@ TRACEPOINT_EVENT(CLOG_DATAPATH_RAW_XDP_LINUX_C, XdpPartitionShutdownComplete, // Decoder Ring for XdpQueueAsyncIoRxComplete // [ xdp][%p] XDP async IO complete (RX) // QuicTraceLogVerbose( - XdpQueueAsyncIoRxComplete, - "[ xdp][%p] XDP async IO complete (RX)", - Queue); + XdpQueueAsyncIoRxComplete, + "[ xdp][%p] XDP async IO complete (RX)", + Queue); // arg2 = arg2 = Queue = arg2 ----------------------------------------------------------*/ TRACEPOINT_EVENT(CLOG_DATAPATH_RAW_XDP_LINUX_C, XdpQueueAsyncIoRxComplete, diff --git a/src/generated/linux/datapath_raw_xdp_win.c.clog.h b/src/generated/linux/datapath_raw_xdp_win.c.clog.h index b8e8a3d927..70075fd2b7 100644 --- a/src/generated/linux/datapath_raw_xdp_win.c.clog.h +++ b/src/generated/linux/datapath_raw_xdp_win.c.clog.h @@ -263,9 +263,9 @@ tracepoint(CLOG_DATAPATH_RAW_XDP_WIN_C, XdpQueueAsyncIoTx , arg2);\ // Decoder Ring for XdpQueueAsyncIoRxComplete // [ xdp][%p] XDP async IO complete (RX) // QuicTraceLogVerbose( - XdpQueueAsyncIoRxComplete, - "[ xdp][%p] XDP async IO complete (RX)", - Queue); + XdpQueueAsyncIoRxComplete, + "[ xdp][%p] XDP async IO complete (RX)", + Queue); // arg2 = arg2 = Queue = arg2 ----------------------------------------------------------*/ #ifndef _clog_3_ARGS_TRACE_XdpQueueAsyncIoRxComplete @@ -281,9 +281,9 @@ tracepoint(CLOG_DATAPATH_RAW_XDP_WIN_C, XdpQueueAsyncIoRxComplete , arg2);\ // Decoder Ring for XdpQueueAsyncIoTxComplete // [ xdp][%p] XDP async IO complete (TX) // QuicTraceLogVerbose( - XdpQueueAsyncIoTxComplete, - "[ xdp][%p] XDP async IO complete (TX)", - Queue); + XdpQueueAsyncIoTxComplete, + "[ xdp][%p] XDP async IO complete (TX)", + Queue); // arg2 = arg2 = Queue = arg2 ----------------------------------------------------------*/ #ifndef _clog_3_ARGS_TRACE_XdpQueueAsyncIoTxComplete @@ -299,9 +299,9 @@ tracepoint(CLOG_DATAPATH_RAW_XDP_WIN_C, XdpQueueAsyncIoTxComplete , arg2);\ // Decoder Ring for XdpPartitionShutdownComplete // [ xdp][%p] XDP partition shutdown complete // QuicTraceLogVerbose( - XdpPartitionShutdownComplete, - "[ xdp][%p] XDP partition shutdown complete", - Partition); + XdpPartitionShutdownComplete, + "[ xdp][%p] XDP partition shutdown complete", + Partition); // arg2 = arg2 = Partition = arg2 ----------------------------------------------------------*/ #ifndef _clog_3_ARGS_TRACE_XdpPartitionShutdownComplete diff --git a/src/generated/linux/datapath_raw_xdp_win.c.clog.h.lttng.h b/src/generated/linux/datapath_raw_xdp_win.c.clog.h.lttng.h index 891a5d88f3..2273dd284d 100644 --- a/src/generated/linux/datapath_raw_xdp_win.c.clog.h.lttng.h +++ b/src/generated/linux/datapath_raw_xdp_win.c.clog.h.lttng.h @@ -261,9 +261,9 @@ TRACEPOINT_EVENT(CLOG_DATAPATH_RAW_XDP_WIN_C, XdpQueueAsyncIoTx, // Decoder Ring for XdpQueueAsyncIoRxComplete // [ xdp][%p] XDP async IO complete (RX) // QuicTraceLogVerbose( - XdpQueueAsyncIoRxComplete, - "[ xdp][%p] XDP async IO complete (RX)", - Queue); + XdpQueueAsyncIoRxComplete, + "[ xdp][%p] XDP async IO complete (RX)", + Queue); // arg2 = arg2 = Queue = arg2 ----------------------------------------------------------*/ TRACEPOINT_EVENT(CLOG_DATAPATH_RAW_XDP_WIN_C, XdpQueueAsyncIoRxComplete, @@ -280,9 +280,9 @@ TRACEPOINT_EVENT(CLOG_DATAPATH_RAW_XDP_WIN_C, XdpQueueAsyncIoRxComplete, // Decoder Ring for XdpQueueAsyncIoTxComplete // [ xdp][%p] XDP async IO complete (TX) // QuicTraceLogVerbose( - XdpQueueAsyncIoTxComplete, - "[ xdp][%p] XDP async IO complete (TX)", - Queue); + XdpQueueAsyncIoTxComplete, + "[ xdp][%p] XDP async IO complete (TX)", + Queue); // arg2 = arg2 = Queue = arg2 ----------------------------------------------------------*/ TRACEPOINT_EVENT(CLOG_DATAPATH_RAW_XDP_WIN_C, XdpQueueAsyncIoTxComplete, @@ -299,9 +299,9 @@ TRACEPOINT_EVENT(CLOG_DATAPATH_RAW_XDP_WIN_C, XdpQueueAsyncIoTxComplete, // Decoder Ring for XdpPartitionShutdownComplete // [ xdp][%p] XDP partition shutdown complete // QuicTraceLogVerbose( - XdpPartitionShutdownComplete, - "[ xdp][%p] XDP partition shutdown complete", - Partition); + XdpPartitionShutdownComplete, + "[ xdp][%p] XDP partition shutdown complete", + Partition); // arg2 = arg2 = Partition = arg2 ----------------------------------------------------------*/ TRACEPOINT_EVENT(CLOG_DATAPATH_RAW_XDP_WIN_C, XdpPartitionShutdownComplete, diff --git a/src/generated/linux/datapath_winuser.c.clog.h b/src/generated/linux/datapath_winuser.c.clog.h index d77ffd43af..c285e4e96b 100644 --- a/src/generated/linux/datapath_winuser.c.clog.h +++ b/src/generated/linux/datapath_winuser.c.clog.h @@ -341,10 +341,10 @@ tracepoint(CLOG_DATAPATH_WINUSER_C, LibraryError , arg2);\ "[data][%p] ERROR, %u, %s.", SocketProc->Parent, LastError, - "CxPlatSocketEnqueueSqe"); + "CxPlatEventQEnqueueEx"); // arg2 = arg2 = SocketProc->Parent = arg2 // arg3 = arg3 = LastError = arg3 -// arg4 = arg4 = "CxPlatSocketEnqueueSqe" = arg4 +// arg4 = arg4 = "CxPlatEventQEnqueueEx" = arg4 ----------------------------------------------------------*/ #ifndef _clog_5_ARGS_TRACE_DatapathErrorStatus #define _clog_5_ARGS_TRACE_DatapathErrorStatus(uniqueId, encoded_arg_string, arg2, arg3, arg4)\ diff --git a/src/generated/linux/datapath_winuser.c.clog.h.lttng.h b/src/generated/linux/datapath_winuser.c.clog.h.lttng.h index 6dfd96927d..6af0327b8e 100644 --- a/src/generated/linux/datapath_winuser.c.clog.h.lttng.h +++ b/src/generated/linux/datapath_winuser.c.clog.h.lttng.h @@ -341,10 +341,10 @@ TRACEPOINT_EVENT(CLOG_DATAPATH_WINUSER_C, LibraryError, "[data][%p] ERROR, %u, %s.", SocketProc->Parent, LastError, - "CxPlatSocketEnqueueSqe"); + "CxPlatEventQEnqueueEx"); // arg2 = arg2 = SocketProc->Parent = arg2 // arg3 = arg3 = LastError = arg3 -// arg4 = arg4 = "CxPlatSocketEnqueueSqe" = arg4 +// arg4 = arg4 = "CxPlatEventQEnqueueEx" = arg4 ----------------------------------------------------------*/ TRACEPOINT_EVENT(CLOG_DATAPATH_WINUSER_C, DatapathErrorStatus, TP_ARGS( diff --git a/src/inc/quic_platform.h b/src/inc/quic_platform.h index ceaa6cdcd8..4e56a87fd5 100644 --- a/src/inc/quic_platform.h +++ b/src/inc/quic_platform.h @@ -541,18 +541,6 @@ CxPlatWakeExecutionContext( ); #endif -// -// The "type" of the completion queue event is stored as the first uint32_t of -// the user data. Everything after that in the user data is type-specific. -// -#define CxPlatCqeType(cqe) (*(uint32_t*)CxPlatCqeUserData(cqe)) - -// -// All QUIC (and lower layer) completion queue events have a type starting with -// 0x8000. -// -#define CXPLAT_CQE_TYPE_QUIC_BASE 0x8000 // to 0xFFFF - // // Test Interface for loading a self-signed certificate. // diff --git a/src/inc/quic_platform_posix.h b/src/inc/quic_platform_posix.h index 9adc306306..f963770741 100644 --- a/src/inc/quic_platform_posix.h +++ b/src/inc/quic_platform_posix.h @@ -959,6 +959,16 @@ CxPlatInternalEventWaitWithTimeout( #include typedef struct io_uring CXPLAT_EVENTQ; typedef struct io_uring_cqe* CXPLAT_CQE; +typedef +_IRQL_requires_max_(PASSIVE_LEVEL) +void +(CXPLAT_EVENT_COMPLETION)( + _In_ CXPLAT_CQE* Cqe + ); +typedef CXPLAT_EVENT_COMPLETION *CXPLAT_EVENT_COMPLETION_HANDLER; +typedef struct CXPLAT_SQE { + CXPLAT_EVENT_COMPLETION_HANDLER Completion; +} CXPLAT_SQE; inline BOOLEAN @@ -980,21 +990,19 @@ CxPlatEventQCleanup( inline BOOLEAN -_CxPlatEventQEnqueue( +CxPlatEventQEnqueue( _In_ CXPLAT_EVENTQ* queue, - _In_opt_ void* user_data + _In_ CXPLAT_SQE* sqe ) { struct io_uring_sqe *io_sqe = io_uring_get_sqe(queue); if (io_sqe == NULL) return FALSE; // OOM io_uring_prep_nop(io_sqe); - io_uring_sqe_set_data(io_sqe, user_data); + io_uring_sqe_set_data(io_sqe, sqe); io_uring_submit(queue); // TODO - Extract to separate function? return TRUE; } -#define CxPlatEventQEnqueue(queue, sqe, user_data) _CxPlatEventQEnqueue(queue, user_data) - inline uint32_t CxPlatEventQDequeue( @@ -1028,12 +1036,36 @@ CxPlatEventQReturn( } inline -void* -CxPlatCqeUserData( +BOOLEAN +CxPlatSqeInitialize( + _In_ CXPLAT_EVENTQ* queue, + _In_ CXPLAT_EVENT_COMPLETION completion, + _Out_ CXPLAT_SQE* sqe + ) +{ + UNREFERENCED_PARAMETER(queue); + sqe->Completion = completion; + return TRUE; +} + +inline +void +CxPlatSqeCleanup( + _In_ CXPLAT_EVENTQ* queue, + _In_ CXPLAT_SQE* sqe + ) +{ + UNREFERENCED_PARAMETER(queue); + UNREFERENCED_PARAMETER(sqe); +} + +inline +CXPLAT_SQE* +CxPlatCqeGetSqe( _In_ const CXPLAT_CQE* cqe ) { - return (void*)(uintptr_t)cqe->user_data; + return (CXPLAT_SQE*)(uintptr_t)cqe->user_data; } #else // epoll @@ -1042,9 +1074,17 @@ CxPlatCqeUserData( #include typedef int CXPLAT_EVENTQ; -#define CXPLAT_SQE int -#define CXPLAT_SQE_DEFAULT 0 typedef struct epoll_event CXPLAT_CQE; +typedef +void +(CXPLAT_EVENT_COMPLETION)( + _In_ CXPLAT_CQE* Cqe + ); +typedef CXPLAT_EVENT_COMPLETION *CXPLAT_EVENT_COMPLETION_HANDLER; +typedef struct CXPLAT_SQE { + int fd; + CXPLAT_EVENT_COMPLETION_HANDLER Completion; +} CXPLAT_SQE; inline BOOLEAN @@ -1068,13 +1108,11 @@ inline BOOLEAN CxPlatEventQEnqueue( _In_ CXPLAT_EVENTQ* queue, - _In_ CXPLAT_SQE* sqe, - _In_opt_ void* user_data + _In_ CXPLAT_SQE* sqe ) { UNREFERENCED_PARAMETER(queue); - UNREFERENCED_PARAMETER(user_data); - return eventfd_write(*sqe, 1) == 0; + return eventfd_write(sqe->fd, 1) == 0; } inline @@ -1105,19 +1143,18 @@ CxPlatEventQReturn( UNREFERENCED_PARAMETER(count); } -#define CXPLAT_SQE_INIT 1 - inline BOOLEAN CxPlatSqeInitialize( _In_ CXPLAT_EVENTQ* queue, - _Out_ CXPLAT_SQE* sqe, - _In_ void* user_data + _In_ CXPLAT_EVENT_COMPLETION completion, + _Out_ CXPLAT_SQE* sqe ) { - struct epoll_event event = { .events = EPOLLIN | EPOLLET, .data = { .ptr = user_data } }; - if ((*sqe = eventfd(0, EFD_CLOEXEC)) == -1) return FALSE; - if (epoll_ctl(*queue, EPOLL_CTL_ADD, *sqe, &event) != 0) { close(*sqe); return FALSE; } + struct epoll_event event = { .events = EPOLLIN | EPOLLET, .data = { .ptr = sqe } }; + sqe->Completion = completion; + if ((sqe->fd = eventfd(0, EFD_CLOEXEC)) == -1) return FALSE; + if (epoll_ctl(*queue, EPOLL_CTL_ADD, sqe->fd, &event) != 0) { close(sqe->fd); return FALSE; } return TRUE; } @@ -1128,17 +1165,17 @@ CxPlatSqeCleanup( _In_ CXPLAT_SQE* sqe ) { - epoll_ctl(*queue, EPOLL_CTL_DEL, *sqe, NULL); - close(*sqe); + epoll_ctl(*queue, EPOLL_CTL_DEL, sqe->fd, NULL); + close(sqe->fd); } inline -void* -CxPlatCqeUserData( +CXPLAT_SQE* +CxPlatCqeGetSqe( _In_ const CXPLAT_CQE* cqe ) { - return (void*)cqe->data.ptr; + return (CXPLAT_SQE*)cqe->data.ptr; } #endif @@ -1149,9 +1186,17 @@ CxPlatCqeUserData( #include typedef int CXPLAT_EVENTQ; -#define CXPLAT_SQE uintptr_t -#define CXPLAT_SQE_DEFAULT 0 typedef struct kevent CXPLAT_CQE; +typedef +void +(CXPLAT_EVENT_COMPLETION)( + _In_ CXPLAT_CQE* Cqe + ); +typedef CXPLAT_EVENT_COMPLETION *CXPLAT_EVENT_COMPLETION_HANDLER; +typedef struct CXPLAT_SQE { + uintptr_t Handle; + CXPLAT_EVENT_COMPLETION_HANDLER Completion; +} CXPLAT_SQE; inline BOOLEAN @@ -1174,12 +1219,25 @@ CxPlatEventQCleanup( inline BOOLEAN CxPlatEventQEnqueue( + _In_ CXPLAT_EVENTQ* queue, + _In_ CXPLAT_SQE* sqe + ) +{ + // TODO - Should ident simply use the pointer value of sqe? + struct kevent event = {.ident = sqe->Handle, .filter = EVFILT_USER, .flags = EV_ADD | EV_ONESHOT, .fflags = NOTE_TRIGGER, .data = 0, .udata = sqe}; + return kevent(*queue, &event, 1, NULL, 0, NULL) == 0; +} + +inline +BOOLEAN +CxPlatEventQEnqueueEx( _In_ CXPLAT_EVENTQ* queue, _In_ CXPLAT_SQE* sqe, - _In_opt_ void* user_data + _In_ short filter, + _In_ unsigned short flags ) { - struct kevent event = {.ident = *sqe, .filter = EVFILT_USER, .flags = EV_ADD | EV_ONESHOT, .fflags = NOTE_TRIGGER, .data = 0, .udata = user_data}; + struct kevent event = {.ident = sqe->Handle, .filter = filter, .flags = flags, .fflags = 0, .data = 0, .udata = sqe}; return kevent(*queue, &event, 1, NULL, 0, NULL) == 0; } @@ -1215,24 +1273,36 @@ CxPlatEventQReturn( UNREFERENCED_PARAMETER(count); } -#define CXPLAT_SQE_INIT 1 - extern uintptr_t CxPlatCurrentSqe; inline BOOLEAN CxPlatSqeInitialize( _In_ CXPLAT_EVENTQ* queue, - _In_ CXPLAT_SQE* sqe, - _In_ void* user_data + _In_ CXPLAT_EVENT_COMPLETION completion, + _Out_ CXPLAT_SQE* sqe ) { UNREFERENCED_PARAMETER(queue); - UNREFERENCED_PARAMETER(user_data); - *sqe = __sync_add_and_fetch(&CxPlatCurrentSqe, 1); + sqe->Handle = __sync_add_and_fetch(&CxPlatCurrentSqe, 1); + sqe->Completion = completion; return TRUE; } +inline +void +CxPlatSqeInitializeEx( + _In_ CXPLAT_EVENTQ* queue, + _In_ uintptr_t handle, + _In_ CXPLAT_EVENT_COMPLETION completion, + _Out_ CXPLAT_SQE* sqe + ) +{ + UNREFERENCED_PARAMETER(queue); + sqe->Handle = handle; + sqe->Completion = completion; +} + inline void CxPlatSqeCleanup( @@ -1245,12 +1315,12 @@ CxPlatSqeCleanup( } inline -void* -CxPlatCqeUserData( +CXPLAT_SQE* +CxPlatCqeGetSqe( _In_ const CXPLAT_CQE* cqe ) { - return (void*)cqe->udata; + return (CXPLAT_SQE*)cqe->udata; } #else diff --git a/src/inc/quic_platform_winkernel.h b/src/inc/quic_platform_winkernel.h index 3314de5272..065f5e192a 100644 --- a/src/inc/quic_platform_winkernel.h +++ b/src/inc/quic_platform_winkernel.h @@ -498,16 +498,14 @@ CxPlatEventQCleanup( inline BOOLEAN _CxPlatEventQEnqueue( - _In_ CXPLAT_EVENTQ* queue, - _In_opt_ void* user_data + _In_ CXPLAT_EVENTQ* queue ) { - UNREFERENCED_PARAMETER(user_data); KeSetEvent(queue, IO_NO_INCREMENT, FALSE); return TRUE; } -#define CxPlatEventQEnqueue(queue, sqe, user_data) _CxPlatEventQEnqueue(queue, user_data) +#define CxPlatEventQEnqueue(queue, sqe) _CxPlatEventQEnqueue(queue) inline uint32_t diff --git a/src/inc/quic_platform_winuser.h b/src/inc/quic_platform_winuser.h index 52c47080b8..d04347ad76 100644 --- a/src/inc/quic_platform_winuser.h +++ b/src/inc/quic_platform_winuser.h @@ -702,11 +702,16 @@ CxPlatEventWaitWithTimeout( typedef HANDLE CXPLAT_EVENTQ; typedef OVERLAPPED_ENTRY CXPLAT_CQE; -#define CXPLAT_SQE CXPLAT_SQE -#define CXPLAT_SQE_DEFAULT {0} +typedef +_IRQL_requires_max_(PASSIVE_LEVEL) +void +(CXPLAT_EVENT_COMPLETION)( + _In_ CXPLAT_CQE* Cqe + ); +typedef CXPLAT_EVENT_COMPLETION *CXPLAT_EVENT_COMPLETION_HANDLER; typedef struct CXPLAT_SQE { - void* UserData; OVERLAPPED Overlapped; + CXPLAT_EVENT_COMPLETION_HANDLER Completion; #if DEBUG BOOLEAN IsQueued; // Debug flag to catch double queueing. #endif @@ -744,8 +749,7 @@ inline BOOLEAN CxPlatEventQEnqueue( _In_ CXPLAT_EVENTQ* queue, - _In_ CXPLAT_SQE* sqe, - _In_opt_ void* user_data + _In_ CXPLAT_SQE* sqe ) { #if DEBUG @@ -753,7 +757,6 @@ CxPlatEventQEnqueue( sqe->IsQueued; #endif CxPlatZeroMemory(&sqe->Overlapped, sizeof(sqe->Overlapped)); - sqe->UserData = user_data; return PostQueuedCompletionStatus(*queue, 0, 0, &sqe->Overlapped) != 0; } @@ -762,8 +765,7 @@ BOOLEAN CxPlatEventQEnqueueEx( // Windows specific extension _In_ CXPLAT_EVENTQ* queue, _In_ CXPLAT_SQE* sqe, - _In_ uint32_t num_bytes, - _In_opt_ void* user_data + _In_ uint32_t num_bytes ) { #if DEBUG @@ -771,7 +773,6 @@ CxPlatEventQEnqueueEx( // Windows specific extension sqe->IsQueued; #endif CxPlatZeroMemory(&sqe->Overlapped, sizeof(sqe->Overlapped)); - sqe->UserData = user_data; return PostQueuedCompletionStatus(*queue, num_bytes, 0, &sqe->Overlapped) != 0; } @@ -810,21 +811,52 @@ CxPlatEventQReturn( } inline -void* -CxPlatCqeUserData( - _In_ const CXPLAT_CQE* cqe +BOOLEAN +CxPlatSqeInitialize( + _In_ CXPLAT_EVENTQ* queue, + _In_ CXPLAT_EVENT_COMPLETION completion, + _Out_ CXPLAT_SQE* sqe ) { - return CONTAINING_RECORD(cqe->lpOverlapped, CXPLAT_SQE, Overlapped)->UserData; + UNREFERENCED_PARAMETER(queue); + CxPlatZeroMemory(sqe, sizeof(*sqe)); + sqe->Completion = completion; + return TRUE; } -typedef struct DATAPATH_SQE DATAPATH_SQE; +inline +void +CxPlatSqeInitializeEx( + _In_ CXPLAT_EVENT_COMPLETION_HANDLER completion, + _Out_ CXPLAT_SQE* sqe + ) +{ + sqe->Completion = completion; + CxPlatZeroMemory(&sqe->Overlapped, sizeof(sqe->Overlapped)); +#if DEBUG + sqe->IsQueued = FALSE; +#endif +} +inline void -CxPlatDatapathSqeInitialize( - _Out_ DATAPATH_SQE* DatapathSqe, - _In_ uint32_t CqeType - ); +CxPlatSqeCleanup( + _In_ CXPLAT_EVENTQ* queue, + _In_ CXPLAT_SQE* sqe + ) +{ + UNREFERENCED_PARAMETER(queue); + UNREFERENCED_PARAMETER(sqe); +} + +inline +CXPLAT_SQE* +CxPlatCqeGetSqe( + _In_ const CXPLAT_CQE* cqe + ) +{ + return CONTAINING_RECORD(cqe->lpOverlapped, CXPLAT_SQE, Overlapped); +} // // Time Measurement Interfaces diff --git a/src/platform/datapath_epoll.c b/src/platform/datapath_epoll.c index 8c95ae8fac..a91b82c532 100644 --- a/src/platform/datapath_epoll.c +++ b/src/platform/datapath_epoll.c @@ -205,6 +205,10 @@ typedef struct CXPLAT_RECV_MSG_CONTROL_BUFFER { #else #define CXPLAT_DBG_ASSERT_CMSG(CMsg, type) #endif + +CXPLAT_EVENT_COMPLETION CxPlatSocketContextUninitializeEventComplete; +CXPLAT_EVENT_COMPLETION CxPlatSocketContextFlushTxEventComplete; +CXPLAT_EVENT_COMPLETION CxPlatSocketContextIoEventComplete; void CxPlatDataPathCalculateFeatureSupport( @@ -572,13 +576,10 @@ CxPlatSocketContextSqeInitialize( BOOLEAN IoSqeInitialized = FALSE; BOOLEAN FlushTxInitialized = FALSE; - SocketContext->ShutdownSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_SHUTDOWN; - SocketContext->IoSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_IO; - SocketContext->FlushTxSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_FLUSH_TX; if (!CxPlatSqeInitialize( SocketContext->DatapathPartition->EventQ, - &SocketContext->ShutdownSqe.Sqe, + CxPlatSocketContextUninitializeEventComplete, &SocketContext->ShutdownSqe)) { Status = errno; QuicTraceEvent( @@ -593,7 +594,7 @@ CxPlatSocketContextSqeInitialize( if (!CxPlatSqeInitialize( SocketContext->DatapathPartition->EventQ, - &SocketContext->IoSqe.Sqe, + CxPlatSocketContextIoEventComplete, &SocketContext->IoSqe)) { Status = errno; QuicTraceEvent( @@ -608,7 +609,7 @@ CxPlatSocketContextSqeInitialize( if (!CxPlatSqeInitialize( SocketContext->DatapathPartition->EventQ, - &SocketContext->FlushTxSqe.Sqe, + CxPlatSocketContextFlushTxEventComplete, &SocketContext->FlushTxSqe)) { Status = errno; QuicTraceEvent( @@ -627,13 +628,13 @@ CxPlatSocketContextSqeInitialize( if (QUIC_FAILED(Status)) { if (ShutdownSqeInitialized) { - CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->ShutdownSqe.Sqe); + CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->ShutdownSqe); } if (IoSqeInitialized) { - CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->IoSqe.Sqe); + CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->IoSqe); } if (FlushTxInitialized) { - CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->FlushTxSqe.Sqe); + CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->FlushTxSqe); } } @@ -1154,9 +1155,9 @@ CxPlatSocketContextUninitializeComplete( } if (SocketContext->SqeInitialized) { - CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->ShutdownSqe.Sqe); - CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->IoSqe.Sqe); - CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->FlushTxSqe.Sqe); + CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->ShutdownSqe); + CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->IoSqe); + CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->FlushTxSqe); } CxPlatLockUninitialize(&SocketContext->TxQueueLock); @@ -1168,6 +1169,16 @@ CxPlatSocketContextUninitializeComplete( CxPlatSocketRelease(SocketContext->Binding); } +void +CxPlatSocketContextUninitializeEventComplete( + _In_ CXPLAT_CQE* Cqe + ) +{ + CXPLAT_SOCKET_CONTEXT* SocketContext = + CXPLAT_CONTAINING_RECORD(CxPlatCqeGetSqe(Cqe), CXPLAT_SOCKET_CONTEXT, ShutdownSqe); + CxPlatSocketContextUninitializeComplete(SocketContext); +} + void CxPlatSocketContextUninitialize( _In_ CXPLAT_SOCKET_CONTEXT* SocketContext @@ -1210,7 +1221,6 @@ CxPlatSocketContextUninitialize( CXPLAT_FRE_ASSERT( CxPlatEventQEnqueue( SocketContext->DatapathPartition->EventQ, - &SocketContext->ShutdownSqe.Sqe, &SocketContext->ShutdownSqe)); } } @@ -2425,7 +2435,6 @@ SocketSend( CXPLAT_FRE_ASSERT( CxPlatEventQEnqueue( SocketContext->DatapathPartition->EventQ, - &SocketContext->FlushTxSqe.Sqe, &SocketContext->FlushTxSqe)); } return; @@ -2787,6 +2796,16 @@ CxPlatSocketContextFlushTxQueue( } } +void +CxPlatSocketContextFlushTxEventComplete( + _In_ CXPLAT_CQE* Cqe + ) +{ + CXPLAT_SOCKET_CONTEXT* SocketContext = + CXPLAT_CONTAINING_RECORD(CxPlatCqeGetSqe(Cqe), CXPLAT_SOCKET_CONTEXT, FlushTxSqe); + CxPlatSocketContextFlushTxQueue(SocketContext, FALSE); +} + _IRQL_requires_max_(DISPATCH_LEVEL) QUIC_STATUS CxPlatSocketGetTcpStatistics( @@ -2800,11 +2819,13 @@ CxPlatSocketGetTcpStatistics( } void -CxPlatDataPathSocketProcessIoCompletion( - _In_ CXPLAT_SOCKET_CONTEXT* SocketContext, +CxPlatSocketContextIoEventComplete( _In_ CXPLAT_CQE* Cqe ) { + CXPLAT_SOCKET_CONTEXT* SocketContext = + CXPLAT_CONTAINING_RECORD(CxPlatCqeGetSqe(Cqe), CXPLAT_SOCKET_CONTEXT, IoSqe); + if (CxPlatRundownAcquire(&SocketContext->UpcallRundown)) { if (EPOLLERR & Cqe->events) { CxPlatSocketHandleErrors(SocketContext); @@ -2827,30 +2848,3 @@ CxPlatDataPathSocketProcessIoCompletion( CxPlatRundownRelease(&SocketContext->UpcallRundown); } } - -void -DataPathProcessCqe( - _In_ CXPLAT_CQE* Cqe - ) -{ - switch (CxPlatCqeType(Cqe)) { - case CXPLAT_CQE_TYPE_SOCKET_SHUTDOWN: { - CXPLAT_SOCKET_CONTEXT* SocketContext = - CXPLAT_CONTAINING_RECORD(CxPlatCqeUserData(Cqe), CXPLAT_SOCKET_CONTEXT, ShutdownSqe); - CxPlatSocketContextUninitializeComplete(SocketContext); - break; - } - case CXPLAT_CQE_TYPE_SOCKET_IO: { - CXPLAT_SOCKET_CONTEXT* SocketContext = - CXPLAT_CONTAINING_RECORD(CxPlatCqeUserData(Cqe), CXPLAT_SOCKET_CONTEXT, IoSqe); - CxPlatDataPathSocketProcessIoCompletion(SocketContext, Cqe); - break; - } - case CXPLAT_CQE_TYPE_SOCKET_FLUSH_TX: { - CXPLAT_SOCKET_CONTEXT* SocketContext = - CXPLAT_CONTAINING_RECORD(CxPlatCqeUserData(Cqe), CXPLAT_SOCKET_CONTEXT, FlushTxSqe); - CxPlatSocketContextFlushTxQueue(SocketContext, FALSE); - break; - } - } -} diff --git a/src/platform/datapath_kqueue.c b/src/platform/datapath_kqueue.c index 87efcf0428..e5a441ed3c 100644 --- a/src/platform/datapath_kqueue.c +++ b/src/platform/datapath_kqueue.c @@ -162,12 +162,12 @@ typedef struct QUIC_CACHEALIGN CXPLAT_SOCKET_CONTEXT { // // The event for the shutdown event. // - DATAPATH_SQE ShutdownSqe; + CXPLAT_SQE ShutdownSqe; // - // The user data for the IO event. + // The event for the IO event. // - uint32_t IoCqeType; + CXPLAT_SQE IoSqe; // // The I/O vector for receive datagrams. @@ -376,6 +376,9 @@ typedef struct CXPLAT_DATAPATH { CXPLAT_DATAPATH_PARTITION Partitions[]; } CXPLAT_DATAPATH; + +CXPLAT_EVENT_COMPLETION CxPlatSocketContextUninitializeEventComplete; +CXPLAT_EVENT_COMPLETION CxPlatSocketContextIoEventComplete; QUIC_STATUS CxPlatSocketSendInternal( @@ -646,19 +649,15 @@ CxPlatSocketContextInitialize( goto Exit; } - if (!CxPlatSqeInitialize( - SocketContext->DatapathPartition->EventQ, - &SocketContext->ShutdownSqe.Sqe, - &SocketContext->ShutdownSqe)) { - Status = errno; - QuicTraceEvent( - DatapathErrorStatus, - "[data][%p] ERROR, %u, %s.", - Binding, - Status, - "CxPlatSqeInitialize failed"); - goto Exit; - } + CxPlatSqeInitialize( + SocketContext->DatapathPartition->EventQ, + CxPlatSocketContextUninitializeEventComplete, + &SocketContext->ShutdownSqe); + CxPlatSqeInitializeEx( + SocketContext->DatapathPartition->EventQ, + SocketContext->SocketFd, + CxPlatSocketContextIoEventComplete, + &SocketContext->IoSqe); // // Set dual (IPv4 & IPv6) socket mode unless we operate in pure IPv4 mode @@ -972,9 +971,11 @@ CxPlatSocketContextUninitializeComplete( } if (SocketContext->SocketFd != INVALID_SOCKET) { - struct kevent DeleteEvent = {0}; - EV_SET(&DeleteEvent, SocketContext->SocketFd, EVFILT_READ, EV_DELETE, 0, 0, &SocketContext->IoCqeType); - (void)kevent(*SocketContext->DatapathPartition->EventQ, &DeleteEvent, 1, NULL, 0, NULL); + CxPlatEventQEnqueueEx( + SocketContext->DatapathPartition->EventQ, + &SocketContext->IoSqe, + EVFILT_READ, + EV_DELETE); close(SocketContext->SocketFd); } @@ -987,6 +988,16 @@ CxPlatSocketContextUninitializeComplete( CxPlatSocketRelease(SocketContext->Binding); } +void +CxPlatSocketContextUninitializeEventComplete( + _In_ CXPLAT_CQE* Cqe + ) +{ + CXPLAT_SOCKET_CONTEXT* SocketContext = + CXPLAT_CONTAINING_RECORD(CxPlatCqeGetSqe(Cqe), CXPLAT_SOCKET_CONTEXT, ShutdownSqe); + CxPlatSocketContextUninitializeComplete(SocketContext); +} + void CxPlatSocketContextUninitialize( _In_ CXPLAT_SOCKET_CONTEXT* SocketContext @@ -1005,13 +1016,13 @@ CxPlatSocketContextUninitialize( // // Cancel and clean up any pending IO. // - struct kevent DeleteEvent = {0}; - EV_SET(&DeleteEvent, SocketContext->SocketFd, EVFILT_READ, EV_DELETE, 0, 0, &SocketContext->IoCqeType); - (void)kevent(*SocketContext->DatapathPartition->EventQ, &DeleteEvent, 1, NULL, 0, NULL); - + CxPlatEventQEnqueueEx( + SocketContext->DatapathPartition->EventQ, + &SocketContext->IoSqe, + EVFILT_READ, + EV_DELETE); CxPlatEventQEnqueue( SocketContext->DatapathPartition->EventQ, - &SocketContext->ShutdownSqe.Sqe, &SocketContext->ShutdownSqe); } } @@ -1063,25 +1074,18 @@ CxPlatSocketContextStartReceive( goto Error; } - struct kevent Event = {0}; - EV_SET(&Event, SocketContext->SocketFd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, &SocketContext->IoCqeType); - - int Ret = - kevent( - *SocketContext->DatapathPartition->EventQ, - &Event, - 1, - NULL, - 0, - NULL); - if (Ret < 0) { + if (!CxPlatEventQEnqueueEx( + SocketContext->DatapathPartition->EventQ, + &SocketContext->IoSqe, + EVFILT_READ, + EV_ADD | EV_ENABLE)) { Status = errno; QuicTraceEvent( DatapathErrorStatus, "[data][%p] ERROR, %u, %s.", SocketContext->Binding, Status, - "kevent failed"); + "CxPlatEventQEnqueueEx failed"); goto Error; } @@ -1307,11 +1311,13 @@ CxPlatSocketContextSendComplete( } void -CxPlatDataPathSocketProcessIoCompletion( - _In_ CXPLAT_SOCKET_CONTEXT* SocketContext, +CxPlatSocketContextIoEventComplete( _In_ CXPLAT_CQE* Cqe ) { + CXPLAT_SOCKET_CONTEXT* SocketContext = + CXPLAT_CONTAINING_RECORD(CxPlatCqeGetSqe(Cqe), CXPLAT_SOCKET_CONTEXT, IoSqe); + if (!CxPlatRundownAcquire(&SocketContext->UpcallRundown)) { return; } @@ -1425,8 +1431,6 @@ CxPlatSocketCreateUdp( for (uint32_t i = 0; i < SocketCount; i++) { Binding->SocketContexts[i].Binding = Binding; Binding->SocketContexts[i].SocketFd = INVALID_SOCKET; - Binding->SocketContexts[i].ShutdownSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_SHUTDOWN; - Binding->SocketContexts[i].IoCqeType = CXPLAT_CQE_TYPE_SOCKET_IO; Binding->SocketContexts[i].RecvIov.iov_len = Binding->Mtu - CXPLAT_MIN_IPV4_HEADER_SIZE - CXPLAT_UDP_HEADER_SIZE; Binding->SocketContexts[i].DatapathPartition = @@ -2029,24 +2033,18 @@ CxPlatSocketSendInternal( CxPlatLockRelease(&SocketContext->PendingSendDataLock); } SendPending = TRUE; - struct kevent Event = {0}; - EV_SET(&Event, SocketContext->SocketFd, EVFILT_WRITE, EV_ADD | EV_ONESHOT | EV_CLEAR, 0, 0, &SocketContext->IoCqeType); - int Ret = - kevent( - *SocketContext->DatapathPartition->EventQ, - &Event, - 1, - NULL, - 0, - NULL); - if (Ret < 1) { + if (!CxPlatEventQEnqueueEx( + SocketContext->DatapathPartition->EventQ, + &SocketContext->IoSqe, + EVFILT_WRITE, + EV_ADD | EV_ONESHOT | EV_CLEAR)) { Status = errno; QuicTraceEvent( DatapathErrorStatus, "[data][%p] ERROR, %u, %s.", SocketContext->Binding, Status, - "kevent failed"); + "CxPlatEventQEnqueueEx failed"); goto Exit; } Status = QUIC_STATUS_PENDING; @@ -2129,27 +2127,6 @@ CxPlatSocketGetTcpStatistics( return QUIC_STATUS_NOT_SUPPORTED; } -void -CxPlatDataPathProcessCqe( - _In_ CXPLAT_CQE* Cqe - ) -{ - switch (CxPlatCqeType(Cqe)) { - case CXPLAT_CQE_TYPE_SOCKET_SHUTDOWN: { - CXPLAT_SOCKET_CONTEXT* SocketContext = - CXPLAT_CONTAINING_RECORD(CxPlatCqeUserData(Cqe), CXPLAT_SOCKET_CONTEXT, ShutdownSqe); - CxPlatSocketContextUninitializeComplete(SocketContext); - break; - } - case CXPLAT_CQE_TYPE_SOCKET_IO: { - CXPLAT_SOCKET_CONTEXT* SocketContext = - CXPLAT_CONTAINING_RECORD(CxPlatCqeUserData(Cqe), CXPLAT_SOCKET_CONTEXT, IoCqeType); - CxPlatDataPathSocketProcessIoCompletion(SocketContext, Cqe); - break; - } - } -} - _IRQL_requires_max_(PASSIVE_LEVEL) void QuicCopyRouteInfo( diff --git a/src/platform/datapath_linux.c b/src/platform/datapath_linux.c index 714d1162d2..b0c6be1259 100644 --- a/src/platform/datapath_linux.c +++ b/src/platform/datapath_linux.c @@ -29,18 +29,6 @@ CxPlatSocketUpdateQeo( return QUIC_STATUS_NOT_SUPPORTED; } -void -CxPlatDataPathProcessCqe( - _In_ CXPLAT_CQE* Cqe - ) -{ - if (CXPLAT_CQE_TYPE_XDP_SHUTDOWN <= CxPlatCqeType(Cqe)) { - RawDataPathProcessCqe(Cqe); - } else { - DataPathProcessCqe(Cqe); - } -} - _IRQL_requires_max_(PASSIVE_LEVEL) void CxPlatUpdateRoute( diff --git a/src/platform/datapath_raw_dummy.c b/src/platform/datapath_raw_dummy.c index 146feb637a..2db9a43e52 100644 --- a/src/platform/datapath_raw_dummy.c +++ b/src/platform/datapath_raw_dummy.c @@ -244,14 +244,6 @@ RawResolveRoute( return QUIC_STATUS_NOT_SUPPORTED; } -void -RawDataPathProcessCqe( - _In_ CXPLAT_CQE* Cqe - ) -{ - UNREFERENCED_PARAMETER(Cqe); -} - _IRQL_requires_max_(PASSIVE_LEVEL) void RawUpdateRoute( diff --git a/src/platform/datapath_raw_xdp.h b/src/platform/datapath_raw_xdp.h index d264ce5d4a..c2ef397d5b 100644 --- a/src/platform/datapath_raw_xdp.h +++ b/src/platform/datapath_raw_xdp.h @@ -26,14 +26,6 @@ typedef struct XDP_PARTITION XDP_PARTITION; typedef struct XDP_DATAPATH XDP_DATAPATH; typedef struct XDP_QUEUE XDP_QUEUE; -// -// IO header for SQE->CQE based completions. -// -typedef struct DATAPATH_XDP_IO_SQE { - DATAPATH_XDP_IO_TYPE IoType; - DATAPATH_SQE DatapathSqe; -} DATAPATH_XDP_IO_SQE; - typedef struct XDP_INTERFACE_COMMON { CXPLAT_INTERFACE; uint16_t QueueCount; @@ -52,7 +44,7 @@ typedef struct XDP_QUEUE_COMMON { typedef struct QUIC_CACHEALIGN XDP_PARTITION { CXPLAT_EXECUTION_CONTEXT Ec; - DATAPATH_SQE ShutdownSqe; + CXPLAT_SQE ShutdownSqe; const struct XDP_DATAPATH* Xdp; CXPLAT_EVENTQ* EventQ; XDP_QUEUE* Queues; // A linked list of queues, accessed by Next. diff --git a/src/platform/datapath_raw_xdp_linux.c b/src/platform/datapath_raw_xdp_linux.c index fd5d1a5470..14471f91eb 100644 --- a/src/platform/datapath_raw_xdp_linux.c +++ b/src/platform/datapath_raw_xdp_linux.c @@ -27,7 +27,6 @@ #include "datapath_raw_xdp_linux.c.clog.h" #endif - #define NUM_FRAMES 8192 * 2 #define CONS_NUM_DESCS NUM_FRAMES / 2 #define PROD_NUM_DESCS NUM_FRAMES / 2 @@ -87,8 +86,8 @@ typedef struct XDP_INTERFACE { typedef struct XDP_QUEUE { XDP_QUEUE_COMMON; - DATAPATH_SQE RxIoSqe; - DATAPATH_SQE FlushTxSqe; + CXPLAT_SQE RxIoSqe; + CXPLAT_SQE FlushTxSqe; CXPLAT_LIST_ENTRY PartitionTxQueue; CXPLAT_SLIST_ENTRY PartitionRxPool; @@ -128,6 +127,10 @@ typedef struct __attribute__((aligned(64))) XDP_TX_PACKET { uint8_t FrameBuffer[MAX_ETH_FRAME_SIZE]; } XDP_TX_PACKET; +CXPLAT_EVENT_COMPLETION CxPlatPartitionShutdownEventComplete; +CXPLAT_EVENT_COMPLETION CxPlatQueueRxIoEventComplete; +CXPLAT_EVENT_COMPLETION CxPlatQueueTxIoEventComplete; + void XdpSocketContextSetEvents( _In_ XDP_QUEUE* Queue, @@ -254,10 +257,10 @@ CxPlatDpRawInterfaceUninitialize( if (Queue->XskInfo->Xsk) { if (Queue->Partition && Queue->Partition->EventQ) { epoll_ctl(*Queue->Partition->EventQ, EPOLL_CTL_DEL, xsk_socket__fd(Queue->XskInfo->Xsk), NULL); - CxPlatSqeCleanup(Queue->Partition->EventQ, &Queue->RxIoSqe.Sqe); - CxPlatSqeCleanup(Queue->Partition->EventQ, &Queue->FlushTxSqe.Sqe); + CxPlatSqeCleanup(Queue->Partition->EventQ, &Queue->RxIoSqe); + CxPlatSqeCleanup(Queue->Partition->EventQ, &Queue->FlushTxSqe); if (i == 0) { - CxPlatSqeCleanup(Queue->Partition->EventQ, &Queue->Partition->ShutdownSqe.Sqe); + CxPlatSqeCleanup(Queue->Partition->EventQ, &Queue->Partition->ShutdownSqe); } } xsk_socket__delete(Queue->XskInfo->Xsk); @@ -799,14 +802,13 @@ CxPlatDpRawInitialize( Partition->Ec.NextTimeUs = UINT64_MAX; Partition->Ec.Callback = CxPlatXdpExecute; Partition->Ec.Context = &Xdp->Partitions[i]; - Partition->ShutdownSqe.CqeType = CXPLAT_CQE_TYPE_XDP_SHUTDOWN; CxPlatRefIncrement(&Xdp->RefCount); CxPlatRundownAcquire(&Xdp->Rundown); Partition->EventQ = CxPlatWorkerPoolGetEventQ(WorkerPool, (uint16_t)i); if (!CxPlatSqeInitialize( Partition->EventQ, - &Partition->ShutdownSqe.Sqe, + CxPlatPartitionShutdownEventComplete, &Partition->ShutdownSqe)) { Status = QUIC_STATUS_INTERNAL_ERROR; goto Error; @@ -817,22 +819,20 @@ CxPlatDpRawInitialize( while (Queue) { if (!CxPlatSqeInitialize( Partition->EventQ, - &Queue->RxIoSqe.Sqe, + CxPlatQueueRxIoEventComplete, &Queue->RxIoSqe)) { Status = QUIC_STATUS_INTERNAL_ERROR; goto Error; } - Queue->RxIoSqe.CqeType = CXPLAT_CQE_TYPE_XDP_IO; XdpSocketContextSetEvents(Queue, EPOLL_CTL_ADD, EPOLLIN); if (!CxPlatSqeInitialize( Partition->EventQ, - &Queue->FlushTxSqe.Sqe, + CxPlatQueueTxIoEventComplete, &Queue->FlushTxSqe)) { Status = QUIC_STATUS_INTERNAL_ERROR; goto Error; } - Queue->FlushTxSqe.CqeType = CXPLAT_CQE_TYPE_XDP_FLUSH_TX; ++QueueCount; Queue = Queue->Next; @@ -1201,7 +1201,7 @@ CxPlatXdpExecute( XdpPartitionShutdown, "[ xdp][%p] XDP partition shutdown", Partition); - CxPlatEventQEnqueue(Partition->EventQ, &Partition->ShutdownSqe.Sqe, &Partition->ShutdownSqe); + CxPlatEventQEnqueue(Partition->EventQ, &Partition->ShutdownSqe); return FALSE; } @@ -1334,40 +1334,43 @@ CxPlatXdpRx( } void -RawDataPathProcessCqe( +CxPlatPartitionShutdownEventComplete( _In_ CXPLAT_CQE* Cqe ) { - switch (CxPlatCqeType(Cqe)) { - case CXPLAT_CQE_TYPE_XDP_SHUTDOWN: { - XDP_PARTITION* Partition = - CXPLAT_CONTAINING_RECORD(CxPlatCqeUserData(Cqe), XDP_PARTITION, ShutdownSqe); - QuicTraceLogVerbose( - XdpPartitionShutdownComplete, - "[ xdp][%p] XDP partition shutdown complete", - Partition); - CxPlatDpRawRelease((XDP_DATAPATH*)Partition->Xdp); - break; - } - case CXPLAT_CQE_TYPE_XDP_IO: { - // TODO: use DATAPATH_IO_SQE to distinguish Tx/RX - DATAPATH_SQE* Sqe = (DATAPATH_SQE*)CxPlatCqeUserData(Cqe); - XDP_QUEUE* Queue; - Queue = CXPLAT_CONTAINING_RECORD(Sqe, XDP_QUEUE, RxIoSqe); - QuicTraceLogVerbose( - XdpQueueAsyncIoRxComplete, - "[ xdp][%p] XDP async IO complete (RX)", - Queue); - if (EPOLLOUT & Cqe->events) { - KickTx(Queue, TRUE); - } else { - Queue->RxQueued = FALSE; - Queue->Partition->Ec.Ready = TRUE; - } - break; - } - case CXPLAT_CQE_TYPE_XDP_FLUSH_TX: { + XDP_PARTITION* Partition = + CXPLAT_CONTAINING_RECORD(CxPlatCqeGetSqe(Cqe), XDP_PARTITION, ShutdownSqe); + QuicTraceLogVerbose( + XdpPartitionShutdownComplete, + "[ xdp][%p] XDP partition shutdown complete", + Partition); + CxPlatDpRawRelease((XDP_DATAPATH*)Partition->Xdp); +} +void +CxPlatQueueRxIoEventComplete( + _In_ CXPLAT_CQE* Cqe + ) +{ + // TODO: use CQE to distinguish Tx/RX + XDP_QUEUE* Queue = + CXPLAT_CONTAINING_RECORD(CxPlatCqeGetSqe(Cqe), XDP_QUEUE, RxIoSqe); + QuicTraceLogVerbose( + XdpQueueAsyncIoRxComplete, + "[ xdp][%p] XDP async IO complete (RX)", + Queue); + if (EPOLLOUT & Cqe->events) { + KickTx(Queue, TRUE); + } else { + Queue->RxQueued = FALSE; + Queue->Partition->Ec.Ready = TRUE; } - } +} + +void +CxPlatQueueTxIoEventComplete( + _In_ CXPLAT_CQE* Cqe + ) +{ + UNREFERENCED_PARAMETER(Cqe); // TODO - Use this? } diff --git a/src/platform/datapath_raw_xdp_win.c b/src/platform/datapath_raw_xdp_win.c index 370b53a529..c2178bfaf9 100644 --- a/src/platform/datapath_raw_xdp_win.c +++ b/src/platform/datapath_raw_xdp_win.c @@ -62,13 +62,13 @@ typedef struct XDP_QUEUE { uint16_t RssProcessor; uint8_t* RxBuffers; HANDLE RxXsk; - DATAPATH_XDP_IO_SQE RxIoSqe; + CXPLAT_SQE RxIoSqe; XSK_RING RxFillRing; XSK_RING RxRing; HANDLE RxProgram; uint8_t* TxBuffers; HANDLE TxXsk; - DATAPATH_XDP_IO_SQE TxIoSqe; + CXPLAT_SQE TxIoSqe; XSK_RING TxRing; XSK_RING TxCompletionRing; @@ -103,6 +103,10 @@ typedef struct DECLSPEC_ALIGN(MEMORY_ALLOCATION_ALIGNMENT) XDP_TX_PACKET { uint8_t FrameBuffer[MAX_ETH_FRAME_SIZE]; } XDP_TX_PACKET; +CXPLAT_EVENT_COMPLETION CxPlatIoXdpWaitRxEventComplete; +CXPLAT_EVENT_COMPLETION CxPlatIoXdpWaitTxEventComplete; +CXPLAT_EVENT_COMPLETION CxPlatIoXdpShutdownEventComplete; + _IRQL_requires_max_(PASSIVE_LEVEL) BOOLEAN CxPlatXdpExecute( @@ -459,10 +463,6 @@ CxPlatDpRawInterfaceInitialize( CxPlatLockInitialize(&Queue->TxLock); CxPlatListInitializeHead(&Queue->TxQueue); CxPlatListInitializeHead(&Queue->PartitionTxQueue); - CxPlatDatapathSqeInitialize(&Queue->RxIoSqe.DatapathSqe, CXPLAT_CQE_TYPE_SOCKET_IO); - Queue->RxIoSqe.IoType = DATAPATH_XDP_IO_RECV; - CxPlatDatapathSqeInitialize(&Queue->TxIoSqe.DatapathSqe, CXPLAT_CQE_TYPE_SOCKET_IO); - Queue->TxIoSqe.IoType = DATAPATH_XDP_IO_SEND; // // RX datapath. @@ -1111,7 +1111,7 @@ CxPlatDpRawInitialize( Partition->Ec.NextTimeUs = UINT64_MAX; Partition->Ec.Callback = CxPlatXdpExecute; Partition->Ec.Context = &Xdp->Partitions[i]; - Partition->ShutdownSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_SHUTDOWN; + CxPlatSqeInitializeEx(CxPlatIoXdpShutdownEventComplete, &Partition->ShutdownSqe); CxPlatRefIncrement(&Xdp->RefCount); Partition->EventQ = CxPlatWorkerPoolGetEventQ(WorkerPool, (uint16_t)i); @@ -1753,7 +1753,7 @@ CxPlatXdpExecute( Queue->TxXsk = NULL; Queue = Queue->Next; } - CxPlatEventQEnqueue(Partition->EventQ, &Partition->ShutdownSqe.Sqe, &Partition->ShutdownSqe); + CxPlatEventQEnqueue(Partition->EventQ, &Partition->ShutdownSqe); return FALSE; } @@ -1781,13 +1781,13 @@ CxPlatXdpExecute( XdpQueueAsyncIoRx, "[ xdp][%p] XDP async IO start (RX)", Queue); - CxPlatZeroMemory( - &Queue->RxIoSqe.DatapathSqe.Sqe.Overlapped, - sizeof(Queue->RxIoSqe.DatapathSqe.Sqe.Overlapped)); + CxPlatSqeInitializeEx( + CxPlatIoXdpWaitRxEventComplete, + &Queue->RxIoSqe); HRESULT hr = Xdp->XdpApi->XskNotifyAsync( Queue->RxXsk, XSK_NOTIFY_FLAG_WAIT_RX, - &Queue->RxIoSqe.DatapathSqe.Sqe.Overlapped); + &Queue->RxIoSqe.Overlapped); if (hr == HRESULT_FROM_WIN32(ERROR_IO_PENDING)) { Queue->RxQueued = TRUE; } else if (hr == S_OK) { @@ -1805,13 +1805,13 @@ CxPlatXdpExecute( XdpQueueAsyncIoTx, "[ xdp][%p] XDP async IO start (TX)", Queue); - CxPlatZeroMemory( - &Queue->TxIoSqe.DatapathSqe.Sqe.Overlapped, - sizeof(Queue->TxIoSqe.DatapathSqe.Sqe.Overlapped)); + CxPlatSqeInitializeEx( + CxPlatIoXdpWaitTxEventComplete, + &Queue->TxIoSqe); HRESULT hr = Xdp->XdpApi->XskNotifyAsync( Queue->TxXsk, XSK_NOTIFY_FLAG_WAIT_TX, - &Queue->TxIoSqe.DatapathSqe.Sqe.Overlapped); + &Queue->TxIoSqe.Overlapped); if (hr == HRESULT_FROM_WIN32(ERROR_IO_PENDING)) { Queue->TxQueued = TRUE; } else if (hr == S_OK) { @@ -1831,40 +1831,50 @@ CxPlatXdpExecute( return TRUE; } +_IRQL_requires_max_(PASSIVE_LEVEL) void -RawDataPathProcessCqe( +CxPlatIoXdpWaitRxEventComplete( _In_ CXPLAT_CQE* Cqe ) { - if (CxPlatCqeType(Cqe) == CXPLAT_CQE_TYPE_SOCKET_IO) { - DATAPATH_XDP_IO_SQE* Sqe = - CONTAINING_RECORD(CxPlatCqeUserData(Cqe), DATAPATH_XDP_IO_SQE, DatapathSqe); - XDP_QUEUE* Queue; + CXPLAT_SQE* Sqe = CxPlatCqeGetSqe(Cqe); + XDP_QUEUE* Queue = CONTAINING_RECORD(Sqe, XDP_QUEUE, RxIoSqe); + QuicTraceLogVerbose( + XdpQueueAsyncIoRxComplete, + "[ xdp][%p] XDP async IO complete (RX)", + Queue); + Queue->RxQueued = FALSE; + Queue->Partition->Ec.Ready = TRUE; +} - if (Sqe->IoType == DATAPATH_XDP_IO_RECV) { - Queue = CONTAINING_RECORD(Sqe, XDP_QUEUE, RxIoSqe); - QuicTraceLogVerbose( - XdpQueueAsyncIoRxComplete, - "[ xdp][%p] XDP async IO complete (RX)", - Queue); - Queue->RxQueued = FALSE; - } else { - CXPLAT_DBG_ASSERT(Sqe->IoType == DATAPATH_XDP_IO_SEND); - Queue = CONTAINING_RECORD(Sqe, XDP_QUEUE, TxIoSqe); - QuicTraceLogVerbose( - XdpQueueAsyncIoTxComplete, - "[ xdp][%p] XDP async IO complete (TX)", - Queue); - Queue->TxQueued = FALSE; - } - Queue->Partition->Ec.Ready = TRUE; - } else if (CxPlatCqeType(Cqe) == CXPLAT_CQE_TYPE_SOCKET_SHUTDOWN) { - XDP_PARTITION* Partition = - CONTAINING_RECORD(CxPlatCqeUserData(Cqe), XDP_PARTITION, ShutdownSqe); - QuicTraceLogVerbose( - XdpPartitionShutdownComplete, - "[ xdp][%p] XDP partition shutdown complete", - Partition); - CxPlatDpRawRelease((XDP_DATAPATH*)Partition->Xdp); - } +_IRQL_requires_max_(PASSIVE_LEVEL) +void +CxPlatIoXdpWaitTxEventComplete( + _In_ CXPLAT_CQE* Cqe + ) +{ + CXPLAT_SQE* Sqe = CxPlatCqeGetSqe(Cqe); + XDP_QUEUE* Queue = CONTAINING_RECORD(Sqe, XDP_QUEUE, TxIoSqe); + QuicTraceLogVerbose( + XdpQueueAsyncIoTxComplete, + "[ xdp][%p] XDP async IO complete (TX)", + Queue); + Queue->TxQueued = FALSE; + Queue->Partition->Ec.Ready = TRUE; +} + +_IRQL_requires_max_(PASSIVE_LEVEL) +void +CxPlatIoXdpShutdownEventComplete( + _In_ CXPLAT_CQE* Cqe + ) +{ + CXPLAT_SQE* Sqe = CxPlatCqeGetSqe(Cqe); + XDP_PARTITION* Partition = + CONTAINING_RECORD(Sqe, XDP_PARTITION, ShutdownSqe); + QuicTraceLogVerbose( + XdpPartitionShutdownComplete, + "[ xdp][%p] XDP partition shutdown complete", + Partition); + CxPlatDpRawRelease((XDP_DATAPATH*)Partition->Xdp); } diff --git a/src/platform/datapath_win.c b/src/platform/datapath_win.c index 6512dc98ab..6c07715f97 100644 --- a/src/platform/datapath_win.c +++ b/src/platform/datapath_win.c @@ -31,30 +31,6 @@ CxPlatSocketUpdateQeo( return QUIC_STATUS_NOT_SUPPORTED; } -void -CxPlatDataPathProcessCqe( - _In_ CXPLAT_CQE* Cqe - ) -{ - switch (CxPlatCqeType(Cqe)) { - case CXPLAT_CQE_TYPE_SOCKET_IO: { - DATAPATH_IO_SQE* Sqe = - CONTAINING_RECORD(CxPlatCqeUserData(Cqe), DATAPATH_IO_SQE, DatapathSqe); - if (Sqe->IoType == DATAPATH_XDP_IO_RECV || Sqe->IoType == DATAPATH_XDP_IO_SEND) { - RawDataPathProcessCqe(Cqe); - } else { - DataPathProcessCqe(Cqe); - } - break; - } - case CXPLAT_CQE_TYPE_SOCKET_SHUTDOWN: { - RawDataPathProcessCqe(Cqe); - break; - } - default: CXPLAT_DBG_ASSERT(FALSE); break; - } -} - _IRQL_requires_max_(PASSIVE_LEVEL) void CxPlatUpdateRoute( diff --git a/src/platform/datapath_winuser.c b/src/platform/datapath_winuser.c index c7d40bf83a..2d935ab68f 100644 --- a/src/platform/datapath_winuser.c +++ b/src/platform/datapath_winuser.c @@ -104,12 +104,22 @@ CXPLAT_STATIC_ASSERT( ErrorCode == WSAECONNRESET \ ) +typedef enum RIO_IO_TYPE { + RIO_IO_RECV, + RIO_IO_SEND, + RIO_IO_RECV_FAILURE, +} RIO_IO_TYPE; // // Contains all the info for a single RX IO operation. Multiple RX packets may // come from a single IO operation. // typedef struct DATAPATH_RX_IO_BLOCK { + // + // The IO type. + // + RIO_IO_TYPE IoType; + // // The owning datagram pool. // @@ -138,7 +148,7 @@ typedef struct DATAPATH_RX_IO_BLOCK { // // The receive SQE. // - DATAPATH_IO_SQE Sqe; + CXPLAT_SQE Sqe; // // Contains the input and output message data. @@ -179,7 +189,7 @@ typedef struct DECLSPEC_ALIGN(MEMORY_ALLOCATION_ALIGNMENT) CXPLAT_RIO_SEND_BUFFE // // The IO type. // - DATAPATH_IO_TYPE IoType; + RIO_IO_TYPE IoType; // // The RIO buffer ID. @@ -210,7 +220,7 @@ typedef struct CXPLAT_SEND_DATA { // // The submission queue entry for the send completion. // - DATAPATH_IO_SQE Sqe; + CXPLAT_SQE Sqe; // // The owning processor context. @@ -284,6 +294,14 @@ void SocketDelete( _In_ CXPLAT_SOCKET* Socket ); + +CXPLAT_EVENT_COMPLETION CxPlatIoRecvEventComplete; +CXPLAT_EVENT_COMPLETION CxPlatIoRecvFailureEventComplete; +CXPLAT_EVENT_COMPLETION CxPlatIoSendEventComplete; +CXPLAT_EVENT_COMPLETION CxPlatIoQueueSendEventComplete; +CXPLAT_EVENT_COMPLETION CxPlatIoAcceptExEventComplete; +CXPLAT_EVENT_COMPLETION CxPlatIoConnectExEventComplete; +CXPLAT_EVENT_COMPLETION CxPlatIoRioNotifyEventComplete; #ifdef DEBUG #ifndef AllocOffset @@ -350,66 +368,23 @@ CxPlatSocketContextRelease( VOID CxPlatStartDatapathIo( _In_ CXPLAT_SOCKET_PROC* SocketProc, - _Inout_ DATAPATH_IO_SQE* Sqe, - _In_ DATAPATH_IO_TYPE IoType + _Inout_ CXPLAT_SQE* Sqe, + _In_ CXPLAT_EVENT_COMPLETION Completion ) { - CXPLAT_DBG_ASSERT(Sqe->DatapathSqe.CqeType == CXPLAT_CQE_TYPE_SOCKET_IO); - CXPLAT_DBG_ASSERT(Sqe->DatapathSqe.Sqe.UserData == &Sqe->DatapathSqe); - CXPLAT_DBG_ASSERT(Sqe->DatapathSqe.Sqe.Overlapped.Internal != 0x103); // STATUS_PENDING - CXPLAT_DBG_ASSERT(Sqe->IoType == 0); - - Sqe->IoType = IoType; - CxPlatZeroMemory(&Sqe->DatapathSqe.Sqe.Overlapped, sizeof(Sqe->DatapathSqe.Sqe.Overlapped)); + CXPLAT_DBG_ASSERT(Sqe->Overlapped.Internal != 0x103); // STATUS_PENDING + CxPlatSqeInitializeEx(Completion, Sqe); CxPlatRefIncrement(&SocketProc->RefCount); } VOID CxPlatCancelDatapathIo( - _In_ CXPLAT_SOCKET_PROC* SocketProc, - _Inout_ DATAPATH_IO_SQE* Sqe + _In_ CXPLAT_SOCKET_PROC* SocketProc ) { - CXPLAT_DBG_ASSERT(Sqe->DatapathSqe.CqeType == CXPLAT_CQE_TYPE_SOCKET_IO); - CXPLAT_DBG_ASSERT(Sqe->DatapathSqe.Sqe.UserData == &Sqe->DatapathSqe); - CXPLAT_DBG_ASSERT(Sqe->IoType > DATAPATH_IO_SIGNATURE && Sqe->IoType < DATAPATH_IO_MAX); - DBG_UNREFERENCED_PARAMETER(Sqe); -#if DEBUG - Sqe->IoType = 0; -#endif CxPlatSocketContextRelease(SocketProc); } -VOID -CxPlatStopDatapathIo( - _Inout_ DATAPATH_IO_SQE* Sqe - ) -{ - CXPLAT_DBG_ASSERT(Sqe->DatapathSqe.CqeType == CXPLAT_CQE_TYPE_SOCKET_IO); - CXPLAT_DBG_ASSERT(Sqe->DatapathSqe.Sqe.UserData == &Sqe->DatapathSqe); - CXPLAT_DBG_ASSERT(Sqe->DatapathSqe.Sqe.Overlapped.Internal != 0x103); // STATUS_PENDING - CXPLAT_DBG_ASSERT(Sqe->IoType > DATAPATH_IO_SIGNATURE && Sqe->IoType < DATAPATH_IO_MAX); - DBG_UNREFERENCED_PARAMETER(Sqe); -#if DEBUG - Sqe->IoType = 0; -#endif -} - -VOID -CxPlatStopInlineDatapathIo( - _Inout_ DATAPATH_IO_SQE* Sqe - ) -{ - // - // We want to assert the overlapped result is not pending below, but Winsock - // and the Windows kernel may leave the overlapped struct in the pending - // state if an IO completes inline. Ignore the overlapped result in this - // case. - // - Sqe->DatapathSqe.Sqe.Overlapped.Internal = 0; - CxPlatStopDatapathIo(Sqe); -} - void CxPlatDataPathStartReceiveAsync( _In_ CXPLAT_SOCKET_PROC* SocketProc @@ -1429,7 +1404,10 @@ CxPlatSocketArmRioNotify( { if (!SocketProc->RioNotifyArmed) { SocketProc->RioNotifyArmed = TRUE; - CxPlatStartDatapathIo(SocketProc, &SocketProc->RioSqe, DATAPATH_IO_RIO_NOTIFY); + CxPlatStartDatapathIo( + SocketProc, + &SocketProc->RioSqe, + CxPlatIoRioNotifyEventComplete); ULONG NotifyResult = SocketProc->DatapathProc->Datapath-> RioDispatch.RIONotify(SocketProc->RioCq); CXPLAT_TEL_ASSERT(NotifyResult == ERROR_SUCCESS); @@ -1440,7 +1418,7 @@ CxPlatSocketArmRioNotify( QUIC_STATUS CxPlatSocketEnqueueSqe( _In_ CXPLAT_SOCKET_PROC* SocketProc, - _In_ DATAPATH_IO_SQE* Sqe, + _In_ CXPLAT_SQE* Sqe, _In_ uint32_t NumBytes ) { @@ -1448,16 +1426,15 @@ CxPlatSocketEnqueueSqe( CXPLAT_DBG_ASSERT(!SocketProc->Freed); if (!CxPlatEventQEnqueueEx( SocketProc->DatapathProc->EventQ, - &Sqe->DatapathSqe.Sqe, - NumBytes, - &Sqe->DatapathSqe)) { + Sqe, + NumBytes)) { const DWORD LastError = GetLastError(); QuicTraceEvent( DatapathErrorStatus, "[data][%p] ERROR, %u, %s.", SocketProc->Parent, LastError, - "CxPlatSocketEnqueueSqe"); + "CxPlatEventQEnqueueEx"); return HRESULT_FROM_WIN32(LastError); } return QUIC_STATUS_SUCCESS; @@ -1536,8 +1513,6 @@ SocketCreateUdp( CxPlatRefInitialize(&Socket->PerProcSockets[i].RefCount); Socket->PerProcSockets[i].Parent = Socket; Socket->PerProcSockets[i].Socket = INVALID_SOCKET; - CxPlatDatapathSqeInitialize( - &Socket->PerProcSockets[i].IoSqe.DatapathSqe, CXPLAT_CQE_TYPE_SOCKET_IO); CxPlatRundownInitialize(&Socket->PerProcSockets[i].RundownRef); Socket->PerProcSockets[i].RioCq = RIO_INVALID_CQ; Socket->PerProcSockets[i].RioRq = RIO_INVALID_RQ; @@ -1875,11 +1850,7 @@ SocketCreateUdp( NotificationCompletion.Iocp.IocpHandle = *SocketProc->DatapathProc->EventQ; NotificationCompletion.Iocp.Overlapped = - &SocketProc->RioSqe.DatapathSqe.Sqe.Overlapped; - - CxPlatDatapathSqeInitialize( - &SocketProc->RioSqe.DatapathSqe, - CXPLAT_CQE_TYPE_SOCKET_IO); + &SocketProc->RioSqe.Overlapped; SocketProc->RioCq = Datapath->RioDispatch.RIOCreateCompletionQueue( @@ -2189,7 +2160,6 @@ CxPlatSocketCreateTcpInternal( CxPlatRefInitialize(&SocketProc->RefCount); SocketProc->Parent = Socket; SocketProc->Socket = INVALID_SOCKET; - CxPlatDatapathSqeInitialize(&SocketProc->IoSqe.DatapathSqe, CXPLAT_CQE_TYPE_SOCKET_IO); CxPlatRundownInitialize(&SocketProc->RundownRef); SocketProc->RioCq = RIO_INVALID_CQ; SocketProc->RioRq = RIO_INVALID_RQ; @@ -2295,7 +2265,10 @@ CxPlatSocketCreateTcpInternal( SOCKADDR_INET MappedRemoteAddress = { 0 }; CxPlatConvertToMappedV6(RemoteAddress, &MappedRemoteAddress); - CxPlatStartDatapathIo(SocketProc, &SocketProc->IoSqe, DATAPATH_IO_CONNECTEX); + CxPlatStartDatapathIo( + SocketProc, + &SocketProc->IoSqe, + CxPlatIoConnectExEventComplete); Result = Datapath->ConnectEx( @@ -2305,7 +2278,7 @@ CxPlatSocketCreateTcpInternal( NULL, 0, &BytesReturned, - &SocketProc->IoSqe.DatapathSqe.Sqe.Overlapped); + &SocketProc->IoSqe.Overlapped); if (Result == FALSE) { int WsaError = WSAGetLastError(); if (WsaError != WSA_IO_PENDING) { @@ -2316,7 +2289,7 @@ CxPlatSocketCreateTcpInternal( WsaError, "ConnectEx"); Status = HRESULT_FROM_WIN32(WsaError); - CxPlatCancelDatapathIo(SocketProc, &SocketProc->IoSqe); + CxPlatCancelDatapathIo(SocketProc); goto Error; } } else { @@ -2325,7 +2298,7 @@ CxPlatSocketCreateTcpInternal( // Status = CxPlatSocketEnqueueSqe(SocketProc, &SocketProc->IoSqe, BytesReturned); if (QUIC_FAILED(Status)) { - CxPlatCancelDatapathIo(SocketProc, &SocketProc->IoSqe); + CxPlatCancelDatapathIo(SocketProc); goto Error; } } @@ -2461,7 +2434,6 @@ SocketCreateTcpListener( CxPlatRefInitialize(&SocketProc->RefCount); SocketProc->Parent = Socket; SocketProc->Socket = INVALID_SOCKET; - CxPlatDatapathSqeInitialize(&SocketProc->IoSqe.DatapathSqe, CXPLAT_CQE_TYPE_SOCKET_IO); CxPlatRundownInitialize(&SocketProc->RundownRef); SocketProc->RioCq = RIO_INVALID_CQ; SocketProc->RioRq = RIO_INVALID_RQ; @@ -2868,9 +2840,6 @@ CxPlatSocketAllocRxIoBlock( IoBlock->OwningPool = OwningPool; IoBlock->ReferenceCount = 0; IoBlock->SocketProc = SocketProc; -#if DEBUG - IoBlock->Sqe.IoType = 0; -#endif } return IoBlock; @@ -2911,7 +2880,10 @@ CxPlatSocketStartAccept( } } - CxPlatStartDatapathIo(ListenerSocketProc, &ListenerSocketProc->IoSqe, DATAPATH_IO_ACCEPTEX); + CxPlatStartDatapathIo( + ListenerSocketProc, + &ListenerSocketProc->IoSqe, + CxPlatIoAcceptExEventComplete); Result = Datapath->AcceptEx( @@ -2922,7 +2894,7 @@ CxPlatSocketStartAccept( sizeof(SOCKADDR_INET)+16, // dwLocalAddressLength sizeof(SOCKADDR_INET)+16, // dwRemoteAddressLength &BytesRecv, - &ListenerSocketProc->IoSqe.DatapathSqe.Sqe.Overlapped); + &ListenerSocketProc->IoSqe.Overlapped); if (Result == FALSE) { int WsaError = WSAGetLastError(); if (WsaError != WSA_IO_PENDING) { @@ -2933,7 +2905,7 @@ CxPlatSocketStartAccept( WsaError, "AcceptEx"); Status = HRESULT_FROM_WIN32(WsaError); - CxPlatCancelDatapathIo(ListenerSocketProc, &ListenerSocketProc->IoSqe); + CxPlatCancelDatapathIo(ListenerSocketProc); goto Error; } } else { @@ -2942,7 +2914,7 @@ CxPlatSocketStartAccept( // Status = CxPlatSocketEnqueueSqe(ListenerSocketProc, &ListenerSocketProc->IoSqe, BytesRecv); if (QUIC_FAILED(Status)) { - CxPlatCancelDatapathIo(ListenerSocketProc, &ListenerSocketProc->IoSqe); + CxPlatCancelDatapathIo(ListenerSocketProc); goto Error; } } @@ -2956,12 +2928,10 @@ CxPlatSocketStartAccept( void CxPlatDataPathSocketProcessAcceptCompletion( - _In_ DATAPATH_IO_SQE* Sqe, - _In_ CXPLAT_CQE* Cqe + _In_ CXPLAT_SOCKET_PROC* ListenerSocketProc, + _In_ ULONG IoResult ) { - CXPLAT_SOCKET_PROC* ListenerSocketProc = CONTAINING_RECORD(Sqe, CXPLAT_SOCKET_PROC, IoSqe); - ULONG IoResult = RtlNtStatusToDosError((NTSTATUS)Cqe->Internal); CXPLAT_SOCKET_PROC* AcceptSocketProc = NULL; if (IoResult == WSAENOTSOCK || IoResult == WSA_OPERATION_ABORTED) { @@ -3105,13 +3075,10 @@ CxPlatDataPathSocketProcessAcceptCompletion( void CxPlatDataPathSocketProcessConnectCompletion( - _In_ DATAPATH_IO_SQE* Sqe, - _In_ CXPLAT_CQE* Cqe + _In_ CXPLAT_SOCKET_PROC* SocketProc, + _In_ ULONG IoResult ) { - CXPLAT_SOCKET_PROC* SocketProc = CONTAINING_RECORD(Sqe, CXPLAT_SOCKET_PROC, IoSqe); - ULONG IoResult = RtlNtStatusToDosError((NTSTATUS)Cqe->Internal); - if (IoResult == WSAENOTSOCK || IoResult == WSA_OPERATION_ABORTED) { // // Error from shutdown, silently ignore. Return immediately so the @@ -3201,11 +3168,10 @@ CxPlatSocketStartRioReceives( Control.BufferId = IoBlock->RioBufferId; Control.Offset = FIELD_OFFSET(DATAPATH_RX_IO_BLOCK, ControlBuf); Control.Length = sizeof(IoBlock->ControlBuf); - IoBlock->Sqe.IoType = DATAPATH_IO_RIO_RECV; if (!Datapath->RioDispatch.RIOReceiveEx( SocketProc->RioRq, &Data, 1, NULL, &RemoteAddr, - &Control, NULL, RioFlags, &IoBlock->Sqe.IoType)) { + &Control, NULL, RioFlags, &IoBlock->Sqe)) { int WsaError = WSAGetLastError(); QuicTraceEvent( DatapathErrorStatus, @@ -3295,8 +3261,10 @@ CxPlatSocketStartWinsockReceive( // held by the socket until it completes. // - CxPlatDatapathSqeInitialize(&IoBlock->Sqe.DatapathSqe, CXPLAT_CQE_TYPE_SOCKET_IO); - CxPlatStartDatapathIo(SocketProc, &IoBlock->Sqe, DATAPATH_IO_RECV); + CxPlatStartDatapathIo( + SocketProc, + &IoBlock->Sqe, + CxPlatIoRecvEventComplete); IoBlock->WsaControlBuf.buf = ((CHAR*)IoBlock) + Datapath->RecvPayloadOffset; IoBlock->WsaControlBuf.len = SocketProc->Parent->RecvBufLen; @@ -3325,7 +3293,7 @@ CxPlatSocketStartWinsockReceive( SocketProc->Socket, &IoBlock->WsaMsgHdr, &BytesRecv, - &IoBlock->Sqe.DatapathSqe.Sqe.Overlapped, + &IoBlock->Sqe.Overlapped, NULL); } else { Result = @@ -3335,7 +3303,7 @@ CxPlatSocketStartWinsockReceive( 1, &BytesRecv, &IoBlock->WsaMsgHdr.dwFlags, - &IoBlock->Sqe.DatapathSqe.Sqe.Overlapped, + &IoBlock->Sqe.Overlapped, NULL); } @@ -3350,7 +3318,7 @@ CxPlatSocketStartWinsockReceive( // Update the SQE to indicate the failure. // if (SyncBytesReceived == NULL) { - IoBlock->Sqe.IoType = DATAPATH_IO_RECV_FAILURE; + IoBlock->Sqe.Completion = CxPlatIoRecvFailureEventComplete; BytesRecv = (DWORD)WsaError; } } @@ -3360,8 +3328,14 @@ CxPlatSocketStartWinsockReceive( // The receive completed inline (success or failure), and the caller is // prepared to handle it synchronously. // - CxPlatStopInlineDatapathIo(&IoBlock->Sqe); CXPLAT_DBG_ASSERT(BytesRecv < UINT16_MAX); + // + // We want to assert the overlapped result is not pending below, but Winsock + // and the Windows kernel may leave the overlapped struct in the pending + // state if an IO completes inline. Ignore the overlapped result in this + // case. + // + IoBlock->Sqe.Overlapped.Internal = 0; *SyncBytesReceived = (uint16_t)BytesRecv; *SyncIoResult = WsaError; *SyncIoBlock = IoBlock; @@ -3380,7 +3354,7 @@ CxPlatSocketStartWinsockReceive( // and this likely should simply be treated as a fatal error. // CXPLAT_DBG_ASSERT(FALSE); // We don't expect tests to hit this. - CxPlatCancelDatapathIo(SocketProc, &IoBlock->Sqe); + CxPlatCancelDatapathIo(SocketProc); CxPlatSocketFreeRxIoBlock(IoBlock); return Status; } @@ -3695,12 +3669,9 @@ CxPlatDataPathStartReceiveAsync( void CxPlatDataPathSocketProcessRioCompletion( - _In_ DATAPATH_IO_SQE* Sqe, - _In_ CXPLAT_CQE* Cqe + _In_ CXPLAT_SOCKET_PROC* SocketProc ) { - UNREFERENCED_PARAMETER(Cqe); - CXPLAT_SOCKET_PROC* SocketProc = CONTAINING_RECORD(Sqe, CXPLAT_SOCKET_PROC, RioSqe); CXPLAT_DATAPATH* Datapath = SocketProc->DatapathProc->Datapath; ULONG ResultCount; BOOLEAN UpcallAcquired; @@ -3721,14 +3692,14 @@ CxPlatDataPathSocketProcessRioCompletion( CXPLAT_FRE_ASSERT(ResultCount != RIO_CORRUPT_CQ); for (ULONG i = 0; i < ResultCount; i++) { - DATAPATH_IO_TYPE* IoType = - (DATAPATH_IO_TYPE*)(ULONG_PTR)Results[i].RequestContext; + RIO_IO_TYPE* IoType = + (RIO_IO_TYPE*)(ULONG_PTR)Results[i].RequestContext; switch (*IoType) { - case DATAPATH_IO_RIO_RECV: + case RIO_IO_RECV: CXPLAT_DBG_ASSERT(Results[i].BytesTransferred <= UINT16_MAX); DATAPATH_RX_IO_BLOCK* IoBlock = - CONTAINING_RECORD(IoType, DATAPATH_RX_IO_BLOCK, Sqe.IoType); + CONTAINING_RECORD(IoType, DATAPATH_RX_IO_BLOCK, IoType); if (UpcallAcquired) { NeedReceive = @@ -3744,7 +3715,7 @@ CxPlatDataPathSocketProcessRioCompletion( SocketProc->RioRecvCount--; break; - case DATAPATH_IO_RIO_SEND: + case RIO_IO_SEND: CXPLAT_RIO_SEND_BUFFER_HEADER* SendHeader = CONTAINING_RECORD(IoType, CXPLAT_RIO_SEND_BUFFER_HEADER, IoType); CxPlatSendDataComplete(SendHeader->SendData, Results[i].Status); @@ -4075,9 +4046,6 @@ SendDataAlloc( SendData->ClientBuffer.len = 0; SendData->ClientBuffer.buf = NULL; SendData->DatapathType = Config->Route->DatapathType = CXPLAT_DATAPATH_TYPE_NORMAL; -#if DEBUG - SendData->Sqe.IoType = 0; -#endif if (Socket->UseRio) { SendData->BufferPool = @@ -4469,7 +4437,7 @@ CxPlatSocketSendWithRio( Data.BufferId = SendHeader->RioBufferId; Data.Length = SendData->WsaBuffers[i].len; - SendHeader->IoType = DATAPATH_IO_RIO_SEND; + SendHeader->IoType = RIO_IO_SEND; SendHeader->SendData = SendData; if (!Datapath->RioDispatch.RIOSendEx( @@ -4595,8 +4563,10 @@ CxPlatSocketSendInline( // // Start the async send. // - CxPlatDatapathSqeInitialize(&SendData->Sqe.DatapathSqe, CXPLAT_CQE_TYPE_SOCKET_IO); - CxPlatStartDatapathIo(SocketProc, &SendData->Sqe, DATAPATH_IO_SEND); + CxPlatStartDatapathIo( + SocketProc, + &SendData->Sqe, + CxPlatIoSendEventComplete); if (Socket->Type == CXPLAT_SOCKET_UDP) { Result = @@ -4605,7 +4575,7 @@ CxPlatSocketSendInline( &WSAMhdr, 0, &BytesSent, - &SendData->Sqe.DatapathSqe.Sqe.Overlapped, + &SendData->Sqe.Overlapped, NULL); } else { Result = @@ -4615,7 +4585,7 @@ CxPlatSocketSendInline( SendData->WsaBufferCount, &BytesSent, 0, - &SendData->Sqe.DatapathSqe.Sqe.Overlapped, + &SendData->Sqe.Overlapped, NULL); } @@ -4630,7 +4600,7 @@ CxPlatSocketSendInline( // // Completed synchronously, so process the completion inline. // - CxPlatCancelDatapathIo(SocketProc, &SendData->Sqe); + CxPlatCancelDatapathIo(SocketProc); CxPlatSendDataComplete(SendData, WsaError); } @@ -4641,11 +4611,17 @@ CxPlatSocketSendEnqueue( ) { SendData->LocalAddress = Route->LocalAddress; - CxPlatDatapathSqeInitialize(&SendData->Sqe.DatapathSqe, CXPLAT_CQE_TYPE_SOCKET_IO); - CxPlatStartDatapathIo(SendData->SocketProc, &SendData->Sqe, DATAPATH_IO_QUEUE_SEND); - QUIC_STATUS Status = CxPlatSocketEnqueueSqe(SendData->SocketProc, &SendData->Sqe, 0); + CxPlatStartDatapathIo( + SendData->SocketProc, + &SendData->Sqe, + CxPlatIoQueueSendEventComplete); + QUIC_STATUS Status = + CxPlatSocketEnqueueSqe( + SendData->SocketProc, + &SendData->Sqe, + 0); if (QUIC_FAILED(Status)) { - CxPlatCancelDatapathIo(SendData->SocketProc, &SendData->Sqe); + CxPlatCancelDatapathIo(SendData->SocketProc); } } @@ -4690,12 +4666,9 @@ SocketSend( void CxPlatDataPathSocketProcessQueuedSend( - _In_ DATAPATH_IO_SQE* Sqe, - _In_ CXPLAT_CQE* Cqe + _In_ CXPLAT_SEND_DATA* SendData ) { - UNREFERENCED_PARAMETER(Cqe); - CXPLAT_SEND_DATA* SendData = CONTAINING_RECORD(Sqe, CXPLAT_SEND_DATA, Sqe); CXPLAT_SOCKET_PROC* SocketProc = SendData->SocketProc; if (CxPlatRundownAcquire(&SocketProc->RundownRef)) { @@ -4795,81 +4768,103 @@ CxPlatSocketGetTcpStatistics( #endif } +_IRQL_requires_max_(PASSIVE_LEVEL) void -DataPathProcessCqe( +CxPlatIoRecvEventComplete( _In_ CXPLAT_CQE* Cqe ) { - switch (CxPlatCqeType(Cqe)) { - case CXPLAT_CQE_TYPE_SOCKET_IO: { - DATAPATH_IO_SQE* Sqe = - CONTAINING_RECORD(CxPlatCqeUserData(Cqe), DATAPATH_IO_SQE, DatapathSqe); - DATAPATH_IO_TYPE IoType = Sqe->IoType; - CXPLAT_SOCKET_PROC* SocketProc = NULL; - - CxPlatStopDatapathIo(Sqe); - - switch (IoType) { - case DATAPATH_IO_RECV: - // - // N.B. We don't set SocketProc here because receive completions are - // special (they loop internally). - // - CXPLAT_DBG_ASSERT(Cqe->dwNumberOfBytesTransferred <= UINT16_MAX); - CxPlatDataPathSocketProcessReceive( - CONTAINING_RECORD(Sqe, DATAPATH_RX_IO_BLOCK, Sqe), - (uint16_t)Cqe->dwNumberOfBytesTransferred, - RtlNtStatusToDosError((NTSTATUS)Cqe->Internal)); - break; - - case DATAPATH_IO_SEND: - SocketProc = CONTAINING_RECORD(Sqe, CXPLAT_SEND_DATA, Sqe)->SocketProc; - CxPlatSendDataComplete( - CONTAINING_RECORD(Sqe, CXPLAT_SEND_DATA, Sqe), - RtlNtStatusToDosError((NTSTATUS)Cqe->Internal)); - break; - - case DATAPATH_IO_QUEUE_SEND: - SocketProc = CONTAINING_RECORD(Sqe, CXPLAT_SEND_DATA, Sqe)->SocketProc; - CxPlatDataPathSocketProcessQueuedSend(Sqe, Cqe); - break; + CXPLAT_SQE* Sqe = CxPlatCqeGetSqe(Cqe); + CXPLAT_DBG_ASSERT(Sqe->Overlapped.Internal != 0x103); // STATUS_PENDING + CXPLAT_DBG_ASSERT(Cqe->dwNumberOfBytesTransferred <= UINT16_MAX); + CxPlatDataPathSocketProcessReceive( + CONTAINING_RECORD(Sqe, DATAPATH_RX_IO_BLOCK, Sqe), + (uint16_t)Cqe->dwNumberOfBytesTransferred, + RtlNtStatusToDosError((NTSTATUS)Cqe->Internal)); +} - case DATAPATH_IO_ACCEPTEX: - SocketProc = CONTAINING_RECORD(Sqe, CXPLAT_SOCKET_PROC, IoSqe); - CxPlatDataPathSocketProcessAcceptCompletion(Sqe, Cqe); - break; +_IRQL_requires_max_(PASSIVE_LEVEL) +void +CxPlatIoRecvFailureEventComplete( + _In_ CXPLAT_CQE* Cqe + ) +{ + CXPLAT_SQE* Sqe = CxPlatCqeGetSqe(Cqe); + CXPLAT_DBG_ASSERT(Sqe->Overlapped.Internal != 0x103); // STATUS_PENDING + CXPLAT_DBG_ASSERT(Cqe->dwNumberOfBytesTransferred <= UINT16_MAX); + CxPlatDataPathSocketProcessReceive( + CONTAINING_RECORD(Sqe, DATAPATH_RX_IO_BLOCK, Sqe), + 0, + (ULONG)Cqe->dwNumberOfBytesTransferred); +} - case DATAPATH_IO_CONNECTEX: - SocketProc = CONTAINING_RECORD(Sqe, CXPLAT_SOCKET_PROC, IoSqe); - CxPlatDataPathSocketProcessConnectCompletion(Sqe, Cqe); - break; +_IRQL_requires_max_(PASSIVE_LEVEL) +void +CxPlatIoSendEventComplete( + _In_ CXPLAT_CQE* Cqe + ) +{ + CXPLAT_SQE* Sqe = CxPlatCqeGetSqe(Cqe); + CXPLAT_DBG_ASSERT(Sqe->Overlapped.Internal != 0x103); // STATUS_PENDING + CXPLAT_SEND_DATA* SendData = CONTAINING_RECORD(Sqe, CXPLAT_SEND_DATA, Sqe); + CXPLAT_SOCKET_PROC* SocketProc = SendData->SocketProc; + CxPlatSendDataComplete( + SendData, + RtlNtStatusToDosError((NTSTATUS)Cqe->Internal)); + CxPlatSocketContextRelease(SocketProc); +} - case DATAPATH_IO_RIO_NOTIFY: - SocketProc = CONTAINING_RECORD(Sqe, CXPLAT_SOCKET_PROC, RioSqe); - CxPlatDataPathSocketProcessRioCompletion(Sqe, Cqe); - break; +_IRQL_requires_max_(PASSIVE_LEVEL) +void +CxPlatIoQueueSendEventComplete( + _In_ CXPLAT_CQE* Cqe + ) +{ + CXPLAT_SQE* Sqe = CxPlatCqeGetSqe(Cqe); + CXPLAT_DBG_ASSERT(Sqe->Overlapped.Internal != 0x103); // STATUS_PENDING + CXPLAT_SEND_DATA* SendData = CONTAINING_RECORD(Sqe, CXPLAT_SEND_DATA, Sqe); + CXPLAT_SOCKET_PROC* SocketProc = SendData->SocketProc; + CxPlatDataPathSocketProcessQueuedSend(SendData); + CxPlatSocketContextRelease(SocketProc); +} - case DATAPATH_IO_RECV_FAILURE: - // - // N.B. We don't set SocketProc here because receive completions are - // special (they loop internally). - // - CxPlatDataPathSocketProcessReceive( - CONTAINING_RECORD(Sqe, DATAPATH_RX_IO_BLOCK, Sqe), - 0, - (ULONG)Cqe->dwNumberOfBytesTransferred); - break; +_IRQL_requires_max_(PASSIVE_LEVEL) +void +CxPlatIoAcceptExEventComplete( + _In_ CXPLAT_CQE* Cqe + ) +{ + CXPLAT_SQE* Sqe = CxPlatCqeGetSqe(Cqe); + CXPLAT_DBG_ASSERT(Sqe->Overlapped.Internal != 0x103); // STATUS_PENDING + CXPLAT_SOCKET_PROC* SocketProc = CONTAINING_RECORD(Sqe, CXPLAT_SOCKET_PROC, IoSqe); + ULONG IoResult = RtlNtStatusToDosError((NTSTATUS)Cqe->Internal); + CxPlatDataPathSocketProcessAcceptCompletion(SocketProc, IoResult); + CxPlatSocketContextRelease(SocketProc); +} - default: - CXPLAT_DBG_ASSERT(FALSE); - break; - } +_IRQL_requires_max_(PASSIVE_LEVEL) +void +CxPlatIoConnectExEventComplete( + _In_ CXPLAT_CQE* Cqe + ) +{ + CXPLAT_SQE* Sqe = CxPlatCqeGetSqe(Cqe); + CXPLAT_DBG_ASSERT(Sqe->Overlapped.Internal != 0x103); // STATUS_PENDING + CXPLAT_SOCKET_PROC* SocketProc = CONTAINING_RECORD(Sqe, CXPLAT_SOCKET_PROC, IoSqe); + ULONG IoResult = RtlNtStatusToDosError((NTSTATUS)Cqe->Internal); + CxPlatDataPathSocketProcessConnectCompletion(SocketProc, IoResult); + CxPlatSocketContextRelease(SocketProc); +} - if (SocketProc) { - CxPlatSocketContextRelease(SocketProc); - } - break; - } - default: CXPLAT_DBG_ASSERT(FALSE); break; - } +_IRQL_requires_max_(PASSIVE_LEVEL) +void +CxPlatIoRioNotifyEventComplete( + _In_ CXPLAT_CQE* Cqe + ) +{ + CXPLAT_SQE* Sqe = CxPlatCqeGetSqe(Cqe); + CXPLAT_DBG_ASSERT(Sqe->Overlapped.Internal != 0x103); // STATUS_PENDING + CXPLAT_SOCKET_PROC* SocketProc = CONTAINING_RECORD(Sqe, CXPLAT_SOCKET_PROC, IoSqe); + CxPlatDataPathSocketProcessRioCompletion(SocketProc); + CxPlatSocketContextRelease(SocketProc); } diff --git a/src/platform/inline.c b/src/platform/inline.c index 6b4d075bda..5ab290945b 100644 --- a/src/platform/inline.c +++ b/src/platform/inline.c @@ -411,20 +411,19 @@ CxPlatEventQCleanup( _In_ CXPLAT_EVENTQ* queue ); -#ifdef CXPLAT_SQE BOOLEAN CxPlatEventQEnqueue( _In_ CXPLAT_EVENTQ* queue, - _In_ CXPLAT_SQE* sqe, - _In_opt_ void* user_data + _In_ CXPLAT_SQE* sqe ); -#else + BOOLEAN -_CxPlatEventQEnqueue( +CxPlatEventQEnqueueEx( _In_ CXPLAT_EVENTQ* queue, - _In_opt_ void* user_data + _In_ CXPLAT_SQE* sqe, + _In_ short filter, + _In_ unsigned short flags ); -#endif uint32_t CxPlatEventQDequeue( @@ -440,12 +439,19 @@ CxPlatEventQReturn( _In_ uint32_t count ); -#ifdef CXPLAT_SQE_INIT BOOLEAN CxPlatSqeInitialize( _In_ CXPLAT_EVENTQ* queue, - _Out_ CXPLAT_SQE* sqe, - _In_ void* user_data + _In_ CXPLAT_EVENT_COMPLETION completion, + _Out_ CXPLAT_SQE* sqe + ); + +void +CxPlatSqeInitializeEx( + _In_ CXPLAT_EVENTQ* queue, + _In_ uintptr_t handle, + _In_ CXPLAT_EVENT_COMPLETION completion, + _Out_ CXPLAT_SQE* sqe ); void @@ -453,10 +459,9 @@ CxPlatSqeCleanup( _In_ CXPLAT_EVENTQ* queue, _In_ CXPLAT_SQE* sqe ); -#endif // CXPLAT_SQE_INIT -void* -CxPlatCqeUserData( +CXPLAT_SQE* +CxPlatCqeGetSqe( _In_ const CXPLAT_CQE* cqe ); diff --git a/src/platform/platform_internal.h b/src/platform/platform_internal.h index 11d064fc4a..d9e83583ee 100644 --- a/src/platform/platform_internal.h +++ b/src/platform/platform_internal.h @@ -39,13 +39,6 @@ #endif -typedef struct DATAPATH_SQE { - uint32_t CqeType; -#ifdef CXPLAT_SQE - CXPLAT_SQE Sqe; -#endif -} DATAPATH_SQE; - typedef struct CXPLAT_DATAPATH_COMMON { // // The UDP callback function pointers. @@ -122,40 +115,6 @@ typedef enum CXPLAT_DATAPATH_TYPE { CXPLAT_DATAPATH_TYPE_RAW, // currently raw == xdp } CXPLAT_DATAPATH_TYPE; -// -// Type of IO. -// -typedef enum DATAPATH_IO_TYPE { - DATAPATH_IO_SIGNATURE = 'WINU', - DATAPATH_IO_RECV = DATAPATH_IO_SIGNATURE + 1, - DATAPATH_IO_SEND = DATAPATH_IO_SIGNATURE + 2, - DATAPATH_IO_QUEUE_SEND = DATAPATH_IO_SIGNATURE + 3, - DATAPATH_IO_ACCEPTEX = DATAPATH_IO_SIGNATURE + 4, - DATAPATH_IO_CONNECTEX = DATAPATH_IO_SIGNATURE + 5, - DATAPATH_IO_RIO_NOTIFY = DATAPATH_IO_SIGNATURE + 6, - DATAPATH_IO_RIO_RECV = DATAPATH_IO_SIGNATURE + 7, - DATAPATH_IO_RIO_SEND = DATAPATH_IO_SIGNATURE + 8, - DATAPATH_IO_RECV_FAILURE = DATAPATH_IO_SIGNATURE + 9, - DATAPATH_IO_MAX -} DATAPATH_IO_TYPE; - -// -// Type of IO for XDP. -// -typedef enum DATAPATH_XDP_IO_TYPE { - DATAPATH_XDP_IO_SIGNATURE = 'XDPD', - DATAPATH_XDP_IO_RECV = DATAPATH_XDP_IO_SIGNATURE + 1, - DATAPATH_XDP_IO_SEND = DATAPATH_XDP_IO_SIGNATURE + 2 -} DATAPATH_XDP_IO_TYPE; - -// -// IO header for SQE->CQE based completions. -// -typedef struct DATAPATH_IO_SQE { - DATAPATH_IO_TYPE IoType; - DATAPATH_SQE DatapathSqe; -} DATAPATH_IO_SQE; - typedef enum CXPLAT_SOCKET_TYPE { CXPLAT_SOCKET_UDP = 0, CXPLAT_SOCKET_TCP_LISTENER = 1, @@ -450,12 +409,12 @@ typedef struct QUIC_CACHEALIGN CXPLAT_SOCKET_PROC { // // Submission queue event for IO completion // - DATAPATH_IO_SQE IoSqe; + CXPLAT_SQE IoSqe; // // Submission queue event for RIO IO completion // - DATAPATH_IO_SQE RioSqe; + CXPLAT_SQE RioSqe; // // The datapath per-processor context. @@ -802,11 +761,6 @@ CxPlatWorkerPoolGetEventQ( _In_ uint16_t Index // Into the config processor array ); -void -CxPlatDataPathProcessCqe( - _In_ CXPLAT_CQE* Cqe - ); - BOOLEAN // Returns FALSE no work was done. CxPlatDataPathPoll( _In_ void* Context, @@ -823,15 +777,6 @@ CxPlatDpRawGetDatapathSize( _In_opt_ const QUIC_EXECUTION_CONFIG* Config ); -#define CXPLAT_CQE_TYPE_WORKER_WAKE CXPLAT_CQE_TYPE_QUIC_BASE + 1 -#define CXPLAT_CQE_TYPE_WORKER_UPDATE_POLL CXPLAT_CQE_TYPE_QUIC_BASE + 2 -#define CXPLAT_CQE_TYPE_SOCKET_SHUTDOWN CXPLAT_CQE_TYPE_QUIC_BASE + 3 -#define CXPLAT_CQE_TYPE_SOCKET_IO CXPLAT_CQE_TYPE_QUIC_BASE + 4 -#define CXPLAT_CQE_TYPE_SOCKET_FLUSH_TX CXPLAT_CQE_TYPE_QUIC_BASE + 5 -#define CXPLAT_CQE_TYPE_XDP_SHUTDOWN CXPLAT_CQE_TYPE_QUIC_BASE + 6 -#define CXPLAT_CQE_TYPE_XDP_IO CXPLAT_CQE_TYPE_QUIC_BASE + 7 -#define CXPLAT_CQE_TYPE_XDP_FLUSH_TX CXPLAT_CQE_TYPE_QUIC_BASE + 8 - #if defined(CX_PLATFORM_LINUX) typedef struct CXPLAT_DATAPATH_PARTITION CXPLAT_DATAPATH_PARTITION; @@ -859,17 +804,17 @@ typedef struct QUIC_CACHEALIGN CXPLAT_SOCKET_CONTEXT { // // The submission queue event for shutdown. // - DATAPATH_SQE ShutdownSqe; + CXPLAT_SQE ShutdownSqe; // // The submission queue event for IO. // - DATAPATH_SQE IoSqe; + CXPLAT_SQE IoSqe; // // The submission queue event for flushing the send queue. // - DATAPATH_SQE FlushTxSqe; + CXPLAT_SQE FlushTxSqe; // // The head of list containg all pending sends on this socket. @@ -1197,11 +1142,6 @@ SocketSend( _In_ CXPLAT_SEND_DATA* SendData ); -void -DataPathProcessCqe( - _In_ CXPLAT_CQE* Cqe - ); - CXPLAT_SOCKET* CxPlatRawToSocket( _In_ CXPLAT_SOCKET_RAW* Socket @@ -1346,11 +1286,6 @@ RawResolveRoute( _In_ CXPLAT_ROUTE_RESOLUTION_CALLBACK_HANDLER Callback ); -void -RawDataPathProcessCqe( - _In_ CXPLAT_CQE* Cqe - ); - _IRQL_requires_max_(PASSIVE_LEVEL) void RawUpdateRoute( diff --git a/src/platform/platform_winuser.c b/src/platform/platform_winuser.c index 46e87aa35b..11897048d2 100644 --- a/src/platform/platform_winuser.c +++ b/src/platform/platform_winuser.c @@ -582,17 +582,6 @@ CxPlatGetProcessorGroupInfo( return QUIC_STATUS_SUCCESS; } -void -CxPlatDatapathSqeInitialize( - _Out_ DATAPATH_SQE* DatapathSqe, - _In_ uint32_t CqeType - ) -{ - RtlZeroMemory(DatapathSqe, sizeof(*DatapathSqe)); - DatapathSqe->CqeType = CqeType; - DatapathSqe->Sqe.UserData = DatapathSqe; -} - #ifdef DEBUG void CxPlatSetAllocFailDenominator( diff --git a/src/platform/platform_worker.c b/src/platform/platform_worker.c index ffe4c73d22..78a139c098 100644 --- a/src/platform/platform_worker.c +++ b/src/platform/platform_worker.c @@ -15,9 +15,6 @@ #include "platform_worker.c.clog.h" #endif -const uint32_t WorkerWakeEventPayload = CXPLAT_CQE_TYPE_WORKER_WAKE; -const uint32_t WorkerUpdatePollEventPayload = CXPLAT_CQE_TYPE_WORKER_UPDATE_POLL; - typedef struct QUIC_CACHEALIGN CXPLAT_WORKER { // @@ -30,7 +27,6 @@ typedef struct QUIC_CACHEALIGN CXPLAT_WORKER { // CXPLAT_EVENTQ EventQ; -#ifdef CXPLAT_SQE // // Submission queue entry for shutting down the worker thread. // @@ -45,7 +41,6 @@ typedef struct QUIC_CACHEALIGN CXPLAT_WORKER { // Submission queue entry for update the polling set. // CXPLAT_SQE UpdatePollSqe; -#endif // // Serializes access to the execution contexts. @@ -83,14 +78,13 @@ typedef struct QUIC_CACHEALIGN CXPLAT_WORKER { // Flags to indicate what has been initialized. // BOOLEAN InitializedEventQ : 1; -#ifdef CXPLAT_SQE_INIT BOOLEAN InitializedShutdownSqe : 1; BOOLEAN InitializedWakeSqe : 1; BOOLEAN InitializedUpdatePollSqe : 1; -#endif BOOLEAN InitializedThread : 1; BOOLEAN InitializedECLock : 1; BOOLEAN StoppingThread : 1; + BOOLEAN StoppedThread : 1; BOOLEAN DestroyedThread : 1; #if DEBUG // Debug flags - Must not be in the bitfield. BOOLEAN ThreadStarted; @@ -106,6 +100,42 @@ typedef struct QUIC_CACHEALIGN CXPLAT_WORKER { CXPLAT_THREAD_CALLBACK(CxPlatWorkerThread, Context); +static void +ShutdownCompletion( + _In_ CXPLAT_CQE* Cqe + ) +{ + CXPLAT_WORKER* Worker = + CXPLAT_CONTAINING_RECORD(CxPlatCqeGetSqe(Cqe), CXPLAT_WORKER, ShutdownSqe); + Worker->StoppedThread = TRUE; +} + +static void +WakeCompletion( + _In_ CXPLAT_CQE* Cqe + ) +{ + // + // No-op as the goal is simply to wake the event queue thread + // + UNREFERENCED_PARAMETER(Cqe); +} + +void +CxPlatUpdateExecutionContexts( + _In_ CXPLAT_WORKER* Worker + ); + +static void +UpdatePollCompletion( + _In_ CXPLAT_CQE* Cqe + ) +{ + CXPLAT_WORKER* Worker = + CXPLAT_CONTAINING_RECORD(CxPlatCqeGetSqe(Cqe), CXPLAT_WORKER, UpdatePollSqe); + CxPlatUpdateExecutionContexts(Worker); +} + void CxPlatWorkerPoolInit( _In_ CXPLAT_WORKER_POOL* WorkerPool @@ -177,55 +207,51 @@ CxPlatWorkerPoolLazyStart( CxPlatZeroMemory(WorkerPool->Workers, WorkersSize); for (uint32_t i = 0; i < WorkerPool->WorkerCount; ++i) { - CxPlatLockInitialize(&WorkerPool->Workers[i].ECLock); - CxPlatListInitializeHead(&WorkerPool->Workers[i].DynamicPoolList); - WorkerPool->Workers[i].InitializedECLock = TRUE; - WorkerPool->Workers[i].IdealProcessor = ProcessorList ? ProcessorList[i] : (uint16_t)i; - CXPLAT_DBG_ASSERT(WorkerPool->Workers[i].IdealProcessor < CxPlatProcCount()); - ThreadConfig.IdealProcessor = WorkerPool->Workers[i].IdealProcessor; + CXPLAT_WORKER* Worker = &WorkerPool->Workers[i]; + CxPlatLockInitialize(&Worker->ECLock); + CxPlatListInitializeHead(&Worker->DynamicPoolList); + Worker->InitializedECLock = TRUE; + Worker->IdealProcessor = ProcessorList ? ProcessorList[i] : (uint16_t)i; + CXPLAT_DBG_ASSERT(Worker->IdealProcessor < CxPlatProcCount()); + ThreadConfig.IdealProcessor = Worker->IdealProcessor; ThreadConfig.Context = &WorkerPool->Workers[i]; - if (!CxPlatEventQInitialize(&WorkerPool->Workers[i].EventQ)) { + if (!CxPlatEventQInitialize(&Worker->EventQ)) { QuicTraceEvent( LibraryError, "[ lib] ERROR, %s.", "CxPlatEventQInitialize"); goto Error; } - WorkerPool->Workers[i].InitializedEventQ = TRUE; -#ifdef CXPLAT_SQE_INIT - WorkerPool->Workers[i].ShutdownSqe = (CXPLAT_SQE)WorkerPool->Workers[i].EventQ; - if (!CxPlatSqeInitialize(&WorkerPool->Workers[i].EventQ, &WorkerPool->Workers[i].ShutdownSqe, NULL)) { + Worker->InitializedEventQ = TRUE; + if (!CxPlatSqeInitialize(&Worker->EventQ, ShutdownCompletion, &Worker->ShutdownSqe)) { QuicTraceEvent( LibraryError, "[ lib] ERROR, %s.", "CxPlatSqeInitialize(shutdown)"); goto Error; } - WorkerPool->Workers[i].InitializedShutdownSqe = TRUE; - WorkerPool->Workers[i].WakeSqe = (CXPLAT_SQE)WorkerWakeEventPayload; - if (!CxPlatSqeInitialize(&WorkerPool->Workers[i].EventQ, &WorkerPool->Workers[i].WakeSqe, (void*)&WorkerWakeEventPayload)) { + Worker->InitializedShutdownSqe = TRUE; + if (!CxPlatSqeInitialize(&Worker->EventQ, WakeCompletion, &Worker->WakeSqe)) { QuicTraceEvent( LibraryError, "[ lib] ERROR, %s.", "CxPlatSqeInitialize(wake)"); goto Error; } - WorkerPool->Workers[i].InitializedWakeSqe = TRUE; - WorkerPool->Workers[i].UpdatePollSqe = (CXPLAT_SQE)WorkerUpdatePollEventPayload; - if (!CxPlatSqeInitialize(&WorkerPool->Workers[i].EventQ, &WorkerPool->Workers[i].UpdatePollSqe, (void*)&WorkerUpdatePollEventPayload)) { + Worker->InitializedWakeSqe = TRUE; + if (!CxPlatSqeInitialize(&Worker->EventQ, UpdatePollCompletion, &Worker->UpdatePollSqe)) { QuicTraceEvent( LibraryError, "[ lib] ERROR, %s.", "CxPlatSqeInitialize(updatepoll)"); goto Error; } - WorkerPool->Workers[i].InitializedUpdatePollSqe = TRUE; -#endif + Worker->InitializedUpdatePollSqe = TRUE; if (QUIC_FAILED( - CxPlatThreadCreate(&ThreadConfig, &WorkerPool->Workers[i].Thread))) { + CxPlatThreadCreate(&ThreadConfig, &Worker->Thread))) { goto Error; } - WorkerPool->Workers[i].InitializedThread = TRUE; + Worker->InitializedThread = TRUE; } CxPlatRundownInitialize(&WorkerPool->Rundown); @@ -238,36 +264,32 @@ CxPlatWorkerPoolLazyStart( if (WorkerPool->Workers) { for (uint32_t i = 0; i < WorkerPool->WorkerCount; ++i) { - if (WorkerPool->Workers[i].InitializedThread) { - WorkerPool->Workers[i].StoppingThread = TRUE; - CxPlatEventQEnqueue( - &WorkerPool->Workers[i].EventQ, - &WorkerPool->Workers[i].ShutdownSqe, - NULL); - CxPlatThreadWait(&WorkerPool->Workers[i].Thread); - CxPlatThreadDelete(&WorkerPool->Workers[i].Thread); + CXPLAT_WORKER* Worker = &WorkerPool->Workers[i]; + if (Worker->InitializedThread) { + Worker->StoppingThread = TRUE; + CxPlatEventQEnqueue(&Worker->EventQ, &Worker->ShutdownSqe); + CxPlatThreadWait(&Worker->Thread); + CxPlatThreadDelete(&Worker->Thread); #if DEBUG - CXPLAT_DBG_ASSERT(WorkerPool->Workers[i].ThreadStarted); - CXPLAT_DBG_ASSERT(WorkerPool->Workers[i].ThreadFinished); + CXPLAT_DBG_ASSERT(Worker->ThreadStarted); + CXPLAT_DBG_ASSERT(Worker->ThreadFinished); #endif - WorkerPool->Workers[i].DestroyedThread = TRUE; + Worker->DestroyedThread = TRUE; } -#ifdef CXPLAT_SQE_INIT - if (WorkerPool->Workers[i].InitializedUpdatePollSqe) { - CxPlatSqeCleanup(&WorkerPool->Workers[i].EventQ, &WorkerPool->Workers[i].UpdatePollSqe); + if (Worker->InitializedUpdatePollSqe) { + CxPlatSqeCleanup(&Worker->EventQ, &Worker->UpdatePollSqe); } - if (WorkerPool->Workers[i].InitializedWakeSqe) { - CxPlatSqeCleanup(&WorkerPool->Workers[i].EventQ, &WorkerPool->Workers[i].WakeSqe); + if (Worker->InitializedWakeSqe) { + CxPlatSqeCleanup(&Worker->EventQ, &Worker->WakeSqe); } - if (WorkerPool->Workers[i].InitializedShutdownSqe) { - CxPlatSqeCleanup(&WorkerPool->Workers[i].EventQ, &WorkerPool->Workers[i].ShutdownSqe); + if (Worker->InitializedShutdownSqe) { + CxPlatSqeCleanup(&Worker->EventQ, &Worker->ShutdownSqe); } -#endif // CXPLAT_SQE_INIT - if (WorkerPool->Workers[i].InitializedEventQ) { - CxPlatEventQCleanup(&WorkerPool->Workers[i].EventQ); + if (Worker->InitializedEventQ) { + CxPlatEventQCleanup(&Worker->EventQ); } - if (WorkerPool->Workers[i].InitializedECLock) { - CxPlatLockUninitialize(&WorkerPool->Workers[i].ECLock); + if (Worker->InitializedECLock) { + CxPlatLockUninitialize(&Worker->ECLock); } } @@ -290,26 +312,22 @@ CxPlatWorkerPoolUninit( CxPlatRundownReleaseAndWait(&WorkerPool->Rundown); for (uint32_t i = 0; i < WorkerPool->WorkerCount; ++i) { - WorkerPool->Workers[i].StoppingThread = TRUE; - CxPlatEventQEnqueue( - &WorkerPool->Workers[i].EventQ, - &WorkerPool->Workers[i].ShutdownSqe, - NULL); - CxPlatThreadWait(&WorkerPool->Workers[i].Thread); - CxPlatThreadDelete(&WorkerPool->Workers[i].Thread); + CXPLAT_WORKER* Worker = &WorkerPool->Workers[i]; + Worker->StoppingThread = TRUE; + CxPlatEventQEnqueue(&Worker->EventQ, &Worker->ShutdownSqe); + CxPlatThreadWait(&Worker->Thread); + CxPlatThreadDelete(&Worker->Thread); #if DEBUG - CXPLAT_DBG_ASSERT(WorkerPool->Workers[i].ThreadStarted); - CXPLAT_DBG_ASSERT(WorkerPool->Workers[i].ThreadFinished); + CXPLAT_DBG_ASSERT(Worker->ThreadStarted); + CXPLAT_DBG_ASSERT(Worker->ThreadFinished); #endif - WorkerPool->Workers[i].DestroyedThread = TRUE; -#ifdef CXPLAT_SQE_INIT - CxPlatSqeCleanup(&WorkerPool->Workers[i].EventQ, &WorkerPool->Workers[i].UpdatePollSqe); - CxPlatSqeCleanup(&WorkerPool->Workers[i].EventQ, &WorkerPool->Workers[i].WakeSqe); - CxPlatSqeCleanup(&WorkerPool->Workers[i].EventQ, &WorkerPool->Workers[i].ShutdownSqe); -#endif // CXPLAT_SQE_INIT - CxPlatEventQCleanup(&WorkerPool->Workers[i].EventQ); - CXPLAT_DBG_ASSERT(CxPlatListIsEmpty(&WorkerPool->Workers[i].DynamicPoolList)); - CxPlatLockUninitialize(&WorkerPool->Workers[i].ECLock); + Worker->DestroyedThread = TRUE; + CxPlatSqeCleanup(&Worker->EventQ, &Worker->UpdatePollSqe); + CxPlatSqeCleanup(&Worker->EventQ, &Worker->WakeSqe); + CxPlatSqeCleanup(&Worker->EventQ, &Worker->ShutdownSqe); + CxPlatEventQCleanup(&Worker->EventQ); + CXPLAT_DBG_ASSERT(CxPlatListIsEmpty(&Worker->DynamicPoolList)); + CxPlatLockUninitialize(&Worker->ECLock); } CXPLAT_FREE(WorkerPool->Workers, QUIC_POOL_PLATFORM_WORKER); @@ -413,10 +431,7 @@ CxPlatAddExecutionContext( CxPlatLockRelease(&Worker->ECLock); if (QueueEvent) { - CxPlatEventQEnqueue( - &Worker->EventQ, - &Worker->UpdatePollSqe, - (void*)&WorkerUpdatePollEventPayload); + CxPlatEventQEnqueue(&Worker->EventQ, &Worker->UpdatePollSqe); } } @@ -427,7 +442,7 @@ CxPlatWakeExecutionContext( { CXPLAT_WORKER* Worker = (CXPLAT_WORKER*)Context->CxPlatContext; if (!InterlockedFetchAndSetBoolean(&Worker->Running)) { - CxPlatEventQEnqueue(&Worker->EventQ, &Worker->WakeSqe, (void*)&WorkerWakeEventPayload); + CxPlatEventQEnqueue(&Worker->EventQ, &Worker->WakeSqe); } } @@ -509,7 +524,7 @@ CxPlatRunExecutionContexts( } } -BOOLEAN +void CxPlatProcessEvents( _In_ CXPLAT_WORKER* Worker, _Inout_ CXPLAT_EXECUTION_STATE* State @@ -524,26 +539,11 @@ CxPlatProcessEvents( #endif State->NoWorkCount = 0; for (uint32_t i = 0; i < CqeCount; ++i) { - if (CxPlatCqeUserData(&Cqes[i]) == NULL) { -#if DEBUG - CXPLAT_DBG_ASSERT(Worker->StoppingThread); -#endif - return TRUE; // NULL user data means shutdown. - } - switch (CxPlatCqeType(&Cqes[i])) { - case CXPLAT_CQE_TYPE_WORKER_WAKE: - break; // No-op, just wake up to do polling stuff. - case CXPLAT_CQE_TYPE_WORKER_UPDATE_POLL: - CxPlatUpdateExecutionContexts(Worker); - break; - default: // Pass the rest to the datapath - CxPlatDataPathProcessCqe(&Cqes[i]); - break; - } + CXPLAT_SQE* Sqe = CxPlatCqeGetSqe(&Cqes[i]); + Sqe->Completion(&Cqes[i]); } CxPlatEventQReturn(&Worker->EventQ, CqeCount); } - return FALSE; } // @@ -568,7 +568,7 @@ CXPLAT_THREAD_CALLBACK(CxPlatWorkerThread, Context) Worker->Running = TRUE; - while (TRUE) { + while (!Worker->StoppedThread) { ++State.NoWorkCount; #if DEBUG // Debug statistics @@ -582,9 +582,7 @@ CXPLAT_THREAD_CALLBACK(CxPlatWorkerThread, Context) CxPlatRunExecutionContexts(Worker, &State); // Run once more to handle race conditions } - if (CxPlatProcessEvents(Worker, &State)) { - goto Shutdown; - } + CxPlatProcessEvents(Worker, &State); if (State.NoWorkCount == 0) { State.LastWorkTime = State.TimeNow; @@ -599,8 +597,6 @@ CXPLAT_THREAD_CALLBACK(CxPlatWorkerThread, Context) } } -Shutdown: - Worker->Running = FALSE; #if DEBUG diff --git a/src/platform/unittest/PlatformTest.cpp b/src/platform/unittest/PlatformTest.cpp index 126e90f10a..0b35b0c65a 100644 --- a/src/platform/unittest/PlatformTest.cpp +++ b/src/platform/unittest/PlatformTest.cpp @@ -53,8 +53,22 @@ TEST(PlatformTest, QuicAddrParsing) TEST(PlatformTest, EventQueue) { - uint32_t user_data1 = 0x1234, user_data2 = 0x5678, user_data3 = 0x90; - + struct my_sqe : public CXPLAT_SQE { + uint32_t data; + static void my_completion_1(CXPLAT_CQE* Cqe) { + CXPLAT_SQE* Sqe = CxPlatCqeGetSqe(Cqe); + ASSERT_TRUE(((my_sqe*)Sqe)->data == 0x1234); + } + static void my_completion_2(CXPLAT_CQE* Cqe) { + CXPLAT_SQE* Sqe = CxPlatCqeGetSqe(Cqe); + ASSERT_TRUE(((my_sqe*)Sqe)->data == 0x5678); + } + static void my_completion_3(CXPLAT_CQE* Cqe) { + CXPLAT_SQE* Sqe = CxPlatCqeGetSqe(Cqe); + ASSERT_TRUE(((my_sqe*)Sqe)->data == 0x90); + } + }; + CXPLAT_EVENTQ queue; ASSERT_TRUE(CxPlatEventQInitialize(&queue)); @@ -63,127 +77,128 @@ TEST(PlatformTest, EventQueue) ASSERT_EQ(0u, CxPlatEventQDequeue(&queue, events, 2, 0)); ASSERT_EQ(0u, CxPlatEventQDequeue(&queue, events, 2, 100)); -#ifdef CXPLAT_SQE - CXPLAT_SQE sqe1 = CXPLAT_SQE_DEFAULT; - CXPLAT_SQE sqe2 = CXPLAT_SQE_DEFAULT; - CXPLAT_SQE sqe3 = CXPLAT_SQE_DEFAULT; -#ifdef CXPLAT_SQE_INIT - ASSERT_TRUE(CxPlatSqeInitialize(&queue, &sqe1, &user_data1)); - ASSERT_TRUE(CxPlatSqeInitialize(&queue, &sqe2, &user_data2)); - ASSERT_TRUE(CxPlatSqeInitialize(&queue, &sqe3, &user_data3)); -#endif // CXPLAT_SQE_INIT -#endif // CXPLAT_SQE + my_sqe sqe1, sqe2, sqe3; + sqe1.data = 0x1234; + ASSERT_TRUE(CxPlatSqeInitialize(&queue, my_sqe::my_completion_1, &sqe1)); + sqe2.data = 0x5678; + ASSERT_TRUE(CxPlatSqeInitialize(&queue, my_sqe::my_completion_2, &sqe2)); + sqe3.data = 0x90; + ASSERT_TRUE(CxPlatSqeInitialize(&queue, my_sqe::my_completion_3, &sqe3)); // Single queue/dequeue tests - ASSERT_TRUE(CxPlatEventQEnqueue(&queue, &sqe1, &user_data1)); + ASSERT_TRUE(CxPlatEventQEnqueue(&queue, &sqe1)); ASSERT_EQ(1u, CxPlatEventQDequeue(&queue, events, 2, 0)); - ASSERT_EQ((void*)&user_data1, CxPlatCqeUserData(&events[0])); + ASSERT_EQ(&sqe1, (my_sqe*)CxPlatCqeGetSqe(&events[0])); // Multiple queue/dequeue tests - ASSERT_TRUE(CxPlatEventQEnqueue(&queue, &sqe1, &user_data1)); - ASSERT_TRUE(CxPlatEventQEnqueue(&queue, &sqe2, &user_data2)); - ASSERT_TRUE(CxPlatEventQEnqueue(&queue, &sqe3, &user_data3)); + ASSERT_TRUE(CxPlatEventQEnqueue(&queue, &sqe1)); + ASSERT_TRUE(CxPlatEventQEnqueue(&queue, &sqe2)); + ASSERT_TRUE(CxPlatEventQEnqueue(&queue, &sqe3)); ASSERT_EQ(2u, CxPlatEventQDequeue(&queue, events, 2, 100)); ASSERT_EQ(1u, CxPlatEventQDequeue(&queue, events, 2, 0)); ASSERT_EQ(0u, CxPlatEventQDequeue(&queue, events, 2, 0)); struct EventQueueContext { CXPLAT_EVENTQ* queue; -#ifdef CXPLAT_SQE CXPLAT_SQE* sqe; -#endif - void* user_data; static CXPLAT_THREAD_CALLBACK(EventQueueCallback, Context) { auto ctx = (EventQueueContext*)Context; CxPlatSleep(100); - CxPlatEventQEnqueue(ctx->queue, ctx->sqe, ctx->user_data); + CxPlatEventQEnqueue(ctx->queue, ctx->sqe); CXPLAT_THREAD_RETURN(0); } }; // Async queue/dequeue tests -#ifdef CXPLAT_SQE - EventQueueContext context = { &queue, &sqe1, &user_data1 }; -#else - EventQueueContext context = { &queue, &user_data1 }; -#endif + EventQueueContext context = { &queue, &sqe1 }; CXPLAT_THREAD_CONFIG config = { 0, 0, NULL, EventQueueContext::EventQueueCallback, &context }; CXPLAT_THREAD thread; ASSERT_TRUE(QUIC_SUCCEEDED(CxPlatThreadCreate(&config, &thread))); ASSERT_EQ(1u, CxPlatEventQDequeue(&queue, events, 2, 1000)); - ASSERT_EQ((void*)&user_data1, CxPlatCqeUserData(&events[0])); + ASSERT_EQ(&sqe1, (my_sqe*)CxPlatCqeGetSqe(&events[0])); CxPlatThreadWait(&thread); CxPlatThreadDelete(&thread); -#ifdef CXPLAT_SQE_INIT CxPlatSqeCleanup(&queue, &sqe1); CxPlatSqeCleanup(&queue, &sqe2); CxPlatSqeCleanup(&queue, &sqe3); -#endif // CXPLAT_SQE_INIT CxPlatEventQCleanup(&queue); } TEST(PlatformTest, EventQueueWorker) { + typedef struct EventQueueContext EventQueueContext; + + struct my_sqe : public CXPLAT_SQE { + EventQueueContext* context; + uint32_t data; + }; + struct EventQueueContext { CXPLAT_EVENTQ* queue; uint32_t counts[3]; + bool running; static CXPLAT_THREAD_CALLBACK(EventQueueCallback, Context) { auto ctx = (EventQueueContext*)Context; CXPLAT_CQE events[4]; - while (true) { + while (ctx->running) { uint32_t count = CxPlatEventQDequeue(ctx->queue, events, ARRAYSIZE(events), UINT32_MAX); for (uint32_t i = 0; i < count; i++) { - if (CxPlatCqeUserData(&events[i]) == NULL) goto Exit; - ctx->counts[CxPlatCqeType(events + i)]++; + auto sqe = CxPlatCqeGetSqe(&events[i]); + sqe->Completion(&events[i]); } } - Exit: CXPLAT_THREAD_RETURN(0); } + static void shutdown_completion(CXPLAT_CQE* Cqe) { + auto Sqe = (my_sqe*)CxPlatCqeGetSqe(Cqe); + Sqe->context->running = false; + } + static void my_completion(CXPLAT_CQE* Cqe) { + auto Sqe = (my_sqe*)CxPlatCqeGetSqe(Cqe); + Sqe->context->counts[Sqe->data]++; + } }; - uint32_t user_data1 = 0, user_data2 = 1, user_data3 = 2; - CXPLAT_EVENTQ queue; ASSERT_TRUE(CxPlatEventQInitialize(&queue)); - EventQueueContext context = { &queue, {0} }; + EventQueueContext context = { &queue, {0}, true }; CXPLAT_THREAD_CONFIG config = { 0, 0, NULL, EventQueueContext::EventQueueCallback, &context }; CXPLAT_THREAD thread; ASSERT_TRUE(QUIC_SUCCEEDED(CxPlatThreadCreate(&config, &thread))); - -#ifdef CXPLAT_SQE - CXPLAT_SQE shutdown = CXPLAT_SQE_DEFAULT; - CXPLAT_SQE sqe1 = CXPLAT_SQE_DEFAULT; - CXPLAT_SQE sqe2 = CXPLAT_SQE_DEFAULT; - CXPLAT_SQE sqe3 = CXPLAT_SQE_DEFAULT; -#ifdef CXPLAT_SQE_INIT - ASSERT_TRUE(CxPlatSqeInitialize(&queue, &shutdown, nullptr)); - ASSERT_TRUE(CxPlatSqeInitialize(&queue, &sqe1, &user_data1)); - ASSERT_TRUE(CxPlatSqeInitialize(&queue, &sqe2, &user_data2)); - ASSERT_TRUE(CxPlatSqeInitialize(&queue, &sqe3, &user_data3)); -#endif // CXPLAT_SQE_INIT -#endif // CXPLAT_SQE - - ASSERT_TRUE(CxPlatEventQEnqueue(&queue, &sqe1, &user_data1)); - ASSERT_TRUE(CxPlatEventQEnqueue(&queue, &sqe2, &user_data2)); + + my_sqe shutdown, sqe1, sqe2, sqe3; + shutdown.context = &context; + ASSERT_TRUE(CxPlatSqeInitialize(&queue, EventQueueContext::shutdown_completion, &shutdown)); + sqe1.context = &context; + sqe1.data = 0; + ASSERT_TRUE(CxPlatSqeInitialize(&queue, EventQueueContext::my_completion, &sqe1)); + sqe2.context = &context; + sqe2.data = 1; + ASSERT_TRUE(CxPlatSqeInitialize(&queue, EventQueueContext::my_completion, &sqe2)); + sqe3.context = &context; + sqe3.data = 2; + ASSERT_TRUE(CxPlatSqeInitialize(&queue, EventQueueContext::my_completion, &sqe3)); + + ASSERT_TRUE(CxPlatEventQEnqueue(&queue, &sqe1)); + ASSERT_TRUE(CxPlatEventQEnqueue(&queue, &sqe2)); CxPlatSleep(100); ASSERT_TRUE(context.counts[0] == 1u); ASSERT_TRUE(context.counts[1] == 1u); ASSERT_TRUE(context.counts[2] == 0u); - ASSERT_TRUE(CxPlatEventQEnqueue(&queue, &sqe1, &user_data1)); - ASSERT_TRUE(CxPlatEventQEnqueue(&queue, &sqe2, &user_data2)); - ASSERT_TRUE(CxPlatEventQEnqueue(&queue, &sqe3, &user_data3)); + ASSERT_TRUE(CxPlatEventQEnqueue(&queue, &sqe1)); + ASSERT_TRUE(CxPlatEventQEnqueue(&queue, &sqe2)); + ASSERT_TRUE(CxPlatEventQEnqueue(&queue, &sqe3)); CxPlatSleep(100); ASSERT_TRUE(context.counts[0] == 2u); ASSERT_TRUE(context.counts[1] == 2u); ASSERT_TRUE(context.counts[2] == 1u); - ASSERT_TRUE(CxPlatEventQEnqueue(&queue, &sqe3, &user_data3)); - ASSERT_TRUE(CxPlatEventQEnqueue(&queue, &shutdown, nullptr)); + ASSERT_TRUE(CxPlatEventQEnqueue(&queue, &sqe3)); + ASSERT_TRUE(CxPlatEventQEnqueue(&queue, &shutdown)); CxPlatThreadWait(&thread); CxPlatThreadDelete(&thread); @@ -192,12 +207,10 @@ TEST(PlatformTest, EventQueueWorker) ASSERT_TRUE(context.counts[1] == 2u); ASSERT_TRUE(context.counts[2] == 2u); -#ifdef CXPLAT_SQE_INIT CxPlatSqeCleanup(&queue, &shutdown); CxPlatSqeCleanup(&queue, &sqe1); CxPlatSqeCleanup(&queue, &sqe2); CxPlatSqeCleanup(&queue, &sqe3); -#endif // CXPLAT_SQE_INIT CxPlatEventQCleanup(&queue); }