diff --git a/src/pds/registrysweepers/ancestry/generation.py b/src/pds/registrysweepers/ancestry/generation.py index 81b16f0..3b11245 100644 --- a/src/pds/registrysweepers/ancestry/generation.py +++ b/src/pds/registrysweepers/ancestry/generation.py @@ -176,7 +176,7 @@ def get_nonaggregate_ancestry_records( 'Failed to parse collection and/or product LIDVIDs from document in index "%s" with id "%s" due to %s: %s', doc.get("_index"), doc.get("_id"), - type(err), + type(err).__name__, err, ) continue diff --git a/src/pds/registrysweepers/repairkit/__init__.py b/src/pds/registrysweepers/repairkit/__init__.py index 25a65d4..c81027a 100644 --- a/src/pds/registrysweepers/repairkit/__init__.py +++ b/src/pds/registrysweepers/repairkit/__init__.py @@ -57,6 +57,8 @@ def generate_updates( docs: Iterable[Dict], repairkit_version_metadata_key: str, repairkit_version: int ) -> Iterable[Update]: """Lazily generate necessary Update objects for a collection of db documents""" + repair_already_logged_to_error = False + for document in docs: id = document["_id"] src = document["_source"] @@ -68,9 +70,13 @@ def generate_updates( for func in funcs: repairs.update(func(src, fieldname)) - if repairs: - log.debug(f"Writing repairs to document: {id}") - yield Update(id=id, content=repairs) + document_needed_fixing = len(set(repairs).difference({repairkit_version_metadata_key})) > 0 + if document_needed_fixing and not repair_already_logged_to_error: + log.error( + "repairkit sweeper detects documents in need of repair - please ~harass~ *request* node user to update their harvest version" + ) + repair_already_logged_to_error = True + yield Update(id=id, content=repairs) def run( @@ -89,10 +95,12 @@ def run( } } - all_docs = query_registry_db(client, unprocessed_docs_query, {}) + # page_size and bulk_chunk_max_update_count constraints are necessary to avoid choking nodes with very-large docs + # i.e. ATM and GEO + all_docs = query_registry_db(client, unprocessed_docs_query, {}, page_size=1000) updates = generate_updates(all_docs, repairkit_version_metadata_key, SWEEPERS_REPAIRKIT_VERSION) ensure_index_mapping(client, "registry", repairkit_version_metadata_key, "integer") - write_updated_docs(client, updates) + write_updated_docs(client, updates, bulk_chunk_max_update_count=20000) log.info("Repairkit sweeper processing complete!") diff --git a/src/pds/registrysweepers/utils/db/__init__.py b/src/pds/registrysweepers/utils/db/__init__.py index a47bdd3..ce20925 100644 --- a/src/pds/registrysweepers/utils/db/__init__.py +++ b/src/pds/registrysweepers/utils/db/__init__.py @@ -35,7 +35,7 @@ def query_registry_db( scroll_keepalive = f"{scroll_keepalive_minutes}m" request_timeout = 20 query_id = get_random_hex_id() # This is just used to differentiate queries during logging - log.info(f"Initiating query with id {query_id}: {query}") + log.info(f"Initiating query with id {query_id}: {json.dumps(query)}") served_hits = 0 @@ -73,6 +73,8 @@ def fetch_func(_scroll_id: str = scroll_id): scroll_id = results.get("_scroll_id") total_hits = results["hits"]["total"]["value"] + if served_hits == 0: + log.info(f"Query {query_id} returns {total_hits} total hits") log.debug( f" paging query {query_id} ({served_hits} to {min(served_hits + page_size, total_hits)} of {total_hits})" ) @@ -131,7 +133,12 @@ def mock_wrapper( return query_registry_db -def write_updated_docs(client: OpenSearch, updates: Iterable[Update], index_name: str = "registry"): +def write_updated_docs( + client: OpenSearch, + updates: Iterable[Update], + index_name: str = "registry", + bulk_chunk_max_update_count: int | None = None, +): log.info("Updating a lazily-generated collection of product documents...") updated_doc_count = 0 @@ -139,10 +146,19 @@ def write_updated_docs(client: OpenSearch, updates: Iterable[Update], index_name bulk_buffer_size_mb = 0.0 bulk_updates_buffer: List[str] = [] for update in updates: - if bulk_buffer_size_mb > bulk_buffer_max_size_mb: - pending_product_count = int(len(bulk_updates_buffer) / 2) + buffered_updates_count = len(bulk_updates_buffer) // 2 + buffer_at_size_threshold = bulk_buffer_size_mb >= bulk_buffer_max_size_mb + buffer_at_update_count_threshold = ( + bulk_chunk_max_update_count is not None and buffered_updates_count >= bulk_chunk_max_update_count + ) + flush_threshold_reached = buffer_at_size_threshold or buffer_at_update_count_threshold + threshold_log_str = ( + f"{bulk_buffer_max_size_mb}MB" if buffer_at_size_threshold else f"{bulk_chunk_max_update_count}docs" + ) + + if flush_threshold_reached: log.info( - f"Bulk update buffer has reached {bulk_buffer_max_size_mb}MB threshold - writing {pending_product_count} document updates to db..." + f"Bulk update buffer has reached {threshold_log_str} threshold - writing {buffered_updates_count} document updates to db..." ) _write_bulk_updates_chunk(client, index_name, bulk_updates_buffer) bulk_updates_buffer = [] @@ -156,10 +172,8 @@ def write_updated_docs(client: OpenSearch, updates: Iterable[Update], index_name bulk_updates_buffer.extend(update_statement_strs) updated_doc_count += 1 - remaining_products_to_write_count = int(len(bulk_updates_buffer) / 2) - if len(bulk_updates_buffer) > 0: - log.info(f"Writing documents updates for {remaining_products_to_write_count} remaining products to db...") + log.info(f"Writing documents updates for {buffered_updates_count} remaining products to db...") _write_bulk_updates_chunk(client, index_name, bulk_updates_buffer) log.info(f"Updated documents for {updated_doc_count} total products!") @@ -183,6 +197,14 @@ def _write_bulk_updates_chunk(client: OpenSearch, index_name: str, bulk_updates: warn_types = {"document_missing_exception"} # these types represent bad data, not bad sweepers behaviour items_with_problems = [item for item in response_content["items"] if "error" in item["update"]] + def get_ids_list_str(ids: List[str]) -> str: + max_display_ids = 50 + ids_count = len(ids) + if ids_count <= max_display_ids or log.isEnabledFor(logging.DEBUG): + return str(ids) + else: + return f"{str(ids[:max_display_ids])} " + if log.isEnabledFor(logging.WARNING): items_with_warnings = [ item for item in items_with_problems if item["update"]["error"]["type"] in warn_types @@ -190,8 +212,9 @@ def _write_bulk_updates_chunk(client: OpenSearch, index_name: str, bulk_updates: warning_aggregates = aggregate_update_error_types(items_with_warnings) for error_type, reason_aggregate in warning_aggregates.items(): for error_reason, ids in reason_aggregate.items(): + ids_str = get_ids_list_str(ids) log.warning( - f"Attempt to update the following documents failed due to {error_type} ({error_reason}): {ids}" + f"Attempt to update the following documents failed due to {error_type} ({error_reason}): {ids_str}" ) if log.isEnabledFor(logging.ERROR): @@ -201,9 +224,12 @@ def _write_bulk_updates_chunk(client: OpenSearch, index_name: str, bulk_updates: error_aggregates = aggregate_update_error_types(items_with_errors) for error_type, reason_aggregate in error_aggregates.items(): for error_reason, ids in reason_aggregate.items(): + ids_str = get_ids_list_str(ids) log.error( - f"Attempt to update the following documents failed unexpectedly due to {error_type} ({error_reason}): {ids}" + f"Attempt to update the following documents failed unexpectedly due to {error_type} ({error_reason}): {ids_str}" ) + else: + log.info("Successfully wrote bulk update chunk") def aggregate_update_error_types(items: Iterable[Dict]) -> Mapping[str, Dict[str, List[str]]]: