Skip to content

Commit

Permalink
Copy a42b74ae8139738a14148f94543c659ec2d5b92b
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarred-Sumner committed Jun 24, 2024
1 parent de3ad98 commit 84908fa
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 20 deletions.
51 changes: 42 additions & 9 deletions packages/bun-usockets/src/eventing/epoll_kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -549,12 +549,28 @@ struct us_internal_async *us_internal_create_async(struct us_loop_t *loop, int f
}

cb->machport_buf = us_malloc(MACHPORT_BUF_LEN);
kern_return_t kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &cb->port);
mach_port_t self = mach_task_self();
kern_return_t kr = mach_port_allocate(self, MACH_PORT_RIGHT_RECEIVE, &cb->port);

if (UNLIKELY(kr != KERN_SUCCESS)) {
return NULL;
}

// Insert a send right into the port since we also use this to send
kr = mach_port_insert_right(self, cb->port, cb->port, MACH_MSG_TYPE_MAKE_SEND);
if (UNLIKELY(kr != KERN_SUCCESS)) {
return NULL;
}

// Modify the port queue size to be 1 because we are only
// using it for notifications and not for any other purpose.
mach_port_limits_t limits = { .mpl_qlimit = 1 };
kr = mach_port_set_attributes(self, cb->port, MACH_PORT_LIMITS_INFO, (mach_port_info_t)&limits, MACH_PORT_LIMITS_INFO_COUNT);

if (UNLIKELY(kr != KERN_SUCCESS)) {
return NULL;
}

return (struct us_internal_async *) cb;
}

Expand Down Expand Up @@ -605,15 +621,32 @@ void us_internal_async_wakeup(struct us_internal_async *a) {
mach_msg_empty_send_t message;
memset(&message, 0, sizeof(message));
message.header.msgh_size = sizeof(message);
message.header.msgh_bits = MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_MAKE_SEND_ONCE);
// We use COPY_SEND which will not increment any send ref
// counts because it'll reuse the existing send right.
message.header.msgh_bits = MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_COPY_SEND);
message.header.msgh_remote_port = internal_cb->port;
kern_return_t kr = mach_msg_send(&message.header);
if (kr != KERN_SUCCESS) {
// If us_internal_async_wakeup is being called by other threads faster
// than the pump can dispatch work, the kernel message queue for the wakeup
// port can fill The kernel does return a SEND_ONCE right in the case of
// failure, which must be destroyed to avoid leaking.
mach_msg_destroy(&message.header);
message.header.msgh_local_port = MACH_PORT_NULL;
mach_msg_return_t kr = mach_msg_send(&message.header);

switch (kr) {
case KERN_SUCCESS: {
break;
}

// This means that the send would've blocked because the
// queue is full. We assume success because the port is full.
case MACH_SEND_TIMED_OUT: {
break;
}

// No space means it will wake up.
case MACH_SEND_NO_BUFFER: {
break;
}

default: {
mach_msg_destroy(&message.header);
}
}
}
#endif
Expand Down
39 changes: 28 additions & 11 deletions src/io/io_darwin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,36 @@ extern "C" bool getaddrinfo_send_reply(mach_port_t port,
}

extern "C" bool io_darwin_schedule_wakeup(mach_port_t waker) {
mach_msg_empty_send_t message{};
mach_msg_empty_send_t message;
memset(&message, 0, sizeof(message));
message.header.msgh_size = sizeof(message);
message.header.msgh_bits =
MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_MAKE_SEND_ONCE);
// We use COPY_SEND which will not increment any send ref
// counts because it'll reuse the existing send right.
message.header.msgh_bits = MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_COPY_SEND);
message.header.msgh_remote_port = waker;
kern_return_t kr = mach_msg_send(&message.header);
if (kr != KERN_SUCCESS) {
// If io_darwin_schedule_wakeup() is being called by other threads faster
// than the pump can dispatch work, the kernel message queue for the wakeup
// port can fill The kernel does return a SEND_ONCE right in the case of
// failure, which must be destroyed to avoid leaking.
mach_msg_destroy(&message.header);
return false;
message.header.msgh_local_port = MACH_PORT_NULL;
mach_msg_return_t kr = mach_msg_send(&message.header);

switch (kr) {
case KERN_SUCCESS: {
break;
}

// This means that the send would've blocked because the
// queue is full. We assume success because the port is full.
case MACH_SEND_TIMED_OUT: {
break;
}

// No space means it will wake up.
case MACH_SEND_NO_BUFFER: {
break;
}

default: {
mach_msg_destroy(&message.header);
return false;
}
}

return true;
Expand Down

0 comments on commit 84908fa

Please sign in to comment.