Skip to content

Commit

Permalink
Fix deprecated pattern in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
isra17 committed Apr 5, 2024
1 parent a2dcc51 commit bf796ee
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 59 deletions.
9 changes: 7 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@


@pytest.fixture
def http_client_mock(event_loop: TimeForwardLoop) -> HttpClientMock:
return HttpClientMock(loop=event_loop)
def http_client_mock(running_event_loop: TimeForwardLoop) -> HttpClientMock:
return HttpClientMock(loop=running_event_loop)


FreezeTime = t.Union[FrozenDateTimeFactory, StepTickTimeFactory]
Expand Down Expand Up @@ -81,6 +81,11 @@ def event_loop(
loop.close()


@pytest.fixture
async def running_event_loop() -> TimeForwardLoop:
return t.cast(TimeForwardLoop, asyncio.get_running_loop())


def pipeline() -> None: ...


Expand Down
4 changes: 2 additions & 2 deletions tests/worker/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,8 @@ def fake_resources_provider_class() -> str:


@pytest.fixture
async def rabbitmq_url(event_loop: TimeForwardLoop, config: Config) -> str:
event_loop.forward_time = False
async def rabbitmq_url(running_event_loop: TimeForwardLoop, config: Config) -> str:
running_event_loop.forward_time = False
url = config.c.rabbitmq.url
try:
connection = await aio_pika.connect(url)
Expand Down
14 changes: 7 additions & 7 deletions tests/worker/inventories/test_joined_inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ async def test_join_sub_inventories() -> None:


async def test_concurrent_join_inventories(
event_loop: TimeForwardLoop,
running_event_loop: TimeForwardLoop,
) -> None:
inventory = JoinInventory.from_options(
{
Expand All @@ -300,7 +300,7 @@ async def test_concurrent_join_inventories(
async with alib.scoped_iter(inventory.run()) as run, TasksGroup() as group:
items = []
while True:
async with event_loop.until_idle():
async with running_event_loop.until_idle():
task = group.create_task(alib.anext(run))
if task.done():
items.append(task.result())
Expand All @@ -320,7 +320,7 @@ async def test_concurrent_join_inventories(
async with sub_items.pop(1):
pass

await event_loop.wait_idle()
await running_event_loop.wait_idle()
assert not task.done()

assert json.loads(inventory.cursor) == {
Expand Down Expand Up @@ -357,7 +357,7 @@ async def test_concurrent_join_inventories(


async def test_nested_join_inventory(
event_loop: TimeForwardLoop,
running_event_loop: TimeForwardLoop,
) -> None:
inventory = JoinInventory.from_options(
{
Expand Down Expand Up @@ -395,7 +395,7 @@ async def test_nested_join_inventory(
# root_concurrency.
items = []
while True:
async with event_loop.until_idle():
async with running_event_loop.until_idle():
task = group.create_task(alib.anext(run))
if task.done():
items.append(task.result())
Expand All @@ -413,10 +413,10 @@ async def test_nested_join_inventory(
async with item:
pass

await event_loop.wait_idle()
await running_event_loop.wait_idle()
items.append(await task)
while True:
async with event_loop.until_idle():
async with running_event_loop.until_idle():
task = group.create_task(alib.anext(run))
if task.done():
items.append(task.result())
Expand Down
6 changes: 3 additions & 3 deletions tests/worker/inventories/test_loop_inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ async def test_loop_inventory_max_iterations() -> None:
assert json.loads(c) == {"v": 1, "a": "19"}


async def test_loop_concurrency(event_loop: TimeForwardLoop) -> None:
async def test_loop_concurrency(running_event_loop: TimeForwardLoop) -> None:
inventory = LoopInventory(options=LoopInventory.Options(max_iterations=20))

async with alib.scoped_iter(inventory.run()) as run, TasksGroup() as group:
item = await alib.anext(run)
async with event_loop.until_idle():
async with running_event_loop.until_idle():
next_item = group.create_task(alib.anext(run))
assert not next_item.done()

async with event_loop.until_idle():
async with running_event_loop.until_idle():
async with item:
pass
assert next_item.done()
4 changes: 2 additions & 2 deletions tests/worker/inventories/test_periodic_inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

@pytest.mark.asyncio
async def test_periodic_inventory(
event_loop: TimeForwardLoop,
running_event_loop: TimeForwardLoop,
frozen_time: FreezeTime,
) -> None:
frozen_time.move_to(datetime.datetime.fromisoformat("1970-01-01T00:00:00+00:00"))
Expand Down Expand Up @@ -54,7 +54,7 @@ async def test_periodic_inventory(
@pytest.mark.asyncio
async def test_periodic_end_date(
frozen_time: FreezeTime,
event_loop: TimeForwardLoop,
running_event_loop: TimeForwardLoop,
) -> None:
frozen_time.move_to(datetime.datetime.fromisoformat("1970-01-01T00:00:00+00:00"))
start_date = utcnow()
Expand Down
12 changes: 6 additions & 6 deletions tests/worker/test_executable.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def test_config_override(

async def test_executable_close(
executable_queue_maker: t.Callable[..., ExecutableQueue],
event_loop: TimeForwardLoop,
running_event_loop: TimeForwardLoop,
) -> None:
output_topic = MemoryTopic(MemoryTopic.Options(name="output_topic"))
output_topic.close = AsyncMock() # type: ignore[assignment]
Expand All @@ -118,7 +118,7 @@ async def test_executable_close(

xmsg_ctx = await alib.anext(xmsg_iter)
async with xmsg_ctx._context:
async with event_loop.until_idle():
async with running_event_loop.until_idle():
close_task = asyncio.create_task(executable_queue.close())
input_topic.close.assert_awaited()
output_topic.close.assert_not_called()
Expand Down Expand Up @@ -152,7 +152,7 @@ async def bad_context_1(self) -> t.AsyncIterator[TopicMessage]:
async def test_execurablt_concurrency(
executable_queue_maker: t.Callable[..., ExecutableQueue],
fake_queue_item: QueueItemWithState,
event_loop: TimeForwardLoop,
running_event_loop: TimeForwardLoop,
) -> None:
topic = StaticTopic(
options=StaticTopic.Options(
Expand All @@ -166,15 +166,15 @@ async def test_execurablt_concurrency(
async with alib.scoped_iter(xqueue.run()) as xrun, AsyncExitStack() as stack:
msg1 = await xrun.__anext__()
msg2 = await xrun.__anext__()
async with event_loop.until_idle():
async with running_event_loop.until_idle():
next_task = asyncio.create_task(alib.anext(xrun))
assert not next_task.done()

async with event_loop.until_idle():
async with running_event_loop.until_idle():
await stack.enter_async_context(msg1._context)
assert not next_task.done()

async with event_loop.until_idle():
async with running_event_loop.until_idle():
async with stack:
pass
assert next_task.done()
Expand Down
44 changes: 22 additions & 22 deletions tests/worker/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,21 @@ async def process_message(self, message: ExecutableMessage) -> PipelineResults:
@pytest.mark.asyncio
async def test_base_executor(
executable_maker: Callable[[], ExecutableMessage],
event_loop: TimeForwardLoop,
running_event_loop: TimeForwardLoop,
executor_queue_maker: Callable[..., ExecutorQueue],
) -> None:
executor = FakeExecutor()
executor.concurrency = 5
executor_manager = executor_queue_maker(executor=executor)

async with event_loop.until_idle():
async with running_event_loop.until_idle():
for _ in range(10):
asyncio.create_task(executor_manager.submit(executable_maker()))

assert executor.processing == 5
assert executor.processed == 0

async with event_loop.until_idle():
async with running_event_loop.until_idle():
for _ in range(10):
executor.execute_semaphore.release()
assert executor.processed == 10
Expand All @@ -100,7 +100,7 @@ def pipeline(resource: FakeResource) -> None: ...
@pytest.mark.asyncio
async def test_executor_wait_resources_and_queue(
executable_maker: Callable[..., ExecutableMessage],
event_loop: TimeForwardLoop,
running_event_loop: TimeForwardLoop,
executor_queue_maker: Callable[..., ExecutorQueue],
) -> None:
executor = FakeExecutor()
Expand All @@ -121,42 +121,42 @@ async def test_executor_wait_resources_and_queue(
# Set up a scenario where there's 2 resource and 1 executor slot.
# Queuing 3 items should have 1 waiting on the executor and 1 waiting on
# the resources.
async with event_loop.until_idle():
async with running_event_loop.until_idle():
for _ in range(2):
await executor_manager.submit(executable_maker())

assert executor.processing == 1
assert not parker.locked()

# Submit another task, stuck locking a resource, park the processable.
async with event_loop.until_idle():
async with running_event_loop.until_idle():
await executor_manager.submit(executable_maker())

assert executor.processing == 1
assert parker.locked()

# Process the task pending in the executor and release the resource.
async with event_loop.until_idle():
async with running_event_loop.until_idle():
executor.execute_semaphore.release()
assert executor.processed == 1
assert executor.processing == 2
assert not parker.locked()

# Process the other task, release the resource.
async with event_loop.until_idle():
async with running_event_loop.until_idle():
executor.execute_semaphore.release()
assert executor.processed == 2
assert executor.processing == 3
assert not parker.locked()

async with event_loop.until_idle():
async with running_event_loop.until_idle():
executor.execute_semaphore.release()


@pytest.mark.asyncio
async def test_executor_wait_pusblish_and_queue(
executable_maker: Callable[..., ExecutableMessage],
event_loop: TimeForwardLoop,
running_event_loop: TimeForwardLoop,
executor_queue_maker: Callable[..., ExecutorQueue],
) -> None:
executor = FakeExecutor()
Expand All @@ -180,7 +180,7 @@ async def test_executor_wait_pusblish_and_queue(
# Set up a scenario where there's 2 task, 1 executor slot and 1 publish slot.
# Queuing 2 items should have 1 waiting on the executor and 1 waiting on publish
# the resources.
async with event_loop.until_idle():
async with running_event_loop.until_idle():
for _ in range(2):
await executor_manager.submit(executable_maker())

Expand All @@ -190,7 +190,7 @@ async def test_executor_wait_pusblish_and_queue(
assert not parker.locked()

# Process one task, take publish slot.
async with event_loop.until_idle():
async with running_event_loop.until_idle():
executor.execute_semaphore.release()

assert executor.processing == 2
Expand All @@ -199,7 +199,7 @@ async def test_executor_wait_pusblish_and_queue(
assert not parker.locked()

# Process the other task, get stuck on publishing
async with event_loop.until_idle():
async with running_event_loop.until_idle():
executor.execute_semaphore.release()

assert executor.processing == 2
Expand All @@ -208,7 +208,7 @@ async def test_executor_wait_pusblish_and_queue(
assert parker.locked()

# Pop the item in the publish queue, leaving room for the next item.
async with event_loop.until_idle():
async with running_event_loop.until_idle():
assert output_queue.get_nowait().args == {"n": 1}

assert executor.processing == 2
Expand All @@ -217,7 +217,7 @@ async def test_executor_wait_pusblish_and_queue(
assert not parker.locked()

# Pop the other item in the publish queue, clearing the queue.
async with event_loop.until_idle():
async with running_event_loop.until_idle():
assert output_queue.get_nowait().args == {"n": 2}

assert executor.processing == 2
Expand All @@ -229,7 +229,7 @@ async def test_executor_wait_pusblish_and_queue(
@pytest.mark.asyncio
async def test_executor_error_handler(
fake_executable_maker_with_output: Callable[..., ExecutableMessage],
event_loop: TimeForwardLoop,
running_event_loop: TimeForwardLoop,
executor_queue_maker: Callable[..., ExecutorQueue],
) -> None:
executor = FakeFailingExecutor()
Expand All @@ -255,7 +255,7 @@ async def collect_exit(*args: t.Any) -> None:
xmsg._executing_context.push_async_exit(collect_exit)

# Execute our failing message
async with event_loop.until_idle():
async with running_event_loop.until_idle():
asyncio.create_task(executor_manager.submit(xmsg))

# Our pipeline should cause a test exception and publish it in its channel
Expand Down Expand Up @@ -287,7 +287,7 @@ async def collect_exit(*args: t.Any) -> None:
@pytest.mark.asyncio
async def test_executor_error_handler_unhandled(
fake_executable_maker_with_output: Callable[..., ExecutableMessage],
event_loop: TimeForwardLoop,
running_event_loop: TimeForwardLoop,
executor_queue_maker: Callable[..., ExecutorQueue],
) -> None:
executor = FakeFailingExecutor()
Expand All @@ -314,7 +314,7 @@ async def test_executor_error_handler_unhandled(
xmsg._context.push_async_exit(mock.context)

# Execute our failing message
async with event_loop.until_idle():
async with running_event_loop.until_idle():
asyncio.create_task(executor_manager.submit(xmsg))

# A message is outputed to error topic.
Expand All @@ -334,7 +334,7 @@ async def test_executor_error_handler_unhandled(
@pytest.mark.asyncio
async def test_executor_error_handler_republish(
fake_executable_maker_with_output: Callable[..., ExecutableMessage],
event_loop: TimeForwardLoop,
running_event_loop: TimeForwardLoop,
executor_queue_maker: Callable[..., ExecutorQueue],
) -> None:
executor = FakeFailingExecutor()
Expand Down Expand Up @@ -373,7 +373,7 @@ async def collect_exit(*args: t.Any) -> None:
xmsg._executing_context.push_async_exit(collect_exit)

# Execute our failing message
async with event_loop.until_idle():
async with running_event_loop.until_idle():
asyncio.create_task(executor_manager.submit(xmsg))

# The error should be republished to the `retry` channel.
Expand All @@ -391,7 +391,7 @@ async def collect_exit(*args: t.Any) -> None:
output=output_topics,
)
xmsg._executing_context.push_async_exit(collect_exit)
async with event_loop.until_idle():
async with running_event_loop.until_idle():
asyncio.create_task(executor_manager.submit(xmsg))

# The error should reach max retry and not republish.
Expand Down
Loading

0 comments on commit bf796ee

Please sign in to comment.