Skip to content

Commit

Permalink
Enable to send/signal different messages to each task. (#1649)
Browse files Browse the repository at this point in the history
  • Loading branch information
Barenboim authored Nov 6, 2024
1 parent 48c71d7 commit fabd556
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 17 deletions.
50 changes: 33 additions & 17 deletions src/factory/WFTaskFactory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ static class __NamedMailboxMap
mailbox_callback_t&& cb);
WFMailboxTask *create(const std::string& name, mailbox_callback_t&& cb);

int send(const std::string& name, void *msg, size_t max);
int send(const std::string& name, void *const msg[], size_t max, int inc);
void send(MailboxList *mailboxes, struct __mailbox_node *node, void *msg);

void remove(MailboxList *mailboxes, struct __mailbox_node *node)
Expand All @@ -552,7 +552,7 @@ static class __NamedMailboxMap
}

private:
bool send_max_locked(MailboxList *mailboxes, void *msg, size_t max,
bool send_max_locked(MailboxList *mailboxes, size_t max,
struct list_head *task_list);
struct rb_root root_;
std::mutex mutex_;
Expand Down Expand Up @@ -622,8 +622,7 @@ WFMailboxTask *__NamedMailboxMap::create(const std::string& name,
return task;
}

bool __NamedMailboxMap::send_max_locked(MailboxList *mailboxes,
void *msg, size_t max,
bool __NamedMailboxMap::send_max_locked(MailboxList *mailboxes, size_t max,
struct list_head *task_list)
{
if (max == (size_t)-1)
Expand All @@ -644,7 +643,8 @@ bool __NamedMailboxMap::send_max_locked(MailboxList *mailboxes,
return true;
}

int __NamedMailboxMap::send(const std::string& name, void *msg, size_t max)
int __NamedMailboxMap::send(const std::string& name, void *const msg[],
size_t max, int inc)
{
LIST_HEAD(task_list);
struct __mailbox_node *node;
Expand All @@ -655,7 +655,7 @@ int __NamedMailboxMap::send(const std::string& name, void *msg, size_t max)
mutex_.lock();
mailboxes = __get_object_list<MailboxList>(name, &root_, false);
if (mailboxes)
erased = send_max_locked(mailboxes, msg, max, &task_list);
erased = send_max_locked(mailboxes, max, &task_list);

mutex_.unlock();
if (erased)
Expand All @@ -665,7 +665,8 @@ int __NamedMailboxMap::send(const std::string& name, void *msg, size_t max)
{
node = list_entry(task_list.next, struct __mailbox_node, list);
list_del(&node->list);
node->task->WFMailboxTask::send(msg);
node->task->WFMailboxTask::send(*msg);
msg += inc;
ret++;
}

Expand Down Expand Up @@ -700,9 +701,16 @@ WFMailboxTask *WFTaskFactory::create_mailbox_task(const std::string& name,
return __mailbox_map.create(name, std::move(callback));
}

int WFTaskFactory::send_by_name(const std::string& name, void *msg, size_t max)
int WFTaskFactory::send_by_name(const std::string& name, void *msg,
size_t max)
{
return __mailbox_map.send(name, msg, max);
return __mailbox_map.send(name, &msg, max, 0);
}

int WFTaskFactory::send_by_name(const std::string& name, void *const msg[],
size_t max)
{
return __mailbox_map.send(name, msg, max, 1);
}

/****************** Named Conditional ******************/
Expand All @@ -725,7 +733,7 @@ static class __NamedConditionalMap
void **msgbuf);
WFConditional *create(const std::string& name, SubTask *task);

int signal(const std::string& name, void *msg, size_t max);
int signal(const std::string& name, void *const msg[], size_t max, int inc);
void signal(ConditionalList *conds, struct __conditional_node *node,
void *msg);

Expand All @@ -741,7 +749,7 @@ static class __NamedConditionalMap
}

private:
bool signal_max_locked(ConditionalList *conds, void *msg, size_t max,
bool signal_max_locked(ConditionalList *conds, size_t max,
struct list_head *cond_list);
struct rb_root root_;
std::mutex mutex_;
Expand Down Expand Up @@ -811,7 +819,7 @@ WFConditional *__NamedConditionalMap::create(const std::string& name,
}

bool __NamedConditionalMap::signal_max_locked(ConditionalList *conds,
void *msg, size_t max,
size_t max,
struct list_head *cond_list)
{
if (max == (size_t)-1)
Expand All @@ -832,7 +840,8 @@ bool __NamedConditionalMap::signal_max_locked(ConditionalList *conds,
return true;
}

int __NamedConditionalMap::signal(const std::string& name, void *msg, size_t max)
int __NamedConditionalMap::signal(const std::string& name, void *const msg[],
size_t max, int inc)
{
LIST_HEAD(cond_list);
struct __conditional_node *node;
Expand All @@ -843,7 +852,7 @@ int __NamedConditionalMap::signal(const std::string& name, void *msg, size_t max
mutex_.lock();
conds = __get_object_list<ConditionalList>(name, &root_, false);
if (conds)
erased = signal_max_locked(conds, msg, max, &cond_list);
erased = signal_max_locked(conds, max, &cond_list);

mutex_.unlock();
if (erased)
Expand All @@ -853,7 +862,8 @@ int __NamedConditionalMap::signal(const std::string& name, void *msg, size_t max
{
node = list_entry(cond_list.next, struct __conditional_node, list);
list_del(&node->list);
node->cond->WFConditional::signal(msg);
node->cond->WFConditional::signal(*msg);
msg += inc;
ret++;
}

Expand Down Expand Up @@ -888,9 +898,15 @@ WFConditional *WFTaskFactory::create_conditional(const std::string& name,
}

int WFTaskFactory::signal_by_name(const std::string& name, void *msg,
size_t max)
size_t max)
{
return __conditional_map.signal(name, &msg, max, 0);
}

int WFTaskFactory::signal_by_name(const std::string& name, void *const msg[],
size_t max)
{
return __conditional_map.signal(name, msg, max);
return __conditional_map.signal(name, msg, max, 1);
}

/****************** Named Guard ******************/
Expand Down
6 changes: 6 additions & 0 deletions src/factory/WFTaskFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ class WFTaskFactory
static int send_by_name(const std::string& mailbox_name, void *msg,
size_t max);

static int send_by_name(const std::string& mailbox_name, void *const msg[],
size_t max);

public:
static WFSelectorTask *create_selector_task(size_t candidates,
selector_callback_t callback)
Expand Down Expand Up @@ -330,6 +333,9 @@ class WFTaskFactory
static int signal_by_name(const std::string& cond_name, void *msg,
size_t max);

static int signal_by_name(const std::string& cond_name, void *const msg[],
size_t max);

public:
static WFConditional *create_guard(const std::string& resource_name,
SubTask *task);
Expand Down

0 comments on commit fabd556

Please sign in to comment.