Skip to content

Commit

Permalink
Merge pull request #77 from NASA-PDS/big-docs-fix
Browse files Browse the repository at this point in the history
ensure repairkit compatibility with large-doc nodes
  • Loading branch information
alexdunnjpl authored Sep 19, 2023
2 parents 3801664 + cbfd48d commit 60a4f66
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 16 deletions.
2 changes: 1 addition & 1 deletion src/pds/registrysweepers/ancestry/generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def get_nonaggregate_ancestry_records(
'Failed to parse collection and/or product LIDVIDs from document in index "%s" with id "%s" due to %s: %s',
doc.get("_index"),
doc.get("_id"),
type(err),
type(err).__name__,
err,
)
continue
Expand Down
18 changes: 13 additions & 5 deletions src/pds/registrysweepers/repairkit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ def generate_updates(
docs: Iterable[Dict], repairkit_version_metadata_key: str, repairkit_version: int
) -> Iterable[Update]:
"""Lazily generate necessary Update objects for a collection of db documents"""
repair_already_logged_to_error = False

for document in docs:
id = document["_id"]
src = document["_source"]
Expand All @@ -68,9 +70,13 @@ def generate_updates(
for func in funcs:
repairs.update(func(src, fieldname))

if repairs:
log.debug(f"Writing repairs to document: {id}")
yield Update(id=id, content=repairs)
document_needed_fixing = len(set(repairs).difference({repairkit_version_metadata_key})) > 0
if document_needed_fixing and not repair_already_logged_to_error:
log.error(
"repairkit sweeper detects documents in need of repair - please ~harass~ *request* node user to update their harvest version"
)
repair_already_logged_to_error = True
yield Update(id=id, content=repairs)


def run(
Expand All @@ -89,10 +95,12 @@ def run(
}
}

all_docs = query_registry_db(client, unprocessed_docs_query, {})
# page_size and bulk_chunk_max_update_count constraints are necessary to avoid choking nodes with very-large docs
# i.e. ATM and GEO
all_docs = query_registry_db(client, unprocessed_docs_query, {}, page_size=1000)
updates = generate_updates(all_docs, repairkit_version_metadata_key, SWEEPERS_REPAIRKIT_VERSION)
ensure_index_mapping(client, "registry", repairkit_version_metadata_key, "integer")
write_updated_docs(client, updates)
write_updated_docs(client, updates, bulk_chunk_max_update_count=20000)

log.info("Repairkit sweeper processing complete!")

Expand Down
46 changes: 36 additions & 10 deletions src/pds/registrysweepers/utils/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def query_registry_db(
scroll_keepalive = f"{scroll_keepalive_minutes}m"
request_timeout = 20
query_id = get_random_hex_id() # This is just used to differentiate queries during logging
log.info(f"Initiating query with id {query_id}: {query}")
log.info(f"Initiating query with id {query_id}: {json.dumps(query)}")

served_hits = 0

Expand Down Expand Up @@ -73,6 +73,8 @@ def fetch_func(_scroll_id: str = scroll_id):
scroll_id = results.get("_scroll_id")

total_hits = results["hits"]["total"]["value"]
if served_hits == 0:
log.info(f"Query {query_id} returns {total_hits} total hits")
log.debug(
f" paging query {query_id} ({served_hits} to {min(served_hits + page_size, total_hits)} of {total_hits})"
)
Expand Down Expand Up @@ -131,18 +133,32 @@ def mock_wrapper(
return query_registry_db


def write_updated_docs(client: OpenSearch, updates: Iterable[Update], index_name: str = "registry"):
def write_updated_docs(
client: OpenSearch,
updates: Iterable[Update],
index_name: str = "registry",
bulk_chunk_max_update_count: int | None = None,
):
log.info("Updating a lazily-generated collection of product documents...")
updated_doc_count = 0

bulk_buffer_max_size_mb = 30.0
bulk_buffer_size_mb = 0.0
bulk_updates_buffer: List[str] = []
for update in updates:
if bulk_buffer_size_mb > bulk_buffer_max_size_mb:
pending_product_count = int(len(bulk_updates_buffer) / 2)
buffered_updates_count = len(bulk_updates_buffer) // 2
buffer_at_size_threshold = bulk_buffer_size_mb >= bulk_buffer_max_size_mb
buffer_at_update_count_threshold = (
bulk_chunk_max_update_count is not None and buffered_updates_count >= bulk_chunk_max_update_count
)
flush_threshold_reached = buffer_at_size_threshold or buffer_at_update_count_threshold
threshold_log_str = (
f"{bulk_buffer_max_size_mb}MB" if buffer_at_size_threshold else f"{bulk_chunk_max_update_count}docs"
)

if flush_threshold_reached:
log.info(
f"Bulk update buffer has reached {bulk_buffer_max_size_mb}MB threshold - writing {pending_product_count} document updates to db..."
f"Bulk update buffer has reached {threshold_log_str} threshold - writing {buffered_updates_count} document updates to db..."
)
_write_bulk_updates_chunk(client, index_name, bulk_updates_buffer)
bulk_updates_buffer = []
Expand All @@ -156,10 +172,8 @@ def write_updated_docs(client: OpenSearch, updates: Iterable[Update], index_name
bulk_updates_buffer.extend(update_statement_strs)
updated_doc_count += 1

remaining_products_to_write_count = int(len(bulk_updates_buffer) / 2)

if len(bulk_updates_buffer) > 0:
log.info(f"Writing documents updates for {remaining_products_to_write_count} remaining products to db...")
log.info(f"Writing documents updates for {buffered_updates_count} remaining products to db...")
_write_bulk_updates_chunk(client, index_name, bulk_updates_buffer)

log.info(f"Updated documents for {updated_doc_count} total products!")
Expand All @@ -183,15 +197,24 @@ def _write_bulk_updates_chunk(client: OpenSearch, index_name: str, bulk_updates:
warn_types = {"document_missing_exception"} # these types represent bad data, not bad sweepers behaviour
items_with_problems = [item for item in response_content["items"] if "error" in item["update"]]

def get_ids_list_str(ids: List[str]) -> str:
max_display_ids = 50
ids_count = len(ids)
if ids_count <= max_display_ids or log.isEnabledFor(logging.DEBUG):
return str(ids)
else:
return f"{str(ids[:max_display_ids])} <list of {ids_count} ids truncated - enable DEBUG logging for full list>"

if log.isEnabledFor(logging.WARNING):
items_with_warnings = [
item for item in items_with_problems if item["update"]["error"]["type"] in warn_types
]
warning_aggregates = aggregate_update_error_types(items_with_warnings)
for error_type, reason_aggregate in warning_aggregates.items():
for error_reason, ids in reason_aggregate.items():
ids_str = get_ids_list_str(ids)
log.warning(
f"Attempt to update the following documents failed due to {error_type} ({error_reason}): {ids}"
f"Attempt to update the following documents failed due to {error_type} ({error_reason}): {ids_str}"
)

if log.isEnabledFor(logging.ERROR):
Expand All @@ -201,9 +224,12 @@ def _write_bulk_updates_chunk(client: OpenSearch, index_name: str, bulk_updates:
error_aggregates = aggregate_update_error_types(items_with_errors)
for error_type, reason_aggregate in error_aggregates.items():
for error_reason, ids in reason_aggregate.items():
ids_str = get_ids_list_str(ids)
log.error(
f"Attempt to update the following documents failed unexpectedly due to {error_type} ({error_reason}): {ids}"
f"Attempt to update the following documents failed unexpectedly due to {error_type} ({error_reason}): {ids_str}"
)
else:
log.info("Successfully wrote bulk update chunk")


def aggregate_update_error_types(items: Iterable[Dict]) -> Mapping[str, Dict[str, List[str]]]:
Expand Down

0 comments on commit 60a4f66

Please sign in to comment.