Skip to content

Commit

Permalink
dump chunks every ~30 minutes, instead
Browse files Browse the repository at this point in the history
  • Loading branch information
alexdunnjpl committed Mar 5, 2024
1 parent f8eebf4 commit eb33b4a
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions src/pds/registrysweepers/ancestry/generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import sys
import tempfile
from collections import namedtuple
from datetime import datetime
from datetime import timedelta
from typing import Dict
from typing import Iterable
from typing import List
Expand Down Expand Up @@ -275,6 +277,8 @@ def _get_nonaggregate_ancestry_records_with_chunking(

most_recent_attempted_collection_lidvid: Union[PdsLidVid, None] = None
nonaggregate_ancestry_records_by_lidvid = {}
last_dump_time = datetime.now()

for doc in collection_refs_query_docs:
try:
collection_lidvid = PdsLidVid.from_string(doc["_source"]["collection_lidvid"])
Expand All @@ -293,11 +297,12 @@ def _get_nonaggregate_ancestry_records_with_chunking(
record_dict["parent_bundle_lidvids"].update({str(id) for id in bundle_ancestry})
record_dict["parent_collection_lidvids"].add(str(collection_lidvid))

if (
sys.getsizeof(nonaggregate_ancestry_records_by_lidvid) / 1024**2 > 1024
): # if chunk is bigger than 1GB
if datetime.now() - last_dump_time > timedelta(minutes=30):
# if (
# sys.getsizeof(nonaggregate_ancestry_records_by_lidvid) / 1024**2 > 1024
# ): # if chunk is bigger than 1GB
# if psutil.virtual_memory().percent >= disk_dump_memory_threshold:
log.debug(
log.info(
f"Memory threshold {disk_dump_memory_threshold:.1f}% reached - dumping serialized history to disk for {len(nonaggregate_ancestry_records_by_lidvid)} products"
)
make_history_serializable(nonaggregate_ancestry_records_by_lidvid)
Expand All @@ -306,6 +311,7 @@ def _get_nonaggregate_ancestry_records_with_chunking(
chunk_size_max, sys.getsizeof(nonaggregate_ancestry_records_by_lidvid)
) # slightly problematic due to reuse of pointers vs actual values, but let's try it
nonaggregate_ancestry_records_by_lidvid = {}
last_dump_time = datetime.now()

# mark collection for metadata update
touched_ref_documents.append(
Expand Down

0 comments on commit eb33b4a

Please sign in to comment.