Skip to content

Commit

Permalink
Ignore errors when Sink or Extractor is forcibly canceled (#1992)
Browse files Browse the repository at this point in the history
  • Loading branch information
wangch079 authored and elastic committed Dec 19, 2023
1 parent a2b4a59 commit 27f7b20
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 8 deletions.
21 changes: 13 additions & 8 deletions connectors/es/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,13 @@ async def run(self):
except asyncio.CancelledError:
self._logger.info("Task is canceled, stop Sink...")
raise
except ForceCanceledError:
self._logger.error(
f"Sink did not stop within {CANCELATION_TIMEOUT} seconds of cancelation, force-canceling the task."
)
except Exception as e:
if isinstance(e, ForceCanceledError) or self._canceled:
self._logger.warning(
f"Sink did not stop within {CANCELATION_TIMEOUT} seconds of cancelation, force-canceling the task."
)
return
raise

async def _run(self):
"""Creates batches of bulk calls given a queue of items.
Expand Down Expand Up @@ -369,11 +372,13 @@ async def run(self, generator, job_type):
except asyncio.CancelledError:
self._logger.info("Task is canceled, stop Extractor...")
raise
except ForceCanceledError:
self._logger.error(
f"Extractor did not stop within {CANCELATION_TIMEOUT} seconds of cancelation, force-canceling the task."
)
except Exception as e:
if isinstance(e, ForceCanceledError) or self._canceled:
self._logger.warning(
f"Extractor did not stop within {CANCELATION_TIMEOUT} seconds of cancelation, force-canceling the task."
)
return

self._logger.critical("Document extractor failed", exc_info=True)
await self.put_doc("FETCH_ERROR")
self.fetch_error = e
Expand Down
41 changes: 41 additions & 0 deletions tests/test_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -1168,6 +1168,25 @@ async def test_force_canceled_extractor_put_doc():
queue.put.assert_not_awaited()


@pytest.mark.asyncio
async def test_force_canceled_extractor_with_other_errors(patch_logger):
queue = Mock()
queue.put = AsyncMock()
extractor = Extractor(
None,
queue,
INDEX,
)
generator = AsyncMock(side_effect=Exception("a non-ForceCanceledError"))

extractor.force_cancel()
# no error thrown
await extractor.run(generator, JobType.FULL)
patch_logger.assert_present(
"Extractor did not stop within 5 seconds of cancelation, force-canceling the task."
)


@pytest.mark.asyncio
async def test_sink_fetch_doc():
expected_doc = {"id": 123}
Expand Down Expand Up @@ -1209,6 +1228,28 @@ async def test_force_canceled_sink_fetch_doc():
queue.get.assert_not_awaited()


@pytest.mark.asyncio
async def test_force_canceled_sink_with_other_errors(patch_logger):
queue = Mock()
queue.get = AsyncMock(side_effect=Exception("a non-ForceCanceledError"))
sink = Sink(
None,
queue,
chunk_size=0,
pipeline={"name": "pipeline"},
chunk_mem_size=0,
max_concurrency=0,
max_retries=3,
)

sink.force_cancel()
# no error thrown
await sink.run()
patch_logger.assert_present(
"Sink did not stop within 5 seconds of cancelation, force-canceling the task."
)


@pytest.mark.parametrize(
"extractor_task_done, sink_task_done, force_cancel",
[
Expand Down

0 comments on commit 27f7b20

Please sign in to comment.