Skip to content

Commit

Permalink
re-work error handling in indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
pudo committed Sep 13, 2024
1 parent 257fec4 commit 02313b2
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 31 deletions.
9 changes: 9 additions & 0 deletions yente/provider/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,15 @@ async def search(
query=json.dumps(query),
)
raise YenteIndexError(f"Could not search index: {ae}") from ae
except (
KeyboardInterrupt,
OSError,
Exception,
asyncio.TimeoutError,
asyncio.CancelledError,
) as exc:
msg = f"Error during search: {str(exc)}"
raise YenteIndexError(msg, status=500) from exc

async def bulk_index(self, entities: AsyncIterator[Dict[str, Any]]) -> None:
"""Index a list of entities into the search index."""
Expand Down
9 changes: 9 additions & 0 deletions yente/provider/opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,15 @@ async def search(
query=json.dumps(query),
)
raise YenteIndexError(f"Could not search index: {ae}") from ae
except (
KeyboardInterrupt,
OSError,
Exception,
asyncio.TimeoutError,
asyncio.CancelledError,
) as exc:
msg = f"Error during search: {str(exc)}"
raise YenteIndexError(msg, status=500) from exc

async def bulk_index(self, entities: AsyncIterator[Dict[str, Any]]) -> None:
"""Index a list of entities into the search index."""
Expand Down
44 changes: 13 additions & 31 deletions yente/search/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,25 +101,17 @@ async def get_index_version(provider: SearchProvider, dataset: Dataset) -> str |
return min(versions)


async def index_entities_rate_limit(
provider: SearchProvider, dataset: Dataset, force: bool
) -> bool:
if lock.locked():
log.info("Index is already being updated", dataset=dataset.name, force=force)
return False
with lock:
return await index_entities(provider, dataset, force=force)


async def index_entities(
provider: SearchProvider, dataset: Dataset, force: bool
) -> bool:
) -> None:
"""Index entities in a particular dataset, with versioning of the index."""
alias = settings.ENTITY_INDEX
base_version = await get_index_version(provider, dataset)
updater = await DatasetUpdater.build(dataset, base_version, force_full=force)
if not updater.needs_update():
return False
if updater.dataset.load:
log.info("No update needed", dataset=dataset.name)
return
log.info(
"Indexing entities",
dataset=dataset.name,
Expand All @@ -133,7 +125,7 @@ async def index_entities(
next_index = construct_index_name(dataset.name, updater.target_version)
if not force and await provider.exists_index_alias(alias, next_index):
log.info("Index is up to date.", index=next_index)
return False
return

# await es.indices.delete(index=next_index)
if updater.is_incremental and not force:
Expand All @@ -145,24 +137,17 @@ async def index_entities(
try:
docs = iter_entity_docs(updater, next_index)
await provider.bulk_index(docs)
except (
YenteIndexError,
KeyboardInterrupt,
OSError,
Exception,
asyncio.TimeoutError,
asyncio.CancelledError,
) as exc:
except YenteIndexError as exc:
log.exception(
"Indexing error: %r" % exc,
"Indexing error: %s" % exc.detail,
dataset=dataset.name,
index=next_index,
)
aliases = await provider.get_alias_indices(alias)
if next_index not in aliases:
log.warn("Deleting partial index", index=next_index)
await provider.delete_index(next_index)
return False
raise exc

await provider.refresh(index=next_index)
dataset_prefix = construct_index_name(dataset.name)
Expand All @@ -173,7 +158,6 @@ async def index_entities(
prefix=dataset_prefix,
)
log.info("Index is now aliased to: %s" % alias, index=next_index)
return True


async def delete_old_indices(provider: SearchProvider, catalog: Catalog) -> None:
Expand All @@ -198,20 +182,18 @@ async def delete_old_indices(provider: SearchProvider, catalog: Catalog) -> None
await provider.delete_index(index)


async def update_index(force: bool = False) -> bool:
async def update_index(force: bool = False) -> None:
"""Reindex all datasets if there is a new version of their data contenst available,
return boolean to indicate if the index was changed for any of them."""
or create an initial version of the index from scratch."""
async with with_provider() as provider:
catalog = await get_catalog()
log.info("Index update check")
changed = False
for dataset in catalog.datasets:
_changed = await index_entities_rate_limit(provider, dataset, force)
changed = changed or _changed
with lock:
await index_entities(provider, dataset, force=force)

await delete_old_indices(provider, catalog)
log.info("Index update complete.", changed=changed)
return changed
log.info("Index update complete.")


def update_index_threaded(force: bool = False) -> None:
Expand Down

0 comments on commit 02313b2

Please sign in to comment.