Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

data store: dont update outputs incrementally #6403

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2420,23 +2420,27 @@ def delta_task_output(
objects from the workflow task pool.

"""
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(itask.tokens)
if not tproxy:
return
outputs = itask.state.outputs
label = outputs.get_trigger(message)
# update task instance
update_time = time()
tp_delta = self.updated[TASK_PROXIES].setdefault(
tp_id, PbTaskProxy(id=tp_id))
tp_delta.stamp = f'{tp_id}@{update_time}'
output = tp_delta.outputs[label]
output.label = label
output.message = message
output.satisfied = outputs.is_message_complete(message)
output.time = update_time
self.updates_pending = True
# TODO: Restore incremental update when we have a protocol to do so
# https://github.com/cylc/cylc-flow/issues/6307
return self.delta_task_outputs(itask)

# tproxy: Optional[PbTaskProxy]
# tp_id, tproxy = self.store_node_fetcher(itask.tokens)
# if not tproxy:
# return
# outputs = itask.state.outputs
# label = outputs.get_trigger(message)
# # update task instance
# update_time = time()
# tp_delta = self.updated[TASK_PROXIES].setdefault(
# tp_id, PbTaskProxy(id=tp_id))
# tp_delta.stamp = f'{tp_id}@{update_time}'
# output = tp_delta.outputs[label]
# output.label = label
# output.message = message
# output.satisfied = outputs.is_message_complete(message)
# output.time = update_time
# self.updates_pending = True

def delta_task_outputs(self, itask: TaskProxy) -> None:
oliver-sanders marked this conversation as resolved.
Show resolved Hide resolved
"""Create delta for change in all task proxy outputs.
Expand Down
116 changes: 113 additions & 3 deletions tests/integration/test_data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from typing import Iterable, List, cast
from typing import Iterable, List, TYPE_CHECKING, cast

import pytest

Expand All @@ -29,6 +29,12 @@
)
from cylc.flow.id import Tokens
from cylc.flow.scheduler import Scheduler
from cylc.flow.task_events_mgr import TaskEventsManager
from cylc.flow.task_outputs import (
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_STARTED,
TASK_OUTPUT_SUCCEEDED,
)
from cylc.flow.task_state import (
TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED,
Expand All @@ -37,8 +43,13 @@
from cylc.flow.wallclock import get_current_time_string


# NOTE: These tests mutate the data store, so running them in isolation may
# see failures when they actually pass if you run the whole file
if TYPE_CHECKING:
from cylc.flow.scheduler import Scheduler


# NOTE: Some of these tests mutate the data store, so running them in
# isolation may see failures when they actually pass if you run the
# whole file


def job_config(schd):
Expand Down Expand Up @@ -392,3 +403,102 @@ async def test_flow_numbers(flow, scheduler, start):
ds_task = schd.data_store_mgr.get_data_elements(TASK_PROXIES).added[1]
assert ds_task.name == 'b'
assert ds_task.flow_nums == '[2]'


async def test_delta_task_outputs(one: 'Scheduler', start):
"""Ensure task outputs are inserted into the store.

Note: Task outputs should *not* be updated incrementally until we have
a protocol for doing so, see https://github.com/cylc/cylc-flow/pull/6403
"""

def get_data_outputs():
"""Return satisfied outputs from the *data* store."""
nonlocal one, itask
return {
output.label
for output in one.data_store_mgr.data[one.id][TASK_PROXIES][
itask.tokens.id
].outputs.values()
if output.satisfied
}

def get_delta_outputs():
"""Return satisfied outputs from the *delta* store.

Or return None if there's nothing there.
"""
nonlocal one, itask
try:
return {
output.label
for output in one.data_store_mgr.updated[TASK_PROXIES][
itask.tokens.id
].outputs.values()
if output.satisfied
}
except KeyError:
return None

def _patch_remove(*args, **kwargs):
"""Prevent the task/workflow from completing."""
pass

async with start(one):
one.pool.remove = _patch_remove

# create a job submission
itask = one.pool.get_tasks()[0]
assert itask
itask.submit_num += 1
one.data_store_mgr.insert_job(
itask.tdef.name, itask.point, itask.state.status, {'submit_num': 1}
)
await one.update_data_structure()

# satisfy the submitted & started outputs
# (note started implies submitted)
one.task_events_mgr.process_message(
itask, 'INFO', TaskEventsManager.EVENT_STARTED
)

# the delta should be populated with the newly satisfied outputs
assert get_data_outputs() == set()
assert get_delta_outputs() == {
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_STARTED,
}

# the delta should be applied to the store
await one.update_data_structure()
assert get_data_outputs() == {
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_STARTED,
}
assert get_delta_outputs() is None

# satisfy the succeeded output
one.task_events_mgr.process_message(
itask, 'INFO', TaskEventsManager.EVENT_SUCCEEDED
)

# the delta should be populated with ALL satisfied outputs
# (not just the newly satisfied output)
assert get_data_outputs() == {
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_STARTED,
}
assert get_delta_outputs() == {
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_STARTED,
TASK_OUTPUT_SUCCEEDED,
}

# the delta should be applied to the store
await one.update_data_structure()
assert get_data_outputs() == {
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_STARTED,
TASK_OUTPUT_SUCCEEDED,
}
assert get_delta_outputs() is None
Loading