Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add schedules, stop_strategy and when to CronWorkflow #1294

Merged
merged 3 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions docs/examples/workflows-examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ Explore the examples through the side bar!
| [conditionals-complex](https://github.com/argoproj/argo-workflows/blob/main/examples/conditionals-complex.yaml) |
| [configmaps/simple-parameters-configmap](https://github.com/argoproj/argo-workflows/blob/main/examples/configmaps/simple-parameters-configmap.yaml) |
| [cron-backfill](https://github.com/argoproj/argo-workflows/blob/main/examples/cron-backfill.yaml) |
| [cron-when](https://github.com/argoproj/argo-workflows/blob/main/examples/cron-when.yaml) |
| [cron-workflow-multiple-schedules](https://github.com/argoproj/argo-workflows/blob/main/examples/cron-workflow-multiple-schedules.yaml) |
| [daemon-step](https://github.com/argoproj/argo-workflows/blob/main/examples/daemon-step.yaml) |
| [daemoned-stateful-set-with-service](https://github.com/argoproj/argo-workflows/blob/main/examples/daemoned-stateful-set-with-service.yaml) |
| [dag-coinflip](https://github.com/argoproj/argo-workflows/blob/main/examples/dag-coinflip.yaml) |
Expand Down
67 changes: 67 additions & 0 deletions docs/examples/workflows/misc/cron_workflow_stop_strategy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Cron Workflow Stop Strategy






=== "Hera"

```python linenums="1"
from hera.workflows import Container, CronWorkflow
from hera.workflows.models import StopStrategy

with CronWorkflow(
name="hello-world-multiple-schedules",
entrypoint="whalesay",
schedules=[
"*/3 * * * *",
"*/2 * * * *",
],
stop_strategy=StopStrategy(expression="cronworkflow.failed >= 3"),
timezone="America/Los_Angeles",
starting_deadline_seconds=0,
concurrency_policy="Replace",
successful_jobs_history_limit=4,
failed_jobs_history_limit=4,
cron_suspend=False,
) as w:
Container(
name="whalesay",
image="docker/whalesay:latest",
command=["cowsay"],
args=["🕓 hello world. Scheduled on: {{workflow.scheduledTime}}"],
)
```

=== "YAML"

```yaml linenums="1"
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: hello-world-multiple-schedules
spec:
concurrencyPolicy: Replace
failedJobsHistoryLimit: 4
schedules:
- '*/3 * * * *'
- '*/2 * * * *'
startingDeadlineSeconds: 0
stopStrategy:
expression: cronworkflow.failed >= 3
successfulJobsHistoryLimit: 4
suspend: false
timezone: America/Los_Angeles
workflowSpec:
entrypoint: whalesay
templates:
- container:
args:
- "\U0001F553 hello world. Scheduled on: {{workflow.scheduledTime}}"
command:
- cowsay
image: docker/whalesay:latest
name: whalesay
```

54 changes: 54 additions & 0 deletions docs/examples/workflows/upstream/cron_when.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Cron When

## Note

This example is a replication of an Argo Workflow example in Hera.
The upstream example can be [found here](https://github.com/argoproj/argo-workflows/blob/main/examples/cron-when.yaml).




=== "Hera"

```python linenums="1"
from hera.workflows import Container, CronWorkflow

with CronWorkflow(
name="sleep-when",
entrypoint="sleep-busybox",
schedule="* * * * *",
concurrency_policy="Allow",
when="{{= cronworkflow.lastScheduledTime == nil || (now() - cronworkflow.lastScheduledTime).Seconds() > 360 }}",
) as w:
print_message = Container(
name="sleep-busybox",
image="busybox",
command=["sleep"],
args=["10"],
)
```

=== "YAML"

```yaml linenums="1"
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: sleep-when
spec:
concurrencyPolicy: Allow
schedule: '* * * * *'
when: '{{= cronworkflow.lastScheduledTime == nil || (now() - cronworkflow.lastScheduledTime).Seconds()
> 360 }}'
workflowSpec:
entrypoint: sleep-busybox
templates:
- container:
args:
- '10'
command:
- sleep
image: busybox
name: sleep-busybox
```

Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Cron Workflow Multiple Schedules

## Note

This example is a replication of an Argo Workflow example in Hera.
The upstream example can be [found here](https://github.com/argoproj/argo-workflows/blob/main/examples/cron-workflow-multiple-schedules.yaml).




=== "Hera"

```python linenums="1"
from hera.workflows import Container, CronWorkflow

with CronWorkflow(
name="hello-world-multiple-schedules",
entrypoint="whalesay",
schedules=[
"*/3 * * * *",
"*/2 * * * *",
],
timezone="America/Los_Angeles",
starting_deadline_seconds=0,
concurrency_policy="Replace",
successful_jobs_history_limit=4,
failed_jobs_history_limit=4,
cron_suspend=False,
) as w:
Container(
name="whalesay",
image="docker/whalesay:latest",
command=["cowsay"],
args=["🕓 hello world. Scheduled on: {{workflow.scheduledTime}}"],
)
```

=== "YAML"

```yaml linenums="1"
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: hello-world-multiple-schedules
spec:
concurrencyPolicy: Replace
failedJobsHistoryLimit: 4
schedules:
- '*/3 * * * *'
- '*/2 * * * *'
startingDeadlineSeconds: 0
successfulJobsHistoryLimit: 4
suspend: false
timezone: America/Los_Angeles
workflowSpec:
entrypoint: whalesay
templates:
- container:
args:
- "\U0001F553 hello world. Scheduled on: {{workflow.scheduledTime}}"
command:
- cowsay
image: docker/whalesay:latest
name: whalesay
```

26 changes: 26 additions & 0 deletions examples/workflows/misc/cron-workflow-stop-strategy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: hello-world-multiple-schedules
spec:
concurrencyPolicy: Replace
failedJobsHistoryLimit: 4
schedules:
- '*/3 * * * *'
- '*/2 * * * *'
startingDeadlineSeconds: 0
stopStrategy:
expression: cronworkflow.failed >= 3
successfulJobsHistoryLimit: 4
suspend: false
timezone: America/Los_Angeles
workflowSpec:
entrypoint: whalesay
templates:
- container:
args:
- "\U0001F553 hello world. Scheduled on: {{workflow.scheduledTime}}"
command:
- cowsay
image: docker/whalesay:latest
name: whalesay
24 changes: 24 additions & 0 deletions examples/workflows/misc/cron_workflow_stop_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from hera.workflows import Container, CronWorkflow
from hera.workflows.models import StopStrategy

with CronWorkflow(
name="hello-world-multiple-schedules",
entrypoint="whalesay",
schedules=[
"*/3 * * * *",
"*/2 * * * *",
],
stop_strategy=StopStrategy(expression="cronworkflow.failed >= 3"),
timezone="America/Los_Angeles",
starting_deadline_seconds=0,
concurrency_policy="Replace",
successful_jobs_history_limit=4,
failed_jobs_history_limit=4,
cron_suspend=False,
) as w:
Container(
name="whalesay",
image="docker/whalesay:latest",
command=["cowsay"],
args=["🕓 hello world. Scheduled on: {{workflow.scheduledTime}}"],
)
19 changes: 19 additions & 0 deletions examples/workflows/upstream/cron-when.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: sleep-when
spec:
concurrencyPolicy: Allow
schedule: '* * * * *'
when: '{{= cronworkflow.lastScheduledTime == nil || (now() - cronworkflow.lastScheduledTime).Seconds()
> 360 }}'
workflowSpec:
entrypoint: sleep-busybox
templates:
- container:
args:
- '10'
command:
- sleep
image: busybox
name: sleep-busybox
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: hello-world-multiple-schedules
spec:
concurrencyPolicy: Replace
failedJobsHistoryLimit: 4
schedules:
- '*/3 * * * *'
- '*/2 * * * *'
startingDeadlineSeconds: 0
successfulJobsHistoryLimit: 4
suspend: false
timezone: America/Los_Angeles
workflowSpec:
entrypoint: whalesay
templates:
- container:
args:
- "\U0001F553 hello world. Scheduled on: {{workflow.scheduledTime}}"
command:
- cowsay
image: docker/whalesay:latest
name: whalesay
15 changes: 15 additions & 0 deletions examples/workflows/upstream/cron_when.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from hera.workflows import Container, CronWorkflow

with CronWorkflow(
name="sleep-when",
entrypoint="sleep-busybox",
schedule="* * * * *",
concurrency_policy="Allow",
when="{{= cronworkflow.lastScheduledTime == nil || (now() - cronworkflow.lastScheduledTime).Seconds() > 360 }}",
) as w:
print_message = Container(
name="sleep-busybox",
image="busybox",
command=["sleep"],
args=["10"],
)
22 changes: 22 additions & 0 deletions examples/workflows/upstream/cron_workflow_multiple_schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from hera.workflows import Container, CronWorkflow

with CronWorkflow(
name="hello-world-multiple-schedules",
entrypoint="whalesay",
schedules=[
"*/3 * * * *",
"*/2 * * * *",
],
timezone="America/Los_Angeles",
starting_deadline_seconds=0,
concurrency_policy="Replace",
successful_jobs_history_limit=4,
failed_jobs_history_limit=4,
cron_suspend=False,
) as w:
Container(
name="whalesay",
image="docker/whalesay:latest",
command=["cowsay"],
args=["🕓 hello world. Scheduled on: {{workflow.scheduledTime}}"],
)
11 changes: 7 additions & 4 deletions src/hera/workflows/cron_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"""

from pathlib import Path
from typing import Annotated, Dict, Optional, Type, Union, cast
from typing import Annotated, Dict, List, Optional, Type, Union, cast

from hera.exceptions import NotFound
from hera.shared._pydantic import BaseModel
Expand All @@ -21,6 +21,7 @@
CronWorkflowSpec,
CronWorkflowStatus,
LintCronWorkflowRequest,
StopStrategy,
UpdateCronWorkflowRequest,
Workflow as _ModelWorkflow,
)
Expand Down Expand Up @@ -76,15 +77,18 @@ class CronWorkflow(Workflow):
failed_jobs_history_limit: Annotated[Optional[int], _CronWorkflowModelMapper("spec.failed_jobs_history_limit")] = (
None
)
schedule: Annotated[str, _CronWorkflowModelMapper("spec.schedule")]
schedule: Annotated[Optional[str], _CronWorkflowModelMapper("spec.schedule")] = None
schedules: Annotated[Optional[List[str]], _CronWorkflowModelMapper("spec.schedules")] = None
starting_deadline_seconds: Annotated[Optional[int], _CronWorkflowModelMapper("spec.starting_deadline_seconds")] = (
None
)
stop_strategy: Annotated[Optional[StopStrategy], _CronWorkflowModelMapper("spec.stop_strategy")]
successful_jobs_history_limit: Annotated[
Optional[int], _CronWorkflowModelMapper("spec.successful_jobs_history_limit")
] = None
cron_suspend: Annotated[Optional[bool], _CronWorkflowModelMapper("spec.suspend")] = None
timezone: Annotated[Optional[str], _CronWorkflowModelMapper("spec.timezone")] = None
when: Annotated[Optional[str], _CronWorkflowModelMapper("spec.when")] = None
cron_status: Annotated[Optional[CronWorkflowStatus], _CronWorkflowModelMapper("status")] = None

def create(self) -> TWorkflow: # type: ignore
Expand Down Expand Up @@ -149,7 +153,6 @@ def build(self) -> TWorkflow:
model_cron_workflow = _ModelCronWorkflow(
metadata=model_workflow.metadata,
spec=CronWorkflowSpec(
schedule=self.schedule,
workflow_spec=model_workflow.spec,
),
)
Expand All @@ -160,7 +163,7 @@ def build(self) -> TWorkflow:
def _from_model(cls, model: BaseModel) -> ModelMapperMixin:
"""Parse from given model to cls's type."""
assert isinstance(model, _ModelCronWorkflow)
hera_cron_workflow = cls(schedule="")
hera_cron_workflow = cls()

for attr, annotation in cls._get_all_annotations().items():
if mappers := get_annotated_metadata(annotation, ModelMapperMixin.ModelMapper):
Expand Down
Loading
Loading