Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add Robust Error Checking to Library and Worker's Communications #3974

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions poncho/src/poncho/library_network_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,16 +379,18 @@ def main():
if context_vars:
(load_variable_from_library.__globals__).update(context_vars)

# set execution mode of functions in this library
global exec_method
exec_method = library_info['exec_mode']

# send configuration of library, just its name for now
config = {
"name": library_info['library_name']
"name": library_info['library_name'],
"taskid": args.task_id,
"exec_mode": exec_method,
}
send_configuration(config, out_pipe_fd, args.worker_pid)

# set execution mode of functions in this library
global exec_method
exec_method = library_info['exec_mode']

# register sigchld handler to turn a sigchld signal into an I/O event
signal.signal(signal.SIGCHLD, sigchld_handler)

Expand Down
7 changes: 5 additions & 2 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -1461,9 +1461,10 @@ static vine_msg_code_t handle_taskvine(struct vine_manager *q, struct vine_worke
{
char items[4][VINE_LINE_MAX];
int worker_protocol;
int worker_library_protocol;

int n = sscanf(line, "taskvine %d %s %s %s %s", &worker_protocol, items[0], items[1], items[2], items[3]);
if (n != 5)
int n = sscanf(line, "taskvine %d %d %s %s %s %s", &worker_protocol, &worker_library_protocol, items[0], items[1], items[2], items[3]);
if (n != 6)
return VINE_MSG_FAILURE;

if (worker_protocol != VINE_PROTOCOL_VERSION) {
Expand All @@ -1472,6 +1473,8 @@ static vine_msg_code_t handle_taskvine(struct vine_manager *q, struct vine_worke
return VINE_MSG_FAILURE;
}

w->library_protocol_version = worker_library_protocol;

if (w->hostname)
free(w->hostname);
if (w->os)
Expand Down
1 change: 1 addition & 0 deletions taskvine/src/manager/vine_manager_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ vine_result_code_t vine_manager_put_task(
if (t->provides_library) {
vine_manager_send(q, w, "provides_library %s\n", t->provides_library);
vine_manager_send(q, w, "function_slots %d\n", t->function_slots_total);
vine_manager_send(q, w, "func_exec_mode %d\n", t->func_exec_mode);
}

vine_manager_send(q, w, "category %s\n", t->category);
Expand Down
4 changes: 3 additions & 1 deletion taskvine/src/manager/vine_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ worker, and catalog, but should not be visible to the public user API.
#ifndef VINE_PROTOCOL_H
#define VINE_PROTOCOL_H

#define VINE_PROTOCOL_VERSION 11
#define VINE_PROTOCOL_VERSION 12

#define VINE_LIBRARY_PROTOCOL_VERSION 1

#define VINE_LINE_MAX 4096 /**< Maximum length of a vine message line. */

Expand Down
7 changes: 7 additions & 0 deletions taskvine/src/manager/vine_schedule.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ See the file COPYING for details.
#include "vine_file.h"
#include "vine_file_replica.h"
#include "vine_mount.h"
#include "vine_protocol.h"

#include "debug.h"
#include "hash_table.h"
Expand Down Expand Up @@ -203,6 +204,12 @@ int check_worker_against_task(struct vine_manager *q, struct vine_worker_info *w
return 0;
}

/* Check if the library and the worker have the same library protocol version. */
if (t->provides_library && (w->library_protocol_version != VINE_LIBRARY_PROTOCOL_VERSION)) {
debug(D_VINE, "Worker %s can't run library with id %d due to mismatched library protocol version.", w->workerid, t->task_id);
return 0;
}

/* Compute the resources to allocate to this task. */
struct rmsummary *l = vine_manager_choose_resources_for_task(q, w, t);

Expand Down
30 changes: 25 additions & 5 deletions taskvine/src/manager/vine_task.c
Original file line number Diff line number Diff line change
Expand Up @@ -328,11 +328,7 @@ void vine_task_set_function_exec_mode(struct vine_task *t, vine_task_func_exec_m
void vine_task_set_function_exec_mode_from_string(struct vine_task *t, const char *exec_mode)
{
if (exec_mode && t->provides_library) {
if (!strncmp(exec_mode, "fork", strlen("fork"))) {
t->func_exec_mode = VINE_TASK_FUNC_EXEC_MODE_FORK;
} else {
t->func_exec_mode = VINE_TASK_FUNC_EXEC_MODE_DIRECT;
}
t->func_exec_mode = vine_task_func_exec_mode_from_string(exec_mode);
}
}

Expand Down Expand Up @@ -1005,3 +1001,27 @@ char *vine_task_to_json(struct vine_task *t)
buffer_free(&b);
return json;
}

/* Converts an int64_t to a valid vine_task_func_exec_mode_t.
* Returns VINE_TASK_FUNC_EXEC_MODE_INVALID if there's no valid mode for the integer. */
vine_task_func_exec_mode_t vine_task_func_exec_mode_from_int64_t(int64_t n) {
if (n == 1) {
return VINE_TASK_FUNC_EXEC_MODE_DIRECT;
}
if (n == 2) {
return VINE_TASK_FUNC_EXEC_MODE_FORK;
}
return VINE_TASK_FUNC_EXEC_MODE_INVALID;
}

/* Converts a string to a valid vine_task_func_exec_mode_t.
* Returns VINE_TASK_FUNC_EXEC_MODE_INVALID if there's no valid mode for the string. */
vine_task_func_exec_mode_t vine_task_func_exec_mode_from_string(const char *exec_mode) {
if (!strncmp(exec_mode, "direct", strlen("direct"))) {
return VINE_TASK_FUNC_EXEC_MODE_DIRECT;
}
if (!strncmp(exec_mode, "fork", strlen("fork"))) {
return VINE_TASK_FUNC_EXEC_MODE_FORK;
}
return VINE_TASK_FUNC_EXEC_MODE_INVALID;
}
3 changes: 3 additions & 0 deletions taskvine/src/manager/vine_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ const char *vine_task_state_to_string( vine_task_state_t task_state );
struct jx * vine_task_to_jx( struct vine_manager *q, struct vine_task *t );
char * vine_task_to_json(struct vine_task *t);

vine_task_func_exec_mode_t vine_task_func_exec_mode_from_int64_t(int64_t n);
vine_task_func_exec_mode_t vine_task_func_exec_mode_from_string(const char *exec_mode);


/** Attach an input or outputs to tasks without declaring files to manager.
* Only really useful at the worker where tasks are created without a manager. */
Expand Down
3 changes: 3 additions & 0 deletions taskvine/src/manager/vine_worker_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ struct vine_worker_info {
/* Connection to the worker or other client. */
struct link *link;

/* Library protocol version of this worker. */
int library_protocol_version;

/* Static properties reported by worker when it connects. */
char *hostname;
char *os;
Expand Down
35 changes: 29 additions & 6 deletions taskvine/src/worker/vine_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -564,8 +564,9 @@ static void report_worker_ready(struct link *manager)
}

send_async_message(manager,
"taskvine %d %s %s %s %d.%d.%d\n",
"taskvine %d %d %s %s %s %d.%d.%d\n",
VINE_PROTOCOL_VERSION,
VINE_LIBRARY_PROTOCOL_VERSION,
hostname,
options->os_name,
options->arch_name,
Expand Down Expand Up @@ -865,6 +866,14 @@ static struct vine_task *do_task_body(struct link *manager, int task_id, time_t
task->function_slots_requested = n;
/* Also set the total number determined by the manager. */
task->function_slots_total = n;
} else if (sscanf(line, "func_exec_mode %" PRId64, &n) == 1) {
func_exec_mode = vine_task_func_exec_mode_from_int64_t(n);
if (func_exec_mode == VINE_TASK_FUNC_EXEC_MODE_INVALID) {
debug(D_VINE | D_NOTICE, "invalid func_exec_mode from manager: %s", line);
vine_task_delete(task);
return 0;
}
task->func_exec_mode = func_exec_mode;
} else if (sscanf(line, "infile %s %s %d", localname, taskname_encoded, &flags)) {
url_decode(taskname_encoded, taskname, VINE_LINE_MAX);
vine_hack_do_not_compute_cached_name = 1;
Expand Down Expand Up @@ -1546,10 +1555,25 @@ static int check_library_startup(struct vine_process *p)
struct jx *response = jx_parse_string(buffer);

const char *name = jx_lookup_string(response, "name");
const char *taskid = jx_lookup_string(response, "taskid");
const char *exec_mode = jx_lookup_string(response, "exec_mode");

int ok = 1;

if (!name || !taskid || !exec_mode) {
ok = 0;
}

int ok = 0;
if (!strcmp(name, p->task->provides_library)) {
ok = 1;
vine_task_func_exec_mode_t converted_exec_mode = vine_task_func_exec_mode_from_string(exec_mode);

if (p->task->provides_library && strcmp(name, p->task->provides_library)) {
ok = 0;
}
if (taskid != p->task->task_id) {
ok = 0;
}
if (p->task->exec_mode && converted_exec_mode != p->task->exec_mode) {
ok = 0;
}
if (response) {
jx_delete(response);
Expand Down Expand Up @@ -1585,8 +1609,7 @@ static void check_libraries_ready(struct link *manager)
debug(D_VINE, "Library %s reports ready to execute functions.", library_process->task->provides_library);
library_process->library_ready = 1;
} else {
/* Kill library if the name reported back doesn't match its name or
* if there's any problem. */
/* Kill library if it fails the startup check. */
debug(D_VINE,
"Library %s task id %" PRIu64 " verification failed (unexpected response). Killing it.",
library_process->task->provides_library,
Expand Down
Loading