Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asyncio #157

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions actionpack/action.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from __future__ import annotations
import asyncio
from functools import partialmethod
from oslash import Left
from oslash import Right
Expand Down Expand Up @@ -110,6 +111,13 @@ def perform(
) -> Result[Outcome]:
return self._perform(should_raise, timestamp_provider)

async def aperform(
self,
should_raise: bool = False,
timestamp_provider: Callable[[], int] = microsecond_timestamp
) -> Result[Outcome]:
return self._perform(should_raise, timestamp_provider)

def validate(self):
return self

Expand Down
48 changes: 48 additions & 0 deletions actionpack/procedure.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import asyncio
import functools
import logging
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
from functools import reduce
Expand All @@ -13,6 +16,8 @@
from actionpack.action import Result
from actionpack import Action

logger = logging.getLogger(__name__)


class Procedure(Generic[Name, Outcome]):

Expand All @@ -31,6 +36,25 @@ def validate(self):
raise Procedure.NotAnAction(msg)
return self

async def aio_gen(
self,
should_raise: bool = False
) -> Iterator[Result[Outcome]]:
actions = []
# We only create coroutines here -- we're not running them until the `gather`.
for action in self.actions:
actions.append(action.aperform(should_raise=should_raise))

return await asyncio.gather(*actions, return_exceptions=not(should_raise))

async def aio_execute(
self,
should_raise: bool = False
) -> Iterator[Result[Outcome]]:
val = await self.aio_gen(should_raise)
logger.debug(f"aio_execute {val}")
return val

def execute(
self,
max_workers: int = 5,
Expand All @@ -42,6 +66,10 @@ def execute(
if synchronously:
for action in actions:
yield action.perform(should_raise=should_raise) if should_raise else action.perform()
elif max_workers <= 0:
logger.debug("running asyncio for Procedure")
for t in asyncio.run(self.aio_execute(should_raise)):
yield t
else:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(action._perform, should_raise=should_raise): str(action) for action in actions}
Expand Down Expand Up @@ -99,6 +127,22 @@ def validate(self):
raise KeyedProcedure.UnnamedAction(msg)
return self

async def aio_gen(
self,
should_raise: bool = False
) -> Iterator[Result[Outcome]]:
for action in self.actions:
ret = await action.aperform(should_raise=should_raise)
yield (action.name, ret)

async def aio_execute(
self,
should_raise: bool = False
) -> Iterator[Result[Outcome]]:
val = [a async for a in self.aio_gen(should_raise)]
logger.debug(f"aio_execute {val}")
return val

def execute(
self,
max_workers: int = 5,
Expand All @@ -109,6 +153,10 @@ def execute(
for action in self:
yield (action.name, action.perform(should_raise=should_raise)) \
if should_raise else (action.name, action.perform())
elif max_workers <= 0:
logger.debug("running asyncio for KeyedProcedure")
for t in asyncio.run(self.aio_execute(should_raise)):
yield t
else:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(action._perform, should_raise=should_raise): action for action in self}
Expand Down
1 change: 1 addition & 0 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
'3.8',
'3.9',
'3.10',
'3.11',
]

nox.options.default_venv_backend = 'none' if not USEVENV else USEVENV
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
coverage==5.5
marshmallow==3.10.0
requests==2.25.1
asyncio==3.4.3

27 changes: 27 additions & 0 deletions tests/actionpack/test_procedure.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,25 @@ def test_can_execute_Procedure_asynchronously(self):
# NOTE: the wellwish precedes since the question took longer
self.assertEqual(file.read(), wellwish + question)

def test_can_execute_Procedure_asyncio(self):
file = FakeFile()

question = b' How are you?'
wellwish = b' I hope you\'re well.'

action1 = FakeWrite[str, int](file, question, delay=0.2)
action2 = FakeWrite[str, int](file, wellwish, delay=0.1)

procedure = Procedure[str, int]((action1, action2))
results = procedure.execute(max_workers=0, should_raise=True, synchronously=False)

assertIsIterable(results)
self.assertIsInstance(next(results), Result)
self.assertIsInstance(next(results), Result)

# NOTE: when running with asyncio the question preceeds the wellwish despite the question taking longer
self.assertEqual(file.read(), question + wellwish)


class KeyedProcedureTest(TestCase):

Expand Down Expand Up @@ -149,6 +168,14 @@ def test_can_execute_asynchronously(self):
self.assertIn('success', results.keys())
self.assertIn('failure', results.keys())

def test_can_execute_asyncio(self):
results = KeyedProcedure((success, failure)).execute(max_workers=0, synchronously=False)

assertIsIterable(results)
results = dict(results)
self.assertIn('success', results.keys())
self.assertIn('failure', results.keys())

def test_can_create_KeyedProcedure_from_Actions_named_using_any_scriptable_type(self):
action1 = FakeAction[int, str]()
action2 = FakeAction[bool, str](instruction_provider=raise_failure)
Expand Down