From 3c9826059c43b3eb556128dcdbd72aa0c513c72f Mon Sep 17 00:00:00 2001 From: SimonThordal Date: Mon, 2 Sep 2024 12:13:14 +0200 Subject: [PATCH] Fail on any error --- yente/provider/base.py | 4 +--- yente/provider/elastic.py | 13 ++++--------- yente/provider/opensearch.py | 12 +++--------- yente/search/indexer.py | 2 +- 4 files changed, 9 insertions(+), 22 deletions(-) diff --git a/yente/provider/base.py b/yente/provider/base.py index ae1c68e7..b4c213be 100644 --- a/yente/provider/base.py +++ b/yente/provider/base.py @@ -60,8 +60,6 @@ async def search( """Search for entities in the index.""" raise NotImplementedError - async def bulk_index( - self, entities: AsyncIterator[Dict[str, Any]] - ) -> Tuple[int, int]: + async def bulk_index(self, entities: AsyncIterator[Dict[str, Any]]) -> int: """Index a list of entities into the search index.""" raise NotImplementedError diff --git a/yente/provider/elastic.py b/yente/provider/elastic.py index bf7fef6f..543f9c5b 100644 --- a/yente/provider/elastic.py +++ b/yente/provider/elastic.py @@ -229,28 +229,23 @@ async def search( ) raise YenteIndexError(f"Could not search index: {ae}") from ae - async def bulk_index( - self, entities: AsyncIterator[Dict[str, Any]] - ) -> Tuple[int, int]: + async def bulk_index(self, entities: AsyncIterator[Dict[str, Any]]) -> int: """Index a list of entities into the search index. Args: entities: An async iterator of entities to index. Returns: - A tuple of the number of entities indexed and the number of errors. + he number of entities indexed """ try: - n, errors = await async_bulk( + n, _ = await async_bulk( self.client(), entities, chunk_size=1000, raise_on_error=False, ) - errors = cast(List[Any], errors) - for error in errors: - log.error("Bulk index error", error=error) - return n, len(errors) + return n except BulkIndexError as exc: raise YenteIndexError(f"Could not index entities: {exc}") from exc diff --git a/yente/provider/opensearch.py b/yente/provider/opensearch.py index 760ecd7a..64ffebf2 100644 --- a/yente/provider/opensearch.py +++ b/yente/provider/opensearch.py @@ -219,22 +219,16 @@ async def search( ) raise YenteIndexError(f"Could not search index: {ae}") from ae - async def bulk_index( - self, entities: AsyncIterator[Dict[str, Any]] - ) -> Tuple[int, int]: + async def bulk_index(self, entities: AsyncIterator[Dict[str, Any]]) -> int: """Index a list of entities into the search index.""" try: - n, errors = await async_bulk( + n, _ = await async_bulk( self.client, entities, chunk_size=1000, max_retries=3, initial_backoff=2, - raise_on_error=False, ) - errors = cast(List[Any], errors) - for error in errors: - log.error("Bulk index error", error=error) - return n, len(errors) + return n except BulkIndexError as exc: raise YenteIndexError(f"Could not index entities: {exc}") from exc diff --git a/yente/search/indexer.py b/yente/search/indexer.py index bf8b7a3c..4c73f4c6 100644 --- a/yente/search/indexer.py +++ b/yente/search/indexer.py @@ -141,7 +141,7 @@ async def index_entities( await provider.create_index(next_index) try: docs = iter_entity_docs(updater, next_index) - n_changed, _ = await provider.bulk_index(docs) + n_changed = await provider.bulk_index(docs) except ( YenteIndexError, KeyboardInterrupt,