Skip to content

Commit

Permalink
Add schedules, stop_strategy and when to CronWorkflow (#1294)
Browse files Browse the repository at this point in the history
Manual changes required for #1215

---------

Signed-off-by: Elliot Gunton <[email protected]>
  • Loading branch information
elliotgunton authored Jan 10, 2025
1 parent 88e7b68 commit 5182067
Show file tree
Hide file tree
Showing 12 changed files with 324 additions and 14 deletions.
2 changes: 0 additions & 2 deletions docs/examples/workflows-examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ Explore the examples through the side bar!
| [conditionals-complex](https://github.com/argoproj/argo-workflows/blob/main/examples/conditionals-complex.yaml) |
| [configmaps/simple-parameters-configmap](https://github.com/argoproj/argo-workflows/blob/main/examples/configmaps/simple-parameters-configmap.yaml) |
| [cron-backfill](https://github.com/argoproj/argo-workflows/blob/main/examples/cron-backfill.yaml) |
| [cron-when](https://github.com/argoproj/argo-workflows/blob/main/examples/cron-when.yaml) |
| [cron-workflow-multiple-schedules](https://github.com/argoproj/argo-workflows/blob/main/examples/cron-workflow-multiple-schedules.yaml) |
| [daemon-step](https://github.com/argoproj/argo-workflows/blob/main/examples/daemon-step.yaml) |
| [daemoned-stateful-set-with-service](https://github.com/argoproj/argo-workflows/blob/main/examples/daemoned-stateful-set-with-service.yaml) |
| [dag-coinflip](https://github.com/argoproj/argo-workflows/blob/main/examples/dag-coinflip.yaml) |
Expand Down
67 changes: 67 additions & 0 deletions docs/examples/workflows/misc/cron_workflow_stop_strategy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Cron Workflow Stop Strategy






=== "Hera"

```python linenums="1"
from hera.workflows import Container, CronWorkflow
from hera.workflows.models import StopStrategy

with CronWorkflow(
name="hello-world-multiple-schedules",
entrypoint="whalesay",
schedules=[
"*/3 * * * *",
"*/2 * * * *",
],
stop_strategy=StopStrategy(expression="cronworkflow.failed >= 3"),
timezone="America/Los_Angeles",
starting_deadline_seconds=0,
concurrency_policy="Replace",
successful_jobs_history_limit=4,
failed_jobs_history_limit=4,
cron_suspend=False,
) as w:
Container(
name="whalesay",
image="docker/whalesay:latest",
command=["cowsay"],
args=["🕓 hello world. Scheduled on: {{workflow.scheduledTime}}"],
)
```

=== "YAML"

```yaml linenums="1"
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: hello-world-multiple-schedules
spec:
concurrencyPolicy: Replace
failedJobsHistoryLimit: 4
schedules:
- '*/3 * * * *'
- '*/2 * * * *'
startingDeadlineSeconds: 0
stopStrategy:
expression: cronworkflow.failed >= 3
successfulJobsHistoryLimit: 4
suspend: false
timezone: America/Los_Angeles
workflowSpec:
entrypoint: whalesay
templates:
- container:
args:
- "\U0001F553 hello world. Scheduled on: {{workflow.scheduledTime}}"
command:
- cowsay
image: docker/whalesay:latest
name: whalesay
```

54 changes: 54 additions & 0 deletions docs/examples/workflows/upstream/cron_when.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Cron When

## Note

This example is a replication of an Argo Workflow example in Hera.
The upstream example can be [found here](https://github.com/argoproj/argo-workflows/blob/main/examples/cron-when.yaml).




=== "Hera"

```python linenums="1"
from hera.workflows import Container, CronWorkflow

with CronWorkflow(
name="sleep-when",
entrypoint="sleep-busybox",
schedule="* * * * *",
concurrency_policy="Allow",
when="{{= cronworkflow.lastScheduledTime == nil || (now() - cronworkflow.lastScheduledTime).Seconds() > 360 }}",
) as w:
print_message = Container(
name="sleep-busybox",
image="busybox",
command=["sleep"],
args=["10"],
)
```

=== "YAML"

```yaml linenums="1"
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: sleep-when
spec:
concurrencyPolicy: Allow
schedule: '* * * * *'
when: '{{= cronworkflow.lastScheduledTime == nil || (now() - cronworkflow.lastScheduledTime).Seconds()
> 360 }}'
workflowSpec:
entrypoint: sleep-busybox
templates:
- container:
args:
- '10'
command:
- sleep
image: busybox
name: sleep-busybox
```

Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Cron Workflow Multiple Schedules

## Note

This example is a replication of an Argo Workflow example in Hera.
The upstream example can be [found here](https://github.com/argoproj/argo-workflows/blob/main/examples/cron-workflow-multiple-schedules.yaml).




=== "Hera"

```python linenums="1"
from hera.workflows import Container, CronWorkflow

with CronWorkflow(
name="hello-world-multiple-schedules",
entrypoint="whalesay",
schedules=[
"*/3 * * * *",
"*/2 * * * *",
],
timezone="America/Los_Angeles",
starting_deadline_seconds=0,
concurrency_policy="Replace",
successful_jobs_history_limit=4,
failed_jobs_history_limit=4,
cron_suspend=False,
) as w:
Container(
name="whalesay",
image="docker/whalesay:latest",
command=["cowsay"],
args=["🕓 hello world. Scheduled on: {{workflow.scheduledTime}}"],
)
```

=== "YAML"

```yaml linenums="1"
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: hello-world-multiple-schedules
spec:
concurrencyPolicy: Replace
failedJobsHistoryLimit: 4
schedules:
- '*/3 * * * *'
- '*/2 * * * *'
startingDeadlineSeconds: 0
successfulJobsHistoryLimit: 4
suspend: false
timezone: America/Los_Angeles
workflowSpec:
entrypoint: whalesay
templates:
- container:
args:
- "\U0001F553 hello world. Scheduled on: {{workflow.scheduledTime}}"
command:
- cowsay
image: docker/whalesay:latest
name: whalesay
```

26 changes: 26 additions & 0 deletions examples/workflows/misc/cron-workflow-stop-strategy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: hello-world-multiple-schedules
spec:
concurrencyPolicy: Replace
failedJobsHistoryLimit: 4
schedules:
- '*/3 * * * *'
- '*/2 * * * *'
startingDeadlineSeconds: 0
stopStrategy:
expression: cronworkflow.failed >= 3
successfulJobsHistoryLimit: 4
suspend: false
timezone: America/Los_Angeles
workflowSpec:
entrypoint: whalesay
templates:
- container:
args:
- "\U0001F553 hello world. Scheduled on: {{workflow.scheduledTime}}"
command:
- cowsay
image: docker/whalesay:latest
name: whalesay
24 changes: 24 additions & 0 deletions examples/workflows/misc/cron_workflow_stop_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from hera.workflows import Container, CronWorkflow
from hera.workflows.models import StopStrategy

with CronWorkflow(
name="hello-world-multiple-schedules",
entrypoint="whalesay",
schedules=[
"*/3 * * * *",
"*/2 * * * *",
],
stop_strategy=StopStrategy(expression="cronworkflow.failed >= 3"),
timezone="America/Los_Angeles",
starting_deadline_seconds=0,
concurrency_policy="Replace",
successful_jobs_history_limit=4,
failed_jobs_history_limit=4,
cron_suspend=False,
) as w:
Container(
name="whalesay",
image="docker/whalesay:latest",
command=["cowsay"],
args=["🕓 hello world. Scheduled on: {{workflow.scheduledTime}}"],
)
19 changes: 19 additions & 0 deletions examples/workflows/upstream/cron-when.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: sleep-when
spec:
concurrencyPolicy: Allow
schedule: '* * * * *'
when: '{{= cronworkflow.lastScheduledTime == nil || (now() - cronworkflow.lastScheduledTime).Seconds()
> 360 }}'
workflowSpec:
entrypoint: sleep-busybox
templates:
- container:
args:
- '10'
command:
- sleep
image: busybox
name: sleep-busybox
24 changes: 24 additions & 0 deletions examples/workflows/upstream/cron-workflow-multiple-schedules.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: hello-world-multiple-schedules
spec:
concurrencyPolicy: Replace
failedJobsHistoryLimit: 4
schedules:
- '*/3 * * * *'
- '*/2 * * * *'
startingDeadlineSeconds: 0
successfulJobsHistoryLimit: 4
suspend: false
timezone: America/Los_Angeles
workflowSpec:
entrypoint: whalesay
templates:
- container:
args:
- "\U0001F553 hello world. Scheduled on: {{workflow.scheduledTime}}"
command:
- cowsay
image: docker/whalesay:latest
name: whalesay
15 changes: 15 additions & 0 deletions examples/workflows/upstream/cron_when.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from hera.workflows import Container, CronWorkflow

with CronWorkflow(
name="sleep-when",
entrypoint="sleep-busybox",
schedule="* * * * *",
concurrency_policy="Allow",
when="{{= cronworkflow.lastScheduledTime == nil || (now() - cronworkflow.lastScheduledTime).Seconds() > 360 }}",
) as w:
print_message = Container(
name="sleep-busybox",
image="busybox",
command=["sleep"],
args=["10"],
)
22 changes: 22 additions & 0 deletions examples/workflows/upstream/cron_workflow_multiple_schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from hera.workflows import Container, CronWorkflow

with CronWorkflow(
name="hello-world-multiple-schedules",
entrypoint="whalesay",
schedules=[
"*/3 * * * *",
"*/2 * * * *",
],
timezone="America/Los_Angeles",
starting_deadline_seconds=0,
concurrency_policy="Replace",
successful_jobs_history_limit=4,
failed_jobs_history_limit=4,
cron_suspend=False,
) as w:
Container(
name="whalesay",
image="docker/whalesay:latest",
command=["cowsay"],
args=["🕓 hello world. Scheduled on: {{workflow.scheduledTime}}"],
)
11 changes: 7 additions & 4 deletions src/hera/workflows/cron_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"""

from pathlib import Path
from typing import Annotated, Dict, Optional, Type, Union, cast
from typing import Annotated, Dict, List, Optional, Type, Union, cast

from hera.exceptions import NotFound
from hera.shared._pydantic import BaseModel
Expand All @@ -21,6 +21,7 @@
CronWorkflowSpec,
CronWorkflowStatus,
LintCronWorkflowRequest,
StopStrategy,
UpdateCronWorkflowRequest,
Workflow as _ModelWorkflow,
)
Expand Down Expand Up @@ -76,15 +77,18 @@ class CronWorkflow(Workflow):
failed_jobs_history_limit: Annotated[Optional[int], _CronWorkflowModelMapper("spec.failed_jobs_history_limit")] = (
None
)
schedule: Annotated[str, _CronWorkflowModelMapper("spec.schedule")]
schedule: Annotated[Optional[str], _CronWorkflowModelMapper("spec.schedule")] = None
schedules: Annotated[Optional[List[str]], _CronWorkflowModelMapper("spec.schedules")] = None
starting_deadline_seconds: Annotated[Optional[int], _CronWorkflowModelMapper("spec.starting_deadline_seconds")] = (
None
)
stop_strategy: Annotated[Optional[StopStrategy], _CronWorkflowModelMapper("spec.stop_strategy")]
successful_jobs_history_limit: Annotated[
Optional[int], _CronWorkflowModelMapper("spec.successful_jobs_history_limit")
] = None
cron_suspend: Annotated[Optional[bool], _CronWorkflowModelMapper("spec.suspend")] = None
timezone: Annotated[Optional[str], _CronWorkflowModelMapper("spec.timezone")] = None
when: Annotated[Optional[str], _CronWorkflowModelMapper("spec.when")] = None
cron_status: Annotated[Optional[CronWorkflowStatus], _CronWorkflowModelMapper("status")] = None

def create(self) -> TWorkflow: # type: ignore
Expand Down Expand Up @@ -149,7 +153,6 @@ def build(self) -> TWorkflow:
model_cron_workflow = _ModelCronWorkflow(
metadata=model_workflow.metadata,
spec=CronWorkflowSpec(
schedule=self.schedule,
workflow_spec=model_workflow.spec,
),
)
Expand All @@ -160,7 +163,7 @@ def build(self) -> TWorkflow:
def _from_model(cls, model: BaseModel) -> ModelMapperMixin:
"""Parse from given model to cls's type."""
assert isinstance(model, _ModelCronWorkflow)
hera_cron_workflow = cls(schedule="")
hera_cron_workflow = cls()

for attr, annotation in cls._get_all_annotations().items():
if mappers := get_annotated_metadata(annotation, ModelMapperMixin.ModelMapper):
Expand Down
Loading

0 comments on commit 5182067

Please sign in to comment.