From 9c8ab16b5cc4734b7d07156aeb90553eca2a4075 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Sat, 25 Dec 2021 12:49:08 +0100 Subject: [PATCH] Refactor time-based tests to use fake and sharp loop time MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Utilise a new library — https://github.com/nolar/looptime — to have a fake time in event loops. The library does a lot of things, but the most important one is the sharp time with predictable steps with **no code overhead included**, which is typically random. Everything else goes as a side-effect: e.g. the fast execution of such tests with zero real-time for any fake duration of loop time, convenient `looptime` fixture for assertions, etc. Signed-off-by: Sergey Vasilyev --- pytest.ini | 1 + requirements.txt | 1 + tests/apis/test_api_requests.py | 60 ++++++----- tests/conftest.py | 70 ++---------- .../daemons/test_daemon_rematching.py | 7 +- .../daemons/test_daemon_termination.py | 21 ++-- .../indexing/test_blocking_until_indexed.py | 72 ++++++------- tests/k8s/test_watching_with_freezes.py | 45 ++++---- .../test_processing_of_namespaces.py | 28 ++--- .../test_processing_of_resources.py | 63 +++++------ tests/posting/test_threadsafety.py | 69 ++++++------ tests/primitives/test_conditions.py | 16 +-- tests/primitives/test_containers.py | 59 +++++----- tests/primitives/test_flags.py | 26 ++--- tests/primitives/test_toggles.py | 30 +++--- tests/primitives/test_togglesets.py | 28 ++--- tests/reactor/test_queueing.py | 55 +++++----- tests/references/test_backbone.py | 21 ++-- tests/test_async.py | 14 +-- tests/timing/test_sleeping.py | 60 +++++------ tests/utilities/aiotasks/test_scheduler.py | 102 ++++++++---------- .../utilities/aiotasks/test_task_selection.py | 2 +- .../utilities/aiotasks/test_task_stopping.py | 37 ++++--- tests/utilities/aiotasks/test_task_waiting.py | 8 +- 24 files changed, 401 insertions(+), 494 deletions(-) diff --git a/pytest.ini b/pytest.ini index a68aafec..2060b24d 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,3 +1,4 @@ [pytest] addopts = --strict-markers + --looptime diff --git a/requirements.txt b/requirements.txt index c1fdac47..2fc3a1eb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,6 +12,7 @@ coveralls freezegun import-linter isort>=5.5.0 +looptime lxml # Mypy requires typed-ast, which is broken on PyPy 3.7 (could work in PyPy 3.8). mypy==0.930; implementation_name == "cpython" diff --git a/tests/apis/test_api_requests.py b/tests/apis/test_api_requests.py index ea2aa9de..fab9f075 100644 --- a/tests/apis/test_api_requests.py +++ b/tests/apis/test_api_requests.py @@ -138,23 +138,23 @@ async def test_parsing_in_streams( (delete, 'delete'), ]) async def test_direct_timeout_in_requests( - resp_mocker, aresponses, hostname, fn, method, settings, logger, timer): + resp_mocker, aresponses, hostname, fn, method, settings, logger, looptime): async def serve_slowly(): - await asyncio.sleep(1.0) + await asyncio.sleep(10) return aiohttp.web.json_response({}) mock = resp_mocker(side_effect=serve_slowly) aresponses.add(hostname, '/url', method, mock) - with timer, pytest.raises(asyncio.TimeoutError): - timeout = aiohttp.ClientTimeout(total=0.1) + with pytest.raises(asyncio.TimeoutError): + timeout = aiohttp.ClientTimeout(total=1.23) # aiohttp raises an asyncio.TimeoutError which is automatically retried. # To reduce the test duration we disable retries for this test. settings.networking.error_backoffs = None await fn('/url', timeout=timeout, settings=settings, logger=logger) - assert 0.1 < timer.seconds < 0.2 + assert looptime == 1.23 @pytest.mark.parametrize('fn, method', [ @@ -164,84 +164,85 @@ async def serve_slowly(): (delete, 'delete'), ]) async def test_settings_timeout_in_requests( - resp_mocker, aresponses, hostname, fn, method, settings, logger, timer): + resp_mocker, aresponses, hostname, fn, method, settings, logger, looptime): async def serve_slowly(): - await asyncio.sleep(1.0) + await asyncio.sleep(10) return aiohttp.web.json_response({}) mock = resp_mocker(side_effect=serve_slowly) aresponses.add(hostname, '/url', method, mock) - with timer, pytest.raises(asyncio.TimeoutError): - settings.networking.request_timeout = 0.1 + with pytest.raises(asyncio.TimeoutError): + settings.networking.request_timeout = 1.23 # aiohttp raises an asyncio.TimeoutError which is automatically retried. # To reduce the test duration we disable retries for this test. settings.networking.error_backoffs = None await fn('/url', settings=settings, logger=logger) - assert 0.1 < timer.seconds < 0.2 + assert looptime == 1.23 @pytest.mark.parametrize('method', ['get']) # the only supported method at the moment async def test_direct_timeout_in_streams( - resp_mocker, aresponses, hostname, method, settings, logger, timer): + resp_mocker, aresponses, hostname, method, settings, logger, looptime): async def serve_slowly(): - await asyncio.sleep(1.0) + await asyncio.sleep(10) return "{}" mock = resp_mocker(side_effect=serve_slowly) aresponses.add(hostname, '/url', method, mock) - with timer, pytest.raises(asyncio.TimeoutError): - timeout = aiohttp.ClientTimeout(total=0.1) + with pytest.raises(asyncio.TimeoutError): + timeout = aiohttp.ClientTimeout(total=1.23) # aiohttp raises an asyncio.TimeoutError which is automatically retried. # To reduce the test duration we disable retries for this test. settings.networking.error_backoffs = None async for _ in stream('/url', timeout=timeout, settings=settings, logger=logger): pass - assert 0.1 < timer.seconds < 0.2 + assert looptime == 1.23 @pytest.mark.parametrize('method', ['get']) # the only supported method at the moment async def test_settings_timeout_in_streams( - resp_mocker, aresponses, hostname, method, settings, logger, timer): + resp_mocker, aresponses, hostname, method, settings, logger, looptime): async def serve_slowly(): - await asyncio.sleep(1.0) + await asyncio.sleep(10) return "{}" mock = resp_mocker(side_effect=serve_slowly) aresponses.add(hostname, '/url', method, mock) - with timer, pytest.raises(asyncio.TimeoutError): - settings.networking.request_timeout = 0.1 + with pytest.raises(asyncio.TimeoutError): + settings.networking.request_timeout = 1.23 # aiohttp raises an asyncio.TimeoutError which is automatically retried. # To reduce the test duration we disable retries for this test. settings.networking.error_backoffs = None async for _ in stream('/url', settings=settings, logger=logger): pass - assert 0.1 < timer.seconds < 0.2 + assert looptime == 1.23 -@pytest.mark.parametrize('delay, expected', [ - pytest.param(0.0, [], id='instant-none'), - pytest.param(0.1, [{'fake': 'result1'}], id='fast-single'), - pytest.param(9.9, [{'fake': 'result1'}, {'fake': 'result2'}], id='inf-double'), +@pytest.mark.parametrize('delay, expected_times, expected_items', [ + pytest.param(0, [], [], id='instant-none'), + pytest.param(2, [1], [{'fake': 'result1'}], id='fast-single'), + pytest.param(9, [1, 4], [{'fake': 'result1'}, {'fake': 'result2'}], id='inf-double'), ]) @pytest.mark.parametrize('method', ['get']) # the only supported method at the moment async def test_stopper_in_streams( - resp_mocker, aresponses, hostname, method, delay, expected, settings, logger): + resp_mocker, aresponses, hostname, method, delay, settings, logger, looptime, + expected_items, expected_times): async def stream_slowly(request: aiohttp.ClientRequest): response = aiohttp.web.StreamResponse() await response.prepare(request) - await asyncio.sleep(0.05) + await asyncio.sleep(1) await response.write(b'{"fake": "result1"}\n') - await asyncio.sleep(0.15) + await asyncio.sleep(3) await response.write(b'{"fake": "result2"}\n') await response.write_eof() return response @@ -252,7 +253,10 @@ async def stream_slowly(request: aiohttp.ClientRequest): asyncio.get_running_loop().call_later(delay, stopper.set_result, None) items = [] + times = [] async for item in stream('/url', stopper=stopper, settings=settings, logger=logger): items.append(item) + times.append(float(looptime)) - assert items == expected + assert items == expected_items + assert times == expected_times diff --git a/tests/conftest.py b/tests/conftest.py index 341bfd0b..03c53edc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,7 +6,6 @@ import logging import re import sys -import time from unittest.mock import Mock import aiohttp.web @@ -552,69 +551,6 @@ def no_certvalidator(): yield from _with_module_absent('certvalidator') -# -# Helpers for the timing checks. -# - -@pytest.fixture() -def timer(): - return Timer() - - -class Timer(object): - """ - A helper context manager to measure the time of the code-blocks. - Also, supports direct comparison with time-deltas and the numbers of seconds. - - Usage: - - with Timer() as timer: - do_something() - print(f"Executing for {timer.seconds}s already.") - do_something_else() - - print(f"Executed in {timer.seconds}s.") - assert timer < 5.0 - """ - - def __init__(self): - super().__init__() - self._ts = None - self._te = None - - @property - def seconds(self): - if self._ts is None: - return None - elif self._te is None: - return time.perf_counter() - self._ts - else: - return self._te - self._ts - - def __repr__(self): - status = 'new' if self._ts is None else 'running' if self._te is None else 'finished' - return f'' - - def __enter__(self): - self._ts = time.perf_counter() - self._te = None - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self._te = time.perf_counter() - - async def __aenter__(self): - return self.__enter__() - - async def __aexit__(self, exc_type, exc_val, exc_tb): - return self.__exit__(exc_type, exc_val, exc_tb) - - def __int__(self): - return int(self.seconds) - - def __float__(self): - return float(self.seconds) - # # Helpers for the logging checks. # @@ -725,3 +661,9 @@ def _no_asyncio_pending_tasks(event_loop): remains = after - before if remains: pytest.fail(f"Unattended asyncio tasks detected: {remains!r}") + + +@pytest.fixture() +def loop(event_loop): + """Sync aiohttp's server-side timeline with kopf's client-side timeline.""" + return event_loop diff --git a/tests/handling/daemons/test_daemon_rematching.py b/tests/handling/daemons/test_daemon_rematching.py index 3190c032..4442f743 100644 --- a/tests/handling/daemons/test_daemon_rematching.py +++ b/tests/handling/daemons/test_daemon_rematching.py @@ -5,7 +5,7 @@ async def test_running_daemon_is_stopped_when_mismatches( - resource, dummy, timer, mocker, caplog, assert_logs, k8s_mocked, simulate_cycle): + resource, dummy, looptime, mocker, caplog, assert_logs, k8s_mocked, simulate_cycle): caplog.set_level(logging.DEBUG) @kopf.daemon(*resource, id='fn', when=lambda **_: is_matching) @@ -26,9 +26,8 @@ async def fn(**kwargs): mocker.resetall() is_matching = False await simulate_cycle({}) - with timer: - await dummy.wait_for_daemon_done() + await dummy.wait_for_daemon_done() - assert timer.seconds < 0.01 # near-instantly + assert looptime == 0 stopped = dummy.kwargs['stopped'] assert DaemonStoppingReason.FILTERS_MISMATCH in stopped.reason diff --git a/tests/handling/daemons/test_daemon_termination.py b/tests/handling/daemons/test_daemon_termination.py index ac0f8f3f..3a0e97e1 100644 --- a/tests/handling/daemons/test_daemon_termination.py +++ b/tests/handling/daemons/test_daemon_termination.py @@ -9,7 +9,7 @@ async def test_daemon_exits_gracefully_and_instantly_on_resource_deletion( settings, resource, dummy, simulate_cycle, - caplog, assert_logs, k8s_mocked, frozen_time, mocker, timer): + caplog, assert_logs, k8s_mocked, frozen_time, mocker, looptime): caplog.set_level(logging.DEBUG) # A daemon-under-test. @@ -31,10 +31,9 @@ async def fn(**kwargs): await simulate_cycle(event_object) # Check that the daemon has exited near-instantly, with no delays. - with timer: - await dummy.wait_for_daemon_done() + await dummy.wait_for_daemon_done() - assert timer.seconds < 0.01 # near-instantly + assert looptime == 0 assert k8s_mocked.sleep.call_count == 0 assert k8s_mocked.patch.call_count == 1 assert k8s_mocked.patch.call_args_list[0][1]['payload']['metadata']['finalizers'] == [] @@ -42,7 +41,7 @@ async def fn(**kwargs): async def test_daemon_exits_gracefully_and_instantly_on_operator_exiting( settings, resource, dummy, simulate_cycle, background_daemon_killer, - caplog, assert_logs, k8s_mocked, frozen_time, mocker, timer): + caplog, assert_logs, k8s_mocked, frozen_time, mocker, looptime): caplog.set_level(logging.DEBUG) # A daemon-under-test. @@ -63,10 +62,9 @@ async def fn(**kwargs): background_daemon_killer.cancel() # Check that the daemon has exited near-instantly, with no delays. - with timer: - await dummy.wait_for_daemon_done() + await dummy.wait_for_daemon_done() - assert timer.seconds < 0.01 # near-instantly + assert looptime == 0 assert k8s_mocked.sleep.call_count == 0 assert k8s_mocked.patch.call_count == 0 @@ -78,7 +76,7 @@ async def fn(**kwargs): @pytest.mark.usefixtures('background_daemon_killer') async def test_daemon_exits_gracefully_and_instantly_on_operator_pausing( settings, memories, resource, dummy, simulate_cycle, conflicts_found, - caplog, assert_logs, k8s_mocked, frozen_time, mocker, timer): + caplog, assert_logs, k8s_mocked, frozen_time, mocker, looptime): caplog.set_level(logging.DEBUG) # A daemon-under-test. @@ -99,9 +97,8 @@ async def fn(**kwargs): await conflicts_found.turn_to(True) # Check that the daemon has exited near-instantly, with no delays. - with timer: - await dummy.wait_for_daemon_done() - assert timer.seconds < 0.01 # near-instantly + await dummy.wait_for_daemon_done() + assert looptime == 0 # There is no way to test for re-spawning here: it is done by watch-events, # which are tested by the paused operators elsewhere (test_daemon_spawning.py). diff --git a/tests/handling/indexing/test_blocking_until_indexed.py b/tests/handling/indexing/test_blocking_until_indexed.py index 8d8c58a3..f15fb5ec 100644 --- a/tests/handling/indexing/test_blocking_until_indexed.py +++ b/tests/handling/indexing/test_blocking_until_indexed.py @@ -16,26 +16,25 @@ @pytest.mark.parametrize('event_type', EVENT_TYPES_WHEN_EXISTS) async def test_reporting_on_resource_readiness( - resource, settings, registry, indexers, caplog, event_type, handlers, timer): + resource, settings, registry, indexers, caplog, event_type, handlers, looptime): caplog.set_level(logging.DEBUG) operator_indexed = ToggleSet(all) resource_indexed = await operator_indexed.make_toggle() - with timer: - await process_resource_event( - lifecycle=all_at_once, - registry=registry, - settings=settings, - resource=resource, - indexers=indexers, - memories=ResourceMemories(), - memobase=Memo(), - raw_event={'type': event_type, 'object': {}}, - event_queue=asyncio.Queue(), - operator_indexed=operator_indexed, - resource_indexed=resource_indexed, - ) - assert timer.seconds < 0.2 # asap, nowait + await process_resource_event( + lifecycle=all_at_once, + registry=registry, + settings=settings, + resource=resource, + indexers=indexers, + memories=ResourceMemories(), + memobase=Memo(), + raw_event={'type': event_type, 'object': {}}, + event_queue=asyncio.Queue(), + operator_indexed=operator_indexed, + resource_indexed=resource_indexed, + ) + assert looptime == 0 assert operator_indexed.is_on() assert set(operator_indexed) == set() # save RAM assert handlers.event_mock.called @@ -43,13 +42,13 @@ async def test_reporting_on_resource_readiness( @pytest.mark.parametrize('event_type', EVENT_TYPES_WHEN_EXISTS) async def test_blocking_when_operator_is_not_ready( - resource, settings, registry, indexers, caplog, event_type, handlers, timer): + resource, settings, registry, indexers, caplog, event_type, handlers, looptime): caplog.set_level(logging.DEBUG) operator_indexed = ToggleSet(all) resource_listed = await operator_indexed.make_toggle() resource_indexed = await operator_indexed.make_toggle() - with pytest.raises(asyncio.TimeoutError), timer: + with pytest.raises(asyncio.TimeoutError): await asyncio.wait_for(process_resource_event( lifecycle=all_at_once, registry=registry, @@ -62,8 +61,8 @@ async def test_blocking_when_operator_is_not_ready( event_queue=asyncio.Queue(), operator_indexed=operator_indexed, resource_indexed=resource_indexed, - ), timeout=0.2) - assert 0.2 < timer.seconds < 0.4 + ), timeout=1.23) + assert looptime == 1.23 assert operator_indexed.is_off() assert set(operator_indexed) == {resource_listed} assert not handlers.event_mock.called @@ -71,7 +70,7 @@ async def test_blocking_when_operator_is_not_ready( @pytest.mark.parametrize('event_type', EVENT_TYPES_WHEN_EXISTS) async def test_unblocking_once_operator_is_ready( - resource, settings, registry, indexers, caplog, event_type, handlers, timer): + resource, settings, registry, indexers, caplog, event_type, handlers, looptime): caplog.set_level(logging.DEBUG) async def delayed_readiness(delay: float): @@ -81,22 +80,21 @@ async def delayed_readiness(delay: float): operator_indexed = ToggleSet(all) resource_listed = await operator_indexed.make_toggle() resource_indexed = await operator_indexed.make_toggle() - with timer: - asyncio.create_task(delayed_readiness(0.2)) - await process_resource_event( - lifecycle=all_at_once, - registry=registry, - settings=settings, - resource=resource, - indexers=indexers, - memories=ResourceMemories(), - memobase=Memo(), - raw_event={'type': event_type, 'object': {}}, - event_queue=asyncio.Queue(), - operator_indexed=operator_indexed, - resource_indexed=resource_indexed, - ) - assert 0.2 < timer.seconds < 0.4 + asyncio.create_task(delayed_readiness(1.23)) + await process_resource_event( + lifecycle=all_at_once, + registry=registry, + settings=settings, + resource=resource, + indexers=indexers, + memories=ResourceMemories(), + memobase=Memo(), + raw_event={'type': event_type, 'object': {}}, + event_queue=asyncio.Queue(), + operator_indexed=operator_indexed, + resource_indexed=resource_indexed, + ) + assert looptime == 1.23 assert operator_indexed.is_on() assert set(operator_indexed) == {resource_listed} assert handlers.event_mock.called diff --git a/tests/k8s/test_watching_with_freezes.py b/tests/k8s/test_watching_with_freezes.py index c1512da8..efecbbc1 100644 --- a/tests/k8s/test_watching_with_freezes.py +++ b/tests/k8s/test_watching_with_freezes.py @@ -8,21 +8,20 @@ async def test_pausing_is_ignored_if_turned_off( - resource, namespace, timer, caplog, assert_logs): + resource, namespace, looptime, caplog, assert_logs): caplog.set_level(logging.DEBUG) operator_paused = ToggleSet(any) await operator_paused.make_toggle(False) - with timer: - async with streaming_block( - resource=resource, - namespace=namespace, - operator_paused=operator_paused, - ): - pass + async with streaming_block( + resource=resource, + namespace=namespace, + operator_paused=operator_paused, + ): + pass - assert timer.seconds < 0.2 # no waits, exits as soon as possible + assert looptime == 0 assert_logs([], prohibited=[ r"Pausing the watch-stream for", r"Resuming the watch-stream for", @@ -30,7 +29,7 @@ async def test_pausing_is_ignored_if_turned_off( async def test_pausing_waits_forever_if_not_resumed( - resource, namespace, timer, caplog, assert_logs): + resource, namespace, looptime, caplog, assert_logs): caplog.set_level(logging.DEBUG) operator_paused = ToggleSet(any) @@ -44,10 +43,10 @@ async def do_it(): ): pass - with pytest.raises(asyncio.TimeoutError), timer: - await asyncio.wait_for(do_it(), timeout=0.5) + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(do_it(), timeout=1.23) - assert timer.seconds >= 0.5 + assert looptime == 1.23 assert_logs([ r"Pausing the watch-stream for", ], prohibited=[ @@ -56,7 +55,7 @@ async def do_it(): async def test_pausing_waits_until_resumed( - resource, namespace, timer, caplog, assert_logs): + resource, namespace, looptime, caplog, assert_logs): caplog.set_level(logging.DEBUG) operator_paused = ToggleSet(any) @@ -66,17 +65,15 @@ async def delayed_resuming(delay: float): await asyncio.sleep(delay) await conflicts_found.turn_to(False) - with timer: - asyncio.create_task(delayed_resuming(0.2)) - async with streaming_block( - resource=resource, - namespace=namespace, - operator_paused=operator_paused, - ): - pass + asyncio.create_task(delayed_resuming(1.23)) + async with streaming_block( + resource=resource, + namespace=namespace, + operator_paused=operator_paused, + ): + pass - assert timer.seconds >= 0.2 - assert timer.seconds <= 0.5 + assert looptime == 1.23 assert_logs([ r"Pausing the watch-stream for", r"Resuming the watch-stream for", diff --git a/tests/observation/test_processing_of_namespaces.py b/tests/observation/test_processing_of_namespaces.py index a8c6ba4c..2fbe1a5f 100644 --- a/tests/observation/test_processing_of_namespaces.py +++ b/tests/observation/test_processing_of_namespaces.py @@ -7,7 +7,7 @@ from kopf._core.reactor.observation import process_discovered_namespace_event -async def test_initial_listing_is_ignored(): +async def test_initial_listing_is_ignored(looptime): insights = Insights() e1 = RawEvent(type=None, object=RawBody(metadata={'name': 'ns1'})) @@ -19,13 +19,15 @@ async def delayed_injection(delay: float): task = asyncio.create_task(delayed_injection(0)) with pytest.raises(asyncio.TimeoutError): async with insights.revised: - await asyncio.wait_for(insights.revised.wait(), timeout=0.1) + await asyncio.wait_for(insights.revised.wait(), timeout=1.23) await task + + assert looptime == 1.23 assert not insights.namespaces @pytest.mark.parametrize('etype', ['ADDED', 'MODIFIED']) -async def test_followups_for_addition(timer, etype): +async def test_followups_for_addition(looptime, etype): insights = Insights() e1 = RawEvent(type=etype, object=RawBody(metadata={'name': 'ns1'})) @@ -34,17 +36,16 @@ async def delayed_injection(delay: float): await process_discovered_namespace_event( insights=insights, raw_event=e1, namespaces=['ns*']) - task = asyncio.create_task(delayed_injection(0.1)) - with timer: - async with insights.revised: - await insights.revised.wait() + task = asyncio.create_task(delayed_injection(9)) + async with insights.revised: + await insights.revised.wait() await task - assert 0.1 < timer.seconds < 0.11 + assert looptime == 9 assert insights.namespaces == {'ns1'} @pytest.mark.parametrize('etype', ['DELETED']) -async def test_followups_for_deletion(timer, etype): +async def test_followups_for_deletion(looptime, etype): insights = Insights() insights.namespaces.add('ns1') e1 = RawEvent(type=etype, object=RawBody(metadata={'name': 'ns1'})) @@ -54,10 +55,9 @@ async def delayed_injection(delay: float): await process_discovered_namespace_event( insights=insights, raw_event=e1, namespaces=['ns*']) - task = asyncio.create_task(delayed_injection(0.1)) - with timer: - async with insights.revised: - await insights.revised.wait() + task = asyncio.create_task(delayed_injection(9)) + async with insights.revised: + await insights.revised.wait() await task - assert 0.1 < timer.seconds < 0.11 + assert looptime == 9 assert not insights.namespaces diff --git a/tests/observation/test_processing_of_resources.py b/tests/observation/test_processing_of_resources.py index 9347b369..2b82b697 100644 --- a/tests/observation/test_processing_of_resources.py +++ b/tests/observation/test_processing_of_resources.py @@ -112,7 +112,7 @@ def fn(**_): ... @pytest.mark.parametrize('decorator', [kopf.on.validate, kopf.on.mutate]) @pytest.mark.parametrize('etype', ['ADDED', 'MODIFIED']) async def test_nonwatchable_resources_are_ignored( - settings, registry, apis_mock, group1_mock, timer, etype, decorator, insights): + settings, registry, apis_mock, group1_mock, looptime, etype, decorator, insights): @decorator('group1', 'version1', 'plural1') def fn(**_): ... @@ -124,19 +124,18 @@ async def delayed_injection(delay: float): await process_discovered_resource_event( insights=insights, raw_event=e1, registry=registry, settings=settings) - task = asyncio.create_task(delayed_injection(0.1)) - with timer: - async with insights.revised: - await insights.revised.wait() + task = asyncio.create_task(delayed_injection(1.23)) + async with insights.revised: + await insights.revised.wait() await task - assert 0.1 < timer.seconds < 1.0 + assert looptime == 1.23 assert not insights.watched_resources assert apis_mock.called assert group1_mock.called async def test_initial_listing_is_ignored( - settings, registry, apis_mock, group1_mock, insights): + settings, registry, apis_mock, group1_mock, looptime, insights): e1 = RawEvent(type=None, object=RawBody(spec={'group': 'group1'})) @@ -148,8 +147,10 @@ async def delayed_injection(delay: float): task = asyncio.create_task(delayed_injection(0)) with pytest.raises(asyncio.TimeoutError): async with insights.revised: - await asyncio.wait_for(insights.revised.wait(), timeout=0.1) + await asyncio.wait_for(insights.revised.wait(), timeout=1.23) await task + + assert looptime == 1.23 assert not insights.indexed_resources assert not insights.watched_resources assert not insights.webhook_resources @@ -159,7 +160,7 @@ async def delayed_injection(delay: float): @pytest.mark.parametrize('etype', ['ADDED', 'MODIFIED']) async def test_followups_for_addition( - settings, registry, apis_mock, group1_mock, timer, etype, insights, insights_resources): + settings, registry, apis_mock, group1_mock, looptime, etype, insights, insights_resources): e1 = RawEvent(type=etype, object=RawBody(spec={'group': 'group1'})) r1 = Resource(group='group1', version='version1', plural='plural1') @@ -169,12 +170,12 @@ async def delayed_injection(delay: float): await process_discovered_resource_event( insights=insights, raw_event=e1, registry=registry, settings=settings) - task = asyncio.create_task(delayed_injection(0.1)) - with timer: - async with insights.revised: - await insights.revised.wait() + task = asyncio.create_task(delayed_injection(1.23)) + async with insights.revised: + await insights.revised.wait() await task - assert 0.1 < timer.seconds < 1.0 + + assert looptime == 1.23 assert insights_resources == {r1} assert apis_mock.called assert group1_mock.called @@ -182,7 +183,7 @@ async def delayed_injection(delay: float): @pytest.mark.parametrize('etype', ['ADDED', 'MODIFIED', 'DELETED']) async def test_followups_for_deletion_of_resource( - settings, registry, apis_mock, group1_empty_mock, timer, etype, + settings, registry, apis_mock, group1_empty_mock, looptime, etype, insights, insights_resources): e1 = RawEvent(type=etype, object=RawBody(spec={'group': 'group1'})) @@ -194,12 +195,12 @@ async def delayed_injection(delay: float): await process_discovered_resource_event( insights=insights, raw_event=e1, registry=registry, settings=settings) - task = asyncio.create_task(delayed_injection(0.1)) - with timer: - async with insights.revised: - await insights.revised.wait() + task = asyncio.create_task(delayed_injection(1.23)) + async with insights.revised: + await insights.revised.wait() await task - assert 0.1 < timer.seconds < 1.0 + + assert looptime == 1.23 assert not insights_resources assert apis_mock.called assert group1_empty_mock.called @@ -207,7 +208,7 @@ async def delayed_injection(delay: float): @pytest.mark.parametrize('etype', ['ADDED', 'MODIFIED', 'DELETED']) async def test_followups_for_deletion_of_group( - settings, registry, apis_mock, group1_404mock, timer, etype, insights, insights_resources): + settings, registry, apis_mock, group1_404mock, looptime, etype, insights, insights_resources): e1 = RawEvent(type=etype, object=RawBody(spec={'group': 'group1'})) r1 = Resource(group='group1', version='version1', plural='plural1') @@ -218,12 +219,12 @@ async def delayed_injection(delay: float): await process_discovered_resource_event( insights=insights, raw_event=e1, registry=registry, settings=settings) - task = asyncio.create_task(delayed_injection(0.1)) - with timer: - async with insights.revised: - await insights.revised.wait() + task = asyncio.create_task(delayed_injection(1.23)) + async with insights.revised: + await insights.revised.wait() await task - assert 0.1 < timer.seconds < 1.0 + + assert looptime == 1.23 assert not insights_resources assert apis_mock.called assert group1_404mock.called @@ -231,7 +232,7 @@ async def delayed_injection(delay: float): @pytest.mark.parametrize('etype', ['DELETED']) async def test_backbone_is_filled( - settings, registry, core_mock, corev1_mock, timer, etype, insights): + settings, registry, core_mock, corev1_mock, looptime, etype, insights): e1 = RawEvent(type=etype, object=RawBody(spec={'group': ''})) @@ -240,11 +241,11 @@ async def delayed_injection(delay: float): await process_discovered_resource_event( insights=insights, raw_event=e1, registry=registry, settings=settings) - task = asyncio.create_task(delayed_injection(0.1)) - with timer: - await insights.backbone.wait_for(NAMESPACES) + task = asyncio.create_task(delayed_injection(1.23)) + await insights.backbone.wait_for(NAMESPACES) await task - assert 0.1 < timer.seconds < 1.0 + + assert looptime == 1.23 assert NAMESPACES in insights.backbone assert core_mock.called assert corev1_mock.called diff --git a/tests/posting/test_threadsafety.py b/tests/posting/test_threadsafety.py index 65914ed9..0ebfc0aa 100644 --- a/tests/posting/test_threadsafety.py +++ b/tests/posting/test_threadsafety.py @@ -33,13 +33,13 @@ If thread safety is not ensured, the operators get sporadic errors regarding thread-unsafe calls, which are difficult to catch and reproduce. """ - import asyncio import contextvars import functools import threading import time +import looptime import pytest from kopf import event @@ -48,24 +48,6 @@ 'metadata': {'uid': 'uid1', 'name': 'name1', 'namespace': 'ns1'}} -@pytest.fixture() -def awakener(event_loop): - handles = [] - - def noop(): - pass - - def awaken_fn(delay, fn=noop): - handle = event_loop.call_later(delay, fn) - handles.append(handle) - - try: - yield awaken_fn - finally: - for handle in handles: - handle.cancel() - - @pytest.fixture() def threader(): threads = [] @@ -87,44 +69,59 @@ def thread_fn(): thread.join() -async def test_nonthreadsafe_indeed_fails(timer, awakener, threader, event_queue, event_queue_loop): +@pytest.mark.looptime(False) +async def test_nonthreadsafe_indeed_fails(chronometer, threader, event_queue, event_loop): + thread_was_called = threading.Event() def thread_fn(): + thread_was_called.set() event_queue.put_nowait(object()) - awakener(0.7) - threader(0.3, thread_fn) + threader(0.5, lambda: event_loop.call_soon_threadsafe(lambda: None)) + threader(0.2, thread_fn) - with timer: + with chronometer, looptime.Chronometer(event_loop.time) as loopometer: await event_queue.get() - assert 0.6 <= timer.seconds <= 0.8 + assert 0.5 <= chronometer.seconds < 0.6 + assert 0.5 <= loopometer.seconds < 0.6 + assert thread_was_called.is_set() -async def test_threadsafe_indeed_works(timer, awakener, threader, event_queue, event_queue_loop): +@pytest.mark.looptime(False) +async def test_threadsafe_indeed_works(chronometer, threader, event_queue, event_loop): + thread_was_called = threading.Event() def thread_fn(): - asyncio.run_coroutine_threadsafe(event_queue.put(object()), loop=event_queue_loop) + thread_was_called.set() + asyncio.run_coroutine_threadsafe(event_queue.put(object()), loop=event_loop) - awakener(0.7) - threader(0.3, thread_fn) + threader(0.5, lambda: event_loop.call_soon_threadsafe(lambda: None)) + threader(0.2, thread_fn) - with timer: + with chronometer, looptime.Chronometer(event_loop.time) as loopometer: await event_queue.get() - assert 0.2 <= timer.seconds <= 0.4 + assert 0.2 <= chronometer.seconds < 0.3 + assert 0.2 <= loopometer.seconds < 0.3 + assert thread_was_called.is_set() -async def test_queueing_is_threadsafe(timer, awakener, threader, event_queue, event_queue_loop, - settings_via_contextvar): +@pytest.mark.looptime(False) +@pytest.mark.usefixtures('event_queue_loop', 'settings_via_contextvar') +async def test_queueing_is_threadsafe(chronometer, threader, event_queue, event_loop): + thread_was_called = threading.Event() def thread_fn(): + thread_was_called.set() event(OBJ1, type='type1', reason='reason1', message='message1') - awakener(0.7) - threader(0.3, thread_fn) + threader(0.5, lambda: event_loop.call_soon_threadsafe(lambda: None)) + threader(0.2, thread_fn) - with timer: + with chronometer, looptime.Chronometer(event_loop.time) as loopometer: await event_queue.get() - assert 0.2 <= timer.seconds <= 0.4 + assert 0.2 <= chronometer.seconds < 0.3 + assert 0.2 <= loopometer.seconds < 0.3 + assert thread_was_called.is_set() diff --git a/tests/primitives/test_conditions.py b/tests/primitives/test_conditions.py index 75919bd8..d69d232f 100644 --- a/tests/primitives/test_conditions.py +++ b/tests/primitives/test_conditions.py @@ -5,20 +5,21 @@ from kopf._cogs.aiokits.aiobindings import condition_chain -async def test_no_triggering(): +async def test_no_triggering(looptime): source = asyncio.Condition() target = asyncio.Condition() task = asyncio.create_task(condition_chain(source, target)) try: with pytest.raises(asyncio.TimeoutError): async with target: - await asyncio.wait_for(target.wait(), timeout=0.1) + await asyncio.wait_for(target.wait(), timeout=1.23) + assert looptime == 1.23 finally: task.cancel() await asyncio.wait([task]) -async def test_triggering(event_loop, timer): +async def test_triggering(event_loop, looptime): source = asyncio.Condition() target = asyncio.Condition() task = asyncio.create_task(condition_chain(source, target)) @@ -28,13 +29,12 @@ async def delayed_trigger(): async with source: source.notify_all() - event_loop.call_later(0.1, asyncio.create_task, delayed_trigger()) + event_loop.call_later(1.23, asyncio.create_task, delayed_trigger()) - with timer: - async with target: - await target.wait() + async with target: + await target.wait() - assert 0.1 <= timer.seconds <= 0.2 + assert looptime == 1.23 finally: task.cancel() diff --git a/tests/primitives/test_containers.py b/tests/primitives/test_containers.py index d289d129..6a9691a7 100644 --- a/tests/primitives/test_containers.py +++ b/tests/primitives/test_containers.py @@ -5,79 +5,74 @@ from kopf._cogs.aiokits.aiovalues import Container -async def test_empty_by_default(): +async def test_empty_by_default(looptime): container = Container() with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(container.wait(), timeout=0.1) + await asyncio.wait_for(container.wait(), timeout=9) + assert looptime == 9 -async def test_does_not_wake_up_when_reset(event_loop, timer): +async def test_does_not_wake_up_when_reset(event_loop, looptime): container = Container() async def reset_it(): await container.reset() - event_loop.call_later(0.05, asyncio.create_task, reset_it()) - + event_loop.call_later(1, asyncio.create_task, reset_it()) with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(container.wait(), timeout=0.1) + await asyncio.wait_for(container.wait(), timeout=9) + assert looptime == 9 -async def test_wakes_up_when_preset(event_loop, timer): +async def test_wakes_up_when_preset(looptime): container = Container() await container.set(123) - with timer: - result = await container.wait() - - assert timer.seconds <= 0.1 + result = await container.wait() + assert looptime == 0 assert result == 123 -async def test_wakes_up_when_set(event_loop, timer): +async def test_wakes_up_when_set(event_loop, looptime): container = Container() async def set_it(): await container.set(123) - event_loop.call_later(0.1, asyncio.create_task, set_it()) - - with timer: - result = await container.wait() + event_loop.call_later(9, asyncio.create_task, set_it()) - assert 0.1 <= timer.seconds <= 0.2 + result = await container.wait() + assert looptime == 9 assert result == 123 -async def test_iterates_when_set(event_loop, timer): +async def test_iterates_when_set(event_loop, looptime): container = Container() async def set_it(v): await container.set(v) - event_loop.call_later(0.1, asyncio.create_task, set_it(123)) - event_loop.call_later(0.2, asyncio.create_task, set_it(234)) + event_loop.call_later(6, asyncio.create_task, set_it(123)) + event_loop.call_later(9, asyncio.create_task, set_it(234)) values = [] - with timer: - async for value in container.as_changed(): - values.append(value) - if value == 234: - break + async for value in container.as_changed(): + values.append(value) + if value == 234: + break - assert 0.2 <= timer.seconds <= 0.3 + assert looptime == 9 assert values == [123, 234] -async def test_iterates_when_preset(event_loop, timer): +async def test_iterates_when_preset(looptime): container = Container() await container.set(123) values = [] - with timer: - async for value in container.as_changed(): - values.append(value) - break + async for value in container.as_changed(): + values.append(value) + break - assert timer.seconds <= 0.1 + assert looptime == 0 assert values == [123] diff --git a/tests/primitives/test_flags.py b/tests/primitives/test_flags.py index f18826b4..008c5d14 100644 --- a/tests/primitives/test_flags.py +++ b/tests/primitives/test_flags.py @@ -133,26 +133,26 @@ async def test_waiting_of_none_does_nothing(): await wait_flag(None) -async def test_waiting_for_unraised_times_out(flag, timer): - with pytest.raises(asyncio.TimeoutError), timer: - await asyncio.wait_for(wait_flag(flag), timeout=0.1) - assert timer.seconds >= 0.1 +async def test_waiting_for_unraised_times_out(flag, looptime): + # Beware: sync primitives consume the real time. + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(wait_flag(flag), timeout=0.123) + assert looptime == 0.123 -async def test_waiting_for_preraised_is_instant(flag, timer): +async def test_waiting_for_preraised_is_instant(flag, looptime): await raise_flag(flag) # tested separately above - with timer: - await wait_flag(flag) - assert timer.seconds < 0.5 # near-instant, plus code overhead + await wait_flag(flag) + assert looptime == 0 -async def test_waiting_for_raised_during_the_wait(flag, timer): +async def test_waiting_for_raised_during_the_wait(flag, looptime): async def raise_delayed(delay: float) -> None: await asyncio.sleep(delay) await raise_flag(flag) # tested separately above - asyncio.create_task(raise_delayed(0.2)) - with timer: - await wait_flag(flag) - assert 0.2 <= timer.seconds < 0.5 # near-instant once raised + # Beware: sync primitives consume the real time. + asyncio.create_task(raise_delayed(0.123)) + await wait_flag(flag) + assert looptime == 0.123 diff --git a/tests/primitives/test_toggles.py b/tests/primitives/test_toggles.py index d670bbd9..ca852904 100644 --- a/tests/primitives/test_toggles.py +++ b/tests/primitives/test_toggles.py @@ -37,50 +37,48 @@ async def test_turning_off(): assert toggle.is_off() -async def test_waiting_until_on_fails_when_not_turned_on(): +async def test_waiting_until_on_fails_when_not_turned_on(looptime): toggle = Toggle(False) with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(toggle.wait_for(True), timeout=0.1) - + await asyncio.wait_for(toggle.wait_for(True), timeout=1.23) assert toggle.is_off() + assert looptime == 1.23 -async def test_waiting_until_off_fails_when_not_turned_off(): +async def test_waiting_until_off_fails_when_not_turned_off(looptime): toggle = Toggle(True) with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(toggle.wait_for(False), timeout=0.1) - + await asyncio.wait_for(toggle.wait_for(False), timeout=1.23) assert toggle.is_on() + assert looptime == 1.23 -async def test_waiting_until_on_wakes_when_turned_on(timer): +async def test_waiting_until_on_wakes_when_turned_on(looptime): toggle = Toggle(False) async def delayed_turning_on(delay: float): await asyncio.sleep(delay) await toggle.turn_to(True) - with timer: - asyncio.create_task(delayed_turning_on(0.05)) - await toggle.wait_for(True) + asyncio.create_task(delayed_turning_on(9)) + await toggle.wait_for(True) assert toggle.is_on() - assert timer.seconds < 0.5 # approx. 0.05 plus some code overhead + assert looptime == 9 -async def test_waiting_until_off_wakes_when_turned_off(timer): +async def test_waiting_until_off_wakes_when_turned_off(looptime): toggle = Toggle(True) async def delayed_turning_off(delay: float): await asyncio.sleep(delay) await toggle.turn_to(False) - with timer: - asyncio.create_task(delayed_turning_off(0.05)) - await toggle.wait_for(False) + asyncio.create_task(delayed_turning_off(9)) + await toggle.wait_for(False) assert toggle.is_off() - assert timer.seconds < 0.5 # approx. 0.05 plus some code overhead + assert looptime == 9 async def test_secures_against_usage_as_a_boolean(): diff --git a/tests/primitives/test_togglesets.py b/tests/primitives/test_togglesets.py index 8cb71324..6b8ea6b3 100644 --- a/tests/primitives/test_togglesets.py +++ b/tests/primitives/test_togglesets.py @@ -195,25 +195,27 @@ async def test_all_toggles_must_be_off_for_anytoggleset_to_be_off(fn): @pytest.mark.parametrize('fn', [all, any]) -async def test_waiting_until_on_fails_when_not_turned_on(fn): +async def test_waiting_until_on_fails_when_not_turned_on(fn, looptime): toggleset = ToggleSet(fn) await toggleset.make_toggle(False) with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(toggleset.wait_for(True), timeout=0.1) + await asyncio.wait_for(toggleset.wait_for(True), timeout=1.23) assert toggleset.is_off() + assert looptime == 1.23 @pytest.mark.parametrize('fn', [all, any]) -async def test_waiting_until_off_fails_when_not_turned_off(fn): +async def test_waiting_until_off_fails_when_not_turned_off(fn, looptime): toggleset = ToggleSet(fn) await toggleset.make_toggle(True) with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(toggleset.wait_for(False), timeout=0.1) + await asyncio.wait_for(toggleset.wait_for(False), timeout=1.23) assert toggleset.is_on() + assert looptime == 1.23 @pytest.mark.parametrize('fn', [all, any]) -async def test_waiting_until_on_wakes_when_turned_on(fn, timer): +async def test_waiting_until_on_wakes_when_turned_on(fn, looptime): toggleset = ToggleSet(fn) toggle = await toggleset.make_toggle(False) @@ -221,16 +223,15 @@ async def delayed_turning_on(delay: float): await asyncio.sleep(delay) await toggle.turn_to(True) - with timer: - asyncio.create_task(delayed_turning_on(0.05)) - await asyncio.wait_for(toggleset.wait_for(True), timeout=1.0) + asyncio.create_task(delayed_turning_on(9)) + await toggleset.wait_for(True) assert toggleset.is_on() - assert timer.seconds < 0.5 # approx. 0.05 plus some code overhead + assert looptime == 9 @pytest.mark.parametrize('fn', [all, any]) -async def test_waiting_until_off_wakes_when_turned_off(fn, timer): +async def test_waiting_until_off_wakes_when_turned_off(fn, looptime): toggleset = ToggleSet(fn) toggle = await toggleset.make_toggle(True) @@ -238,12 +239,11 @@ async def delayed_turning_off(delay: float): await asyncio.sleep(delay) await toggle.turn_to(False) - with timer: - asyncio.create_task(delayed_turning_off(0.05)) - await asyncio.wait_for(toggleset.wait_for(False), timeout=1.0) + asyncio.create_task(delayed_turning_off(9)) + await toggleset.wait_for(False) assert toggleset.is_off() - assert timer.seconds < 0.5 # approx. 0.05 plus some code overhead + assert looptime == 9 @pytest.mark.parametrize('fn', [all, any]) diff --git a/tests/reactor/test_queueing.py b/tests/reactor/test_queueing.py index f41a35d6..06a68cba 100644 --- a/tests/reactor/test_queueing.py +++ b/tests/reactor/test_queueing.py @@ -46,7 +46,7 @@ ]) @pytest.mark.usefixtures('watcher_limited') -async def test_watchevent_demultiplexing(worker_mock, timer, resource, processor, +async def test_watchevent_demultiplexing(worker_mock, looptime, resource, processor, settings, stream, events, uids, cnts): """ Verify that every unique uid goes into its own queue+worker, which are never shared. """ @@ -60,17 +60,16 @@ async def test_watchevent_demultiplexing(worker_mock, timer, resource, processor stream.close() # Run the watcher (near-instantly and test-blocking). - with timer: - await watcher( - namespace=None, - resource=resource, - settings=settings, - processor=processor, - ) + await watcher( + namespace=None, + resource=resource, + settings=settings, + processor=processor, + ) # Extra-check: verify that the real workers were not involved: # they would do batching, which is absent in the mocked workers. - assert timer.seconds < settings.batching.batch_window + assert looptime == 0 # The processor must not be called by the watcher, only by the worker. # But the worker (even if mocked) must be called & awaited by the watcher. @@ -122,32 +121,30 @@ async def test_watchevent_demultiplexing(worker_mock, timer, resource, processor ]) @pytest.mark.usefixtures('watcher_limited') -async def test_watchevent_batching(settings, resource, processor, timer, - stream, events, uids, vals, event_loop): +async def test_watchevent_batching(settings, resource, processor, + stream, events, uids, vals, looptime): """ Verify that only the last event per uid is actually handled. """ # Override the default timeouts to make the tests faster. - settings.batching.idle_timeout = 100 # should not be involved, fail if it is - settings.batching.exit_timeout = 100 # should exit instantly, fail if it didn't - settings.batching.batch_window = 0.3 # the time period being tested (make bigger than overhead) + settings.batching.idle_timeout = 999 # should not be involved, fail if it is + settings.batching.exit_timeout = 999 # should exit instantly, fail if it didn't + settings.batching.batch_window = 123 # the time period being tested # Inject the events of unique objects - to produce few streams/workers. stream.feed(events) stream.close() # Run the watcher (near-instantly and test-blocking). - with timer: - await watcher( - namespace=None, - resource=resource, - settings=settings, - processor=processor, - ) + await watcher( + namespace=None, + resource=resource, + settings=settings, + processor=processor, + ) # Should be batched strictly once (never twice). Note: multiple uids run concurrently, # so they all are batched in parallel, and the timing remains the same. - assert timer.seconds > settings.batching.batch_window * 1 - assert timer.seconds < settings.batching.batch_window * 2 + assert looptime == 123 # Was the processor called at all? Awaited as needed for async fns? assert processor.awaited @@ -182,10 +179,10 @@ async def test_watchevent_batching(settings, resource, processor, timer, async def test_garbage_collection_of_streams(settings, stream, events, unique, worker_spy): # Override the default timeouts to make the tests faster. - settings.batching.exit_timeout = 100 # should exit instantly, fail if it didn't - settings.batching.idle_timeout = .05 # finish workers faster, but not as fast as batching - settings.batching.batch_window = .01 # minimize the effects of batching (not our interest) - settings.watching.reconnect_backoff = 1.0 # to prevent src depletion + settings.batching.exit_timeout = 999 # should exit instantly, fail if it didn't + settings.batching.idle_timeout = 5 # finish workers faster, but not as fast as batching + settings.batching.batch_window = 1 # minimize the effects of batching (not our interest) + settings.watching.reconnect_backoff = 100 # to prevent src depletion # Inject the events of unique objects - to produce few streams/workers. stream.feed(events) @@ -194,7 +191,7 @@ async def test_garbage_collection_of_streams(settings, stream, events, unique, w # Give it a moment to populate the streams and spawn all the workers. # Intercept and remember _any_ seen dict of streams for further checks. while worker_spy.call_count < unique: - await asyncio.sleep(0.001) # give control to the loop + await asyncio.sleep(0) # give control to the loop streams = worker_spy.call_args_list[-1][1]['streams'] signaller: asyncio.Condition = worker_spy.call_args_list[0][1]['signaller'] @@ -221,7 +218,7 @@ async def test_garbage_collection_of_streams(settings, stream, events, unique, w # Let the workers to actually exit and gc their local scopes with variables. # The jobs can take a tiny moment more, but this is noticeable in the tests. - await asyncio.sleep(0.1) + await asyncio.sleep(0) # For PyPy: force the gc! (GC can be delayed in PyPy, unlike in CPython.) # https://doc.pypy.org/en/latest/cpython_differences.html#differences-related-to-garbage-collection-strategies diff --git a/tests/references/test_backbone.py b/tests/references/test_backbone.py index 767c352d..2060a3b9 100644 --- a/tests/references/test_backbone.py +++ b/tests/references/test_backbone.py @@ -47,23 +47,23 @@ async def test_refill_is_cumulative_ie_does_not_reset(): assert set(backbone) == {NAMESPACES, EVENTS} -async def test_waiting_for_absent_resources_never_ends(timer): +async def test_waiting_for_absent_resources_never_ends(looptime): backbone = Backbone() with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(backbone.wait_for(NAMESPACES), timeout=0.1) + await asyncio.wait_for(backbone.wait_for(NAMESPACES), timeout=1.23) + assert looptime == 1.23 -async def test_waiting_for_preexisting_resources_ends_instantly(timer): +async def test_waiting_for_preexisting_resources_ends_instantly(looptime): resource = Resource('', 'v1', 'namespaces') backbone = Backbone() await backbone.fill(resources=[resource]) - with timer: - found_resource = await backbone.wait_for(NAMESPACES) - assert timer.seconds < 0.1 + found_resource = await backbone.wait_for(NAMESPACES) + assert looptime == 0 assert found_resource == resource -async def test_waiting_for_delayed_resources_ends_once_delivered(timer): +async def test_waiting_for_delayed_resources_ends_once_delivered(looptime): resource = Resource('', 'v1', 'namespaces') backbone = Backbone() @@ -71,9 +71,8 @@ async def delayed_injection(delay: float): await asyncio.sleep(delay) await backbone.fill(resources=[resource]) - task = asyncio.create_task(delayed_injection(0.1)) - with timer: - found_resource = await backbone.wait_for(NAMESPACES) + task = asyncio.create_task(delayed_injection(123)) + found_resource = await backbone.wait_for(NAMESPACES) await task - assert 0.1 < timer.seconds < 0.11 + assert looptime == 123 assert found_resource == resource diff --git a/tests/test_async.py b/tests/test_async.py index 03957f4e..542de9f0 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -6,24 +6,18 @@ _async_was_executed = False -async def test_async_tests_are_enabled(timer): +async def test_async_tests_are_enabled(): global _async_was_executed _async_was_executed = True # asserted in a sync-test below. - with timer as t: - await asyncio.sleep(0.5) - assert t.seconds > 0.5 # real sleep - - -async def test_async_mocks_are_enabled(timer, mocker): +async def test_async_mocks_are_enabled(mocker, looptime): p = mocker.patch('asyncio.sleep') - with timer as t: - await asyncio.sleep(1.0) + await asyncio.sleep(1.0) assert p.called assert p.awaited - assert t.seconds < 0.01 # mocked sleep + assert looptime == 0 def test_async_test_was_executed_and_awaited(): diff --git a/tests/timing/test_sleeping.py b/tests/timing/test_sleeping.py index dfc8d77e..4bfc8fcb 100644 --- a/tests/timing/test_sleeping.py +++ b/tests/timing/test_sleeping.py @@ -5,24 +5,21 @@ from kopf._cogs.aiokits.aiotime import sleep -async def test_the_only_delay_is_awaited(timer): - with timer: - unslept = await sleep(0.10) - assert 0.10 <= timer.seconds < 0.11 +async def test_the_only_delay_is_awaited(looptime): + unslept = await sleep(123) + assert looptime == 123 assert unslept is None -async def test_the_shortest_delay_is_awaited(timer): - with timer: - unslept = await sleep([0.10, 0.20]) - assert 0.10 <= timer.seconds < 0.11 +async def test_the_shortest_delay_is_awaited(looptime): + unslept = await sleep([123, 456]) + assert looptime == 123 assert unslept is None -async def test_specific_delays_only_are_awaited(timer): - with timer: - unslept = await sleep([0.10, None]) - assert 0.10 <= timer.seconds < 0.11 +async def test_specific_delays_only_are_awaited(looptime): + unslept = await sleep([123, None]) + assert looptime == 123 assert unslept is None @@ -31,10 +28,9 @@ async def test_specific_delays_only_are_awaited(timer): pytest.param([-100, -10], id='all-negative'), pytest.param(-10, id='alone'), ]) -async def test_negative_delays_skip_sleeping(timer, delays): - with timer: - unslept = await sleep(delays) - assert timer.seconds < 0.01 +async def test_negative_delays_skip_sleeping(looptime, delays): + unslept = await sleep(delays) + assert looptime == 0 assert unslept is None @@ -42,36 +38,32 @@ async def test_negative_delays_skip_sleeping(timer, delays): pytest.param([], id='empty-list'), pytest.param([None], id='list-of-none'), ]) -async def test_no_delays_skip_sleeping(timer, delays): - with timer: - unslept = await sleep(delays) - assert timer.seconds < 0.01 +async def test_no_delays_skip_sleeping(looptime, delays): + unslept = await sleep(delays) + assert looptime == 0 assert unslept is None -async def test_by_event_set_before_time_comes(timer): +async def test_by_event_set_before_time_comes(looptime): event = asyncio.Event() - asyncio.get_running_loop().call_later(0.07, event.set) - with timer: - unslept = await sleep(0.10, event) + asyncio.get_running_loop().call_later(7, event.set) + unslept = await sleep(10, event) assert unslept is not None - assert 0.02 <= unslept <= 0.04 - assert 0.06 <= timer.seconds <= 0.08 + assert unslept == 3 + assert looptime == 7 -async def test_with_zero_time_and_event_initially_cleared(timer): +async def test_with_zero_time_and_event_initially_cleared(looptime): event = asyncio.Event() event.clear() - with timer: - unslept = await sleep(0, event) - assert timer.seconds <= 0.01 + unslept = await sleep(0, event) + assert looptime == 0 assert unslept is None -async def test_with_zero_time_and_event_initially_set(timer): +async def test_with_zero_time_and_event_initially_set(looptime): event = asyncio.Event() event.set() - with timer: - unslept = await sleep(0, event) - assert timer.seconds <= 0.01 + unslept = await sleep(0, event) + assert looptime == 0 assert not unslept # 0/None; undefined for such case: both goals reached. diff --git a/tests/utilities/aiotasks/test_scheduler.py b/tests/utilities/aiotasks/test_scheduler.py index 2670748c..3b809ea7 100644 --- a/tests/utilities/aiotasks/test_scheduler.py +++ b/tests/utilities/aiotasks/test_scheduler.py @@ -5,8 +5,6 @@ from kopf._cogs.aiokits.aiotasks import Scheduler -CODE_OVERHEAD = 0.01 - async def f(mock, *args): try: @@ -14,7 +12,7 @@ async def f(mock, *args): for arg in args: if isinstance(arg, asyncio.Event): arg.set() - elif isinstance(arg, float): + elif isinstance(arg, (int, float)): await asyncio.sleep(arg) elif callable(arg): arg() @@ -24,70 +22,62 @@ async def f(mock, *args): mock('finished') -async def test_empty_scheduler_lifecycle(timer): - with timer: - scheduler = Scheduler() - assert scheduler.empty() - await scheduler.wait() - assert scheduler.empty() - await scheduler.close() - assert scheduler.empty() - assert timer.seconds < CODE_OVERHEAD +async def test_empty_scheduler_lifecycle(looptime): + scheduler = Scheduler() + assert scheduler.empty() + await scheduler.wait() + assert scheduler.empty() + await scheduler.close() + assert scheduler.empty() + assert looptime == 0 -async def test_task_spawning_and_graceful_finishing(timer): +async def test_task_spawning_and_graceful_finishing(looptime): mock = Mock() flag1 = asyncio.Event() flag2 = asyncio.Event() scheduler = Scheduler() - result = await scheduler.spawn(f(mock, flag1, 0.1, flag2)) + result = await scheduler.spawn(f(mock, flag1, 123, flag2)) assert result is None - with timer: - await flag1.wait() - assert timer.seconds < CODE_OVERHEAD - assert mock.call_args[0][0] == 'started' + await flag1.wait() + assert looptime == 0 + assert mock.call_args_list[0][0][0] == 'started' - with timer: - await flag2.wait() - assert timer.seconds > 0.1 - assert timer.seconds < 0.1 + CODE_OVERHEAD - assert mock.call_args[0][0] == 'finished' + await flag2.wait() + assert looptime == 123 + assert mock.call_args_list[1][0][0] == 'finished' await scheduler.close() -async def test_task_spawning_and_cancellation(timer): +async def test_task_spawning_and_cancellation(looptime): mock = Mock() flag1 = asyncio.Event() flag2 = asyncio.Event() scheduler = Scheduler() - result = await scheduler.spawn(f(mock, flag1, 1.0, flag2)) + result = await scheduler.spawn(f(mock, flag1, 123, flag2)) assert result is None - with timer: - await flag1.wait() - assert timer.seconds < CODE_OVERHEAD - assert mock.call_args[0][0] == 'started' + await flag1.wait() + assert looptime == 0 + assert mock.call_args_list[0][0][0] == 'started' - with timer: - await scheduler.close() - assert timer.seconds < CODE_OVERHEAD # near-instant - assert mock.call_args[0][0] == 'cancelled' + await scheduler.close() + assert looptime == 0 + assert mock.call_args_list[1][0][0] == 'cancelled' async def test_no_tasks_are_accepted_after_closing(): scheduler = Scheduler() await scheduler.close() - assert scheduler._closed assert scheduler._spawning_task.done() assert scheduler._cleaning_task.done() - with pytest.raises(RuntimeError, match=r"Cannot add new coroutines"): - await scheduler.spawn(f(Mock(), 1.0)) + await scheduler.spawn(f(Mock(), 123)) async def test_successes_are_not_reported(): @@ -121,7 +111,7 @@ async def test_exceptions_are_reported(): assert exception_handler.call_args[0][0] is exception -async def test_tasks_are_parallel_if_limit_is_not_reached(timer): +async def test_tasks_are_parallel_if_limit_is_not_reached(looptime): """ time: ////////----------------------0.1s------------------0.2s--/// task1: ->spawn->start->sleep->finish->| @@ -133,23 +123,19 @@ async def test_tasks_are_parallel_if_limit_is_not_reached(timer): task2_finished = asyncio.Event() scheduler = Scheduler(limit=2) - with timer: - await scheduler.spawn(f(Mock(), task1_started, 0.1, task1_finished)) - await scheduler.spawn(f(Mock(), task2_started, 0.1, task2_finished)) - assert timer.seconds < CODE_OVERHEAD # i.e. spawning is not not blocking + await scheduler.spawn(f(Mock(), task1_started, 9, task1_finished)) + await scheduler.spawn(f(Mock(), task2_started, 9, task2_finished)) + assert looptime == 0 # i.e. spawning is not not blocking - with timer: - await task1_finished.wait() - assert task2_started.is_set() - await task2_finished.wait() - - assert timer.seconds > 0.1 - assert timer.seconds < 0.1 + CODE_OVERHEAD + await task1_finished.wait() + assert task2_started.is_set() + await task2_finished.wait() + assert looptime == 9 await scheduler.close() -async def test_tasks_are_pending_if_limit_is_reached(timer): +async def test_tasks_are_pending_if_limit_is_reached(looptime): """ time: ////////----------------------0.1s------------------0.2s--/// task1: ->spawn->start->sleep->finish->| @@ -161,17 +147,13 @@ async def test_tasks_are_pending_if_limit_is_reached(timer): task2_finished = asyncio.Event() scheduler = Scheduler(limit=1) - with timer: - await scheduler.spawn(f(Mock(), task1_started, 0.1, task1_finished)) - await scheduler.spawn(f(Mock(), task2_started, 0.1, task2_finished)) - assert timer.seconds < CODE_OVERHEAD # i.e. spawning is not not blocking - - with timer: - await task1_finished.wait() - assert not task2_started.is_set() - await task2_finished.wait() + await scheduler.spawn(f(Mock(), task1_started, 9, task1_finished)) + await scheduler.spawn(f(Mock(), task2_started, 9, task2_finished)) + assert looptime == 0 # i.e. spawning is not not blocking - assert timer.seconds > 0.2 - assert timer.seconds < 0.2 + CODE_OVERHEAD * 2 + await task1_finished.wait() + assert not task2_started.is_set() + await task2_finished.wait() + assert looptime == 18 await scheduler.close() diff --git a/tests/utilities/aiotasks/test_task_selection.py b/tests/utilities/aiotasks/test_task_selection.py index a3bf17b8..3754ec56 100644 --- a/tests/utilities/aiotasks/test_task_selection.py +++ b/tests/utilities/aiotasks/test_task_selection.py @@ -7,7 +7,7 @@ async def test_alltasks_exclusion(): flag = asyncio.Event() task1 = create_task(flag.wait()) task2 = create_task(flag.wait()) - done, pending = await asyncio.wait([task1, task2], timeout=0.01) + done, pending = await asyncio.wait([task1, task2], timeout=0.01) # let them start assert not done tasks = await all_tasks(ignored=[task2]) diff --git a/tests/utilities/aiotasks/test_task_stopping.py b/tests/utilities/aiotasks/test_task_stopping.py index 3717bd46..5ee96bd8 100644 --- a/tests/utilities/aiotasks/test_task_stopping.py +++ b/tests/utilities/aiotasks/test_task_stopping.py @@ -17,25 +17,27 @@ async def stuck() -> None: await asyncio.Event().wait() -async def test_stop_with_no_tasks(assert_logs, caplog): +async def test_stop_with_no_tasks(assert_logs, caplog, looptime): logger = logging.getLogger() caplog.set_level(0) done, pending = await stop([], title='sample', logger=logger) assert not done assert not pending assert_logs(["Sample tasks stopping is skipped: no tasks given."]) + assert looptime == 0 -async def test_stop_with_no_tasks_when_quiet(assert_logs, caplog): +async def test_stop_with_no_tasks_when_quiet(assert_logs, caplog, looptime): logger = logging.getLogger() caplog.set_level(0) done, pending = await stop([], title='sample', logger=logger, quiet=True) assert not done assert not pending assert not caplog.messages + assert looptime == 0 -async def test_stop_immediately_with_finishing(assert_logs, caplog): +async def test_stop_immediately_with_finishing(assert_logs, caplog, looptime): logger = logging.getLogger() caplog.set_level(0) task1 = create_task(simple()) @@ -46,9 +48,10 @@ async def test_stop_immediately_with_finishing(assert_logs, caplog): assert_logs(["Sample tasks are stopped: finishing normally"]) assert task1.cancelled() assert task2.cancelled() + assert looptime == 0 -async def test_stop_immediately_with_cancelling(assert_logs, caplog): +async def test_stop_immediately_with_cancelling(assert_logs, caplog, looptime): logger = logging.getLogger() caplog.set_level(0) task1 = create_task(simple()) @@ -59,27 +62,31 @@ async def test_stop_immediately_with_cancelling(assert_logs, caplog): assert_logs(["Sample tasks are stopped: cancelling normally"]) assert task1.cancelled() assert task2.cancelled() + assert looptime == 0 @pytest.mark.parametrize('cancelled', [False, True]) -async def test_stop_iteratively(assert_logs, caplog, cancelled): +async def test_stop_iteratively(assert_logs, caplog, cancelled, looptime): logger = logging.getLogger() caplog.set_level(0) task1 = create_task(simple()) task2 = create_task(stuck()) - stask = create_task(stop([task1, task2], title='sample', logger=logger, interval=0.01, cancelled=cancelled)) + stask = create_task(stop([task1, task2], title='sample', logger=logger, interval=1, cancelled=cancelled)) + assert looptime == 0 - done, pending = await asyncio.wait({stask}, timeout=0.011) + done, pending = await asyncio.wait({stask}, timeout=10) assert not done assert task1.done() assert not task2.done() + assert looptime == 10 task2.cancel() - done, pending = await asyncio.wait({stask}, timeout=0.011) + done, pending = await asyncio.wait({stask}, timeout=10) assert done assert task1.done() assert task2.done() + assert looptime == 10 # not 20! assert_logs([ r"Sample tasks are not stopped: (finishing|cancelling) normally; tasks left: \{