diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index afbc7db3..8ab6dcee 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -683,7 +683,7 @@ def _get_function_name(self, name: FunctionBuilder, name = name._function._name return name else: - if(trigger_type == OrchestrationTrigger): + if (trigger_type == OrchestrationTrigger): trigger_type = "OrchestrationTrigger" else: trigger_type = "ActivityTrigger" diff --git a/azure/durable_functions/models/Task.py b/azure/durable_functions/models/Task.py index 22973625..22faac45 100644 --- a/azure/durable_functions/models/Task.py +++ b/azure/durable_functions/models/Task.py @@ -56,6 +56,14 @@ def __init__(self, id_: Union[int, str], actions: Union[List[Action], Action]): self.result: Any = None self.action_repr: Union[List[Action], Action] = actions self.is_played = False + self._is_scheduled_flag = False + + @property + def _is_scheduled(self) -> bool: + return self._is_scheduled_flag + + def _set_is_scheduled(self, is_scheduled: bool): + self._is_scheduled_flag = is_scheduled @property def is_completed(self) -> bool: @@ -158,7 +166,8 @@ def __init__(self, tasks: List[TaskBase], compound_action_constructor=None): if isinstance(action_repr, list): child_actions.extend(action_repr) else: - child_actions.append(action_repr) + if not task._is_scheduled: + child_actions.append(action_repr) if compound_action_constructor is None: self.action_repr = child_actions else: # replay_schema is ReplaySchema.V2 @@ -176,6 +185,10 @@ def __init__(self, tasks: List[TaskBase], compound_action_constructor=None): if not (child.state is TaskState.RUNNING): self.handle_completion(child) + @property + def _is_scheduled(self) -> bool: + return all([child._is_scheduled for child in self.children]) + def handle_completion(self, child: TaskBase): """Manage sub-task completion events. diff --git a/azure/durable_functions/models/TaskOrchestrationExecutor.py b/azure/durable_functions/models/TaskOrchestrationExecutor.py index 6eb2add9..25551c78 100644 --- a/azure/durable_functions/models/TaskOrchestrationExecutor.py +++ b/azure/durable_functions/models/TaskOrchestrationExecutor.py @@ -1,4 +1,4 @@ -from azure.durable_functions.models.Task import TaskBase, TaskState, AtomicTask +from azure.durable_functions.models.Task import TaskBase, TaskState, AtomicTask, CompoundTask from azure.durable_functions.models.OrchestratorState import OrchestratorState from azure.durable_functions.models.DurableOrchestrationContext import DurableOrchestrationContext from typing import Any, List, Optional, Union @@ -229,7 +229,8 @@ def resume_user_code(self): task_succeeded = current_task.state is TaskState.SUCCEEDED new_task = self.generator.send( task_value) if task_succeeded else self.generator.throw(task_value) - self.context._add_to_open_tasks(new_task) + if isinstance(new_task, TaskBase) and not (new_task._is_scheduled): + self.context._add_to_open_tasks(new_task) except StopIteration as stop_exception: # the orchestration returned, # flag it as such and capture its output @@ -245,9 +246,17 @@ def resume_user_code(self): # user yielded the same task multiple times, continue executing code # until a new/not-previously-yielded task is encountered self.resume_user_code() - else: + elif not (self.current_task._is_scheduled): # new task is received. it needs to be resolved to a value self.context._add_to_actions(self.current_task.action_repr) + self._mark_as_scheduled(self.current_task) + + def _mark_as_scheduled(self, task: TaskBase): + if isinstance(task, CompoundTask): + for task in task.children: + self._mark_as_scheduled(task) + else: + task._set_is_scheduled(True) def get_orchestrator_state_str(self) -> str: """Obtain a JSON-formatted string representing the orchestration's state. diff --git a/samples-v2/blueprint/.funcignore b/samples-v2/blueprint/.funcignore new file mode 100644 index 00000000..9966315f --- /dev/null +++ b/samples-v2/blueprint/.funcignore @@ -0,0 +1,8 @@ +.git* +.vscode +__azurite_db*__.json +__blobstorage__ +__queuestorage__ +local.settings.json +test +.venv \ No newline at end of file diff --git a/samples-v2/blueprint/.gitignore b/samples-v2/blueprint/.gitignore new file mode 100644 index 00000000..7685fc4a --- /dev/null +++ b/samples-v2/blueprint/.gitignore @@ -0,0 +1,135 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don’t work, or not +# install all needed dependencies. +#Pipfile.lock + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# Azure Functions artifacts +bin +obj +appsettings.json +local.settings.json + +# Azurite artifacts +__blobstorage__ +__queuestorage__ +__azurite_db*__.json +.python_packages \ No newline at end of file diff --git a/samples-v2/blueprint/durable_blueprints.py b/samples-v2/blueprint/durable_blueprints.py new file mode 100644 index 00000000..ec1a443a --- /dev/null +++ b/samples-v2/blueprint/durable_blueprints.py @@ -0,0 +1,31 @@ +import logging +import azure.functions as func +import azure.durable_functions as df + +# To learn more about blueprints in the Python prog model V2, +# see: https://learn.microsoft.com/en-us/azure/azure-functions/functions-reference-python?tabs=asgi%2Capplication-level&pivots=python-mode-decorators#blueprints + +# Note, the `func` namespace does not contain Durable Functions triggers and bindings, so to register blueprints of +# DF we need to use the `df` package's version of blueprints. +bp = df.Blueprint() + +# We define a standard function-chaining DF pattern + +@bp.route(route="startOrchestrator") +@bp.durable_client_input(client_name="client") +async def start_orchestrator(req: func.HttpRequest, client): + instance_id = await client.start_new("my_orchestrator") + + logging.info(f"Started orchestration with ID = '{instance_id}'.") + return client.create_check_status_response(req, instance_id) + +@bp.orchestration_trigger(context_name="context") +def my_orchestrator(context: df.DurableOrchestrationContext): + result1 = yield context.call_activity('say_hello', "Tokyo") + result2 = yield context.call_activity('say_hello', "Seattle") + result3 = yield context.call_activity('say_hello', "London") + return [result1, result2, result3] + +@bp.activity_trigger(input_name="city") +def say_hello(city: str) -> str: + return f"Hello {city}!" \ No newline at end of file diff --git a/samples-v2/blueprint/function_app.py b/samples-v2/blueprint/function_app.py new file mode 100644 index 00000000..65ba510f --- /dev/null +++ b/samples-v2/blueprint/function_app.py @@ -0,0 +1,30 @@ +import azure.functions as func +import logging + +from durable_blueprints import bp + +app = func.FunctionApp(http_auth_level=func.AuthLevel.FUNCTION) +app.register_functions(bp) # register the DF functions + +# Define a simple HTTP trigger function, to show that you can also +# register functions via the `app` object +@app.route(route="HttpTrigger") +def HttpTrigger(req: func.HttpRequest) -> func.HttpResponse: + logging.info('Python HTTP trigger function processed a request.') + + name = req.params.get('name') + if not name: + try: + req_body = req.get_json() + except ValueError: + pass + else: + name = req_body.get('name') + + if name: + return func.HttpResponse(f"Hello, {name}. This HTTP triggered function executed successfully.") + else: + return func.HttpResponse( + "This HTTP triggered function executed successfully. Pass a name in the query string or in the request body for a personalized response.", + status_code=200 + ) \ No newline at end of file diff --git a/samples-v2/blueprint/host.json b/samples-v2/blueprint/host.json new file mode 100644 index 00000000..9df91361 --- /dev/null +++ b/samples-v2/blueprint/host.json @@ -0,0 +1,15 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "excludedTypes": "Request" + } + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle", + "version": "[4.*, 5.0.0)" + } +} \ No newline at end of file diff --git a/samples-v2/blueprint/requirements.txt b/samples-v2/blueprint/requirements.txt new file mode 100644 index 00000000..e1734eda --- /dev/null +++ b/samples-v2/blueprint/requirements.txt @@ -0,0 +1,6 @@ +# DO NOT include azure-functions-worker in this file +# The Python Worker is managed by Azure Functions platform +# Manually managing azure-functions-worker may cause unexpected issues + +azure-functions +azure-functions-durable>=1.2.4 \ No newline at end of file diff --git a/samples-v2/fan_in_fan_out/extensions.csproj b/samples-v2/fan_in_fan_out/extensions.csproj deleted file mode 100644 index 1a58b47a..00000000 --- a/samples-v2/fan_in_fan_out/extensions.csproj +++ /dev/null @@ -1,11 +0,0 @@ - - - netcoreapp3.1 - - ** - - - - - - \ No newline at end of file diff --git a/samples-v2/fan_in_fan_out/host.json b/samples-v2/fan_in_fan_out/host.json index 278b52cd..9df91361 100644 --- a/samples-v2/fan_in_fan_out/host.json +++ b/samples-v2/fan_in_fan_out/host.json @@ -7,5 +7,9 @@ "excludedTypes": "Request" } } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle", + "version": "[4.*, 5.0.0)" } } \ No newline at end of file diff --git a/samples-v2/function_chaining/extensions.csproj b/samples-v2/function_chaining/extensions.csproj deleted file mode 100644 index 1a58b47a..00000000 --- a/samples-v2/function_chaining/extensions.csproj +++ /dev/null @@ -1,11 +0,0 @@ - - - netcoreapp3.1 - - ** - - - - - - \ No newline at end of file diff --git a/samples-v2/function_chaining/function_app.py b/samples-v2/function_chaining/function_app.py index d92c83ac..d3a0cc0b 100644 --- a/samples-v2/function_chaining/function_app.py +++ b/samples-v2/function_chaining/function_app.py @@ -6,7 +6,7 @@ @myApp.route(route="orchestrators/{functionName}") @myApp.durable_client_input(client_name="client") -async def HttpStart(req: func.HttpRequest, client): +async def http_start(req: func.HttpRequest, client): function_name = req.route_params.get('functionName') instance_id = await client.start_new(function_name) @@ -14,12 +14,12 @@ async def HttpStart(req: func.HttpRequest, client): return client.create_check_status_response(req, instance_id) @myApp.orchestration_trigger(context_name="context") -def E1_SayHello(context: df.DurableOrchestrationContext): +def my_orchestrator(context: df.DurableOrchestrationContext): result1 = yield context.call_activity('say_hello', "Tokyo") result2 = yield context.call_activity('say_hello', "Seattle") result3 = yield context.call_activity('say_hello', "London") return [result1, result2, result3] @myApp.activity_trigger(input_name="city") -def E1_SayHello(city: str) -> str: +def say_hello(city: str) -> str: return f"Hello {city}!" \ No newline at end of file diff --git a/samples-v2/function_chaining/host.json b/samples-v2/function_chaining/host.json index 278b52cd..15324357 100644 --- a/samples-v2/function_chaining/host.json +++ b/samples-v2/function_chaining/host.json @@ -1,4 +1,18 @@ { + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "excludedTypes": "Request" + } + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle", + "version": "[4.*, 5.0.0)" + } +}{ "version": "2.0", "logging": { "applicationInsights": { diff --git a/tests/orchestrator/test_sequential_orchestrator.py b/tests/orchestrator/test_sequential_orchestrator.py index 9ced2980..4cac7334 100644 --- a/tests/orchestrator/test_sequential_orchestrator.py +++ b/tests/orchestrator/test_sequential_orchestrator.py @@ -77,6 +77,24 @@ def generator_function_duplicate_yield(context): return "" +def generator_function_reducing_when_all(context): + task1 = context.call_activity("Hello", "Tokyo") + task2 = context.call_activity("Hello", "Seattle") + pending_tasks = [task1, task2] + + # Yield until first task is completed + finished_task1 = yield context.task_any(pending_tasks) + + # Remove completed task from pending tasks + pending_tasks.remove(finished_task1) + + # Yield remaining task + yield context.task_any(pending_tasks) + + # Ensure we can still schedule new tasks + yield context.call_activity("Hello", "London") + return "" + def generator_function_compound_tasks(context): yield context.call_activity("Hello", "Tokyo") @@ -689,6 +707,30 @@ def test_duplicate_yields_do_not_add_duplicate_actions(): assert_valid_schema(result) assert_orchestration_state_equals(expected, result) +def test_reducing_when_any_pattern(): + """Tests that a user can call when_any on a progressively smaller list of already scheduled tasks""" + context_builder = ContextBuilder('test_reducing_when_any', replay_schema=ReplaySchema.V2) + add_hello_completed_events(context_builder, 0, "\"Hello Tokyo!\"") + add_hello_completed_events(context_builder, 1, "\"Hello Seattle!\"") + add_hello_completed_events(context_builder, 2, "\"Hello London!\"") + + result = get_orchestration_state_result( + context_builder, generator_function_reducing_when_all) + + # this scenario is only supported for V2 replay + expected_state = base_expected_state("",replay_schema=ReplaySchema.V2) + expected_state._actions = [ + [WhenAnyAction( + [CallActivityAction("Hello", "Seattle"), CallActivityAction("Hello", "Tokyo")]), + CallActivityAction("Hello", "London") + ] + ] + + expected_state._is_done = True + expected = expected_state.to_json() + + assert_orchestration_state_equals(expected, result) + def test_compound_tasks_return_single_action_in_V2(): """Tests that compound tasks, in the v2 replay schema, are represented as a single "deep" action""" context_builder = ContextBuilder('test_v2_replay_schema', replay_schema=ReplaySchema.V2)