Skip to content

Commit

Permalink
Vine: Clean Up Mini-Tasks (#3577)
Browse files Browse the repository at this point in the history
* Clean up definition of mini-tasks to remove various hacks
and inconsistencies.  The "name" field of the mini-task
now represents the file to extract from the sandbox of the
producing task.

* vine_cache_queue_command -> vine_cache_queue_mini_task

* mini-tasks don't have task ids at the manager, so don't send or print them.

* format

* Fix all module makefiles to be links to .module.mk
  • Loading branch information
dthain authored Nov 17, 2023
1 parent d72cd59 commit aa3f212
Show file tree
Hide file tree
Showing 15 changed files with 53 additions and 146 deletions.
5 changes: 5 additions & 0 deletions .module.mk
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ $(LINT_TARGETS):
@$(MAKE) -C $(@:lint-%=%) lint
lint: $(LINT_TARGETS)

FORMAT_TARGETS = $(TARGETS:%=format-%)
$(FORMAT_TARGETS):
@$(MAKE) -C $(@:format-%=%) format
format: $(FORMAT_TARGETS)

INSTALL_TARGETS = $(TARGETS:%=install-%)
$(INSTALL_TARGETS):
@$(MAKE) -C $(@:install-%=%) install
Expand Down
13 changes: 4 additions & 9 deletions doc/manuals/taskvine/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -775,9 +775,8 @@ like the input to be the result of a query to a database.

my_url = m.declare_url("http://somewhere.com/archive.cpio", cache=True)
t.add_input(my_url, "archive.cpio")
t.set_mini_task_output("output_dir")

mini_task = m.declare_mini_task(t)
mini_task = m.declare_mini_task(t,"output_dir")

# regular tasks can use the mini task as input # the output of the mini
# task is mounted in the regular task sandbox
Expand All @@ -797,9 +796,8 @@ like the input to be the result of a query to a database.

struct vine_file *my_url = vine_declare_url("http://somewhere.com/archive.cpio", VINE_CACHE);
vine_task_add_input(my_url, "archive.cpio", 0);
vine_task_set_mini_task_output(t, "output_dir");

struct vine_file *mini_task = m.declare_mini_task(t)
struct vine_file *mini_task = m.declare_mini_task(t,"output_dir")

// regular tasks can use the mini task as input
// the output of the mini task is mounted in the regular task sandbox
Expand Down Expand Up @@ -977,11 +975,8 @@ mini task to construct the environment structure directly on the workers.
mt.add_input(runner, "env_dir/bin/run_in_env")
mt.add_input(image, "env_dir/image.img")

# the output of the mini task is the environment directory
mt.add_output(image, "env_dir")

# tell the manager that this is a mini task.
env = m.declare_mini_task(mt)
# the mini task will extract the environment directory
env = m.declare_mini_task(mt,"env_dir")

# now we define our regular task, and attach the environment to it.
t = Task("/bin/echo from inside apptainer!")
Expand Down
31 changes: 0 additions & 31 deletions dttools/Makefile

This file was deleted.

1 change: 1 addition & 0 deletions dttools/Makefile
30 changes: 0 additions & 30 deletions poncho/Makefile

This file was deleted.

1 change: 1 addition & 0 deletions poncho/Makefile
33 changes: 0 additions & 33 deletions taskvine/Makefile

This file was deleted.

1 change: 1 addition & 0 deletions taskvine/Makefile
5 changes: 3 additions & 2 deletions taskvine/src/bindings/python3/ndcctools/taskvine/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1480,20 +1480,21 @@ def declare_buffer(self, buffer=None, cache=False, peer_transfer=True):
#
# @param self The manager to register this file
# @param minitask The task to execute in order to produce a file
# @param source The name of the file to extract from the task's sandbox.
# @param cache If True or 'workflow', cache the file at workers for reuse
# until the end of the workflow. If 'always', the file is cache until the
# end-of-life of the worker. Default is False (file is not cache).
# @param peer_transfer Whether the file can be transfered between workers when
# peer transfers are enabled (see @ref ndcctools.taskvine.manager.Manager.enable_peer_transfers). Default is True.
# @return A file object to use in @ref ndcctools.taskvine.task.Task.add_input
def declare_minitask(self, minitask, name="minitask", cache=False, peer_transfer=True):
def declare_minitask(self, minitask, source, cache=False, peer_transfer=True):

# Attaching a task as a mini-task is like submitting it, so we must finalize the details.
minitask.submit_finalize(self)

# Then proceed to attach the task to the mini-task file object.
flags = Task._determine_file_flags(cache, peer_transfer)
f = cvine.vine_declare_mini_task(self._taskvine, minitask._task, name, flags)
f = cvine.vine_declare_mini_task(self._taskvine, minitask._task, source, flags)

# minitasks are freed when the manager frees its related file structure
minitask._manager_will_free = True
Expand Down
2 changes: 1 addition & 1 deletion taskvine/src/manager/taskvine.h
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ renaming, or unpacking to be useful. A mini-task should be a short-running
activity with minimal resource consumption.
@param m A manager object
@param mini_task The task which produces the file
@param name A descriptive name for the mini-task.
@param name The name of the file in the task's sandbox to extract.
@param flags Whether to never cache the output of the mini task at the workers (VINE_CACHE_NEVER,
the default), to cache it only for the current manager (VINE_CACHE), or to
cache it for the lifetime of the worker (VINE_CACHE_ALWAYS). Cache flags can be
Expand Down
17 changes: 5 additions & 12 deletions taskvine/src/manager/vine_file.c
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ struct vine_file *vine_file_untar(struct vine_file *f, vine_file_flags_t flags)
{
struct vine_task *t = vine_task_create("mkdir output && tar xf input -C output");
vine_task_add_input(t, f, "input", 0);
vine_task_add_output(t, vine_file_local("output", flags), "output", 0);
return vine_file_mini_task(t, "untar", flags);
return vine_file_mini_task(t, "output", flags);
}

struct vine_file *vine_file_poncho(struct vine_file *f, vine_file_flags_t flags)
Expand All @@ -172,16 +171,14 @@ struct vine_file *vine_file_poncho(struct vine_file *f, vine_file_flags_t flags)
free(cmd);

vine_task_add_input(t, f, "package.tar.gz", 0);
vine_task_add_output(t, vine_file_local("output", flags), "output", 0);
return vine_file_mini_task(t, "poncho", flags);
return vine_file_mini_task(t, "output", flags);
}

struct vine_file *vine_file_starch(struct vine_file *f, vine_file_flags_t flags)
{
struct vine_task *t = vine_task_create("SFX_DIR=output SFX_EXTRACT_ONLY=1 ./package.sfx");
vine_task_add_input(t, f, "package.sfx", 0);
vine_task_add_output(t, vine_file_local("output", flags), "output", 0);
return vine_file_mini_task(t, "starch", flags);
return vine_file_mini_task(t, "output", flags);
}

static char *find_x509_proxy()
Expand Down Expand Up @@ -220,8 +217,6 @@ struct vine_file *vine_file_xrootd(
char *command = string_format("xrdcp %s output.root", source);
struct vine_task *t = vine_task_create(command);

vine_task_add_output(t, vine_file_local("output.root", flags), "output.root", 0);

if (proxy) {
vine_task_set_env_var(t, "X509_USER_PROXY", "proxy509");
vine_task_add_input(t, proxy, "proxy509.pem", 0);
Expand All @@ -233,7 +228,7 @@ struct vine_file *vine_file_xrootd(

free(command);

return vine_file_mini_task(t, "xrootd", flags);
return vine_file_mini_task(t, "output.root", flags);
}

struct vine_file *vine_file_chirp(const char *server, const char *source, struct vine_file *ticket,
Expand All @@ -246,8 +241,6 @@ struct vine_file *vine_file_chirp(const char *server, const char *source, struct

struct vine_task *t = vine_task_create(command);

vine_task_add_output(t, vine_file_local("output.chirp", flags), "output.chirp", 0);

if (ticket) {
vine_task_add_input(t, ticket, "ticket.chirp", 0);
}
Expand All @@ -258,7 +251,7 @@ struct vine_file *vine_file_chirp(const char *server, const char *source, struct

free(command);

return vine_file_mini_task(t, "chirp", flags);
return vine_file_mini_task(t, "output.chirp", flags);
}

/* vim: set noexpandtab tabstop=8: */
11 changes: 3 additions & 8 deletions taskvine/src/manager/vine_manager_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -276,12 +276,7 @@ static vine_result_code_t vine_manager_put_input_file(struct vine_manager *q, st
break;

case VINE_MINI_TASK:
debug(D_VINE,
"%s (%s) will produce %s via mini task %d",
w->hostname,
w->addrport,
m->remote_name,
f->mini_task->task_id);
debug(D_VINE, "%s (%s) will produce %s via mini task", w->hostname, w->addrport, m->remote_name);
result = vine_manager_put_task(q, w, f->mini_task, 0, 0, f);
break;

Expand Down Expand Up @@ -475,8 +470,8 @@ vine_result_code_t vine_manager_put_task(struct vine_manager *q, struct vine_wor
if (target) {
vine_manager_send(q,
w,
"mini_task %lld %s %lld %o\n",
(long long)target->mini_task->task_id,
"mini_task %s %s %lld %o\n",
target->source,
target->cached_name,
(long long)target->size,
0777);
Expand Down
2 changes: 1 addition & 1 deletion taskvine/src/manager/vine_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ worker, and catalog, but should not be visible to the public user API.
#ifndef VINE_PROTOCOL_H
#define VINE_PROTOCOL_H

#define VINE_PROTOCOL_VERSION 4
#define VINE_PROTOCOL_VERSION 5

#define VINE_LINE_MAX 4096 /**< Maximum length of a vine message line. */

Expand Down
22 changes: 17 additions & 5 deletions taskvine/src/worker/vine_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,10 @@ Queue a mini-task to produce a file.
This entry will be materialized later in vine_cache_ensure.
*/

int vine_cache_queue_command(
struct vine_cache *c, struct vine_task *mini_task, const char *cachename, int64_t size, int mode)
int vine_cache_queue_mini_task(struct vine_cache *c, struct vine_task *mini_task, const char *source,
const char *cachename, int64_t size, int mode)
{
struct vine_cache_file *f = vine_cache_file_create(VINE_CACHE_MINI_TASK, "task", size, mode, mini_task);
struct vine_cache_file *f = vine_cache_file_create(VINE_CACHE_MINI_TASK, source, size, mode, mini_task);
hash_table_insert(c->table, cachename, f);
return 1;
}
Expand Down Expand Up @@ -511,6 +511,7 @@ vine_cache_status_t vine_cache_ensure(struct vine_cache *c, const char *cachenam
break;
}

/* For a mini-task, we must also insure the inputs to the task exist. */
if (f->type == VINE_CACHE_MINI_TASK) {
if (f->mini_task->input_mounts) {
struct vine_mount *m;
Expand Down Expand Up @@ -578,13 +579,24 @@ static void vine_cache_check_outputs(

/* If this was produced by a mini task, first move the output in the cache directory. */
if (f->type == VINE_CACHE_MINI_TASK) {

if (f->status == VINE_CACHE_STATUS_READY) {
if (vine_sandbox_stageout(f->process, c, manager)) {

char *source_path = vine_sandbox_full_path(f->process, f->source);

debug(D_VINE, "cache: extracting %s from mini-task sandbox to %s\n", f->source, cachename);
int result = rename(source_path, cache_path);
if (result == 0) {
f->status = VINE_CACHE_STATUS_READY;
} else {
debug(D_VINE,
"cache: unable to rename %s to %s: %s\n",
source_path,
cache_path,
strerror(errno));
f->status = VINE_CACHE_STATUS_FAILED;
}

free(source_path);
}

/* Clean up the minitask process, but keep the defining task. */
Expand Down
2 changes: 1 addition & 1 deletion taskvine/src/worker/vine_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ char *vine_cache_full_path( struct vine_cache *c, const char *cachename );

int vine_cache_addfile( struct vine_cache *c, int64_t size, int mode, const char *cachename );
int vine_cache_queue_transfer( struct vine_cache *c, const char *source, const char *cachename, int64_t size, int mode );
int vine_cache_queue_command( struct vine_cache *c, struct vine_task *minitask, const char *cachename, int64_t size, int mode );
int vine_cache_queue_mini_task( struct vine_cache *c, struct vine_task *minitask, const char *source, const char *cachename, int64_t size, int mode );
vine_cache_status_t vine_cache_ensure( struct vine_cache *c, const char *cachename);
int vine_cache_remove( struct vine_cache *c, const char *cachename, struct link *manager );
int vine_cache_contains( struct vine_cache *c, const char *cachename );
Expand Down
3 changes: 3 additions & 0 deletions taskvine/src/worker/vine_sandbox.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ See the file COPYING for details.
#include "vine_cache.h"
#include "link.h"

char *vine_sandbox_full_path(struct vine_process *p, const char *sandbox_name);

vine_cache_status_t vine_sandbox_ensure( struct vine_process *p, struct vine_cache *c, struct link *manager );

int vine_sandbox_stagein( struct vine_process *p, struct vine_cache *c);
int vine_sandbox_stageout( struct vine_process *p, struct vine_cache *c, struct link *manager );

Expand Down
16 changes: 6 additions & 10 deletions taskvine/src/worker/vine_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,8 @@ static int do_put_url(const char *cache_name, int64_t size, int mode, const char
}

/*
Accept a mini_task that is executed on demand to produce a specific file.
Accept a mini_task that is executed on demand.
We will then extract the file "source" from the sandbox in order to produce "cache_name".
*/

static int do_put_mini_task(struct link *manager, time_t stoptime, const char *cache_name, int64_t size, int mode,
Expand All @@ -823,13 +824,7 @@ static int do_put_mini_task(struct link *manager, time_t stoptime, const char *c
struct vine_task *mini_task = do_task_body(manager, mini_task_id, stoptime);
if (!mini_task)
return 0;

/* XXX hacky hack -- the single output of the task must have the target cachename */
struct vine_mount *output_mount = list_peek_head(mini_task->output_mounts);
free(output_mount->file->cached_name);
output_mount->file->cached_name = strdup(cache_name);

return vine_cache_queue_command(cache_manager, mini_task, cache_name, size, mode);
return vine_cache_queue_mini_task(cache_manager, mini_task, source, cache_name, size, mode);
}

/*
Expand Down Expand Up @@ -1098,11 +1093,12 @@ static int handle_manager(struct link *manager)
hash_table_insert(current_transfers, strdup(filename), strdup(transfer_id));
debug(D_VINE, "Insert ID-File pair into transfer table : %s :: %s", filename, transfer_id);
} else if (sscanf(line,
"mini_task %" SCNd64 " %s %" SCNd64 " %o",
&task_id,
"mini_task %s %s %" SCNd64 " %o",
source_encoded,
filename_encoded,
&length,
&mode) == 4) {
url_decode(source_encoded, source, sizeof(source));
url_decode(filename_encoded, filename, sizeof(filename));
r = do_put_mini_task(
manager, time(0) + options->active_timeout, filename, length, mode, source);
Expand Down
Loading

0 comments on commit aa3f212

Please sign in to comment.