Skip to content

Commit

Permalink
working example
Browse files Browse the repository at this point in the history
  • Loading branch information
BarrySlyDelgado committed Jul 20, 2023
1 parent 6e532ac commit 334590e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 5 deletions.
11 changes: 9 additions & 2 deletions taskvine/src/bindings/python3/ndcctools/taskvine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -1028,8 +1028,6 @@ def result(self, timeout="wait_forever"):
def add_done_callback(self, fn):
self.callback_fns.append(fn)

def retrieve_output(arg):
return arg

class FutureTask(PythonTask):
##
Expand All @@ -1044,14 +1042,19 @@ def __init__(self, manager, rf, func, *args, **kwargs):
self.enable_temp_output()
self._module_manager = manager
self._future = VineFuture(self)
self._envs = []
self._future_resolved = False
self._ran_functions = False
self._retrieval_future = rf
self._retrieval_task = None

def output(self, timeout="wait_forever"):
def retrieve_output(arg):
return arg
if not self._retrieval_future and not self._retrieval_task:
self._retrieval_task = FutureTask(self._module_manager, True, retrieve_output, self._future)
for env in self._envs:
self._retrieval_task.add_environment(env)
self._retrieval_task.disable_temp_output()
self._module_manager.submit(self._retrieval_task)

Expand Down Expand Up @@ -1102,6 +1105,10 @@ def submit_finalize(self, manager):
self._create_wrapper()
self._add_IO_files(manager)

def add_environment(self, f):
self._envs.append(f)
return cvine.vine_task_add_environment(self._task, f._file)

def _create_wrapper(self):
with open(os.path.join(self._tmpdir, self._wrapper), "w") as f:
f.write(
Expand Down
17 changes: 14 additions & 3 deletions taskvine/src/examples/vine_example_future_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import ndcctools.taskvine as vine
import cloudpickle
import poncho
import random

def generate_random_matrix(n):
Expand Down Expand Up @@ -60,36 +61,46 @@ def matrix_multiply(a, b):
return result

futures = []

levels = 3
count = 0
n = 400
matricies = [generate_random_matrix(n) for x in range(2**levels)]
print(len(matricies))
print('Generated Matricies...')

executor = vine.Executor(manager_name='vine_matrix_example', batch_type='condor')
executor.manager.enable_peer_transfers()
executor.set("memory", 8000)
executor.set("disk", 8000)
env_spec = {"conda": {"channels": ["conda-forge"],"packages": ["python","pip","conda","conda-pack","cloudpickle", "ndcctools"]}}
env_tarball = poncho.package_create.dict_to_env(env_spec, cache=True)
env_file = executor.manager.declare_poncho(env_tarball, cache=True)

for level in range(levels):
print(level)
for x in range(2**(levels - level - 1)):
if level == 0:
t = executor.task(matrix_multiply, matricies[x*2], matricies[x*2+1])
t.set_cores(1)
t.add_environment(env_file)
# the future can be created ommiting the task specifications above like so: f = executor.submit(matrix_multiply, matricies[x*2], matricies[x*2+1])
f = executor.submit(t)
futures.append(f)

else:
t = executor.task(matrix_multiply, futures[count], futures[count + 1])
t.set_cores(1)
t.add_environment(env_file)
f = executor.submit(t)
futures.append(f)
count += 2


print("waiting for result")
print("waiting for result...")
f = futures[-1]
print(f.result())
print("RESULT:", f.result())
print("OUT1:", f._task.std_output)
print("OUT2:", f._task._retrieval_task.std_output)



Expand Down

0 comments on commit 334590e

Please sign in to comment.