From 84022d4236fea599571f46b427169393251cfe87 Mon Sep 17 00:00:00 2001 From: Colin Thomas <33940547+colinthomas-z80@users.noreply.github.com> Date: Mon, 28 Oct 2024 16:13:26 -0400 Subject: [PATCH] vine: limit worker transfer processes (#3961) * limit worker transfer processes * return pending and poll in main loop * format * add define * add configurable value * format --- taskvine/src/worker/vine_cache.c | 17 ++++++++++++++++- taskvine/src/worker/vine_cache.h | 2 +- taskvine/src/worker/vine_worker.c | 2 +- taskvine/src/worker/vine_worker_options.c | 8 ++++++++ taskvine/src/worker/vine_worker_options.h | 3 +++ 5 files changed, 29 insertions(+), 3 deletions(-) diff --git a/taskvine/src/worker/vine_cache.c b/taskvine/src/worker/vine_cache.c index ba9b8f46f1..e421ca606a 100644 --- a/taskvine/src/worker/vine_cache.c +++ b/taskvine/src/worker/vine_cache.c @@ -39,6 +39,7 @@ See the file COPYING for details. struct vine_cache { struct hash_table *table; char *cache_dir; + int max_transfer_procs; }; static void vine_cache_wait_for_file(struct vine_cache *c, struct vine_cache_file *f, const char *cachename, struct link *manager); @@ -47,11 +48,12 @@ static void vine_cache_wait_for_file(struct vine_cache *c, struct vine_cache_fil Create the cache manager structure for a given cache directory. */ -struct vine_cache *vine_cache_create(const char *cache_dir) +struct vine_cache *vine_cache_create(const char *cache_dir, int max_procs) { struct vine_cache *c = malloc(sizeof(*c)); c->cache_dir = strdup(cache_dir); c->table = hash_table_create(0, 0); + c->max_transfer_procs = max_procs; return c; } @@ -644,6 +646,19 @@ vine_cache_status_t vine_cache_ensure(struct vine_cache *c, const char *cachenam f->process = p; } + int num_processing = 0; + char *table_cachename; + struct vine_cache_file *table_f; + HASH_TABLE_ITERATE(c->table, table_cachename, table_f) + { + if (table_f->status == VINE_CACHE_STATUS_PROCESSING) { + num_processing++; + } + } + if (num_processing > c->max_transfer_procs) { + return VINE_CACHE_STATUS_PENDING; + } + f->pid = fork(); if (f->pid < 0) { diff --git a/taskvine/src/worker/vine_cache.h b/taskvine/src/worker/vine_cache.h index 689d721dd9..ff2c1bf173 100644 --- a/taskvine/src/worker/vine_cache.h +++ b/taskvine/src/worker/vine_cache.h @@ -44,7 +44,7 @@ typedef enum { VINE_CACHE_STATUS_UNKNOWN, /**< File is not known at all to the cache manager. */ } vine_cache_status_t; -struct vine_cache * vine_cache_create( const char *cachedir ); +struct vine_cache * vine_cache_create( const char *cachedir, int max_procs ); void vine_cache_delete( struct vine_cache *c ); void vine_cache_load( struct vine_cache *c ); void vine_cache_scan( struct vine_cache *c, struct link *manager ); diff --git a/taskvine/src/worker/vine_worker.c b/taskvine/src/worker/vine_worker.c index 1c0975fbc4..84f3f3994f 100644 --- a/taskvine/src/worker/vine_worker.c +++ b/taskvine/src/worker/vine_worker.c @@ -1830,7 +1830,7 @@ static int vine_worker_serve_manager_by_hostport(const char *host, int port, con vine_workspace_prepare(workspace); /* Start the cache manager and scan for existing files. */ - cache_manager = vine_cache_create(workspace->cache_dir); + cache_manager = vine_cache_create(workspace->cache_dir, options->max_transfer_procs); vine_cache_load(cache_manager); /* Start the transfer server, which serves up the cache directory. */ diff --git a/taskvine/src/worker/vine_worker_options.c b/taskvine/src/worker/vine_worker_options.c index 66f64d8278..c02ad3fa73 100644 --- a/taskvine/src/worker/vine_worker_options.c +++ b/taskvine/src/worker/vine_worker_options.c @@ -56,6 +56,8 @@ struct vine_worker_options *vine_worker_options_create() self->transfer_port_min = 0; self->transfer_port_max = 0; + self->max_transfer_procs = 5; + self->reported_transfer_host = 0; return self; @@ -140,6 +142,7 @@ void vine_worker_options_show_help(const char *cmd, struct vine_worker_options * printf(" %-30s Single-shot mode -- quit immediately after disconnection.\n", "--single-shot"); printf(" %-30s Listening port for worker-worker transfers. Either port or port_min:port_max (default: any)\n", "--transfer-port"); printf(" %-30s Explicit contact host:port for worker-worker transfers, e.g., when routing is used. (default: :)\n", "--contact-hostport"); + printf(" %-30s Maximum number of concurrent worker transfer requests (default=%d)\n", "--max-transfer-procs", options->max_transfer_procs); printf(" %-30s Enable tls connection to manager (manager should support it).\n", "--ssl"); printf(" %-30s SNI domain name if different from manager hostname. Implies --ssl.\n", "--tls-sni="); @@ -170,6 +173,7 @@ enum { LONG_OPT_CONTACT_HOSTPORT, LONG_OPT_WORKSPACE, LONG_OPT_KEEP_WORKSPACE, + LONG_OPT_MAX_TRANSFER_PROCS, }; static const struct option long_options[] = {{"advertise", no_argument, 0, 'a'}, @@ -210,6 +214,7 @@ static const struct option long_options[] = {{"advertise", no_argument, 0, 'a'}, {"tls-sni", required_argument, 0, LONG_OPT_TLS_SNI}, {"from-factory", required_argument, 0, LONG_OPT_FROM_FACTORY}, {"transfer-port", required_argument, 0, LONG_OPT_TRANSFER_PORT}, + {"max-transfer-procs", required_argument, 0, LONG_OPT_MAX_TRANSFER_PROCS}, {"contact-hostport", required_argument, 0, LONG_OPT_CONTACT_HOSTPORT}, {0, 0, 0, 0}}; @@ -458,6 +463,9 @@ void vine_worker_options_get(struct vine_worker_options *options, int argc, char case LONG_OPT_CONTACT_HOSTPORT: set_transfer_host(options, optarg); break; + case LONG_OPT_MAX_TRANSFER_PROCS: + options->max_transfer_procs = atoi(optarg); + break; default: vine_worker_options_show_help(argv[0], options); exit(1); diff --git a/taskvine/src/worker/vine_worker_options.h b/taskvine/src/worker/vine_worker_options.h index a96254b390..368ff6f498 100644 --- a/taskvine/src/worker/vine_worker_options.h +++ b/taskvine/src/worker/vine_worker_options.h @@ -105,6 +105,9 @@ struct vine_worker_options { int transfer_port_min; int transfer_port_max; + /* Maximum number of concurrent worker transfer requests made by worker */ + int max_transfer_procs; + /* Explicit contact host (address or hostname) for transfers bewteen workers. */ char *reported_transfer_host; int reported_transfer_port;