From ec9b8439446939392153aba113bc8afa7f7c8099 Mon Sep 17 00:00:00 2001 From: Nicholas Flink Date: Thu, 25 Jul 2024 15:39:44 +0300 Subject: [PATCH 01/10] support python 3.11 and asyncio --- noxfile.py | 1 + requirements.txt | 2 ++ 2 files changed, 3 insertions(+) diff --git a/noxfile.py b/noxfile.py index 801868c..ca5240a 100644 --- a/noxfile.py +++ b/noxfile.py @@ -24,6 +24,7 @@ '3.8', '3.9', '3.10', + '3.11', ] nox.options.default_venv_backend = 'none' if not USEVENV else USEVENV diff --git a/requirements.txt b/requirements.txt index b675cee..5bbae2a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,5 @@ coverage==5.5 marshmallow==3.10.0 requests==2.25.1 +asyncio==3.4.3 + From 9b1adf5b0748e2e1e8bec503d896b388fba50dff Mon Sep 17 00:00:00 2001 From: Nicholas Flink Date: Thu, 25 Jul 2024 15:59:40 +0300 Subject: [PATCH 02/10] create a failing test to iterate on --- actionpack/procedure.py | 7 +++++++ tests/actionpack/test_procedure.py | 8 ++++++++ 2 files changed, 15 insertions(+) diff --git a/actionpack/procedure.py b/actionpack/procedure.py index 6294801..d823b3e 100644 --- a/actionpack/procedure.py +++ b/actionpack/procedure.py @@ -1,3 +1,4 @@ +import asyncio from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed from functools import reduce @@ -42,6 +43,9 @@ def execute( if synchronously: for action in actions: yield action.perform(should_raise=should_raise) if should_raise else action.perform() + elif max_workers == 1: + # Put asyncio code here + pass else: with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(action._perform, should_raise=should_raise): str(action) for action in actions} @@ -109,6 +113,9 @@ def execute( for action in self: yield (action.name, action.perform(should_raise=should_raise)) \ if should_raise else (action.name, action.perform()) + elif max_workers == 1: + # Put asyncio code here + pass else: with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(action._perform, should_raise=should_raise): action for action in self} diff --git a/tests/actionpack/test_procedure.py b/tests/actionpack/test_procedure.py index 37afb82..2ddd590 100644 --- a/tests/actionpack/test_procedure.py +++ b/tests/actionpack/test_procedure.py @@ -149,6 +149,14 @@ def test_can_execute_asynchronously(self): self.assertIn('success', results.keys()) self.assertIn('failure', results.keys()) + def test_can_execute_asyncio(self): + results = KeyedProcedure((success, failure)).execute(max_workers=1, synchronously=False) + + assertIsIterable(results) + results = dict(results) + self.assertIn('success', results.keys()) + self.assertIn('failure', results.keys()) + def test_can_create_KeyedProcedure_from_Actions_named_using_any_scriptable_type(self): action1 = FakeAction[int, str]() action2 = FakeAction[bool, str](instruction_provider=raise_failure) From dc589b49ef2edabe25ebb345640b837b781fa188 Mon Sep 17 00:00:00 2001 From: Nicholas Flink Date: Thu, 25 Jul 2024 16:04:02 +0300 Subject: [PATCH 03/10] only run asyncio if max_workers==0 and not synchronously --- actionpack/procedure.py | 4 ++-- tests/actionpack/test_procedure.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/actionpack/procedure.py b/actionpack/procedure.py index d823b3e..742fe33 100644 --- a/actionpack/procedure.py +++ b/actionpack/procedure.py @@ -43,7 +43,7 @@ def execute( if synchronously: for action in actions: yield action.perform(should_raise=should_raise) if should_raise else action.perform() - elif max_workers == 1: + elif max_workers <= 0: # Put asyncio code here pass else: @@ -113,7 +113,7 @@ def execute( for action in self: yield (action.name, action.perform(should_raise=should_raise)) \ if should_raise else (action.name, action.perform()) - elif max_workers == 1: + elif max_workers <= 0: # Put asyncio code here pass else: diff --git a/tests/actionpack/test_procedure.py b/tests/actionpack/test_procedure.py index 2ddd590..c717817 100644 --- a/tests/actionpack/test_procedure.py +++ b/tests/actionpack/test_procedure.py @@ -150,7 +150,7 @@ def test_can_execute_asynchronously(self): self.assertIn('failure', results.keys()) def test_can_execute_asyncio(self): - results = KeyedProcedure((success, failure)).execute(max_workers=1, synchronously=False) + results = KeyedProcedure((success, failure)).execute(max_workers=0, synchronously=False) assertIsIterable(results) results = dict(results) From a61c79c1876c6ca3353a89b8a8ce727113a18bfe Mon Sep 17 00:00:00 2001 From: Nicholas Flink Date: Thu, 25 Jul 2024 17:04:00 +0300 Subject: [PATCH 04/10] first naive implementation of a keyed generator --- actionpack/procedure.py | 39 +++++++++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/actionpack/procedure.py b/actionpack/procedure.py index 742fe33..60913fa 100644 --- a/actionpack/procedure.py +++ b/actionpack/procedure.py @@ -14,6 +14,21 @@ from actionpack.action import Result from actionpack import Action +import logging + +# TODO(nick.flink) clean this up +# create logger +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) +# create console handler and set level to debug +ch = logging.StreamHandler() +ch.setLevel(logging.DEBUG) +# create formatter +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +# add formatter to ch +ch.setFormatter(formatter) +# add ch to logger +logger.addHandler(ch) class Procedure(Generic[Name, Outcome]): @@ -44,8 +59,7 @@ def execute( for action in actions: yield action.perform(should_raise=should_raise) if should_raise else action.perform() elif max_workers <= 0: - # Put asyncio code here - pass + raise NotImplemented("refine implementation of KeyedProcedure and consider how to share with this") else: with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(action._perform, should_raise=should_raise): str(action) for action in actions} @@ -103,6 +117,22 @@ def validate(self): raise KeyedProcedure.UnnamedAction(msg) return self + async def aio_gen( + self, + should_raise: bool = False + ) -> Iterator[Result[Outcome]]: + for action in self.actions: + ret = await action.perform(should_raise=should_raise) if should_raise else action.perform() + yield (action.name, ret) + + async def aio_execute( + self, + should_raise: bool = False + ) -> Iterator[Result[Outcome]]: + val = [a async for a in self.aio_gen(should_raise)] + logger.debug(f"aio_execute {val}") + return val + def execute( self, max_workers: int = 5, @@ -114,8 +144,9 @@ def execute( yield (action.name, action.perform(should_raise=should_raise)) \ if should_raise else (action.name, action.perform()) elif max_workers <= 0: - # Put asyncio code here - pass + logger.debug("running asyncio for KeyedProcedure") + for t in asyncio.run(self.aio_execute(should_raise)): + yield t else: with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(action._perform, should_raise=should_raise): action for action in self} From 8f9f4a20ca3947c0f105e3ea4457f427c34bef51 Mon Sep 17 00:00:00 2001 From: Nicholas Flink Date: Fri, 26 Jul 2024 10:46:31 +0300 Subject: [PATCH 05/10] all tests passing --- actionpack/action.py | 8 ++++++++ actionpack/procedure.py | 30 ++++++++++++++++++++++++++++-- tests/actionpack/test_procedure.py | 19 +++++++++++++++++++ 3 files changed, 55 insertions(+), 2 deletions(-) diff --git a/actionpack/action.py b/actionpack/action.py index 83b757e..2d5601e 100644 --- a/actionpack/action.py +++ b/actionpack/action.py @@ -1,4 +1,5 @@ from __future__ import annotations +import asyncio from functools import partialmethod from oslash import Left from oslash import Right @@ -110,6 +111,13 @@ def perform( ) -> Result[Outcome]: return self._perform(should_raise, timestamp_provider) + async def aperform( + self, + should_raise: bool = False, + timestamp_provider: Callable[[], int] = microsecond_timestamp + ) -> Result[Outcome]: + return self._perform(should_raise, timestamp_provider) + def validate(self): return self diff --git a/actionpack/procedure.py b/actionpack/procedure.py index 60913fa..8e0c81a 100644 --- a/actionpack/procedure.py +++ b/actionpack/procedure.py @@ -1,4 +1,6 @@ import asyncio +import functools +import inspect from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed from functools import reduce @@ -47,6 +49,28 @@ def validate(self): raise Procedure.NotAnAction(msg) return self + async def aio_gen( + self, + should_raise: bool = False + ) -> Iterator[Result[Outcome]]: + for action in self.actions: + logger.debug(f"running action {action}") + if inspect.iscoroutinefunction(action.aperform): + ret = await action.aperform(should_raise=should_raise) if should_raise else await action.aperform() + else: + loop = asyncio.get_running_loop() + ret = await loop.run_in_executor(None, action.perform, { + 'should_raise': should_raise}) + yield ret + + async def aio_execute( + self, + should_raise: bool = False + ) -> Iterator[Result[Outcome]]: + val = [a async for a in self.aio_gen(should_raise)] + logger.debug(f"aio_execute {val}") + return val + def execute( self, max_workers: int = 5, @@ -59,7 +83,9 @@ def execute( for action in actions: yield action.perform(should_raise=should_raise) if should_raise else action.perform() elif max_workers <= 0: - raise NotImplemented("refine implementation of KeyedProcedure and consider how to share with this") + logger.debug("running asyncio for Procedure") + for t in asyncio.run(self.aio_execute(should_raise)): + yield t else: with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(action._perform, should_raise=should_raise): str(action) for action in actions} @@ -122,7 +148,7 @@ async def aio_gen( should_raise: bool = False ) -> Iterator[Result[Outcome]]: for action in self.actions: - ret = await action.perform(should_raise=should_raise) if should_raise else action.perform() + ret = await action.aperform(should_raise=should_raise) yield (action.name, ret) async def aio_execute( diff --git a/tests/actionpack/test_procedure.py b/tests/actionpack/test_procedure.py index c717817..65804f0 100644 --- a/tests/actionpack/test_procedure.py +++ b/tests/actionpack/test_procedure.py @@ -113,6 +113,25 @@ def test_can_execute_Procedure_asynchronously(self): # NOTE: the wellwish precedes since the question took longer self.assertEqual(file.read(), wellwish + question) + def test_can_execute_Procedure_asyncio(self): + file = FakeFile() + + question = b' How are you?' + wellwish = b' I hope you\'re well.' + + action1 = FakeWrite[str, int](file, question, delay=0.2) + action2 = FakeWrite[str, int](file, wellwish, delay=0.1) + + procedure = Procedure[str, int]((action1, action2)) + results = procedure.execute(max_workers=0, should_raise=True, synchronously=False) + + assertIsIterable(results) + self.assertIsInstance(next(results), Result) + self.assertIsInstance(next(results), Result) + + # NOTE: when running with asyncio the question preceeds the wellwish despite the question taking longer + self.assertEqual(file.read(), question + wellwish) + class KeyedProcedureTest(TestCase): From f30a08bc4c698c9f0aa6ade872f4bf63d121f7ad Mon Sep 17 00:00:00 2001 From: Nicholas Flink Date: Fri, 26 Jul 2024 10:50:23 +0300 Subject: [PATCH 06/10] no longer need inspect --- actionpack/procedure.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/actionpack/procedure.py b/actionpack/procedure.py index 8e0c81a..85c62d0 100644 --- a/actionpack/procedure.py +++ b/actionpack/procedure.py @@ -1,6 +1,5 @@ import asyncio import functools -import inspect from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed from functools import reduce @@ -18,8 +17,6 @@ import logging -# TODO(nick.flink) clean this up -# create logger logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # create console handler and set level to debug @@ -55,12 +52,7 @@ async def aio_gen( ) -> Iterator[Result[Outcome]]: for action in self.actions: logger.debug(f"running action {action}") - if inspect.iscoroutinefunction(action.aperform): - ret = await action.aperform(should_raise=should_raise) if should_raise else await action.aperform() - else: - loop = asyncio.get_running_loop() - ret = await loop.run_in_executor(None, action.perform, { - 'should_raise': should_raise}) + ret = await action.aperform(should_raise=should_raise) yield ret async def aio_execute( From e8264a862ac28e44f3cfa03134ae5bc40c183dfc Mon Sep 17 00:00:00 2001 From: Nicholas Flink Date: Fri, 26 Jul 2024 10:52:13 +0300 Subject: [PATCH 07/10] clean up debug logging --- actionpack/procedure.py | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/actionpack/procedure.py b/actionpack/procedure.py index 85c62d0..87916a8 100644 --- a/actionpack/procedure.py +++ b/actionpack/procedure.py @@ -1,5 +1,6 @@ import asyncio import functools +import logging from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed from functools import reduce @@ -15,19 +16,8 @@ from actionpack.action import Result from actionpack import Action -import logging - logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) -# create console handler and set level to debug -ch = logging.StreamHandler() -ch.setLevel(logging.DEBUG) -# create formatter -formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') -# add formatter to ch -ch.setFormatter(formatter) -# add ch to logger -logger.addHandler(ch) + class Procedure(Generic[Name, Outcome]): From a8570ac8a0ed9dc765f7bcdfc3dc7a232eb43c7b Mon Sep 17 00:00:00 2001 From: Aman Luqman Date: Sun, 28 Jul 2024 17:50:15 -0500 Subject: [PATCH 08/10] Schedule coroutines concurrently --- actionpack/procedure.py | 13 +++++++++---- tests/actionpack/test_procedure.py | 26 ++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/actionpack/procedure.py b/actionpack/procedure.py index 87916a8..adbd93e 100644 --- a/actionpack/procedure.py +++ b/actionpack/procedure.py @@ -40,16 +40,21 @@ async def aio_gen( self, should_raise: bool = False ) -> Iterator[Result[Outcome]]: + # for action in self.actions: + # logger.debug(f"running action {action}") + # ret = await action.aperform(should_raise=should_raise) + # yield ret + actions = [] for action in self.actions: - logger.debug(f"running action {action}") - ret = await action.aperform(should_raise=should_raise) - yield ret + actions.append(action.aperform(should_raise=should_raise)) + ret = await asyncio.gather(*actions) + return ret async def aio_execute( self, should_raise: bool = False ) -> Iterator[Result[Outcome]]: - val = [a async for a in self.aio_gen(should_raise)] + val = await self.aio_gen(should_raise) logger.debug(f"aio_execute {val}") return val diff --git a/tests/actionpack/test_procedure.py b/tests/actionpack/test_procedure.py index 65804f0..97fa236 100644 --- a/tests/actionpack/test_procedure.py +++ b/tests/actionpack/test_procedure.py @@ -176,6 +176,32 @@ def test_can_execute_asyncio(self): self.assertIn('success', results.keys()) self.assertIn('failure', results.keys()) + def test_benchmark_asyncio_vs_threads(self): + import timeit + import logging + import sys + + logger = logging.getLogger(__name__) + logger.setLevel(logging.INFO) + stream_handler = logging.StreamHandler() + logger.addHandler(stream_handler) + + start_time = timeit.default_timer() + KeyedProcedure((success, failure)).execute(max_workers=0, synchronously=False) + async_results = timeit.default_timer() - start_time + + start_time = timeit.default_timer() + KeyedProcedure((success, failure)).execute(synchronously=False) + thread_results = timeit.default_timer() - start_time + + start_time = timeit.default_timer() + KeyedProcedure((success, failure)).execute(max_workers=0, synchronously=False) + synch_results = timeit.default_timer() - start_time + + logger.info(f"\nAsync results: {async_results}") + logger.info(f"\nThreaded results: {thread_results}") + logger.info(f"\nSynchronous results: {synch_results}") + def test_can_create_KeyedProcedure_from_Actions_named_using_any_scriptable_type(self): action1 = FakeAction[int, str]() action2 = FakeAction[bool, str](instruction_provider=raise_failure) From dccd2e8733a6c6c2e4ddab5c4f764892b567ea94 Mon Sep 17 00:00:00 2001 From: Aman Luqman Date: Sun, 28 Jul 2024 18:02:09 -0500 Subject: [PATCH 09/10] Remove benchmark test and extra code --- actionpack/procedure.py | 9 +++------ tests/actionpack/test_procedure.py | 26 -------------------------- 2 files changed, 3 insertions(+), 32 deletions(-) diff --git a/actionpack/procedure.py b/actionpack/procedure.py index adbd93e..d4dbd4a 100644 --- a/actionpack/procedure.py +++ b/actionpack/procedure.py @@ -40,15 +40,12 @@ async def aio_gen( self, should_raise: bool = False ) -> Iterator[Result[Outcome]]: - # for action in self.actions: - # logger.debug(f"running action {action}") - # ret = await action.aperform(should_raise=should_raise) - # yield ret actions = [] + # We only create coroutines here -- we're not running them until the `gather`. for action in self.actions: actions.append(action.aperform(should_raise=should_raise)) - ret = await asyncio.gather(*actions) - return ret + + return await asyncio.gather(*actions) async def aio_execute( self, diff --git a/tests/actionpack/test_procedure.py b/tests/actionpack/test_procedure.py index 97fa236..65804f0 100644 --- a/tests/actionpack/test_procedure.py +++ b/tests/actionpack/test_procedure.py @@ -176,32 +176,6 @@ def test_can_execute_asyncio(self): self.assertIn('success', results.keys()) self.assertIn('failure', results.keys()) - def test_benchmark_asyncio_vs_threads(self): - import timeit - import logging - import sys - - logger = logging.getLogger(__name__) - logger.setLevel(logging.INFO) - stream_handler = logging.StreamHandler() - logger.addHandler(stream_handler) - - start_time = timeit.default_timer() - KeyedProcedure((success, failure)).execute(max_workers=0, synchronously=False) - async_results = timeit.default_timer() - start_time - - start_time = timeit.default_timer() - KeyedProcedure((success, failure)).execute(synchronously=False) - thread_results = timeit.default_timer() - start_time - - start_time = timeit.default_timer() - KeyedProcedure((success, failure)).execute(max_workers=0, synchronously=False) - synch_results = timeit.default_timer() - start_time - - logger.info(f"\nAsync results: {async_results}") - logger.info(f"\nThreaded results: {thread_results}") - logger.info(f"\nSynchronous results: {synch_results}") - def test_can_create_KeyedProcedure_from_Actions_named_using_any_scriptable_type(self): action1 = FakeAction[int, str]() action2 = FakeAction[bool, str](instruction_provider=raise_failure) From 6f61ca9145546c4e4e64c3557dabf592d9e1fa05 Mon Sep 17 00:00:00 2001 From: Aman Luqman Date: Sun, 28 Jul 2024 18:36:04 -0500 Subject: [PATCH 10/10] Ensure exceptions are raised in asyncio.gather --- actionpack/procedure.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/actionpack/procedure.py b/actionpack/procedure.py index d4dbd4a..531a86f 100644 --- a/actionpack/procedure.py +++ b/actionpack/procedure.py @@ -45,7 +45,7 @@ async def aio_gen( for action in self.actions: actions.append(action.aperform(should_raise=should_raise)) - return await asyncio.gather(*actions) + return await asyncio.gather(*actions, return_exceptions=not(should_raise)) async def aio_execute( self,