From ecb261f519d54607a1cbb784e48d3ed4dd3c75f0 Mon Sep 17 00:00:00 2001 From: Alex Dunn Date: Mon, 11 Sep 2023 21:37:16 -0700 Subject: [PATCH] bump up request timeout values if necessary, these can be extracted to runtime arguments --- src/pds/registrysweepers/utils/db/__init__.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/pds/registrysweepers/utils/db/__init__.py b/src/pds/registrysweepers/utils/db/__init__.py index 7a004ad..a47bdd3 100644 --- a/src/pds/registrysweepers/utils/db/__init__.py +++ b/src/pds/registrysweepers/utils/db/__init__.py @@ -33,6 +33,7 @@ def query_registry_db( """ scroll_keepalive = f"{scroll_keepalive_minutes}m" + request_timeout = 20 query_id = get_random_hex_id() # This is just used to differentiate queries during logging log.info(f"Initiating query with id {query_id}: {query}") @@ -51,6 +52,7 @@ def fetch_func(): index=index_name, body=query, scroll=scroll_keepalive, + request_timeout=request_timeout, size=page_size, _source_includes=_source.get("includes", []), # TODO: Break out from the enclosing _source object _source_excludes=_source.get("excludes", []), # TODO: Break out from the enclosing _source object @@ -59,7 +61,7 @@ def fetch_func(): else: def fetch_func(_scroll_id: str = scroll_id): - return client.scroll(scroll_id=_scroll_id, scroll=scroll_keepalive) + return client.scroll(scroll_id=_scroll_id, scroll=scroll_keepalive, request_timeout=request_timeout) results = retry_call( fetch_func, @@ -174,7 +176,8 @@ def update_as_statements(update: Update) -> Iterable[str]: def _write_bulk_updates_chunk(client: OpenSearch, index_name: str, bulk_updates: Iterable[str]): bulk_data = "\n".join(bulk_updates) + "\n" - response_content = client.bulk(index=index_name, body=bulk_data) + request_timeout = 60 + response_content = client.bulk(index=index_name, body=bulk_data, request_timeout=request_timeout) if response_content.get("errors"): warn_types = {"document_missing_exception"} # these types represent bad data, not bad sweepers behaviour