Skip to content

Commit

Permalink
Simplify service session's state.
Browse files Browse the repository at this point in the history
  • Loading branch information
Barenboim committed Jan 1, 2025
1 parent 75c662c commit b7a8c6a
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 26 deletions.
44 changes: 19 additions & 25 deletions src/kernel/Communicator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ int CommService::init(const struct sockaddr *bind_addr, socklen_t addrlen,
this->addrlen = addrlen;
this->listen_timeout = listen_timeout;
this->response_timeout = response_timeout;
INIT_LIST_HEAD(&this->alive_list);
INIT_LIST_HEAD(&this->keep_alive_list);

this->ssl_ctx = NULL;
this->ssl_accept_timeout = 0;
Expand Down Expand Up @@ -253,9 +253,9 @@ int CommService::drain(int max)

errno_bak = errno;
pthread_mutex_lock(&this->mutex);
while (cnt != max && !list_empty(&this->alive_list))
while (cnt != max && !list_empty(&this->keep_alive_list))
{
pos = this->alive_list.next;
pos = this->keep_alive_list.prev;
entry = list_entry(pos, struct CommConnEntry, list);
list_del(pos);
cnt++;
Expand Down Expand Up @@ -309,13 +309,6 @@ class CommServiceTarget : public CommTarget
private:
CommService *service;

private:
virtual int create_connect_fd()
{
errno = EPERM;
return -1;
}

friend class Communicator;
};

Expand Down Expand Up @@ -359,7 +352,7 @@ CommSession::~CommSession()
return;

target = (CommServiceTarget *)this->target;
if (this->passive == 2)
if (!this->out && target->has_idle_conn())
target->shutdown();

target->decref();
Expand Down Expand Up @@ -485,7 +478,7 @@ int Communicator::send_message_sync(struct iovec vectors[], int cnt,
if (service->listen_fd >= 0)
{
entry->state = CONN_STATE_KEEPALIVE;
list_add_tail(&entry->list, &service->alive_list);
list_add(&entry->list, &service->keep_alive_list);
entry = NULL;
}

Expand Down Expand Up @@ -607,7 +600,6 @@ void Communicator::handle_incoming_request(struct poller_result *res)
{
case PR_ST_SUCCESS:
session = entry->session;
session->passive = 2;
state = CS_STATE_TOREPLY;
pthread_mutex_lock(&target->mutex);
if (entry->state == CONN_STATE_SUCCESS)
Expand Down Expand Up @@ -649,7 +641,6 @@ void Communicator::handle_incoming_request(struct poller_result *res)
state = CS_STATE_ERROR;
case CONN_STATE_RECEIVING:
session = entry->session;
session->passive = 3;
break;

case CONN_STATE_SUCCESS:
Expand Down Expand Up @@ -795,7 +786,7 @@ void Communicator::handle_reply_result(struct poller_result *res)
if (!this->stop_flag && service->listen_fd >= 0)
{
entry->state = CONN_STATE_KEEPALIVE;
list_add_tail(&entry->list, &service->alive_list);
list_add(&entry->list, &service->keep_alive_list);
}
else
{
Expand Down Expand Up @@ -1083,15 +1074,13 @@ void Communicator::handle_recvfrom_result(struct poller_result *res)
target = entry->target;
if (entry->state == CONN_STATE_SUCCESS)
{
session->passive = 2;
state = CS_STATE_TOREPLY;
error = 0;
entry->state = CONN_STATE_IDLE;
list_add(&entry->list, &target->idle_list);
}
else
{
session->passive = 3;
state = CS_STATE_ERROR;
if (entry->state == CONN_STATE_ERROR)
error = entry->error;
Expand Down Expand Up @@ -1974,14 +1963,19 @@ int Communicator::reply(CommSession *session)
int errno_bak;
int ret;

if (session->passive != 2)
if (!session->passive)
{
errno = EINVAL;
return -1;
}

if (session->out)
{
errno = ENOENT;
return -1;
}

errno_bak = errno;
session->passive = 3;
target = (CommServiceTarget *)session->target;
if (target->service->reliable)
ret = this->reply_reliable(session, target);
Expand Down Expand Up @@ -2023,9 +2017,8 @@ int Communicator::push(const void *buf, size_t size, CommSession *session)
mutex = &in->entry->mutex;

pthread_mutex_lock(mutex);
if ((session->passive == 2 && !list_empty(&session->target->idle_list)) ||
(!session->passive && in->entry->session == session) ||
session->passive == 1)
if ((!session->passive || session->target->has_idle_conn()) &&
in->entry->session == session)
{
ret = in->inner()->feedback(buf, size);
}
Expand All @@ -2043,15 +2036,14 @@ int Communicator::shutdown(CommSession *session)
{
CommServiceTarget *target;

if (session->passive != 2)
if (!session->passive)
{
errno = EINVAL;
return -1;
}

session->passive = 3;
target = (CommServiceTarget *)session->target;
if (!target->shutdown())
if (session->out || !target->shutdown())
{
errno = ENOENT;
return -1;
Expand All @@ -2068,7 +2060,9 @@ int Communicator::sleep(SleepSession *session)
{
if (mpoller_add_timer(&value, session, &session->timer, &session->index,
this->mpoller) >= 0)
{
return 0;
}
}

return -1;
Expand Down
2 changes: 1 addition & 1 deletion src/kernel/Communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ class CommService
int ref;

private:
struct list_head alive_list;
struct list_head keep_alive_list;
pthread_mutex_t mutex;

public:
Expand Down

0 comments on commit b7a8c6a

Please sign in to comment.