Skip to content

Commit

Permalink
Promote dev to main for 1.1.1 release (#321)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmrdavid authored Sep 14, 2021
1 parent 5a5ba25 commit 3b5d5e3
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 12 deletions.
39 changes: 32 additions & 7 deletions azure/durable_functions/models/Task.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ def __init__(self, tasks: List[TaskBase], compound_action_constructor=None):
self.completed_tasks: List[TaskBase] = []
self.children = tasks

if len(self.children) == 0:
self.state = TaskState.SUCCEEDED

def handle_completion(self, child: TaskBase):
"""Manage sub-task completion events.
Expand Down Expand Up @@ -238,7 +241,7 @@ def try_set_value(self, child: TaskBase):
# A WhenAll Task only completes when it has no pending tasks
# i.e _when all_ of its children have completed
if len(self.pending_tasks) == 0:
results = list(map(lambda x: x.result, self.completed_tasks))
results = list(map(lambda x: x.result, self.children))
self.set_value(is_error=False, value=results)
else: # child.state is TaskState.FAILED:
# a single error is sufficient to fail this task
Expand Down Expand Up @@ -287,14 +290,28 @@ class RetryAbleTask(WhenAllTask):
"""

def __init__(self, child: TaskBase, retry_options: RetryOptions, context):
self.id_ = str(child.id) + "_retryable_proxy"
tasks = [child]
super().__init__(tasks, context._replay_schema)

self.retry_options = retry_options
self.num_attempts = 1
self.context = context
self.actions = child.action_repr
self.is_waiting_on_timer = False

@property
def id_(self):
"""Obtain the task's ID.
Since this is an internal-only abstraction, the task ID is represented
by the ID of its inner/wrapped task _plus_ a suffix: "_retryable_proxy"
Returns
-------
[type]
[description]
"""
return str(list(map(lambda x: x.id, self.children))) + "_retryable_proxy"

def try_set_value(self, child: TaskBase):
"""Transition a Retryable Task to a terminal state and set its value.
Expand All @@ -304,6 +321,14 @@ def try_set_value(self, child: TaskBase):
child : TaskBase
A sub-task that just completed
"""
if self.is_waiting_on_timer:
# timer fired, re-scheduling original task
self.is_waiting_on_timer = False
rescheduled_task = self.context._generate_task(
action=NoOpAction("rescheduled task"), parent=self)
self.pending_tasks.add(rescheduled_task)
self.context._add_to_open_tasks(rescheduled_task)
return
if child.state is TaskState.SUCCEEDED:
if len(self.pending_tasks) == 0:
# if all pending tasks have completed,
Expand All @@ -318,11 +343,11 @@ def try_set_value(self, child: TaskBase):
else:
# still have some retries left.
# increase size of pending tasks by adding a timer task
# and then re-scheduling the current task after that
timer_task = self.context._generate_task(action=NoOpAction(), parent=self)
# when it completes, we'll retry the original task
timer_task = self.context._generate_task(
action=NoOpAction("-WithRetry timer"), parent=self)
self.pending_tasks.add(timer_task)
self.context._add_to_open_tasks(timer_task)
rescheduled_task = self.context._generate_task(action=NoOpAction(), parent=self)
self.pending_tasks.add(rescheduled_task)
self.context._add_to_open_tasks(rescheduled_task)
self.is_waiting_on_timer = True

self.num_attempts += 1
17 changes: 16 additions & 1 deletion azure/durable_functions/models/actions/NoOpAction.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,25 @@
from azure.durable_functions.models.actions.Action import Action
from typing import Any, Dict
from typing import Any, Dict, Optional


class NoOpAction(Action):
"""A no-op action, for anonymous tasks only."""

def __init__(self, metadata: Optional[str] = None):
"""Create a NoOpAction object.
This is an internal-only action class used to represent cases when intermediate
tasks are used to implement some API. For example, in -WithRetry APIs, intermediate
timers are created. We create this NoOp action to track those the backing actions
of those tasks, which is necessary because we mimic the DF-internal replay algorithm.
Parameters
----------
metadata : Optional[str]
Used for internal debugging: metadata about the action being represented.
"""
self.metadata = metadata

def action_type(self) -> int:
"""Get the type of action this class represents."""
raise Exception("Attempted to get action type of an anonymous Action")
Expand Down
211 changes: 207 additions & 4 deletions tests/orchestrator/test_sequential_orchestrator_with_retry.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import List, Union
from azure.durable_functions.models.ReplaySchema import ReplaySchema
from .orchestrator_test_utils \
import get_orchestration_state_result, assert_orchestration_state_equals, assert_valid_schema
Expand Down Expand Up @@ -28,16 +29,63 @@ def generator_function(context):

return outputs

def generator_function_concurrent_retries(context):
outputs = []

retry_options = RETRY_OPTIONS
task1 = context.call_activity_with_retry(
"Hello", retry_options, "Tokyo")
task2 = context.call_activity_with_retry(
"Hello", retry_options, "Seattle")
task3 = context.call_activity_with_retry(
"Hello", retry_options, "London")

outputs = yield context.task_all([task1, task2, task3])

return outputs

def generator_function_two_concurrent_retries_when_all(context):
outputs = []

retry_options = RETRY_OPTIONS
task1 = context.call_activity_with_retry(
"Hello", retry_options, "Tokyo")
task2 = context.call_activity_with_retry(
"Hello", retry_options, "Seattle")

outputs = yield context.task_all([task1, task2])

return outputs

def generator_function_two_concurrent_retries_when_any(context):
outputs = []

retry_options = RETRY_OPTIONS
task1 = context.call_activity_with_retry(
"Hello", retry_options, "Tokyo")
task2 = context.call_activity_with_retry(
"Hello", retry_options, "Seattle")

outputs = yield context.task_any([task1, task2])

return outputs.result


def base_expected_state(output=None, replay_schema: ReplaySchema = ReplaySchema.V1) -> OrchestratorState:
return OrchestratorState(is_done=False, actions=[], output=output, replay_schema=replay_schema.value)


def add_hello_action(state: OrchestratorState, input_: str):
def add_hello_action(state: OrchestratorState, input_: Union[List[str], str]):
retry_options = RETRY_OPTIONS
action = CallActivityWithRetryAction(
function_name='Hello', retry_options=retry_options, input_=input_)
state._actions.append([action])
actions = []
inputs = input_
if not isinstance(input_, list):
inputs = [input_]
for input_ in inputs:
action = CallActivityWithRetryAction(
function_name='Hello', retry_options=retry_options, input_=input_)
actions.append(action)
state._actions.append(actions)


def add_hello_failed_events(
Expand All @@ -63,6 +111,45 @@ def add_retry_timer_events(context_builder: ContextBuilder, id_: int):
context_builder.add_orchestrator_started_event()
context_builder.add_timer_fired_event(id_=id_, fire_at=fire_at)

def add_two_retriable_events_completing_out_of_order(context_builder: ContextBuilder,
failed_reason, failed_details):
## Schedule tasks
context_builder.add_task_scheduled_event(name='Hello', id_=0) # Tokyo task
context_builder.add_task_scheduled_event(name='Hello', id_=1) # Seattle task

context_builder.add_orchestrator_completed_event()
context_builder.add_orchestrator_started_event()

## Task failures and timer-scheduling

# tasks fail "out of order"
context_builder.add_task_failed_event(
id_=1, reason=failed_reason, details=failed_details) # Seattle task
fire_at_1 = context_builder.add_timer_created_event(2) # Seattle timer

context_builder.add_orchestrator_completed_event()
context_builder.add_orchestrator_started_event()

context_builder.add_task_failed_event(
id_=0, reason=failed_reason, details=failed_details) # Tokyo task
fire_at_2 = context_builder.add_timer_created_event(3) # Tokyo timer

context_builder.add_orchestrator_completed_event()
context_builder.add_orchestrator_started_event()

## fire timers
context_builder.add_timer_fired_event(id_=2, fire_at=fire_at_1) # Seattle timer
context_builder.add_timer_fired_event(id_=3, fire_at=fire_at_2) # Tokyo timer

## Complete events
context_builder.add_task_scheduled_event(name='Hello', id_=4) # Seattle task
context_builder.add_task_scheduled_event(name='Hello', id_=5) # Tokyo task

context_builder.add_orchestrator_completed_event()
context_builder.add_orchestrator_started_event()
context_builder.add_task_completed_event(id_=4, result="\"Hello Seattle!\"")
context_builder.add_task_completed_event(id_=5, result="\"Hello Tokyo!\"")


def test_initial_orchestration_state():
context_builder = ContextBuilder('test_simple_function')
Expand Down Expand Up @@ -217,3 +304,119 @@ def test_failed_tokyo_hit_max_attempts():

expected_error_str = f"{error_msg}{error_label}{state_str}"
assert expected_error_str == error_str

def test_concurrent_retriable_results():
failed_reason = 'Reasons'
failed_details = 'Stuff and Things'
context_builder = ContextBuilder('test_concurrent_retriable')
add_hello_failed_events(context_builder, 0, failed_reason, failed_details)
add_hello_failed_events(context_builder, 1, failed_reason, failed_details)
add_hello_failed_events(context_builder, 2, failed_reason, failed_details)
add_retry_timer_events(context_builder, 3)
add_retry_timer_events(context_builder, 4)
add_retry_timer_events(context_builder, 5)
add_hello_completed_events(context_builder, 6, "\"Hello Tokyo!\"")
add_hello_completed_events(context_builder, 7, "\"Hello Seattle!\"")
add_hello_completed_events(context_builder, 8, "\"Hello London!\"")

result = get_orchestration_state_result(
context_builder, generator_function_concurrent_retries)

expected_state = base_expected_state()
add_hello_action(expected_state, ['Tokyo', 'Seattle', 'London'])
expected_state._output = ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
expected_state._is_done = True
expected = expected_state.to_json()

assert_valid_schema(result)
assert_orchestration_state_equals(expected, result)

def test_concurrent_retriable_results_unordered_arrival():
failed_reason = 'Reasons'
failed_details = 'Stuff and Things'
context_builder = ContextBuilder('test_concurrent_retriable_unordered_results')
add_hello_failed_events(context_builder, 0, failed_reason, failed_details)
add_hello_failed_events(context_builder, 1, failed_reason, failed_details)
add_hello_failed_events(context_builder, 2, failed_reason, failed_details)
add_retry_timer_events(context_builder, 3)
add_retry_timer_events(context_builder, 4)
add_retry_timer_events(context_builder, 5)
# events arrive in non-sequential different order
add_hello_completed_events(context_builder, 8, "\"Hello London!\"")
add_hello_completed_events(context_builder, 6, "\"Hello Tokyo!\"")
add_hello_completed_events(context_builder, 7, "\"Hello Seattle!\"")

result = get_orchestration_state_result(
context_builder, generator_function_concurrent_retries)

expected_state = base_expected_state()
add_hello_action(expected_state, ['Tokyo', 'Seattle', 'London'])
expected_state._output = ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
expected_state._is_done = True
expected = expected_state.to_json()

assert_valid_schema(result)
assert_orchestration_state_equals(expected, result)

def test_concurrent_retriable_results_mixed_arrival():
failed_reason = 'Reasons'
failed_details = 'Stuff and Things'
context_builder = ContextBuilder('test_concurrent_retriable_unordered_results')
# one task succeeds, the other two fail at first, and succeed on retry
add_hello_failed_events(context_builder, 1, failed_reason, failed_details)
add_hello_completed_events(context_builder, 0, "\"Hello Tokyo!\"")
add_hello_failed_events(context_builder, 2, failed_reason, failed_details)
add_retry_timer_events(context_builder, 3)
add_retry_timer_events(context_builder, 4)
add_hello_completed_events(context_builder, 6, "\"Hello London!\"")
add_hello_completed_events(context_builder, 5, "\"Hello Seattle!\"")

result = get_orchestration_state_result(
context_builder, generator_function_concurrent_retries)

expected_state = base_expected_state()
add_hello_action(expected_state, ['Tokyo', 'Seattle', 'London'])
expected_state._output = ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
expected_state._is_done = True
expected = expected_state.to_json()

assert_valid_schema(result)
assert_orchestration_state_equals(expected, result)

def test_concurrent_retriable_results_alternating_taskIDs_when_all():
failed_reason = 'Reasons'
failed_details = 'Stuff and Things'
context_builder = ContextBuilder('test_concurrent_retriable_unordered_results')

add_two_retriable_events_completing_out_of_order(context_builder, failed_reason, failed_details)

result = get_orchestration_state_result(
context_builder, generator_function_two_concurrent_retries_when_all)

expected_state = base_expected_state()
add_hello_action(expected_state, ['Tokyo', 'Seattle'])
expected_state._output = ["Hello Tokyo!", "Hello Seattle!"]
expected_state._is_done = True
expected = expected_state.to_json()

assert_valid_schema(result)
assert_orchestration_state_equals(expected, result)

def test_concurrent_retriable_results_alternating_taskIDs_when_any():
failed_reason = 'Reasons'
failed_details = 'Stuff and Things'
context_builder = ContextBuilder('test_concurrent_retriable_unordered_results')

add_two_retriable_events_completing_out_of_order(context_builder, failed_reason, failed_details)

result = get_orchestration_state_result(
context_builder, generator_function_two_concurrent_retries_when_any)

expected_state = base_expected_state()
add_hello_action(expected_state, ['Tokyo', 'Seattle'])
expected_state._output = "Hello Seattle!"
expected_state._is_done = True
expected = expected_state.to_json()

assert_valid_schema(result)
assert_orchestration_state_equals(expected, result)
13 changes: 13 additions & 0 deletions tests/orchestrator/test_task_any.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,19 @@ def generator_function(context):
except:
return "exception"

def generator_function_no_activity(context):
yield context.task_any([])
return "Done!"

def test_continues_on_zero_inner_tasks():
context_builder = ContextBuilder()
result = get_orchestration_state_result(
context_builder, generator_function_no_activity)
expected_state = base_expected_state("Done!")
expected_state._is_done = True
expected = expected_state.to_json()
assert_orchestration_state_equals(expected, result)

def test_continues_on_zero_results():
context_builder = ContextBuilder()
result = get_orchestration_state_result(
Expand Down

0 comments on commit 3b5d5e3

Please sign in to comment.