diff --git a/src/gmx/context.py b/src/gmx/context.py index 114bdcf8ad..e523f075df 100644 --- a/src/gmx/context.py +++ b/src/gmx/context.py @@ -17,6 +17,8 @@ from gmx import exceptions from gmx import logging from gmx import status +import gmx.core as gmxapi + # Module-level logger logger = logging.getLogger(__name__) @@ -71,7 +73,6 @@ def _md(context, element): Returns: A Director that the Context can use in launching the Session. """ - import gmx.core class Builder(object): """Translate md work element to a node in the session's DAG.""" def __init__(self, element): @@ -123,7 +124,7 @@ def launch(rank=None): # altered input. _, temp_filename = tempfile.mkstemp(suffix='.tpr') logger.debug('Updating input. Using temp file {}'.format(temp_filename)) - gmx.core.copy_tprfile(source=infile[rank], + gmxapi.copy_tprfile(source=infile[rank], destination=temp_filename, end_time=self.runtime_params['end_time']) tpr_file = temp_filename @@ -131,9 +132,9 @@ def launch(rank=None): tpr_file = infile[rank] logger.info('Loading TPR file: {}'.format(tpr_file)) - system = gmx.core.from_tpr(tpr_file) + system = gmxapi.from_tpr(tpr_file) dag.nodes[name]['system'] = system - mdargs = gmx.core.MDArgs() + mdargs = gmxapi.MDArgs() mdargs.set(self.runtime_params) # Workaround to give access to plugin potentials used in a context. pycontext = element.workspec._context @@ -664,7 +665,6 @@ def __init__(self, work=None, workdir_list=None, communicator=None): # self.__context_array = list([Context(work_element) for work_element in work]) from gmx.workflow import WorkSpec - import gmx.core # Until better Session abstraction exists at the Python level, a # _session_communicator attribute will be added to and removed from the @@ -723,7 +723,7 @@ def __init__(self, work=None, workdir_list=None, communicator=None): # This setter must be called after the operations map has been populated. self.work = work - self._api_object = gmx.core.Context() + self._api_object = gmxapi.Context() @property def work(self): @@ -996,7 +996,9 @@ def __enter__(self): if not runner is None: runners.append(runner) closers.append(graph.nodes[name]['close']) + # Get a session object to return. It must simply provide a `run()` function. + context = self # Part of workaround for bug gmxapi-214 class Session(object): def __init__(self, runners, closers): self.runners = list(runners) @@ -1012,22 +1014,33 @@ def run(self): to_be_deleted.insert(0, i) for i in to_be_deleted: del self.runners[i] + return True def close(self): for close in self.closers: logger.debug("Closing node: {}".format(close)) close() + # Workaround for bug gmxapi-214 + if not gmxapi.has_feature('0.0.7-bugfix-https://github.com/kassonlab/gmxapi/issues/214'): + context._api_object = gmxapi.Context() + self._session = Session(runners, closers) else: logger.info("Context rank {} has no work to do".format(self.rank)) + + context = self # Part of workaround for bug gmxapi-214 class NullSession(object): def run(self): logger.info("Running null session on rank {}.".format(self.rank)) return status.Status() def close(self): logger.info("Closing null session.") + # Workaround for bug gmxapi-214 + if not gmxapi.has_feature('0.0.7-bugfix-https://github.com/kassonlab/gmxapi/issues/214'): + context._api_object = gmxapi.Context() return + self._session = NullSession() self._session.rank = self.rank diff --git a/src/gmx/core/core.cpp b/src/gmx/core/core.cpp index 4033b45a45..0cb8d21e3c 100644 --- a/src/gmx/core/core.cpp +++ b/src/gmx/core/core.cpp @@ -10,6 +10,7 @@ #include "tprfile.h" #include "gmxapi/status.h" +#include "gmxapi/version.h" #include "pybind11/pybind11.h" @@ -69,6 +70,9 @@ PYBIND11_MODULE(core, m) { m.doc() = docstring; // Export core bindings + + m.def("has_feature", &gmxapi::Version::hasFeature, "Check the gmxapi library for a named feature."); + py::class_< ::gmxapi::Status > gmx_status(m, "Status", "Holds status for API operations."); diff --git a/src/gmx/test/test_pymd.py b/src/gmx/test/test_pymd.py index 72baed7f56..f10520e1a8 100644 --- a/src/gmx/test/test_pymd.py +++ b/src/gmx/test/test_pymd.py @@ -84,28 +84,99 @@ def test_simpleSimulation(caplog): md = gmx.workflow.from_tpr(tpr_filename, threads_per_rank=1) gmx.run(md) +@pytest.mark.usefixtures("cleandir") +@pytest.mark.filterwarnings("ignore:Using or importing the ABCs from 'collections'") +@pytest.mark.usefixtures("caplog") +def test_idempotence1(caplog): + """Confirm that a work graph can be run repeatedly, even after completed. + + Use gmx.run and avoid extra references held by user code. + """ + md = gmx.workflow.from_tpr(tpr_filename, threads_per_rank=1) + gmx.run(md) + gmx.run(md) + gmx.run(md) + md = gmx.workflow.from_tpr(tpr_filename, threads_per_rank=1) + gmx.run(md) + md = gmx.workflow.from_tpr(tpr_filename, threads_per_rank=1) + gmx.run(md) + +@pytest.mark.usefixtures("cleandir") +@pytest.mark.filterwarnings("ignore:Using or importing the ABCs from 'collections'") +@pytest.mark.usefixtures("caplog") +def test_idempotence2(caplog): + """Confirm that a work graph can be run repeatedly, even after completed. + + Interact with Context more directly. + Check that more unpredictable references held by user are still safe. + """ + md = gmx.workflow.from_tpr(tpr_filename, threads_per_rank=1) + with gmx.get_context(md) as session: + session.run() + + context = gmx.get_context(md) + with context as session: + session.run() + + context = gmx.context.Context() + context.work = md + with context as session: + session.run() + @pytest.mark.usefixtures("cleandir") def test_modifiedInput(caplog): """Load a work specification with a single TPR file and updated params.""" md = gmx.workflow.from_tpr(tpr_filename, threads_per_rank=1, end_time='0.02') - with gmx.context.ParallelArrayContext(md) as session: + context = gmx.get_context(md) + with context as session: + session.run() + md = gmx.workflow.from_tpr(tpr_filename, threads_per_rank=1, end_time='0.03') + context.work = md + with context as session: session.run() + md = gmx.workflow.from_tpr(tpr_filename, threads_per_rank=1, end_time='0.04') + gmx.run(md) @pytest.mark.usefixtures("cleandir") @pytest.mark.usefixtures("caplog") @withmpi_only -def test_array_context(caplog): +def test_plugin_no_ensemble(caplog): + # Test attachment of external code md = gmx.workflow.from_tpr(tpr_filename, threads_per_rank=1) - context = gmx.context.ParallelArrayContext(md) - with context as session: - session.run() + + # Create a WorkElement for the potential + #potential = gmx.core.TestModule() + potential_element = gmx.workflow.WorkElement(namespace="testing", operation="create_test") + potential_element.name = "test_module" + before = md.workspec.elements[md.name] + md.add_dependency(potential_element) + assert potential_element.name in md.workspec.elements + assert potential_element.workspec is md.workspec + after = md.workspec.elements[md.name] + assert not before is after + + # Workaround for https://github.com/kassonlab/gmxapi/issues/42 + # We can't add an operation to a context that doesn't exist yet, but we can't + # add a work graph with an operation that is not defined in a context. + context = gmx.get_context() + context.add_operation(potential_element.namespace, potential_element.operation, my_plugin) + context.work = md + + with warnings.catch_warnings(): + # Swallow warning about wide MPI context + warnings.simplefilter("ignore") + with context as session: + if context.rank == 0: + print(context.work) + session.run() + @pytest.mark.usefixtures("cleandir") @pytest.mark.usefixtures("caplog") @withmpi_only -def test_plugin(caplog): - # Test attachment of external code - md = gmx.workflow.from_tpr(tpr_filename, threads_per_rank=1) +def test_plugin_with_ensemble(caplog): + # Test in ensemble. + md = gmx.workflow.from_tpr([tpr_filename, tpr_filename], threads_per_rank=1) # Create a WorkElement for the potential #potential = gmx.core.TestModule() @@ -118,7 +189,10 @@ def test_plugin(caplog): after = md.workspec.elements[md.name] assert not before is after - context = gmx.context.ParallelArrayContext() + # Workaround for https://github.com/kassonlab/gmxapi/issues/42 + # We can't add an operation to a context that doesn't exist yet, but we can't + # add a work graph with an operation that is not defined in a context. + context = gmx.get_context() context.add_operation(potential_element.namespace, potential_element.operation, my_plugin) context.work = md diff --git a/src/gmx/workflow.py b/src/gmx/workflow.py index 0703f75a4f..693c94e2a1 100644 --- a/src/gmx/workflow.py +++ b/src/gmx/workflow.py @@ -31,6 +31,7 @@ from __future__ import unicode_literals import warnings +import weakref from gmx import exceptions from gmx import logging @@ -124,7 +125,20 @@ class WorkSpec(object): def __init__(self): self.version = workspec_version self.elements = dict() - self._context = None + self.__context_weak_ref = None + + @property + def _context(self): + referent = None + if self.__context_weak_ref is not None: + referent = self.__context_weak_ref() + return referent + + @_context.setter + def _context(self, context): + # We're moving towards having the context own the work, so the work should + # not own the context. + self.__context_weak_ref = weakref.ref(context) def _chase_deps(self, source_set, name_list): """Helper to recursively generate dependencies before dependents.