diff --git a/connectors/es/sink.py b/connectors/es/sink.py index b54cf977e..8be41dec1 100644 --- a/connectors/es/sink.py +++ b/connectors/es/sink.py @@ -189,10 +189,13 @@ async def run(self): except asyncio.CancelledError: self._logger.info("Task is canceled, stop Sink...") raise - except ForceCanceledError: - self._logger.error( - f"Sink did not stop within {CANCELATION_TIMEOUT} seconds of cancelation, force-canceling the task." - ) + except Exception as e: + if isinstance(e, ForceCanceledError) or self._canceled: + self._logger.warning( + f"Sink did not stop within {CANCELATION_TIMEOUT} seconds of cancelation, force-canceling the task." + ) + return + raise async def _run(self): """Creates batches of bulk calls given a queue of items. @@ -369,11 +372,13 @@ async def run(self, generator, job_type): except asyncio.CancelledError: self._logger.info("Task is canceled, stop Extractor...") raise - except ForceCanceledError: - self._logger.error( - f"Extractor did not stop within {CANCELATION_TIMEOUT} seconds of cancelation, force-canceling the task." - ) except Exception as e: + if isinstance(e, ForceCanceledError) or self._canceled: + self._logger.warning( + f"Extractor did not stop within {CANCELATION_TIMEOUT} seconds of cancelation, force-canceling the task." + ) + return + self._logger.critical("Document extractor failed", exc_info=True) await self.put_doc("FETCH_ERROR") self.fetch_error = e diff --git a/tests/test_sink.py b/tests/test_sink.py index 979abdb97..01ede3c4e 100644 --- a/tests/test_sink.py +++ b/tests/test_sink.py @@ -1168,6 +1168,25 @@ async def test_force_canceled_extractor_put_doc(): queue.put.assert_not_awaited() +@pytest.mark.asyncio +async def test_force_canceled_extractor_with_other_errors(patch_logger): + queue = Mock() + queue.put = AsyncMock() + extractor = Extractor( + None, + queue, + INDEX, + ) + generator = AsyncMock(side_effect=Exception("a non-ForceCanceledError")) + + extractor.force_cancel() + # no error thrown + await extractor.run(generator, JobType.FULL) + patch_logger.assert_present( + "Extractor did not stop within 5 seconds of cancelation, force-canceling the task." + ) + + @pytest.mark.asyncio async def test_sink_fetch_doc(): expected_doc = {"id": 123} @@ -1209,6 +1228,28 @@ async def test_force_canceled_sink_fetch_doc(): queue.get.assert_not_awaited() +@pytest.mark.asyncio +async def test_force_canceled_sink_with_other_errors(patch_logger): + queue = Mock() + queue.get = AsyncMock(side_effect=Exception("a non-ForceCanceledError")) + sink = Sink( + None, + queue, + chunk_size=0, + pipeline={"name": "pipeline"}, + chunk_mem_size=0, + max_concurrency=0, + max_retries=3, + ) + + sink.force_cancel() + # no error thrown + await sink.run() + patch_logger.assert_present( + "Sink did not stop within 5 seconds of cancelation, force-canceling the task." + ) + + @pytest.mark.parametrize( "extractor_task_done, sink_task_done, force_cancel", [