Skip to content

Commit

Permalink
Merge pull request #248 from Azure/dev
Browse files Browse the repository at this point in the history
Promote dev to main for release
  • Loading branch information
davidmrdavid authored Dec 18, 2020
2 parents e7be548 + d8f2c06 commit 3c1ecad
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 13 deletions.
44 changes: 44 additions & 0 deletions azure/durable_functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,50 @@
from .models.DurableEntityContext import DurableEntityContext
from .models.RetryOptions import RetryOptions
from .models.TokenSource import ManagedIdentityTokenSource
import json
from pathlib import Path
import sys


def validate_extension_bundles():
"""Throw an exception if host.json contains bundle-range V1.
Raises
------
Exception: Exception prompting the user to update to bundles V2
"""
# No need to validate if we're running tests
if "pytest" in sys.modules:
return

host_path = "host.json"
bundles_key = "extensionBundle"
version_key = "version"
host_file = Path(host_path)

if not host_file.exists():
# If it doesn't exist, we ignore it
return

with open(host_path) as f:
host_settings = json.loads(f.read())
try:
version_range = host_settings[bundles_key][version_key]
except Exception:
# If bundle info is not available, we ignore it.
# For example: it's possible the user is using a manual extension install
return
# We do a best-effort attempt to detect bundles V1
# This is the string hard-coded into the bundles V1 template in VSCode
if version_range == "[1.*, 2.0.0)":
message = "Durable Functions for Python does not support Bundles V1."\
" Please update to Bundles V2 in your `host.json`."\
" You can set extensionBundles version to be: [2.*, 3.0.0)"
raise Exception(message)


# Validate that users are not in extension bundles V1
validate_extension_bundles()

__all__ = [
'Orchestrator',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self,
self.decision_started_event: HistoryEvent = \
[e_ for e_ in self.histories
if e_.event_type == HistoryEventType.ORCHESTRATOR_STARTED][0]
self._current_utc_datetime = \
self._current_utc_datetime: datetime.datetime = \
self.decision_started_event.timestamp
self._new_uuid_counter = 0
self.actions: List[List[Action]] = []
Expand Down
11 changes: 4 additions & 7 deletions azure/durable_functions/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def handle(self, context: DurableOrchestrationContext):
generation_state.exception)
continue

self._reset_timestamp()
self._update_timestamp()
self.durable_context._is_replaying = generation_state._is_played
generation_state = self._generate_next(generation_state)

Expand Down Expand Up @@ -141,16 +141,13 @@ def _add_to_actions(self, generation_state):
and hasattr(generation_state, "actions")):
self.durable_context.actions.append(generation_state.actions)

def _reset_timestamp(self):
def _update_timestamp(self):
last_timestamp = self.durable_context.decision_started_event.timestamp
decision_started_events = [e_ for e_ in self.durable_context.histories
if e_.event_type == HistoryEventType.ORCHESTRATOR_STARTED
and e_.timestamp > last_timestamp]
if len(decision_started_events) == 0:
self.durable_context.current_utc_datetime = None
else:
self.durable_context.decision_started_event = \
decision_started_events[0]
if len(decision_started_events) != 0:
self.durable_context.decision_started_event = decision_started_events[0]
self.durable_context.current_utc_datetime = \
self.durable_context.decision_started_event.timestamp

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from setuptools import setup, find_packages
from distutils.command import build

with open("README.md", "r") as fh:
with open("README.md", "r", encoding="utf8") as fh:
long_description = fh.read()

class BuildModule(build.build):
Expand Down
133 changes: 133 additions & 0 deletions tests/orchestrator/test_sequential_orchestrator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime, timedelta
from .orchestrator_test_utils \
import assert_orchestration_state_equals, get_orchestration_state_result, assert_valid_schema
from tests.test_utils.ContextBuilder import ContextBuilder
Expand All @@ -20,6 +21,49 @@ def generator_function(context):

return outputs

def generator_function_time_is_not_none(context):
outputs = []

now = context.current_utc_datetime
if not now:
raise Exception("No time! 1st attempt")
task1 = yield context.call_activity("Hello", "Tokyo")

now = context.current_utc_datetime
if not now:
raise Exception("No time! 2nd attempt")
task2 = yield context.call_activity("Hello", "Seattle")

now = context.current_utc_datetime
if not now:
raise Exception("No time! 3rd attempt")
task3 = yield context.call_activity("Hello", "London")

now = context.current_utc_datetime
if not now:
raise Exception("No time! 4th attempt")

outputs.append(task1)
outputs.append(task2)
outputs.append(task3)

return outputs

def generator_function_time_gather(context):
outputs = []

outputs.append(context.current_utc_datetime.strftime("%m/%d/%Y, %H:%M:%S"))
yield context.call_activity("Hello", "Tokyo")

outputs.append(context.current_utc_datetime.strftime("%m/%d/%Y, %H:%M:%S"))
yield context.call_activity("Hello", "Seattle")

outputs.append(context.current_utc_datetime.strftime("%m/%d/%Y, %H:%M:%S"))
yield context.call_activity("Hello", "London")

outputs.append(context.current_utc_datetime.strftime("%m/%d/%Y, %H:%M:%S"))
return outputs

def generator_function_rasing_ex(context):
outputs = []

Expand Down Expand Up @@ -220,3 +264,92 @@ def test_tokyo_and_seattle_and_london_with_serialization_state():

assert_valid_schema(result)
assert_orchestration_state_equals(expected, result)

def test_utc_time_is_never_none():
"""Tests an orchestrator that errors out if its current_utc_datetime is ever None.
If we receive all activity results, it means we never error'ed out. Our test has
a history events array with identical timestamps, simulating events arriving
very close to one another."""

# we set `increase_time` to False to make sure the changes are resilient
# to undistinguishable timestamps (events arrive very close to each other)
context_builder = ContextBuilder('test_simple_function', increase_time=False)
add_hello_completed_events(context_builder, 0, "\"Hello Tokyo!\"")
add_hello_completed_events(context_builder, 1, "\"Hello Seattle!\"")
add_hello_completed_events(context_builder, 2, "\"Hello London!\"")

result = get_orchestration_state_result(
context_builder, generator_function_deterministic_utc_time)

expected_state = base_expected_state(
['Hello Tokyo!', 'Hello Seattle!', 'Hello London!'])
add_hello_action(expected_state, 'Tokyo')
add_hello_action(expected_state, 'Seattle')
add_hello_action(expected_state, 'London')
expected_state._is_done = True
expected = expected_state.to_json()

assert_valid_schema(result)
assert_orchestration_state_equals(expected, result)

def test_utc_time_is_never_none():
"""Tests an orchestrator that errors out if its current_utc_datetime is ever None.
If we receive all activity results, it means we never error'ed out. Our test has
a history events array with identical timestamps, simulating events arriving
very close to one another."""

# we set `increase_time` to False to make sure the changes are resilient
# to undistinguishable timestamps (events arrive very close to each other)
context_builder = ContextBuilder('test_simple_function', increase_time=False)
add_hello_completed_events(context_builder, 0, "\"Hello Tokyo!\"")
add_hello_completed_events(context_builder, 1, "\"Hello Seattle!\"")
add_hello_completed_events(context_builder, 2, "\"Hello London!\"")

result = get_orchestration_state_result(
context_builder, generator_function_time_is_not_none)

expected_state = base_expected_state(
['Hello Tokyo!', 'Hello Seattle!', 'Hello London!'])
add_hello_action(expected_state, 'Tokyo')
add_hello_action(expected_state, 'Seattle')
add_hello_action(expected_state, 'London')
expected_state._is_done = True
expected = expected_state.to_json()

assert_valid_schema(result)
assert_orchestration_state_equals(expected, result)

def test_utc_time_updates_correctly():
"""Tests that current_utc_datetime updates correctly"""

now = datetime.utcnow()
# the first orchestrator-started event starts 1 second after `now`
context_builder = ContextBuilder('test_simple_function', starting_time=now)
add_hello_completed_events(context_builder, 0, "\"Hello Tokyo!\"")
add_hello_completed_events(context_builder, 1, "\"Hello Seattle!\"")
add_hello_completed_events(context_builder, 2, "\"Hello London!\"")

result = get_orchestration_state_result(
context_builder, generator_function_time_gather)

# In the expected history, the orchestrator starts again every 4 seconds
# The current_utc_datetime should update to the orchestrator start event timestamp
num_restarts = 3
expected_utc_time = now + timedelta(seconds=1)
outputs = [expected_utc_time.strftime("%m/%d/%Y, %H:%M:%S")]
for _ in range(num_restarts):
expected_utc_time += timedelta(seconds=4)
outputs.append(expected_utc_time.strftime("%m/%d/%Y, %H:%M:%S"))

expected_state = base_expected_state(outputs)
add_hello_action(expected_state, 'Tokyo')
add_hello_action(expected_state, 'Seattle')
add_hello_action(expected_state, 'London')
expected_state._is_done = True
expected = expected_state.to_json()

assert_valid_schema(result)
assert_orchestration_state_equals(expected, result)

14 changes: 10 additions & 4 deletions tests/test_utils/ContextBuilder.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import uuid
import json
from datetime import datetime, timedelta
from typing import List, Dict, Any
from typing import List, Dict, Any, Optional

from .json_utils import add_attrib, convert_history_event_to_json_dict
from azure.durable_functions.constants import DATETIME_STRING_FORMAT
Expand All @@ -13,20 +13,26 @@


class ContextBuilder:
def __init__(self, name: str=""):
def __init__(self, name: str="", increase_time: bool = True, starting_time: Optional[datetime] = None):
self.increase_time = increase_time
self.instance_id = uuid.uuid4()
self.is_replaying: bool = False
self.input_ = None
self.parent_instance_id = None
self.history_events: List[HistoryEvent] = []
self.current_datetime: datetime = datetime.now()

if starting_time is None:
starting_time = datetime.now()
self.current_datetime: datetime = starting_time

self.add_orchestrator_started_event()
self.add_execution_started_event(name)

def get_base_event(
self, event_type: HistoryEventType, id_: int = -1,
is_played: bool = False, timestamp=None) -> HistoryEvent:
self.current_datetime = self.current_datetime + timedelta(seconds=1)
if self.increase_time:
self.current_datetime = self.current_datetime + timedelta(seconds=1)
if not timestamp:
timestamp = self.current_datetime
event = HistoryEvent(EventType=event_type, EventId=id_,
Expand Down

0 comments on commit 3c1ecad

Please sign in to comment.