Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] EventLoop: Socket now closed only after event loop done polling #815

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/adapters/libevent.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,17 @@ natsLibevent_Attach(void **userData, void *loop, natsConnection *nc, natsSock so
return s;
}

static void
_closeCb(evutil_socket_t fd, short event, void *arg)
{
natsSock socket = (natsSock) fd;

// We have stopped polling for the "READ" event and are now in the
// event loop thread and invoke this so that the NATS C client
// library can proceed with the close of the socket/connection.
natsConnection_ProcessCloseEvent(&socket);
}

/** \brief Start or stop polling on READ events.
*
* This callback is invoked to notify that the event library should start
Expand All @@ -175,7 +186,16 @@ natsLibevent_Read(void *userData, bool add)
if (add)
res = event_add(nle->read, NULL);
else
{
int socket = event_get_fd(nle->read);
res = event_del_noblock(nle->read);
if (res == 0)
{
// This will schedule a one-time event that guarantees that the
// callback `_closeCb` will be invoked from the event loop thread.
res = event_base_once(nle->loop, socket, EV_TIMEOUT, _closeCb, (void*) nle, NULL);
}
}

return (res == 0 ? NATS_OK : NATS_ERR);
}
Expand Down
9 changes: 9 additions & 0 deletions src/adapters/libuv.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,16 @@ uvPollUpdate(natsLibuvEvents *nle, int eventType, bool add)
if (nle->events)
res = uv_poll_start(nle->handle, nle->events, natsLibuvPoll);
else
{
res = uv_poll_stop(nle->handle);
if (res == 0)
{
// We have stopped polling for events for this socket and are in
// the event loop thread, so we invoke this so that the NATS C
// client library can proceed with closing the socket.
natsConnection_ProcessCloseEvent(&(nle->socket));
}
}

if (res != 0)
return NATS_ERR;
Expand Down
60 changes: 45 additions & 15 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -2146,9 +2146,21 @@ _evStopPolling(natsConnection *nc)

nc->sockCtx.useEventLoop = false;
nc->el.writeAdded = false;
s = nc->opts->evCbs.read(nc->el.data, NATS_EVENT_ACTION_REMOVE);
// The "write" event is added and removed as we write, however, we always
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Right, matching the changes in natsLibevent_Read, thx for the explanation comment!

// have the "read" event added to the event loop. Removing it signals that
// the connection is closed and so the event loop adapter can then invoke
// natsConnection_ProcessCloseEvent() when the event loop is done polling
// the event. So we will remove "write" first, then finish with "read".
s = nc->opts->evCbs.write(nc->el.data, NATS_EVENT_ACTION_REMOVE);
if (s == NATS_OK)
s = nc->opts->evCbs.write(nc->el.data, NATS_EVENT_ACTION_REMOVE);
s = nc->opts->evCbs.read(nc->el.data, NATS_EVENT_ACTION_REMOVE);
if (s == NATS_OK)
{
// We can't close the socket here, but we will mark as invalid and
// clear SSL object if applicable.
nc->sockCtx.fd = NATS_SOCK_INVALID;
_clearSSL(nc);
}

return s;
}
Expand Down Expand Up @@ -2187,6 +2199,7 @@ _processOpError(natsConnection *nc, natsStatus s, bool initialConnect)
SET_WRITE_DEADLINE(nc);
natsConn_bufferFlush(nc);

// Shutdown the socket to stop any read/write operations.
natsSock_Shutdown(nc->sockCtx.fd);
nc->sockCtx.fdActive = false;
}
Expand All @@ -2195,12 +2208,10 @@ _processOpError(natsConnection *nc, natsStatus s, bool initialConnect)
// on the socket since we are going to reconnect.
if (nc->el.attached)
{
// This will take care of invalidating the socket and clear SSL,
// but the actual socket close will be done from the event loop
// adapter by calling natsConnection_ProcessCloseEvent().
ls = _evStopPolling(nc);
natsSock_Close(nc->sockCtx.fd);
nc->sockCtx.fd = NATS_SOCK_INVALID;

// We need to cleanup some things if the connection was SSL.
_clearSSL(nc);
}

// Fail pending flush requests.
Expand Down Expand Up @@ -2579,13 +2590,20 @@ _close(natsConnection *nc, natsConnStatus status, bool fromPublicClose, bool doC
{
// If event loop attached, stop polling...
if (nc->el.attached)
{
// This will take care of invalidating the socket and clear SSL,
// but the actual socket close will be done from the event loop
// adapter by calling natsConnection_ProcessCloseEvent().
_evStopPolling(nc);
}
else
{
natsSock_Close(nc->sockCtx.fd);
nc->sockCtx.fd = NATS_SOCK_INVALID;

natsSock_Close(nc->sockCtx.fd);
nc->sockCtx.fd = NATS_SOCK_INVALID;

// We need to cleanup some things if the connection was SSL.
_clearSSL(nc);
// We need to cleanup some things if the connection was SSL.
_clearSSL(nc);
}
}
else
{
Expand Down Expand Up @@ -3411,6 +3429,7 @@ natsConnection_Reconnect(natsConnection *nc)
natsSock_Shutdown(nc->sockCtx.fd);

natsConn_Unlock(nc);

return NATS_OK;
}

Expand Down Expand Up @@ -4098,13 +4117,16 @@ natsConnection_ProcessReadEvent(natsConnection *nc)
buffer = nc->el.buffer;
size = nc->opts->ioBufSize;

natsConn_Unlock(nc);

// Do not try to read again here on success. If more than one connection
// is attached to the same loop, and there is a constant stream of data
// coming for the first connection, this would starve the second connection.
// So return and we will be called back later by the event loop.

// This needs to be protected by the connection lock. We are here because
// there is a read event, so we will gather some data in natsSock_Read()
// but not wait there.
s = natsSock_Read(&(nc->sockCtx), buffer, size, &n);
natsConn_Unlock(nc);
if (s == NATS_OK)
s = natsParser_Parse(nc, buffer, n);

Expand Down Expand Up @@ -4159,8 +4181,16 @@ natsConnection_ProcessWriteEvent(natsConnection *nc)

if (s != NATS_OK)
_processOpError(nc, s, false);
}

void
natsConnection_ProcessCloseEvent(natsSock *socket)
{
if ((socket == NULL) || (*socket == NATS_SOCK_INVALID))
return;

(void) NATS_UPDATE_ERR_STACK(s);
natsSock_Close(*socket);
*socket = NATS_SOCK_INVALID;
}

natsStatus
Expand Down
14 changes: 14 additions & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -4194,6 +4194,20 @@ natsConnection_Reconnect(natsConnection *nc);
NATS_EXTERN void
natsConnection_ProcessReadEvent(natsConnection *nc);

/** \brief Process a socket close event when using external event loop.
*
* When using an external event loop, and the library wants to close
* the connection, the event loop adapter will ensure that the event
* loop library stops polling, and then will invoke this function
* so that the socket can be safely closed.
*
* @param socket the pointer to the #natsSock object.
*
* \warning This API is reserved for external event loop adapters.
*/
NATS_EXTERN void
natsConnection_ProcessCloseEvent(natsSock *socket);

/** \brief Process a write event when using external event loop.
*
* When using an external event loop, and the callback indicating that
Expand Down
Loading