diff --git a/src/watcher/async.zig b/src/watcher/async.zig index b28edd0..910470d 100644 --- a/src/watcher/async.zig +++ b/src/watcher/async.zig @@ -135,6 +135,18 @@ fn AsyncMachPort(comptime xev: type) type { /// The error that can come in the wait callback. pub const WaitError = xev.Sys.MachPortError; + /// Missing Mach APIs from Zig stdlib. Data from xnu: osfmk/mach/port.h + const mach_port_flavor_t = c_int; + const mach_port_limits = extern struct { mpl_qlimit: c_uint }; + const MACH_PORT_LIMITS_INFO = 1; + extern "c" fn mach_port_set_attributes( + task: posix.system.ipc_space_t, + name: posix.system.mach_port_name_t, + flavor: mach_port_flavor_t, + info: *anyopaque, + count: posix.system.mach_msg_type_number_t, + ) posix.system.kern_return_t; + /// The mach port port: posix.system.mach_port_name_t, @@ -155,6 +167,31 @@ fn AsyncMachPort(comptime xev: type) type { } errdefer _ = posix.system.mach_port_deallocate(mach_self, mach_port); + // Insert a send right into the port since we also use this to send + switch (posix.system.getKernError(posix.system.mach_port_insert_right( + mach_self, + mach_port, + mach_port, + @intFromEnum(posix.system.MACH_MSG_TYPE.MAKE_SEND), + ))) { + .SUCCESS => {}, // Success + else => return error.MachPortAllocFailed, + } + + // Modify the port queue size to be 1 because we are only + // using it for notifications and not for any other purpose. + var limits: mach_port_limits = .{ .mpl_qlimit = 1 }; + switch (posix.system.getKernError(mach_port_set_attributes( + mach_self, + mach_port, + MACH_PORT_LIMITS_INFO, + &limits, + @sizeOf(@TypeOf(limits)), + ))) { + .SUCCESS => {}, // Success + else => return error.MachPortAllocFailed, + } + return .{ .port = mach_port, }; @@ -266,7 +303,9 @@ fn AsyncMachPort(comptime xev: type) type { pub fn notify(self: Self) !void { // This constructs an empty mach message. It has no data. var msg: posix.system.mach_msg_header_t = .{ - .msgh_bits = @intFromEnum(posix.system.MACH_MSG_TYPE.MAKE_SEND_ONCE), + // We use COPY_SEND which will not increment any send ref + // counts because it'll reuse the existing send right. + .msgh_bits = @intFromEnum(posix.system.MACH_MSG_TYPE.COPY_SEND), .msgh_size = @sizeOf(posix.system.mach_msg_header_t), .msgh_remote_port = self.port, .msgh_local_port = posix.system.MACH_PORT_NULL, @@ -277,11 +316,11 @@ fn AsyncMachPort(comptime xev: type) type { return switch (posix.system.getMachMsgError( posix.system.mach_msg( &msg, - posix.system.MACH_SEND_MSG, + posix.system.MACH_SEND_MSG | posix.system.MACH_SEND_TIMEOUT, msg.msgh_size, 0, posix.system.MACH_PORT_NULL, - posix.system.MACH_MSG_TIMEOUT_NONE, + 0, // Fail instantly if the port is full posix.system.MACH_PORT_NULL, ), )) { @@ -294,6 +333,10 @@ fn AsyncMachPort(comptime xev: type) type { // This is okay because it means that there was no more buffer // space meaning that the port will wake up. .SEND_NO_BUFFER => {}, + + // This means that the send would've blocked because the + // queue is full. We assume success because the port is full. + .SEND_TIMED_OUT => {}, }; }