From 03fa5c42da9acfead2ab9e294a96026c17f0aaed Mon Sep 17 00:00:00 2001 From: Alex Dunn Date: Tue, 19 Sep 2023 10:08:33 -0700 Subject: [PATCH 1/4] improve log message for ease of copy/pasting into postman --- src/pds/registrysweepers/utils/db/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pds/registrysweepers/utils/db/__init__.py b/src/pds/registrysweepers/utils/db/__init__.py index a47bdd3..4a6c6e6 100644 --- a/src/pds/registrysweepers/utils/db/__init__.py +++ b/src/pds/registrysweepers/utils/db/__init__.py @@ -35,7 +35,7 @@ def query_registry_db( scroll_keepalive = f"{scroll_keepalive_minutes}m" request_timeout = 20 query_id = get_random_hex_id() # This is just used to differentiate queries during logging - log.info(f"Initiating query with id {query_id}: {query}") + log.info(f"Initiating query with id {query_id}: {json.dumps(query)}") served_hits = 0 From 6de54f35555ae1df347854aa0397222e9926d119 Mon Sep 17 00:00:00 2001 From: Alex Dunn Date: Tue, 19 Sep 2023 15:15:20 -0700 Subject: [PATCH 2/4] implement update-count-based _bulk constraint for repairkit --- .../registrysweepers/repairkit/__init__.py | 6 ++++-- src/pds/registrysweepers/utils/db/__init__.py | 20 ++++++++++++++++--- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/src/pds/registrysweepers/repairkit/__init__.py b/src/pds/registrysweepers/repairkit/__init__.py index 25a65d4..1419014 100644 --- a/src/pds/registrysweepers/repairkit/__init__.py +++ b/src/pds/registrysweepers/repairkit/__init__.py @@ -89,10 +89,12 @@ def run( } } - all_docs = query_registry_db(client, unprocessed_docs_query, {}) + # page_size and bulk_chunk_max_update_count constraints are necessary to avoid choking nodes with very-large docs + # i.e. ATM and GEO + all_docs = query_registry_db(client, unprocessed_docs_query, {}, page_size=1000) updates = generate_updates(all_docs, repairkit_version_metadata_key, SWEEPERS_REPAIRKIT_VERSION) ensure_index_mapping(client, "registry", repairkit_version_metadata_key, "integer") - write_updated_docs(client, updates) + write_updated_docs(client, updates, bulk_chunk_max_update_count=20000) log.info("Repairkit sweeper processing complete!") diff --git a/src/pds/registrysweepers/utils/db/__init__.py b/src/pds/registrysweepers/utils/db/__init__.py index 4a6c6e6..313471f 100644 --- a/src/pds/registrysweepers/utils/db/__init__.py +++ b/src/pds/registrysweepers/utils/db/__init__.py @@ -131,7 +131,12 @@ def mock_wrapper( return query_registry_db -def write_updated_docs(client: OpenSearch, updates: Iterable[Update], index_name: str = "registry"): +def write_updated_docs( + client: OpenSearch, + updates: Iterable[Update], + index_name: str = "registry", + bulk_chunk_max_update_count: int | None = None, +): log.info("Updating a lazily-generated collection of product documents...") updated_doc_count = 0 @@ -139,8 +144,17 @@ def write_updated_docs(client: OpenSearch, updates: Iterable[Update], index_name bulk_buffer_size_mb = 0.0 bulk_updates_buffer: List[str] = [] for update in updates: - if bulk_buffer_size_mb > bulk_buffer_max_size_mb: - pending_product_count = int(len(bulk_updates_buffer) / 2) + buffered_updates_count = len(bulk_updates_buffer) // 2 + buffer_at_size_threshold = bulk_buffer_size_mb >= bulk_buffer_max_size_mb + buffer_at_update_count_threshold = ( + bulk_chunk_max_update_count is not None and buffered_updates_count >= bulk_chunk_max_update_count + ) + flush_threshold_reached = buffer_at_size_threshold or buffer_at_update_count_threshold + threshold_log_str = ( + f"{bulk_buffer_max_size_mb}MB" if buffer_at_size_threshold else f"{bulk_chunk_max_update_count}docs" + ) + + if flush_threshold_reached: log.info( f"Bulk update buffer has reached {bulk_buffer_max_size_mb}MB threshold - writing {pending_product_count} document updates to db..." ) From f63feaa9c6c753911c00c1a04d784e7a9b856ab6 Mon Sep 17 00:00:00 2001 From: Alex Dunn Date: Tue, 19 Sep 2023 15:15:31 -0700 Subject: [PATCH 3/4] logging improvements --- .../registrysweepers/ancestry/generation.py | 2 +- src/pds/registrysweepers/utils/db/__init__.py | 24 ++++++++++++++----- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/src/pds/registrysweepers/ancestry/generation.py b/src/pds/registrysweepers/ancestry/generation.py index 81b16f0..3b11245 100644 --- a/src/pds/registrysweepers/ancestry/generation.py +++ b/src/pds/registrysweepers/ancestry/generation.py @@ -176,7 +176,7 @@ def get_nonaggregate_ancestry_records( 'Failed to parse collection and/or product LIDVIDs from document in index "%s" with id "%s" due to %s: %s', doc.get("_index"), doc.get("_id"), - type(err), + type(err).__name__, err, ) continue diff --git a/src/pds/registrysweepers/utils/db/__init__.py b/src/pds/registrysweepers/utils/db/__init__.py index 313471f..ce20925 100644 --- a/src/pds/registrysweepers/utils/db/__init__.py +++ b/src/pds/registrysweepers/utils/db/__init__.py @@ -73,6 +73,8 @@ def fetch_func(_scroll_id: str = scroll_id): scroll_id = results.get("_scroll_id") total_hits = results["hits"]["total"]["value"] + if served_hits == 0: + log.info(f"Query {query_id} returns {total_hits} total hits") log.debug( f" paging query {query_id} ({served_hits} to {min(served_hits + page_size, total_hits)} of {total_hits})" ) @@ -156,7 +158,7 @@ def write_updated_docs( if flush_threshold_reached: log.info( - f"Bulk update buffer has reached {bulk_buffer_max_size_mb}MB threshold - writing {pending_product_count} document updates to db..." + f"Bulk update buffer has reached {threshold_log_str} threshold - writing {buffered_updates_count} document updates to db..." ) _write_bulk_updates_chunk(client, index_name, bulk_updates_buffer) bulk_updates_buffer = [] @@ -170,10 +172,8 @@ def write_updated_docs( bulk_updates_buffer.extend(update_statement_strs) updated_doc_count += 1 - remaining_products_to_write_count = int(len(bulk_updates_buffer) / 2) - if len(bulk_updates_buffer) > 0: - log.info(f"Writing documents updates for {remaining_products_to_write_count} remaining products to db...") + log.info(f"Writing documents updates for {buffered_updates_count} remaining products to db...") _write_bulk_updates_chunk(client, index_name, bulk_updates_buffer) log.info(f"Updated documents for {updated_doc_count} total products!") @@ -197,6 +197,14 @@ def _write_bulk_updates_chunk(client: OpenSearch, index_name: str, bulk_updates: warn_types = {"document_missing_exception"} # these types represent bad data, not bad sweepers behaviour items_with_problems = [item for item in response_content["items"] if "error" in item["update"]] + def get_ids_list_str(ids: List[str]) -> str: + max_display_ids = 50 + ids_count = len(ids) + if ids_count <= max_display_ids or log.isEnabledFor(logging.DEBUG): + return str(ids) + else: + return f"{str(ids[:max_display_ids])} " + if log.isEnabledFor(logging.WARNING): items_with_warnings = [ item for item in items_with_problems if item["update"]["error"]["type"] in warn_types @@ -204,8 +212,9 @@ def _write_bulk_updates_chunk(client: OpenSearch, index_name: str, bulk_updates: warning_aggregates = aggregate_update_error_types(items_with_warnings) for error_type, reason_aggregate in warning_aggregates.items(): for error_reason, ids in reason_aggregate.items(): + ids_str = get_ids_list_str(ids) log.warning( - f"Attempt to update the following documents failed due to {error_type} ({error_reason}): {ids}" + f"Attempt to update the following documents failed due to {error_type} ({error_reason}): {ids_str}" ) if log.isEnabledFor(logging.ERROR): @@ -215,9 +224,12 @@ def _write_bulk_updates_chunk(client: OpenSearch, index_name: str, bulk_updates: error_aggregates = aggregate_update_error_types(items_with_errors) for error_type, reason_aggregate in error_aggregates.items(): for error_reason, ids in reason_aggregate.items(): + ids_str = get_ids_list_str(ids) log.error( - f"Attempt to update the following documents failed unexpectedly due to {error_type} ({error_reason}): {ids}" + f"Attempt to update the following documents failed unexpectedly due to {error_type} ({error_reason}): {ids_str}" ) + else: + log.info("Successfully wrote bulk update chunk") def aggregate_update_error_types(items: Iterable[Dict]) -> Mapping[str, Dict[str, List[str]]]: From cbfd48d2b8b35eae12bdf5d18a419def6e2a55c4 Mon Sep 17 00:00:00 2001 From: Alex Dunn Date: Tue, 19 Sep 2023 15:31:50 -0700 Subject: [PATCH 4/4] log presence of fix-required data as error --- src/pds/registrysweepers/repairkit/__init__.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/pds/registrysweepers/repairkit/__init__.py b/src/pds/registrysweepers/repairkit/__init__.py index 1419014..c81027a 100644 --- a/src/pds/registrysweepers/repairkit/__init__.py +++ b/src/pds/registrysweepers/repairkit/__init__.py @@ -57,6 +57,8 @@ def generate_updates( docs: Iterable[Dict], repairkit_version_metadata_key: str, repairkit_version: int ) -> Iterable[Update]: """Lazily generate necessary Update objects for a collection of db documents""" + repair_already_logged_to_error = False + for document in docs: id = document["_id"] src = document["_source"] @@ -68,9 +70,13 @@ def generate_updates( for func in funcs: repairs.update(func(src, fieldname)) - if repairs: - log.debug(f"Writing repairs to document: {id}") - yield Update(id=id, content=repairs) + document_needed_fixing = len(set(repairs).difference({repairkit_version_metadata_key})) > 0 + if document_needed_fixing and not repair_already_logged_to_error: + log.error( + "repairkit sweeper detects documents in need of repair - please ~harass~ *request* node user to update their harvest version" + ) + repair_already_logged_to_error = True + yield Update(id=id, content=repairs) def run(