Skip to content

Commit

Permalink
vine: limit worker transfer processes (#3961)
Browse files Browse the repository at this point in the history
* limit worker transfer processes

* return pending and poll in main loop

* format

* add define

* add configurable value

* format
  • Loading branch information
colinthomas-z80 authored Oct 28, 2024
1 parent 78e0276 commit 84022d4
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 3 deletions.
17 changes: 16 additions & 1 deletion taskvine/src/worker/vine_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ See the file COPYING for details.
struct vine_cache {
struct hash_table *table;
char *cache_dir;
int max_transfer_procs;
};

static void vine_cache_wait_for_file(struct vine_cache *c, struct vine_cache_file *f, const char *cachename, struct link *manager);
Expand All @@ -47,11 +48,12 @@ static void vine_cache_wait_for_file(struct vine_cache *c, struct vine_cache_fil
Create the cache manager structure for a given cache directory.
*/

struct vine_cache *vine_cache_create(const char *cache_dir)
struct vine_cache *vine_cache_create(const char *cache_dir, int max_procs)
{
struct vine_cache *c = malloc(sizeof(*c));
c->cache_dir = strdup(cache_dir);
c->table = hash_table_create(0, 0);
c->max_transfer_procs = max_procs;
return c;
}

Expand Down Expand Up @@ -644,6 +646,19 @@ vine_cache_status_t vine_cache_ensure(struct vine_cache *c, const char *cachenam
f->process = p;
}

int num_processing = 0;
char *table_cachename;
struct vine_cache_file *table_f;
HASH_TABLE_ITERATE(c->table, table_cachename, table_f)
{
if (table_f->status == VINE_CACHE_STATUS_PROCESSING) {
num_processing++;
}
}
if (num_processing > c->max_transfer_procs) {
return VINE_CACHE_STATUS_PENDING;
}

f->pid = fork();

if (f->pid < 0) {
Expand Down
2 changes: 1 addition & 1 deletion taskvine/src/worker/vine_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ typedef enum {
VINE_CACHE_STATUS_UNKNOWN, /**< File is not known at all to the cache manager. */
} vine_cache_status_t;

struct vine_cache * vine_cache_create( const char *cachedir );
struct vine_cache * vine_cache_create( const char *cachedir, int max_procs );
void vine_cache_delete( struct vine_cache *c );
void vine_cache_load( struct vine_cache *c );
void vine_cache_scan( struct vine_cache *c, struct link *manager );
Expand Down
2 changes: 1 addition & 1 deletion taskvine/src/worker/vine_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -1830,7 +1830,7 @@ static int vine_worker_serve_manager_by_hostport(const char *host, int port, con
vine_workspace_prepare(workspace);

/* Start the cache manager and scan for existing files. */
cache_manager = vine_cache_create(workspace->cache_dir);
cache_manager = vine_cache_create(workspace->cache_dir, options->max_transfer_procs);
vine_cache_load(cache_manager);

/* Start the transfer server, which serves up the cache directory. */
Expand Down
8 changes: 8 additions & 0 deletions taskvine/src/worker/vine_worker_options.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ struct vine_worker_options *vine_worker_options_create()
self->transfer_port_min = 0;
self->transfer_port_max = 0;

self->max_transfer_procs = 5;

self->reported_transfer_host = 0;

return self;
Expand Down Expand Up @@ -140,6 +142,7 @@ void vine_worker_options_show_help(const char *cmd, struct vine_worker_options *
printf(" %-30s Single-shot mode -- quit immediately after disconnection.\n", "--single-shot");
printf(" %-30s Listening port for worker-worker transfers. Either port or port_min:port_max (default: any)\n", "--transfer-port");
printf(" %-30s Explicit contact host:port for worker-worker transfers, e.g., when routing is used. (default: :<transfer_port>)\n", "--contact-hostport");
printf(" %-30s Maximum number of concurrent worker transfer requests (default=%d)\n", "--max-transfer-procs", options->max_transfer_procs);

printf(" %-30s Enable tls connection to manager (manager should support it).\n", "--ssl");
printf(" %-30s SNI domain name if different from manager hostname. Implies --ssl.\n", "--tls-sni=<domain name>");
Expand Down Expand Up @@ -170,6 +173,7 @@ enum {
LONG_OPT_CONTACT_HOSTPORT,
LONG_OPT_WORKSPACE,
LONG_OPT_KEEP_WORKSPACE,
LONG_OPT_MAX_TRANSFER_PROCS,
};

static const struct option long_options[] = {{"advertise", no_argument, 0, 'a'},
Expand Down Expand Up @@ -210,6 +214,7 @@ static const struct option long_options[] = {{"advertise", no_argument, 0, 'a'},
{"tls-sni", required_argument, 0, LONG_OPT_TLS_SNI},
{"from-factory", required_argument, 0, LONG_OPT_FROM_FACTORY},
{"transfer-port", required_argument, 0, LONG_OPT_TRANSFER_PORT},
{"max-transfer-procs", required_argument, 0, LONG_OPT_MAX_TRANSFER_PROCS},
{"contact-hostport", required_argument, 0, LONG_OPT_CONTACT_HOSTPORT},
{0, 0, 0, 0}};

Expand Down Expand Up @@ -458,6 +463,9 @@ void vine_worker_options_get(struct vine_worker_options *options, int argc, char
case LONG_OPT_CONTACT_HOSTPORT:
set_transfer_host(options, optarg);
break;
case LONG_OPT_MAX_TRANSFER_PROCS:
options->max_transfer_procs = atoi(optarg);
break;
default:
vine_worker_options_show_help(argv[0], options);
exit(1);
Expand Down
3 changes: 3 additions & 0 deletions taskvine/src/worker/vine_worker_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ struct vine_worker_options {
int transfer_port_min;
int transfer_port_max;

/* Maximum number of concurrent worker transfer requests made by worker */
int max_transfer_procs;

/* Explicit contact host (address or hostname) for transfers bewteen workers. */
char *reported_transfer_host;
int reported_transfer_port;
Expand Down

0 comments on commit 84022d4

Please sign in to comment.