Skip to content

Commit

Permalink
demote some noisy info logs
Browse files Browse the repository at this point in the history
  • Loading branch information
alexdunnjpl committed Jan 11, 2024
1 parent 2f5fef2 commit 4daedda
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 19 deletions.
4 changes: 2 additions & 2 deletions src/pds/registrysweepers/ancestry/generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def _get_nonaggregate_ancestry_records_with_chunking(
os.makedirs(on_disk_cache_dir, exist_ok=True)
else:
on_disk_cache_dir = tempfile.mkdtemp(prefix="ancestry-merge-dump_")
log.info(f"dumping partial non-aggregate ancestry result-sets to {on_disk_cache_dir}")
log.debug(f"dumping partial non-aggregate ancestry result-sets to {on_disk_cache_dir}")

collection_refs_query_docs = get_nonaggregate_ancestry_records_query(client, registry_db_mock)

Expand Down Expand Up @@ -270,7 +270,7 @@ def _get_nonaggregate_ancestry_records_with_chunking(
record_dict["parent_collection_lidvids"].add(str(collection_lidvid))

if psutil.virtual_memory().percent >= disk_dump_memory_threshold:
log.info(
log.debug(
f"Memory threshold {disk_dump_memory_threshold:.1f}% reached - dumping serialized history to disk for {len(nonaggregate_ancestry_records_by_lidvid)} products"
)
make_history_serializable(nonaggregate_ancestry_records_by_lidvid)
Expand Down
6 changes: 3 additions & 3 deletions src/pds/registrysweepers/ancestry/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def make_history_serializable(history: Dict[str, Dict[str, Union[str, Set[str],
def dump_history_to_disk(parent_dir: str, history: Dict[str, SerializableAncestryRecordTypeDef]) -> str:
"""Dump set of history records to disk and return the filepath"""
temp_fp = os.path.join(parent_dir, datetime.now().isoformat().replace(":", "-"))
log.info(f"Dumping history to {temp_fp} for later merging...")
log.debug(f"Dumping history to {temp_fp} for later merging...")
with open(temp_fp, "w+") as outfile:
json.dump(history, outfile)
log.debug(" complete!")
Expand All @@ -37,7 +37,7 @@ def dump_history_to_disk(parent_dir: str, history: Dict[str, SerializableAncestr


def merge_matching_history_chunks(dest_fp: str, src_fps: List[str], max_chunk_size: Union[int, None] = None):
log.info(f"Performing merges into {dest_fp} using max_chunk_size={max_chunk_size}")
log.debug(f"Performing merges into {dest_fp} using max_chunk_size={max_chunk_size}")
with open(dest_fp, "r") as dest_infile:
dest_file_content: Dict[str, SerializableAncestryRecordTypeDef] = json.load(dest_infile)

Expand Down Expand Up @@ -114,7 +114,7 @@ def split_chunk_if_oversized(max_chunk_size: Union[int, None], parent_dir: str,
split_content[k] = content.pop(k)

split_filepath = dump_history_to_disk(parent_dir, split_content)
log.info(f"split off excess chunk content to new file: {split_filepath}")
log.debug(f"split off excess chunk content to new file: {split_filepath}")
return split_filepath


Expand Down
28 changes: 14 additions & 14 deletions src/pds/registrysweepers/utils/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ def query_registry_db(

scroll_keepalive = f"{scroll_keepalive_minutes}m"
query_id = get_random_hex_id() # This is just used to differentiate queries during logging
log.info(f"Initiating query (id {query_id}) of index {index_name}: {json.dumps(query)}")
log.debug(f"Initiating query (id {query_id}) of index {index_name}: {json.dumps(query)}")

served_hits = 0

last_info_log_at_percentage = 0
log.info(f"Query {query_id} progress: 0%")
log.debug(f"Query {query_id} progress: 0%")

more_data_exists = True
scroll_id = None
Expand Down Expand Up @@ -78,7 +78,7 @@ def fetch_func(_scroll_id: str = scroll_id):

total_hits = results["hits"]["total"]["value"]
if served_hits == 0:
log.info(f"Query {query_id} returns {total_hits} total hits")
log.debug(f"Query {query_id} returns {total_hits} total hits")

response_hits = results["hits"]["hits"]
for hit in response_hits:
Expand All @@ -87,7 +87,7 @@ def fetch_func(_scroll_id: str = scroll_id):
percentage_of_hits_served = int(served_hits / total_hits * 100)
if last_info_log_at_percentage is None or percentage_of_hits_served >= (last_info_log_at_percentage + 5):
last_info_log_at_percentage = percentage_of_hits_served
log.info(f"Query {query_id} progress: {percentage_of_hits_served}%")
log.debug(f"Query {query_id} progress: {percentage_of_hits_served}%")

yield hit

Expand All @@ -113,7 +113,7 @@ def fetch_func(_scroll_id: str = scroll_id):
logger=log,
)

log.info(f"Query {query_id} complete!")
log.debug(f"Query {query_id} complete!")


def query_registry_db_with_search_after(
Expand All @@ -138,12 +138,12 @@ def query_registry_db_with_search_after(
sort_fields = sort_fields or ["lidvid"]

query_id = get_random_hex_id() # This is just used to differentiate queries during logging
log.info(f"Initiating query with id {query_id}: {json.dumps(query)}")
log.debug(f"Initiating query with id {query_id}: {json.dumps(query)}")

served_hits = 0

last_info_log_at_percentage = 0
log.info(f"Query {query_id} progress: 0%")
log.debug(f"Query {query_id} progress: 0%")

more_data_exists = True
search_after_values: Union[List, None] = None
Expand All @@ -152,7 +152,7 @@ def query_registry_db_with_search_after(
while more_data_exists:
if search_after_values is not None:
query["search_after"] = search_after_values
log.info(
log.debug(
f"Query {query_id} paging {page_size} hits (page {current_page} of {expected_pages}) with sort fields {sort_fields} and search-after values {search_after_values}"
)

Expand Down Expand Up @@ -180,7 +180,7 @@ def fetch_func():
current_page += 1
expected_pages = math.ceil(total_hits / page_size)
if served_hits == 0:
log.info(f"Query {query_id} returns {total_hits} total hits")
log.debug(f"Query {query_id} returns {total_hits} total hits")

response_hits = results["hits"]["hits"]
for hit in response_hits:
Expand All @@ -189,7 +189,7 @@ def fetch_func():
percentage_of_hits_served = int(served_hits / total_hits * 100)
if last_info_log_at_percentage is None or percentage_of_hits_served >= (last_info_log_at_percentage + 5):
last_info_log_at_percentage = percentage_of_hits_served
log.info(f"Query {query_id} progress: {percentage_of_hits_served}%")
log.debug(f"Query {query_id} progress: {percentage_of_hits_served}%")

yield hit

Expand All @@ -209,7 +209,7 @@ def fetch_func():

more_data_exists = served_hits < results["hits"]["total"]["value"]

log.info(f"Query {query_id} complete!")
log.debug(f"Query {query_id} complete!")


def query_registry_db_or_mock(
Expand Down Expand Up @@ -260,7 +260,7 @@ def write_updated_docs(
)

if flush_threshold_reached:
log.info(
log.debug(
f"Bulk update buffer has reached {threshold_log_str} threshold - writing {buffered_updates_count} document updates to db..."
)
_write_bulk_updates_chunk(client, index_name, bulk_updates_buffer)
Expand All @@ -276,7 +276,7 @@ def write_updated_docs(
updated_doc_count += 1

if len(bulk_updates_buffer) > 0:
log.info(f"Writing documents updates for {buffered_updates_count} remaining products to db...")
log.debug(f"Writing documents updates for {buffered_updates_count} remaining products to db...")
_write_bulk_updates_chunk(client, index_name, bulk_updates_buffer)

log.info(f"Updated documents for {updated_doc_count} total products!")
Expand Down Expand Up @@ -332,7 +332,7 @@ def get_ids_list_str(ids: List[str]) -> str:
f"Attempt to update the following documents failed unexpectedly due to {error_type} ({error_reason}): {ids_str}"
)
else:
log.info("Successfully wrote bulk update chunk")
log.debug("Successfully wrote bulk update chunk")


def aggregate_update_error_types(items: Iterable[Dict]) -> Mapping[str, Dict[str, List[str]]]:
Expand Down

0 comments on commit 4daedda

Please sign in to comment.