Skip to content

Commit

Permalink
feature: support socket in some block phase
Browse files Browse the repository at this point in the history
  • Loading branch information
rainingmaster committed Nov 18, 2020
1 parent fdf752d commit 7b2ce35
Show file tree
Hide file tree
Showing 8 changed files with 729 additions and 5 deletions.
13 changes: 13 additions & 0 deletions config
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,18 @@ HTTP_LUA_SRCS=" \
$ngx_addon_dir/src/ngx_http_lua_pipe.c \
"

if [ $EVENT_MODULES =~ "ngx_epoll_module" ]; then
HTTP_LUA_SRCS="$HTTP_LUA_SRCS $ngx_addon_dir/src/event/ngx_http_lua_poll.c"
fi

if [ $EVENT_MODULES =~ "ngx_poll_module" ]; then
HTTP_LUA_SRCS="$HTTP_LUA_SRCS $ngx_addon_dir/src/event/ngx_http_lua_poll.c"
fi

if [ $EVENT_MODULES =~ "ngx_kqueue_module" ]; then
HTTP_LUA_SRCS="$HTTP_LUA_SRCS $ngx_addon_dir/src/event/ngx_http_lua_kqueue.c"
fi

HTTP_LUA_DEPS=" \
$ngx_addon_dir/src/ddebug.h \
$ngx_addon_dir/src/ngx_http_lua_autoconf.h \
Expand Down Expand Up @@ -355,6 +367,7 @@ HTTP_LUA_DEPS=" \
$ngx_addon_dir/src/ngx_http_lua_log_ringbuf.h \
$ngx_addon_dir/src/ngx_http_lua_input_filters.h \
$ngx_addon_dir/src/ngx_http_lua_pipe.h \
$ngx_addon_dir/src/ngx_http_lua_event.h \
"

# ----------------------------------------
Expand Down
180 changes: 180 additions & 0 deletions src/event/ngx_http_lua_epoll.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@

/*
* Copyright (C) Yichun Zhang (agentzh)
*/


#include <ngx_core.h>
#include <ngx_event.h>
#include <ngx_http.h>
#include "../ngx_http_lua_event.h"


static ngx_int_t ngx_http_lua_epoll_init_event(ngx_conf_t *cf);

static void ngx_http_lua_epoll_set_event(ngx_event_t *ev, ngx_int_t event);

static ngx_int_t ngx_http_lua_epoll_process_events(ngx_http_request_t *r,
ngx_msec_t timer);

static int ep = -1;
static struct epoll_event event_list[1];

ngx_http_lua_event_actions_t ngx_http_lua_epoll = {
ngx_http_lua_epoll_init_event,
ngx_http_lua_epoll_set_event,
ngx_http_lua_epoll_process_events,
};


static ngx_int_t
ngx_http_lua_epoll_init_event(ngx_conf_t *cf)
{
ep = epoll_create(cycle->connection_n / 2);

if (ep == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"epoll_create() failed");
return NGX_ERROR;
}

return NGX_OK;
}


static void
ngx_http_lua_epoll_set_event(ngx_event_t *ev, ngx_int_t event)
{
int op;
uint32_t events, prev;
ngx_event_t *e;
ngx_connection_t *c;
struct epoll_event ee;

c = ev->data;

events = (uint32_t) event;

if (event == NGX_READ_EVENT) {
e = c->write;
prev = EPOLLOUT;
#if (NGX_READ_EVENT != EPOLLIN|EPOLLRDHUP)
events = EPOLLIN|EPOLLRDHUP;
#endif

} else {
e = c->read;
prev = EPOLLIN|EPOLLRDHUP;
#if (NGX_WRITE_EVENT != EPOLLOUT)
events = EPOLLOUT;
#endif
}

if (e->active) {
op = EPOLL_CTL_MOD;
events |= prev;

} else {
op = EPOLL_CTL_ADD;
}

#if (NGX_HAVE_EPOLLEXCLUSIVE && NGX_HAVE_EPOLLRDHUP)
if (flags & NGX_EXCLUSIVE_EVENT) {
events &= ~EPOLLRDHUP;
}
#endif

ee.events = events | (uint32_t) flags;
ee.data.ptr = (void *) ((uintptr_t) c | ev->instance);

if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
"epoll_ctl(%d, %d) failed", op, c->fd);
return NGX_ERROR;
}

ev->active = 1;

return NGX_OK;
}


static ngx_int_t
ngx_http_lua_epoll_process_events(ngx_http_request_t *r, ngx_msec_t timer)
{
int events;
uint32_t revents;
ngx_int_t instance, i;
ngx_uint_t level;
ngx_err_t err;
ngx_event_t *rev, *wev;
ngx_queue_t *queue;
ngx_connection_t *c;

events = epoll_wait(ep, event_list, (int) nevents, timer);

err = (events == -1) ? ngx_errno : 0;

if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
ngx_time_update();
}

if (err) {
ngx_log_error(NGX_LOG_ALERT, r->connection->log, err,
"epoll_wait() failed");

return NGX_ERROR;
}

if (events == 0) {
ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
"epoll_wait() returned no events without timeout");

return NGX_ERROR;
}

c = event_list[0].data.ptr;

instance = (uintptr_t) c & 1;
c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);

revents = event_list[0].events;

if (revents & (EPOLLERR|EPOLLHUP)) {
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll_wait() error on fd:%d ev:%04XD",
c->fd, revents);

/*
* if the error events were returned, add EPOLLIN and EPOLLOUT
* to handle the events at least in one active handler
*/

revents |= EPOLLIN|EPOLLOUT;
}

rev = c->read;

if ((revents & EPOLLIN) && rev->active) {

#if (NGX_HAVE_EPOLLRDHUP)
if (revents & EPOLLRDHUP) {
rev->pending_eof = 1;
}
#endif

rev->ready = 1;
rev->available = -1;
}

wev = c->write;

if ((revents & EPOLLOUT) && wev->active) {
wev->ready = 1;
#if (NGX_THREADS)
wev->complete = 1;
#endif
}

return NGX_OK;
}
110 changes: 110 additions & 0 deletions src/event/ngx_http_lua_kqueue.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@

/*
* Copyright (C) Yichun Zhang (agentzh)
*/


#include <ngx_core.h>
#include <ngx_event.h>
#include <ngx_http.h>
#include "../ngx_http_lua_event.h"


static ngx_int_t ngx_http_lua_kqueue_init_event(ngx_conf_t *cf);

static void ngx_http_lua_kqueue_set_event(ngx_event_t *ev, ngx_int_t event);

static ngx_int_t ngx_http_lua_kqueue_process_events(ngx_http_request_t *r,
ngx_msec_t timer);

int ngx_lua_kqueue = -1;
static struct kevent change_list[1];
static struct kevent event_list[1];

ngx_http_lua_event_actions_t ngx_http_lua_kqueue = {
ngx_http_lua_kqueue_init_event,
ngx_http_lua_kqueue_set_event,
ngx_http_lua_kqueue_process_events,
};


static ngx_int_t
ngx_http_lua_kqueue_init_event(ngx_conf_t *cf)
{
if (ngx_lua_kqueue == -1) {
ngx_lua_kqueue = kqueue();

if (ngx_lua_kqueue == -1) {
ngx_conf_log_error(NGX_LOG_ALERT, cf, 0, "kqueue() failed");

return NGX_ERROR;
}
}

return NGX_OK;
}


static void
ngx_http_lua_kqueue_set_event(ngx_event_t *ev, ngx_int_t event)
{
struct kevent *kev;
ngx_connection_t *c;

c = ev->data;

ev->active = 1;

kev = &change_list[0];

kev->ident = c->fd;
kev->filter = (short) event;
kev->flags = EV_ADD|EV_ENABLE;
kev->udata = NGX_KQUEUE_UDATA_T ((uintptr_t) ev | ev->instance);
}


static ngx_int_t
ngx_http_lua_kqueue_process_events(ngx_http_request_t *r, ngx_msec_t timer)
{
int events;
struct timespec ts;
ngx_event_t *ev;
ngx_int_t instance;
ngx_err_t err;

ts.tv_sec = timer / 1000;
ts.tv_nsec = (timer % 1000) * 1000000;

events = kevent(ngx_lua_kqueue, change_list, 1, event_list, 1, &ts);

err = (events == -1) ? ngx_errno : 0;

if (err) {
ngx_log_error(NGX_LOG_ALERT, r->connection->log, err,
"kevent() failed");

return NGX_ERROR;
}

if (events == 0) {
ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
"kevent() returned no events without timeout");

return NGX_ERROR;
}

ev = (ngx_event_t *) event_list[0].udata;
instance = (uintptr_t) ev & 1;
ev = (ngx_event_t *) ((uintptr_t) ev & (uintptr_t) ~1);

ev->available = event_list[0].data;

if (event_list[0].flags & EV_EOF) {
ev->pending_eof = 1;
}

ev->ready = 1;

return NGX_OK;
}
Loading

0 comments on commit 7b2ce35

Please sign in to comment.