From dfe4488e88c148fcb950f4814e3299146119cd3c Mon Sep 17 00:00:00 2001 From: Thanh Son Phung Date: Wed, 6 Nov 2024 16:13:13 -0500 Subject: [PATCH 1/2] scheduling with library protocol version checked --- taskvine/src/manager/vine_manager.c | 7 +++++-- taskvine/src/manager/vine_protocol.h | 4 +++- taskvine/src/manager/vine_schedule.c | 7 +++++++ taskvine/src/manager/vine_worker_info.h | 3 +++ taskvine/src/worker/vine_worker.c | 3 ++- 5 files changed, 20 insertions(+), 4 deletions(-) diff --git a/taskvine/src/manager/vine_manager.c b/taskvine/src/manager/vine_manager.c index 946d1132f7..190e6c9639 100644 --- a/taskvine/src/manager/vine_manager.c +++ b/taskvine/src/manager/vine_manager.c @@ -1461,9 +1461,10 @@ static vine_msg_code_t handle_taskvine(struct vine_manager *q, struct vine_worke { char items[4][VINE_LINE_MAX]; int worker_protocol; + int worker_library_protocol; - int n = sscanf(line, "taskvine %d %s %s %s %s", &worker_protocol, items[0], items[1], items[2], items[3]); - if (n != 5) + int n = sscanf(line, "taskvine %d %d %s %s %s %s", &worker_protocol, &worker_library_protocol, items[0], items[1], items[2], items[3]); + if (n != 6) return VINE_MSG_FAILURE; if (worker_protocol != VINE_PROTOCOL_VERSION) { @@ -1472,6 +1473,8 @@ static vine_msg_code_t handle_taskvine(struct vine_manager *q, struct vine_worke return VINE_MSG_FAILURE; } + w->library_protocol_version = worker_library_protocol; + if (w->hostname) free(w->hostname); if (w->os) diff --git a/taskvine/src/manager/vine_protocol.h b/taskvine/src/manager/vine_protocol.h index 7a3dca9fcd..d32c495d1f 100644 --- a/taskvine/src/manager/vine_protocol.h +++ b/taskvine/src/manager/vine_protocol.h @@ -13,7 +13,9 @@ worker, and catalog, but should not be visible to the public user API. #ifndef VINE_PROTOCOL_H #define VINE_PROTOCOL_H -#define VINE_PROTOCOL_VERSION 11 +#define VINE_PROTOCOL_VERSION 12 + +#define VINE_LIBRARY_PROTOCOL_VERSION 1 #define VINE_LINE_MAX 4096 /**< Maximum length of a vine message line. */ diff --git a/taskvine/src/manager/vine_schedule.c b/taskvine/src/manager/vine_schedule.c index 1127e1e636..7975e8964a 100644 --- a/taskvine/src/manager/vine_schedule.c +++ b/taskvine/src/manager/vine_schedule.c @@ -10,6 +10,7 @@ See the file COPYING for details. #include "vine_file.h" #include "vine_file_replica.h" #include "vine_mount.h" +#include "vine_protocol.h" #include "debug.h" #include "hash_table.h" @@ -203,6 +204,12 @@ int check_worker_against_task(struct vine_manager *q, struct vine_worker_info *w return 0; } + /* Check if the library and the worker have the same library protocol version. */ + if (t->provides_library && (w->library_protocol_version != VINE_LIBRARY_PROTOCOL_VERSION)) { + debug(D_VINE, "Worker %s can't run library with id %d due to mismatched library protocol version.", w->workerid, t->task_id); + return 0; + } + /* Compute the resources to allocate to this task. */ struct rmsummary *l = vine_manager_choose_resources_for_task(q, w, t); diff --git a/taskvine/src/manager/vine_worker_info.h b/taskvine/src/manager/vine_worker_info.h index c52fa91103..d5c9dc7c7d 100644 --- a/taskvine/src/manager/vine_worker_info.h +++ b/taskvine/src/manager/vine_worker_info.h @@ -28,6 +28,9 @@ struct vine_worker_info { /* Connection to the worker or other client. */ struct link *link; + /* Library protocol version of this worker. */ + int library_protocol_version; + /* Static properties reported by worker when it connects. */ char *hostname; char *os; diff --git a/taskvine/src/worker/vine_worker.c b/taskvine/src/worker/vine_worker.c index 84f3f3994f..5767c58608 100644 --- a/taskvine/src/worker/vine_worker.c +++ b/taskvine/src/worker/vine_worker.c @@ -564,8 +564,9 @@ static void report_worker_ready(struct link *manager) } send_async_message(manager, - "taskvine %d %s %s %s %d.%d.%d\n", + "taskvine %d %d %s %s %s %d.%d.%d\n", VINE_PROTOCOL_VERSION, + VINE_LIBRARY_PROTOCOL_VERSION, hostname, options->os_name, options->arch_name, From 3eca9b827ed9470744247d3aabd5c0030edc7fc7 Mon Sep 17 00:00:00 2001 From: Thanh Son Phung Date: Wed, 6 Nov 2024 16:54:01 -0500 Subject: [PATCH 2/2] add check exec_mode of library to worker --- poncho/src/poncho/library_network_code.py | 12 +++++---- taskvine/src/manager/vine_manager_put.c | 1 + taskvine/src/manager/vine_task.c | 30 +++++++++++++++++---- taskvine/src/manager/vine_task.h | 3 +++ taskvine/src/worker/vine_worker.c | 32 +++++++++++++++++++---- 5 files changed, 63 insertions(+), 15 deletions(-) diff --git a/poncho/src/poncho/library_network_code.py b/poncho/src/poncho/library_network_code.py index 159d16dd45..29a5ae9238 100755 --- a/poncho/src/poncho/library_network_code.py +++ b/poncho/src/poncho/library_network_code.py @@ -379,16 +379,18 @@ def main(): if context_vars: (load_variable_from_library.__globals__).update(context_vars) + # set execution mode of functions in this library + global exec_method + exec_method = library_info['exec_mode'] + # send configuration of library, just its name for now config = { - "name": library_info['library_name'] + "name": library_info['library_name'], + "taskid": args.task_id, + "exec_mode": exec_method, } send_configuration(config, out_pipe_fd, args.worker_pid) - # set execution mode of functions in this library - global exec_method - exec_method = library_info['exec_mode'] - # register sigchld handler to turn a sigchld signal into an I/O event signal.signal(signal.SIGCHLD, sigchld_handler) diff --git a/taskvine/src/manager/vine_manager_put.c b/taskvine/src/manager/vine_manager_put.c index 83dbaeb64f..e8e22b22bf 100644 --- a/taskvine/src/manager/vine_manager_put.c +++ b/taskvine/src/manager/vine_manager_put.c @@ -509,6 +509,7 @@ vine_result_code_t vine_manager_put_task( if (t->provides_library) { vine_manager_send(q, w, "provides_library %s\n", t->provides_library); vine_manager_send(q, w, "function_slots %d\n", t->function_slots_total); + vine_manager_send(q, w, "func_exec_mode %d\n", t->func_exec_mode); } vine_manager_send(q, w, "category %s\n", t->category); diff --git a/taskvine/src/manager/vine_task.c b/taskvine/src/manager/vine_task.c index c0ea564b27..d5f10230ed 100644 --- a/taskvine/src/manager/vine_task.c +++ b/taskvine/src/manager/vine_task.c @@ -328,11 +328,7 @@ void vine_task_set_function_exec_mode(struct vine_task *t, vine_task_func_exec_m void vine_task_set_function_exec_mode_from_string(struct vine_task *t, const char *exec_mode) { if (exec_mode && t->provides_library) { - if (!strncmp(exec_mode, "fork", strlen("fork"))) { - t->func_exec_mode = VINE_TASK_FUNC_EXEC_MODE_FORK; - } else { - t->func_exec_mode = VINE_TASK_FUNC_EXEC_MODE_DIRECT; - } + t->func_exec_mode = vine_task_func_exec_mode_from_string(exec_mode); } } @@ -1005,3 +1001,27 @@ char *vine_task_to_json(struct vine_task *t) buffer_free(&b); return json; } + +/* Converts an int64_t to a valid vine_task_func_exec_mode_t. + * Returns VINE_TASK_FUNC_EXEC_MODE_INVALID if there's no valid mode for the integer. */ +vine_task_func_exec_mode_t vine_task_func_exec_mode_from_int64_t(int64_t n) { + if (n == 1) { + return VINE_TASK_FUNC_EXEC_MODE_DIRECT; + } + if (n == 2) { + return VINE_TASK_FUNC_EXEC_MODE_FORK; + } + return VINE_TASK_FUNC_EXEC_MODE_INVALID; +} + +/* Converts a string to a valid vine_task_func_exec_mode_t. + * Returns VINE_TASK_FUNC_EXEC_MODE_INVALID if there's no valid mode for the string. */ +vine_task_func_exec_mode_t vine_task_func_exec_mode_from_string(const char *exec_mode) { + if (!strncmp(exec_mode, "direct", strlen("direct"))) { + return VINE_TASK_FUNC_EXEC_MODE_DIRECT; + } + if (!strncmp(exec_mode, "fork", strlen("fork"))) { + return VINE_TASK_FUNC_EXEC_MODE_FORK; + } + return VINE_TASK_FUNC_EXEC_MODE_INVALID; +} diff --git a/taskvine/src/manager/vine_task.h b/taskvine/src/manager/vine_task.h index 4c53ece509..718f46d383 100644 --- a/taskvine/src/manager/vine_task.h +++ b/taskvine/src/manager/vine_task.h @@ -164,6 +164,9 @@ const char *vine_task_state_to_string( vine_task_state_t task_state ); struct jx * vine_task_to_jx( struct vine_manager *q, struct vine_task *t ); char * vine_task_to_json(struct vine_task *t); +vine_task_func_exec_mode_t vine_task_func_exec_mode_from_int64_t(int64_t n); +vine_task_func_exec_mode_t vine_task_func_exec_mode_from_string(const char *exec_mode); + /** Attach an input or outputs to tasks without declaring files to manager. * Only really useful at the worker where tasks are created without a manager. */ diff --git a/taskvine/src/worker/vine_worker.c b/taskvine/src/worker/vine_worker.c index 5767c58608..8843664607 100644 --- a/taskvine/src/worker/vine_worker.c +++ b/taskvine/src/worker/vine_worker.c @@ -866,6 +866,14 @@ static struct vine_task *do_task_body(struct link *manager, int task_id, time_t task->function_slots_requested = n; /* Also set the total number determined by the manager. */ task->function_slots_total = n; + } else if (sscanf(line, "func_exec_mode %" PRId64, &n) == 1) { + func_exec_mode = vine_task_func_exec_mode_from_int64_t(n); + if (func_exec_mode == VINE_TASK_FUNC_EXEC_MODE_INVALID) { + debug(D_VINE | D_NOTICE, "invalid func_exec_mode from manager: %s", line); + vine_task_delete(task); + return 0; + } + task->func_exec_mode = func_exec_mode; } else if (sscanf(line, "infile %s %s %d", localname, taskname_encoded, &flags)) { url_decode(taskname_encoded, taskname, VINE_LINE_MAX); vine_hack_do_not_compute_cached_name = 1; @@ -1547,10 +1555,25 @@ static int check_library_startup(struct vine_process *p) struct jx *response = jx_parse_string(buffer); const char *name = jx_lookup_string(response, "name"); + const char *taskid = jx_lookup_string(response, "taskid"); + const char *exec_mode = jx_lookup_string(response, "exec_mode"); + + int ok = 1; + + if (!name || !taskid || !exec_mode) { + ok = 0; + } - int ok = 0; - if (!strcmp(name, p->task->provides_library)) { - ok = 1; + vine_task_func_exec_mode_t converted_exec_mode = vine_task_func_exec_mode_from_string(exec_mode); + + if (p->task->provides_library && strcmp(name, p->task->provides_library)) { + ok = 0; + } + if (taskid != p->task->task_id) { + ok = 0; + } + if (p->task->exec_mode && converted_exec_mode != p->task->exec_mode) { + ok = 0; } if (response) { jx_delete(response); @@ -1586,8 +1609,7 @@ static void check_libraries_ready(struct link *manager) debug(D_VINE, "Library %s reports ready to execute functions.", library_process->task->provides_library); library_process->library_ready = 1; } else { - /* Kill library if the name reported back doesn't match its name or - * if there's any problem. */ + /* Kill library if it fails the startup check. */ debug(D_VINE, "Library %s task id %" PRIu64 " verification failed (unexpected response). Killing it.", library_process->task->provides_library,