Skip to content

Commit

Permalink
Merge pull request #192 from Azure/dev
Browse files Browse the repository at this point in the history
Promote to Master for PyPi release
  • Loading branch information
davidmrdavid authored Sep 14, 2020
2 parents d53ec10 + 65188f2 commit 1b9d157
Show file tree
Hide file tree
Showing 24 changed files with 651 additions and 117 deletions.
56 changes: 55 additions & 1 deletion azure/durable_functions/models/DurableOrchestrationClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ async def wait_for_completion_or_create_check_status_response(
lambda: self._create_http_response(500, status.to_json()),
}

result = switch_statement.get(OrchestrationRuntimeStatus(status.runtime_status))
result = switch_statement.get(status.runtime_status)
if result:
return result()

Expand Down Expand Up @@ -546,3 +546,57 @@ def _get_raise_event_url(
request_url += "?" + "&".join(query)

return request_url

async def rewind(self,
instance_id: str,
reason: str,
task_hub_name: Optional[str] = None,
connection_name: Optional[str] = None):
"""Return / "rewind" a failed orchestration instance to a prior "healthy" state.
Parameters
----------
instance_id: str
The ID of the orchestration instance to rewind.
reason: str
The reason for rewinding the orchestration instance.
task_hub_name: Optional[str]
The TaskHub of the orchestration to rewind
connection_name: Optional[str]
Name of the application setting containing the storage
connection string to use.
Raises
------
Exception:
In case of a failure, it reports the reason for the exception
"""
request_url: str = ""
if self._orchestration_bindings.rpc_base_url:
path = f"instances/{instance_id}/rewind?reason={reason}"
query: List[str] = []
if not (task_hub_name is None):
query.append(f"taskHub={task_hub_name}")
if not (connection_name is None):
query.append(f"connection={connection_name}")
if len(query) > 0:
path += "&" + "&".join(query)

request_url = f"{self._orchestration_bindings.rpc_base_url}" + path
else:
raise Exception("The Python SDK only supports RPC endpoints."
+ "Please remove the `localRpcEnabled` setting from host.json")

response = await self._post_async_request(request_url, None)
status: int = response[0]
if status == 200 or status == 202:
return
elif status == 404:
ex_msg = f"No instance with ID {instance_id} found."
raise Exception(ex_msg)
elif status == 410:
ex_msg = "The rewind operation is only supported on failed orchestration instances."
raise Exception(ex_msg)
else:
ex_msg = response[1]
raise Exception(ex_msg)
14 changes: 9 additions & 5 deletions azure/durable_functions/models/DurableOrchestrationStatus.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import datetime
from dateutil.parser import parse as dt_parse
from typing import Any, List, Dict, Optional, Union

from .OrchestrationRuntimeStatus import OrchestrationRuntimeStatus
from .utils.json_utils import add_attrib, add_datetime_attrib


Expand All @@ -15,7 +15,8 @@ class DurableOrchestrationStatus:
def __init__(self, name: Optional[str] = None, instanceId: Optional[str] = None,
createdTime: Optional[str] = None, lastUpdatedTime: Optional[str] = None,
input: Optional[Any] = None, output: Optional[Any] = None,
runtimeStatus: Optional[str] = None, customStatus: Optional[Any] = None,
runtimeStatus: Optional[OrchestrationRuntimeStatus] = None,
customStatus: Optional[Any] = None,
history: Optional[List[Any]] = None,
**kwargs):
self._name: Optional[str] = name
Expand All @@ -26,7 +27,9 @@ def __init__(self, name: Optional[str] = None, instanceId: Optional[str] = None,
if lastUpdatedTime is not None else None
self._input: Any = input
self._output: Any = output
self._runtime_status: Optional[str] = runtimeStatus # TODO: GH issue 178
self._runtime_status: Optional[OrchestrationRuntimeStatus] = runtimeStatus
if runtimeStatus is not None:
self._runtime_status = OrchestrationRuntimeStatus(runtimeStatus)
self._custom_status: Any = customStatus
self._history: Optional[List[Any]] = history
if kwargs is not None:
Expand Down Expand Up @@ -82,7 +85,8 @@ def to_json(self) -> Dict[str, Union[int, str]]:
add_datetime_attrib(json, self, 'last_updated_time', 'lastUpdatedTime')
add_attrib(json, self, 'output')
add_attrib(json, self, 'input_', 'input')
add_attrib(json, self, 'runtime_status', 'runtimeStatus')
if self.runtime_status is not None:
json["runtimeStatus"] = self.runtime_status.name
add_attrib(json, self, 'custom_status', 'customStatus')
add_attrib(json, self, 'history')
return json
Expand Down Expand Up @@ -129,7 +133,7 @@ def output(self) -> Any:
return self._output

@property
def runtime_status(self) -> Optional[str]:
def runtime_status(self) -> Optional[OrchestrationRuntimeStatus]:
"""Get the runtime status of the orchestration instance."""
return self._runtime_status

Expand Down
3 changes: 2 additions & 1 deletion azure/durable_functions/models/Task.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ class Task:
"""

def __init__(self, is_completed, is_faulted, action,
result=None, timestamp=None, id_=None, exc=None):
result=None, timestamp=None, id_=None, exc=None, is_played=False):
self._is_completed: bool = is_completed
self._is_faulted: bool = is_faulted
self._action: Action = action
self._result = result
self._timestamp: datetime = timestamp
self._id = id_
self._exception = exc
self._is_played = is_played

@property
def is_completed(self) -> bool:
Expand Down
3 changes: 2 additions & 1 deletion azure/durable_functions/models/TaskSet.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ class TaskSet:
"""

def __init__(self, is_completed, actions, result, is_faulted=False,
timestamp=None, exception=None):
timestamp=None, exception=None, is_played=False):
self._is_completed: bool = is_completed
self._actions: List[Action] = actions
self._result = result
self._is_faulted: bool = is_faulted
self._timestamp: datetime = timestamp
self._exception = exception
self._is_played = is_played

@property
def is_completed(self) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion azure/durable_functions/models/utils/http_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,5 @@ async def delete_async_request(url: str) -> List[Union[int, Any]]:
"""
async with aiohttp.ClientSession() as session:
async with session.delete(url) as response:
data = await response.json()
data = await response.json(content_type=None)
return [response.status, data]
1 change: 1 addition & 0 deletions azure/durable_functions/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def handle(self, context: DurableOrchestrationContext):
continue

self._reset_timestamp()
self.durable_context._is_replaying = generation_state._is_played
generation_state = self._generate_next(generation_state)

except StopIteration as sie:
Expand Down
2 changes: 2 additions & 0 deletions azure/durable_functions/tasks/call_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def call_activity_task(
is_completed=True,
is_faulted=False,
action=new_action,
is_played=task_completed._is_played,
result=parse_history_event(task_completed),
timestamp=task_completed.timestamp,
id_=task_completed.TaskScheduledId)
Expand All @@ -49,6 +50,7 @@ def call_activity_task(
is_completed=True,
is_faulted=True,
action=new_action,
is_played=task_failed._is_played,
result=task_failed.Reason,
timestamp=task_failed.timestamp,
id_=task_failed.TaskScheduledId,
Expand Down
48 changes: 10 additions & 38 deletions azure/durable_functions/tasks/call_activity_with_retry.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
from typing import List, Any

from .task_utilities import find_task_scheduled, \
find_task_retry_timer_created, set_processed, parse_history_event, \
find_task_completed, find_task_failed, find_task_retry_timer_fired
from .task_utilities import get_retried_task
from ..models.RetryOptions import RetryOptions
from ..models.Task import (
Task)
from ..models.actions.CallActivityWithRetryAction import \
CallActivityWithRetryAction
from ..models.history import HistoryEvent
from ..models.history import HistoryEvent, HistoryEventType


def call_activity_with_retry_task(
Expand Down Expand Up @@ -37,38 +35,12 @@ def call_activity_with_retry_task(
"""
new_action = CallActivityWithRetryAction(
function_name=name, retry_options=retry_options, input_=input_)
for attempt in range(retry_options.max_number_of_attempts):
task_scheduled = find_task_scheduled(state, name)
task_completed = find_task_completed(state, task_scheduled)
task_failed = find_task_failed(state, task_scheduled)
task_retry_timer = find_task_retry_timer_created(state, task_failed)
task_retry_timer_fired = find_task_retry_timer_fired(
state, task_retry_timer)
set_processed([task_scheduled, task_completed,
task_failed, task_retry_timer, task_retry_timer_fired])

if not task_scheduled:
break

if task_completed:
return Task(
is_completed=True,
is_faulted=False,
action=new_action,
result=parse_history_event(task_completed),
timestamp=task_completed.timestamp,
id_=task_completed.TaskScheduledId)

if task_failed and task_retry_timer and attempt + 1 >= \
retry_options.max_number_of_attempts:
return Task(
is_completed=True,
is_faulted=True,
action=new_action,
timestamp=task_failed.timestamp,
id_=task_failed.TaskScheduledId,
exc=Exception(
f"{task_failed.Reason} \n {task_failed.Details}")
)

return Task(is_completed=False, is_faulted=False, action=new_action)
return get_retried_task(
state=state,
max_number_of_attempts=retry_options.max_number_of_attempts,
scheduled_type=HistoryEventType.TASK_SCHEDULED,
completed_type=HistoryEventType.TASK_COMPLETED,
failed_type=HistoryEventType.TASK_FAILED,
action=new_action
)
2 changes: 2 additions & 0 deletions azure/durable_functions/tasks/call_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def call_http(state: List[HistoryEvent], method: str, uri: str, content: Optiona
is_completed=True,
is_faulted=False,
action=new_action,
is_played=task_completed._is_played,
result=parse_history_event(task_completed),
timestamp=task_completed.timestamp,
id_=task_completed.TaskScheduledId)
Expand All @@ -66,6 +67,7 @@ def call_http(state: List[HistoryEvent], method: str, uri: str, content: Optiona
is_completed=True,
is_faulted=True,
action=new_action,
is_played=task_failed._is_played,
result=task_failed.Reason,
timestamp=task_failed.timestamp,
id_=task_failed.TaskScheduledId,
Expand Down
2 changes: 2 additions & 0 deletions azure/durable_functions/tasks/call_suborchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def call_sub_orchestrator_task(
is_completed=True,
is_faulted=False,
action=new_action,
is_played=task_completed._is_played,
result=parse_history_event(task_completed),
timestamp=task_completed.timestamp,
id_=task_completed.TaskScheduledId)
Expand All @@ -57,6 +58,7 @@ def call_sub_orchestrator_task(
is_completed=True,
is_faulted=True,
action=new_action,
is_played=task_failed._is_played,
result=task_failed.Reason,
timestamp=task_failed.timestamp,
id_=task_failed.TaskScheduledId,
Expand Down
51 changes: 10 additions & 41 deletions azure/durable_functions/tasks/call_suborchestrator_with_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@
Task)
from ..models.actions.CallSubOrchestratorWithRetryAction import CallSubOrchestratorWithRetryAction
from ..models.RetryOptions import RetryOptions
from ..models.history import HistoryEvent
from .task_utilities import set_processed, parse_history_event, \
find_sub_orchestration_created, find_sub_orchestration_completed, \
find_sub_orchestration_failed, find_task_retry_timer_fired, find_task_retry_timer_created
from ..models.history import HistoryEvent, HistoryEventType
from .task_utilities import get_retried_task


def call_sub_orchestrator_with_retry_task(
Expand Down Expand Up @@ -40,40 +38,11 @@ def call_sub_orchestrator_with_retry_task(
A Durable Task that completes when the called sub-orchestrator completes or fails.
"""
new_action = CallSubOrchestratorWithRetryAction(name, retry_options, input_, instance_id)
for attempt in range(retry_options.max_number_of_attempts):
task_scheduled = find_sub_orchestration_created(
state, name, context=context, instance_id=instance_id)
task_completed = find_sub_orchestration_completed(state, task_scheduled)
task_failed = find_sub_orchestration_failed(state, task_scheduled)
task_retry_timer = find_task_retry_timer_created(state, task_failed)
task_retry_timer_fired = find_task_retry_timer_fired(
state, task_retry_timer)
set_processed([task_scheduled, task_completed,
task_failed, task_retry_timer, task_retry_timer_fired])

if not task_scheduled:
break

if task_completed is not None:
return Task(
is_completed=True,
is_faulted=False,
action=new_action,
result=parse_history_event(task_completed),
timestamp=task_completed.timestamp,
id_=task_completed.TaskScheduledId)

if task_failed and task_retry_timer and attempt + 1 >= \
retry_options.max_number_of_attempts:
return Task(
is_completed=True,
is_faulted=True,
action=new_action,
result=task_failed.Reason,
timestamp=task_failed.timestamp,
id_=task_failed.TaskScheduledId,
exc=Exception(
f"{task_failed.Reason} \n {task_failed.Details}")
)

return Task(is_completed=False, is_faulted=False, action=new_action)
return get_retried_task(
state=state,
max_number_of_attempts=retry_options.max_number_of_attempts,
scheduled_type=HistoryEventType.SUB_ORCHESTRATION_INSTANCE_CREATED,
completed_type=HistoryEventType.SUB_ORCHESTRATION_INSTANCE_COMPLETED,
failed_type=HistoryEventType.SUB_ORCHESTRATION_INSTANCE_FAILED,
action=new_action
)
3 changes: 2 additions & 1 deletion azure/durable_functions/tasks/create_timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ def create_timer_task(state: List[HistoryEvent],
return TimerTask(
is_completed=True, action=new_action,
timestamp=timer_fired.timestamp,
id_=timer_fired.event_id)
id_=timer_fired.event_id,
is_played=timer_fired.is_played)
else:
return TimerTask(
is_completed=False, action=new_action,
Expand Down
Loading

0 comments on commit 1b9d157

Please sign in to comment.