Skip to content

Commit

Permalink
Merge pull request #440 from Azure/dev
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmrdavid authored Jun 13, 2023
2 parents f0fa726 + c529728 commit 1105d8c
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 23 deletions.
4 changes: 2 additions & 2 deletions azure/durable_functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ def validate_extension_bundles():

try:
# disabling linter on this line because it fails to recognize the conditional export
from .decorators import DFApp, BluePrint # noqa
from .decorators.durable_app import (DFApp, Blueprint) # noqa
__all__.append('DFApp')
__all__.append('BluePrint')
__all__.append('Blueprint')
except ModuleNotFoundError:
pass
6 changes: 0 additions & 6 deletions azure/durable_functions/decorators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
"""Decorator definitions for Durable Functions."""
from .durable_app import DFApp, BluePrint

__all__ = [
"DFApp",
"BluePrint"
]
6 changes: 3 additions & 3 deletions azure/durable_functions/decorators/durable_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from functools import wraps


class BluePrint(TriggerApi, BindingApi):
"""Durable Functions (DF) blueprint container.
class Blueprint(TriggerApi, BindingApi):
"""Durable Functions (DF) Blueprint container.
It allows functions to be declared via trigger and binding decorators,
but does not automatically index/register these functions.
Expand Down Expand Up @@ -232,7 +232,7 @@ def decorator():
return wrap


class DFApp(BluePrint, FunctionRegister):
class DFApp(Blueprint, FunctionRegister):
"""Durable Functions (DF) app.
Exports the decorators required to declare and index DF Function-types.
Expand Down
97 changes: 86 additions & 11 deletions azure/durable_functions/models/DurableOrchestrationContext.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import json
import datetime
import inspect
from typing import DefaultDict, List, Any, Dict, Optional, Tuple, Union
from typing import DefaultDict, List, Any, Dict, Optional, Tuple, Union, Callable
from uuid import UUID, uuid5, NAMESPACE_URL, NAMESPACE_OID
from datetime import timezone

Expand All @@ -34,6 +34,8 @@
from .utils.entity_utils import EntityId
from azure.functions._durable_functions import _deserialize_custom_object
from azure.durable_functions.constants import DATETIME_STRING_FORMAT
from azure.durable_functions.decorators.metadata import OrchestrationTrigger, ActivityTrigger
from azure.functions.decorators.function_app import FunctionBuilder


class DurableOrchestrationContext:
Expand Down Expand Up @@ -143,13 +145,14 @@ def _set_is_replaying(self, is_replaying: bool):
"""
self._is_replaying = is_replaying

def call_activity(self, name: str, input_: Optional[Any] = None) -> TaskBase:
def call_activity(self, name: Union[str, Callable], input_: Optional[Any] = None) -> TaskBase:
"""Schedule an activity for execution.
Parameters
----------
name: str
The name of the activity function to call.
name: str | Callable
Either the name of the activity function to call, as a string or,
in the Python V2 programming model, the activity function itself.
input_: Optional[Any]
The JSON-serializable input to pass to the activity function.
Expand All @@ -158,19 +161,31 @@ def call_activity(self, name: str, input_: Optional[Any] = None) -> TaskBase:
Task
A Durable Task that completes when the called activity function completes or fails.
"""
if isinstance(name, Callable) and not isinstance(name, FunctionBuilder):
error_message = "The `call_activity` API received a `Callable` without an "\
"associated Azure Functions trigger-type. "\
"Please ensure you're using the Python programming model V2 "\
"and that your activity function is annotated with the `activity_trigger`"\
"decorator. Otherwise, provide in the name of the activity as a string."
raise ValueError(error_message)

if isinstance(name, FunctionBuilder):
name = self._get_function_name(name, ActivityTrigger)

action = CallActivityAction(name, input_)
task = self._generate_task(action)
return task

def call_activity_with_retry(self,
name: str, retry_options: RetryOptions,
name: Union[str, Callable], retry_options: RetryOptions,
input_: Optional[Any] = None) -> TaskBase:
"""Schedule an activity for execution with retry options.
Parameters
----------
name: str
The name of the activity function to call.
name: str | Callable
Either the name of the activity function to call, as a string or,
in the Python V2 programming model, the activity function itself.
retry_options: RetryOptions
The retry options for the activity function.
input_: Optional[Any]
Expand All @@ -182,6 +197,17 @@ def call_activity_with_retry(self,
A Durable Task that completes when the called activity function completes or
fails completely.
"""
if isinstance(name, Callable) and not isinstance(name, FunctionBuilder):
error_message = "The `call_activity` API received a `Callable` without an "\
"associated Azure Functions trigger-type. "\
"Please ensure you're using the Python programming model V2 "\
"and that your activity function is annotated with the `activity_trigger`"\
"decorator. Otherwise, provide in the name of the activity as a string."
raise ValueError(error_message)

if isinstance(name, FunctionBuilder):
name = self._get_function_name(name, ActivityTrigger)

action = CallActivityWithRetryAction(name, retry_options, input_)
task = self._generate_task(action, retry_options)
return task
Expand Down Expand Up @@ -221,13 +247,13 @@ def call_http(self, method: str, uri: str, content: Optional[str] = None,
return task

def call_sub_orchestrator(self,
name: str, input_: Optional[Any] = None,
name: Union[str, Callable], input_: Optional[Any] = None,
instance_id: Optional[str] = None) -> TaskBase:
"""Schedule sub-orchestration function named `name` for execution.
Parameters
----------
name: str
name: Union[str, Callable]
The name of the orchestrator function to call.
input_: Optional[Any]
The JSON-serializable input to pass to the orchestrator function.
Expand All @@ -239,19 +265,30 @@ def call_sub_orchestrator(self,
Task
A Durable Task that completes when the called sub-orchestrator completes or fails.
"""
if isinstance(name, Callable) and not isinstance(name, FunctionBuilder):
error_message = "The `call_activity` API received a `Callable` without an "\
"associated Azure Functions trigger-type. "\
"Please ensure you're using the Python programming model V2 "\
"and that your activity function is annotated with the `activity_trigger`"\
"decorator. Otherwise, provide in the name of the activity as a string."
raise ValueError(error_message)

if isinstance(name, FunctionBuilder):
name = self._get_function_name(name, OrchestrationTrigger)

action = CallSubOrchestratorAction(name, input_, instance_id)
task = self._generate_task(action)
return task

def call_sub_orchestrator_with_retry(self,
name: str, retry_options: RetryOptions,
name: Union[str, Callable], retry_options: RetryOptions,
input_: Optional[Any] = None,
instance_id: Optional[str] = None) -> TaskBase:
"""Schedule sub-orchestration function named `name` for execution, with retry-options.
Parameters
----------
name: str
name: Union[str, Callable]
The name of the activity function to schedule.
retry_options: RetryOptions
The settings for retrying this sub-orchestrator in case of a failure.
Expand All @@ -265,6 +302,17 @@ def call_sub_orchestrator_with_retry(self,
Task
A Durable Task that completes when the called sub-orchestrator completes or fails.
"""
if isinstance(name, Callable) and not isinstance(name, FunctionBuilder):
error_message = "The `call_activity` API received a `Callable` without an "\
"associated Azure Functions trigger-type. "\
"Please ensure you're using the Python programming model V2 "\
"and that your activity function is annotated with the `activity_trigger`"\
"decorator. Otherwise, provide in the name of the activity as a string."
raise ValueError(error_message)

if isinstance(name, FunctionBuilder):
name = self._get_function_name(name, OrchestrationTrigger)

action = CallSubOrchestratorWithRetryAction(name, retry_options, input_, instance_id)
task = self._generate_task(action, retry_options)
return task
Expand Down Expand Up @@ -627,3 +675,30 @@ def _add_to_open_tasks(self, task: TaskBase):
else:
for child in task.children:
self._add_to_open_tasks(child)

def _get_function_name(self, name: FunctionBuilder,
trigger_type: Union[OrchestrationTrigger, ActivityTrigger]):
try:
if (isinstance(name._function._trigger, trigger_type)):
name = name._function._name
return name
else:
if(trigger_type == OrchestrationTrigger):
trigger_type = "OrchestrationTrigger"
else:
trigger_type = "ActivityTrigger"
error_message = "Received function with Trigger-type `"\
+ name._function._trigger.type\
+ "` but expected `" + trigger_type + "`. Ensure your "\
"function is annotated with the `" + trigger_type +\
"` decorator or directly pass in the name of the "\
"function as a string."
raise ValueError(error_message)
except AttributeError as e:
e.message = "Durable Functions SDK internal error: an "\
"expected attribute is missing from the `FunctionBuilder` "\
"object in the Python V2 programming model. Please report "\
"this bug in the Durable Functions Python SDK repo: "\
"https://github.com/Azure/azure-functions-durable-python.\n"\
"Error trace: " + e.message
raise e
4 changes: 4 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ def run(self, *args, **kwargs):
]),
use_scm_version=True,
setup_requires=['setuptools_scm'],
author="Azure Functions team at Microsoft Corp.",
author_email="[email protected]",
keywords="azure functions azurefunctions python serverless workflows durablefunctions",
url="https://github.com/Azure/azure-functions-durable-python",
description='Durable Functions For Python',
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down
105 changes: 104 additions & 1 deletion tests/orchestrator/test_sequential_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,42 @@ def generator_function_new_guid(context):
outputs.append(str(output3))
return outputs

def generator_function_call_activity_with_name(context):
"""Simple orchestrator that call activity function with function name"""
outputs = []

task1 = yield context.call_activity(Hello, "Tokyo")
task2 = yield context.call_activity(Hello, "Seattle")
task3 = yield context.call_activity(Hello, "London")

outputs.append(task1)
outputs.append(task2)
outputs.append(task3)

return outputs

def generator_function_call_activity_with_callable(context):
outputs = []

task1 = yield context.call_activity(generator_function, "Tokyo")

outputs.append(task1)

return outputs

def generator_function_call_activity_with_orchestrator(context):
outputs = []

task1 = yield context.call_activity(generator_function_rasing_ex_with_pystein, "Tokyo")

outputs.append(task1)

return outputs

@app.activity_trigger(input_name = "myArg")
def Hello(myArg: str):
return "Hello" + myArg

def base_expected_state(output=None, replay_schema: ReplaySchema = ReplaySchema.V1) -> OrchestratorState:
return OrchestratorState(is_done=False, actions=[], output=output, replay_schema=replay_schema)

Expand Down Expand Up @@ -272,6 +308,73 @@ def test_failed_tokyo_state():
expected_error_str = f"{error_msg}{error_label}{state_str}"
assert expected_error_str == error_str

def test_call_activity_with_name():
context_builder = ContextBuilder('test_call_activity_with_name')
add_hello_completed_events(context_builder, 0, "\"Hello Tokyo!\"")
add_hello_completed_events(context_builder, 1, "\"Hello Seattle!\"")
add_hello_completed_events(context_builder, 2, "\"Hello London!\"")
result = get_orchestration_state_result(
context_builder, generator_function_call_activity_with_name)

expected_state = base_expected_state(
['Hello Tokyo!', 'Hello Seattle!', 'Hello London!'])
add_hello_action(expected_state, 'Tokyo')
add_hello_action(expected_state, 'Seattle')
add_hello_action(expected_state, 'London')
expected_state._is_done = True
expected = expected_state.to_json()

assert_valid_schema(result)
assert_orchestration_state_equals(expected, result)

def test_call_activity_function_callable_exception():
context_builder = ContextBuilder('test_call_activity_by_name_exception')

try:
result = get_orchestration_state_result(
context_builder, generator_function_call_activity_with_callable)
# expected an exception
assert False
except Exception as e:
error_label = "\n\n$OutOfProcData$:"
error_str = str(e)

expected_state = base_expected_state()
error_msg = "The `call_activity` API received a `Callable` without an "\
"associated Azure Functions trigger-type. "\
"Please ensure you're using the Python programming model V2 "\
"and that your activity function is annotated with the `activity_trigger`"\
"decorator. Otherwise, provide in the name of the activity as a string."
expected_state._error = error_msg
state_str = expected_state.to_json_string()

expected_error_str = f"{error_msg}{error_label}{state_str}"
assert expected_error_str == error_str

def test_call_activity_function_with_orchestrator_exception():
context_builder = ContextBuilder('test_call_activity_by_name_exception')

try:
result = get_orchestration_state_result(
context_builder, generator_function_call_activity_with_orchestrator)
# expected an exception
assert False
except Exception as e:
error_label = "\n\n$OutOfProcData$:"
error_str = str(e)

expected_state = base_expected_state()
error_msg = "Received function with Trigger-type `"\
+ generator_function_rasing_ex_with_pystein._function._trigger.type\
+ "` but expected `ActivityTrigger`. Ensure your "\
"function is annotated with the `ActivityTrigger`" \
" decorator or directly pass in the name of the "\
"function as a string."
expected_state._error = error_msg
state_str = expected_state.to_json_string()

expected_error_str = f"{error_msg}{error_label}{state_str}"
assert expected_error_str == error_str

def test_user_code_raises_exception():
context_builder = ContextBuilder('test_simple_function')
Expand Down Expand Up @@ -608,4 +711,4 @@ def test_compound_tasks_return_single_action_in_V2():
expected = expected_state.to_json()

#assert_valid_schema(result)
assert_orchestration_state_equals(expected, result)
assert_orchestration_state_equals(expected, result)
Loading

0 comments on commit 1105d8c

Please sign in to comment.