From 51820671f0e4316f9fd3220adfc2ee664bc7d37a Mon Sep 17 00:00:00 2001 From: Elliot Gunton Date: Fri, 10 Jan 2025 13:00:46 +0000 Subject: [PATCH] Add `schedules`, `stop_strategy` and `when` to `CronWorkflow` (#1294) Manual changes required for #1215 --------- Signed-off-by: Elliot Gunton --- docs/examples/workflows-examples.md | 2 - .../misc/cron_workflow_stop_strategy.md | 67 +++++++++++++++++++ docs/examples/workflows/upstream/cron_when.md | 54 +++++++++++++++ .../cron_workflow_multiple_schedules.md | 66 ++++++++++++++++++ .../misc/cron-workflow-stop-strategy.yaml | 26 +++++++ .../misc/cron_workflow_stop_strategy.py | 24 +++++++ examples/workflows/upstream/cron-when.yaml | 19 ++++++ .../cron-workflow-multiple-schedules.yaml | 24 +++++++ examples/workflows/upstream/cron_when.py | 15 +++++ .../cron_workflow_multiple_schedules.py | 22 ++++++ src/hera/workflows/cron_workflow.py | 11 +-- tests/test_remaining_examples.py | 8 --- 12 files changed, 324 insertions(+), 14 deletions(-) create mode 100644 docs/examples/workflows/misc/cron_workflow_stop_strategy.md create mode 100644 docs/examples/workflows/upstream/cron_when.md create mode 100644 docs/examples/workflows/upstream/cron_workflow_multiple_schedules.md create mode 100644 examples/workflows/misc/cron-workflow-stop-strategy.yaml create mode 100644 examples/workflows/misc/cron_workflow_stop_strategy.py create mode 100644 examples/workflows/upstream/cron-when.yaml create mode 100644 examples/workflows/upstream/cron-workflow-multiple-schedules.yaml create mode 100644 examples/workflows/upstream/cron_when.py create mode 100644 examples/workflows/upstream/cron_workflow_multiple_schedules.py diff --git a/docs/examples/workflows-examples.md b/docs/examples/workflows-examples.md index 186aff102..996449e12 100644 --- a/docs/examples/workflows-examples.md +++ b/docs/examples/workflows-examples.md @@ -32,8 +32,6 @@ Explore the examples through the side bar! | [conditionals-complex](https://github.com/argoproj/argo-workflows/blob/main/examples/conditionals-complex.yaml) | | [configmaps/simple-parameters-configmap](https://github.com/argoproj/argo-workflows/blob/main/examples/configmaps/simple-parameters-configmap.yaml) | | [cron-backfill](https://github.com/argoproj/argo-workflows/blob/main/examples/cron-backfill.yaml) | -| [cron-when](https://github.com/argoproj/argo-workflows/blob/main/examples/cron-when.yaml) | -| [cron-workflow-multiple-schedules](https://github.com/argoproj/argo-workflows/blob/main/examples/cron-workflow-multiple-schedules.yaml) | | [daemon-step](https://github.com/argoproj/argo-workflows/blob/main/examples/daemon-step.yaml) | | [daemoned-stateful-set-with-service](https://github.com/argoproj/argo-workflows/blob/main/examples/daemoned-stateful-set-with-service.yaml) | | [dag-coinflip](https://github.com/argoproj/argo-workflows/blob/main/examples/dag-coinflip.yaml) | diff --git a/docs/examples/workflows/misc/cron_workflow_stop_strategy.md b/docs/examples/workflows/misc/cron_workflow_stop_strategy.md new file mode 100644 index 000000000..ba8aebfad --- /dev/null +++ b/docs/examples/workflows/misc/cron_workflow_stop_strategy.md @@ -0,0 +1,67 @@ +# Cron Workflow Stop Strategy + + + + + + +=== "Hera" + + ```python linenums="1" + from hera.workflows import Container, CronWorkflow + from hera.workflows.models import StopStrategy + + with CronWorkflow( + name="hello-world-multiple-schedules", + entrypoint="whalesay", + schedules=[ + "*/3 * * * *", + "*/2 * * * *", + ], + stop_strategy=StopStrategy(expression="cronworkflow.failed >= 3"), + timezone="America/Los_Angeles", + starting_deadline_seconds=0, + concurrency_policy="Replace", + successful_jobs_history_limit=4, + failed_jobs_history_limit=4, + cron_suspend=False, + ) as w: + Container( + name="whalesay", + image="docker/whalesay:latest", + command=["cowsay"], + args=["🕓 hello world. Scheduled on: {{workflow.scheduledTime}}"], + ) + ``` + +=== "YAML" + + ```yaml linenums="1" + apiVersion: argoproj.io/v1alpha1 + kind: CronWorkflow + metadata: + name: hello-world-multiple-schedules + spec: + concurrencyPolicy: Replace + failedJobsHistoryLimit: 4 + schedules: + - '*/3 * * * *' + - '*/2 * * * *' + startingDeadlineSeconds: 0 + stopStrategy: + expression: cronworkflow.failed >= 3 + successfulJobsHistoryLimit: 4 + suspend: false + timezone: America/Los_Angeles + workflowSpec: + entrypoint: whalesay + templates: + - container: + args: + - "\U0001F553 hello world. Scheduled on: {{workflow.scheduledTime}}" + command: + - cowsay + image: docker/whalesay:latest + name: whalesay + ``` + diff --git a/docs/examples/workflows/upstream/cron_when.md b/docs/examples/workflows/upstream/cron_when.md new file mode 100644 index 000000000..0698e164d --- /dev/null +++ b/docs/examples/workflows/upstream/cron_when.md @@ -0,0 +1,54 @@ +# Cron When + +## Note + +This example is a replication of an Argo Workflow example in Hera. +The upstream example can be [found here](https://github.com/argoproj/argo-workflows/blob/main/examples/cron-when.yaml). + + + + +=== "Hera" + + ```python linenums="1" + from hera.workflows import Container, CronWorkflow + + with CronWorkflow( + name="sleep-when", + entrypoint="sleep-busybox", + schedule="* * * * *", + concurrency_policy="Allow", + when="{{= cronworkflow.lastScheduledTime == nil || (now() - cronworkflow.lastScheduledTime).Seconds() > 360 }}", + ) as w: + print_message = Container( + name="sleep-busybox", + image="busybox", + command=["sleep"], + args=["10"], + ) + ``` + +=== "YAML" + + ```yaml linenums="1" + apiVersion: argoproj.io/v1alpha1 + kind: CronWorkflow + metadata: + name: sleep-when + spec: + concurrencyPolicy: Allow + schedule: '* * * * *' + when: '{{= cronworkflow.lastScheduledTime == nil || (now() - cronworkflow.lastScheduledTime).Seconds() + > 360 }}' + workflowSpec: + entrypoint: sleep-busybox + templates: + - container: + args: + - '10' + command: + - sleep + image: busybox + name: sleep-busybox + ``` + diff --git a/docs/examples/workflows/upstream/cron_workflow_multiple_schedules.md b/docs/examples/workflows/upstream/cron_workflow_multiple_schedules.md new file mode 100644 index 000000000..f4e057637 --- /dev/null +++ b/docs/examples/workflows/upstream/cron_workflow_multiple_schedules.md @@ -0,0 +1,66 @@ +# Cron Workflow Multiple Schedules + +## Note + +This example is a replication of an Argo Workflow example in Hera. +The upstream example can be [found here](https://github.com/argoproj/argo-workflows/blob/main/examples/cron-workflow-multiple-schedules.yaml). + + + + +=== "Hera" + + ```python linenums="1" + from hera.workflows import Container, CronWorkflow + + with CronWorkflow( + name="hello-world-multiple-schedules", + entrypoint="whalesay", + schedules=[ + "*/3 * * * *", + "*/2 * * * *", + ], + timezone="America/Los_Angeles", + starting_deadline_seconds=0, + concurrency_policy="Replace", + successful_jobs_history_limit=4, + failed_jobs_history_limit=4, + cron_suspend=False, + ) as w: + Container( + name="whalesay", + image="docker/whalesay:latest", + command=["cowsay"], + args=["🕓 hello world. Scheduled on: {{workflow.scheduledTime}}"], + ) + ``` + +=== "YAML" + + ```yaml linenums="1" + apiVersion: argoproj.io/v1alpha1 + kind: CronWorkflow + metadata: + name: hello-world-multiple-schedules + spec: + concurrencyPolicy: Replace + failedJobsHistoryLimit: 4 + schedules: + - '*/3 * * * *' + - '*/2 * * * *' + startingDeadlineSeconds: 0 + successfulJobsHistoryLimit: 4 + suspend: false + timezone: America/Los_Angeles + workflowSpec: + entrypoint: whalesay + templates: + - container: + args: + - "\U0001F553 hello world. Scheduled on: {{workflow.scheduledTime}}" + command: + - cowsay + image: docker/whalesay:latest + name: whalesay + ``` + diff --git a/examples/workflows/misc/cron-workflow-stop-strategy.yaml b/examples/workflows/misc/cron-workflow-stop-strategy.yaml new file mode 100644 index 000000000..d12596e3e --- /dev/null +++ b/examples/workflows/misc/cron-workflow-stop-strategy.yaml @@ -0,0 +1,26 @@ +apiVersion: argoproj.io/v1alpha1 +kind: CronWorkflow +metadata: + name: hello-world-multiple-schedules +spec: + concurrencyPolicy: Replace + failedJobsHistoryLimit: 4 + schedules: + - '*/3 * * * *' + - '*/2 * * * *' + startingDeadlineSeconds: 0 + stopStrategy: + expression: cronworkflow.failed >= 3 + successfulJobsHistoryLimit: 4 + suspend: false + timezone: America/Los_Angeles + workflowSpec: + entrypoint: whalesay + templates: + - container: + args: + - "\U0001F553 hello world. Scheduled on: {{workflow.scheduledTime}}" + command: + - cowsay + image: docker/whalesay:latest + name: whalesay diff --git a/examples/workflows/misc/cron_workflow_stop_strategy.py b/examples/workflows/misc/cron_workflow_stop_strategy.py new file mode 100644 index 000000000..643f74870 --- /dev/null +++ b/examples/workflows/misc/cron_workflow_stop_strategy.py @@ -0,0 +1,24 @@ +from hera.workflows import Container, CronWorkflow +from hera.workflows.models import StopStrategy + +with CronWorkflow( + name="hello-world-multiple-schedules", + entrypoint="whalesay", + schedules=[ + "*/3 * * * *", + "*/2 * * * *", + ], + stop_strategy=StopStrategy(expression="cronworkflow.failed >= 3"), + timezone="America/Los_Angeles", + starting_deadline_seconds=0, + concurrency_policy="Replace", + successful_jobs_history_limit=4, + failed_jobs_history_limit=4, + cron_suspend=False, +) as w: + Container( + name="whalesay", + image="docker/whalesay:latest", + command=["cowsay"], + args=["🕓 hello world. Scheduled on: {{workflow.scheduledTime}}"], + ) diff --git a/examples/workflows/upstream/cron-when.yaml b/examples/workflows/upstream/cron-when.yaml new file mode 100644 index 000000000..28a9f9b6c --- /dev/null +++ b/examples/workflows/upstream/cron-when.yaml @@ -0,0 +1,19 @@ +apiVersion: argoproj.io/v1alpha1 +kind: CronWorkflow +metadata: + name: sleep-when +spec: + concurrencyPolicy: Allow + schedule: '* * * * *' + when: '{{= cronworkflow.lastScheduledTime == nil || (now() - cronworkflow.lastScheduledTime).Seconds() + > 360 }}' + workflowSpec: + entrypoint: sleep-busybox + templates: + - container: + args: + - '10' + command: + - sleep + image: busybox + name: sleep-busybox diff --git a/examples/workflows/upstream/cron-workflow-multiple-schedules.yaml b/examples/workflows/upstream/cron-workflow-multiple-schedules.yaml new file mode 100644 index 000000000..ca66b4f14 --- /dev/null +++ b/examples/workflows/upstream/cron-workflow-multiple-schedules.yaml @@ -0,0 +1,24 @@ +apiVersion: argoproj.io/v1alpha1 +kind: CronWorkflow +metadata: + name: hello-world-multiple-schedules +spec: + concurrencyPolicy: Replace + failedJobsHistoryLimit: 4 + schedules: + - '*/3 * * * *' + - '*/2 * * * *' + startingDeadlineSeconds: 0 + successfulJobsHistoryLimit: 4 + suspend: false + timezone: America/Los_Angeles + workflowSpec: + entrypoint: whalesay + templates: + - container: + args: + - "\U0001F553 hello world. Scheduled on: {{workflow.scheduledTime}}" + command: + - cowsay + image: docker/whalesay:latest + name: whalesay diff --git a/examples/workflows/upstream/cron_when.py b/examples/workflows/upstream/cron_when.py new file mode 100644 index 000000000..f51fbf30c --- /dev/null +++ b/examples/workflows/upstream/cron_when.py @@ -0,0 +1,15 @@ +from hera.workflows import Container, CronWorkflow + +with CronWorkflow( + name="sleep-when", + entrypoint="sleep-busybox", + schedule="* * * * *", + concurrency_policy="Allow", + when="{{= cronworkflow.lastScheduledTime == nil || (now() - cronworkflow.lastScheduledTime).Seconds() > 360 }}", +) as w: + print_message = Container( + name="sleep-busybox", + image="busybox", + command=["sleep"], + args=["10"], + ) diff --git a/examples/workflows/upstream/cron_workflow_multiple_schedules.py b/examples/workflows/upstream/cron_workflow_multiple_schedules.py new file mode 100644 index 000000000..22f7b0d1c --- /dev/null +++ b/examples/workflows/upstream/cron_workflow_multiple_schedules.py @@ -0,0 +1,22 @@ +from hera.workflows import Container, CronWorkflow + +with CronWorkflow( + name="hello-world-multiple-schedules", + entrypoint="whalesay", + schedules=[ + "*/3 * * * *", + "*/2 * * * *", + ], + timezone="America/Los_Angeles", + starting_deadline_seconds=0, + concurrency_policy="Replace", + successful_jobs_history_limit=4, + failed_jobs_history_limit=4, + cron_suspend=False, +) as w: + Container( + name="whalesay", + image="docker/whalesay:latest", + command=["cowsay"], + args=["🕓 hello world. Scheduled on: {{workflow.scheduledTime}}"], + ) diff --git a/src/hera/workflows/cron_workflow.py b/src/hera/workflows/cron_workflow.py index 4c189c500..a29bf5293 100644 --- a/src/hera/workflows/cron_workflow.py +++ b/src/hera/workflows/cron_workflow.py @@ -5,7 +5,7 @@ """ from pathlib import Path -from typing import Annotated, Dict, Optional, Type, Union, cast +from typing import Annotated, Dict, List, Optional, Type, Union, cast from hera.exceptions import NotFound from hera.shared._pydantic import BaseModel @@ -21,6 +21,7 @@ CronWorkflowSpec, CronWorkflowStatus, LintCronWorkflowRequest, + StopStrategy, UpdateCronWorkflowRequest, Workflow as _ModelWorkflow, ) @@ -76,15 +77,18 @@ class CronWorkflow(Workflow): failed_jobs_history_limit: Annotated[Optional[int], _CronWorkflowModelMapper("spec.failed_jobs_history_limit")] = ( None ) - schedule: Annotated[str, _CronWorkflowModelMapper("spec.schedule")] + schedule: Annotated[Optional[str], _CronWorkflowModelMapper("spec.schedule")] = None + schedules: Annotated[Optional[List[str]], _CronWorkflowModelMapper("spec.schedules")] = None starting_deadline_seconds: Annotated[Optional[int], _CronWorkflowModelMapper("spec.starting_deadline_seconds")] = ( None ) + stop_strategy: Annotated[Optional[StopStrategy], _CronWorkflowModelMapper("spec.stop_strategy")] successful_jobs_history_limit: Annotated[ Optional[int], _CronWorkflowModelMapper("spec.successful_jobs_history_limit") ] = None cron_suspend: Annotated[Optional[bool], _CronWorkflowModelMapper("spec.suspend")] = None timezone: Annotated[Optional[str], _CronWorkflowModelMapper("spec.timezone")] = None + when: Annotated[Optional[str], _CronWorkflowModelMapper("spec.when")] = None cron_status: Annotated[Optional[CronWorkflowStatus], _CronWorkflowModelMapper("status")] = None def create(self) -> TWorkflow: # type: ignore @@ -149,7 +153,6 @@ def build(self) -> TWorkflow: model_cron_workflow = _ModelCronWorkflow( metadata=model_workflow.metadata, spec=CronWorkflowSpec( - schedule=self.schedule, workflow_spec=model_workflow.spec, ), ) @@ -160,7 +163,7 @@ def build(self) -> TWorkflow: def _from_model(cls, model: BaseModel) -> ModelMapperMixin: """Parse from given model to cls's type.""" assert isinstance(model, _ModelCronWorkflow) - hera_cron_workflow = cls(schedule="") + hera_cron_workflow = cls() for attr, annotation in cls._get_all_annotations().items(): if mappers := get_annotated_metadata(annotation, ModelMapperMixin.ModelMapper): diff --git a/tests/test_remaining_examples.py b/tests/test_remaining_examples.py index 1c4bd0735..c4972189f 100644 --- a/tests/test_remaining_examples.py +++ b/tests/test_remaining_examples.py @@ -20,16 +20,8 @@ "cluster-workflow-template__clustertemplates.upstream.yaml", "cron-backfill.upstream.yaml", "memoize-simple.upstream.yaml", - "pod-gc-strategy-with-label-selector.upstream.yaml", - "pod-gc-strategy.upstream.yaml", "webhdfs-input-output-artifacts.upstream.yaml", "workflow-template__templates.upstream.yaml", - "synchronization-wf-level.upstream.yaml", - "synchronization-mutex-tmpl-level.upstream.yaml", - "synchronization-mutex-wf-level.upstream.yaml", - "synchronization-tmpl-level.upstream.yaml", - "cron-when.upstream.yaml", - "cron-workflow-multiple-schedules.upstream.yaml", ]