From 482146ade666f18e13a06bed13d656a05dd8fbab Mon Sep 17 00:00:00 2001 From: edunn Date: Thu, 11 Apr 2024 12:11:28 -0700 Subject: [PATCH] generate deferred updates only after non-deferred updates have been written to db --- src/pds/registrysweepers/ancestry/__init__.py | 76 +++++++++++-------- .../ancestry/test_ancestry.py | 10 ++- 2 files changed, 54 insertions(+), 32 deletions(-) diff --git a/src/pds/registrysweepers/ancestry/__init__.py b/src/pds/registrysweepers/ancestry/__init__.py index 337dc60..ef98299 100644 --- a/src/pds/registrysweepers/ancestry/__init__.py +++ b/src/pds/registrysweepers/ancestry/__init__.py @@ -9,6 +9,7 @@ from typing import List from typing import Optional from typing import Set +from typing import TextIO from typing import Tuple from typing import Union @@ -57,8 +58,9 @@ def run( ancestry_records = chain(collection_and_nonaggregate_records, bundle_records) ancestry_records_to_write = filter(lambda r: not r.skip_write, ancestry_records) + deferred_records_file = tempfile.NamedTemporaryFile(mode="w+", delete=False) updates = generate_updates( - client, ancestry_records_to_write, ancestry_records_accumulator, bulk_updates_sink, registry_mock_query_f + ancestry_records_to_write, deferred_records_file.name, ancestry_records_accumulator, bulk_updates_sink ) if bulk_updates_sink is None: @@ -81,6 +83,15 @@ def run( updates, index_name="registry", ) + log.info("Generating updates from deferred records...") + deferred_updates = generate_deferred_updates(client, deferred_records_file.name, registry_mock_query_f) + + log.info("Writing deferred updates to database...") + write_updated_docs( + client, + deferred_updates, + index_name="registry", + ) else: # consume generator to dump bulk updates to sink for _ in updates: @@ -116,18 +127,19 @@ def orphan_counter_mock(_, __): def generate_updates( - client: OpenSearch, ancestry_records: Iterable[AncestryRecord], + deferred_records_filepath: str, ancestry_records_accumulator=None, bulk_updates_sink=None, - registry_db_mock: DbMockTypeDef = None, ) -> Iterable[Update]: """ - Given a collection of AncestryRecords, yield corresponding Update objects. + Given a collection of AncestryRecords, yield corresponding Update objects, excluding any deferred updates, which + must be generated seperately. Ideally, there should be one record per product, but this is not necessarily the case due to the potential of nonaggregate products to be shared between collections with different LIDs. In that case, it is necessary to: - defer processing of all records which conflict with a previously-processed record + - ensure all non-deferred records have been written to the db - retrieve the conflicting records which have been written to db, since the streaming collection-iterative approach prevents efficiently detecting conflicts until the first partial history is already processed/written. - merge all deferred/retrieved partial histories into a full history for each distinct product lidvid @@ -139,7 +151,7 @@ def generate_updates( # stream/yield Updates for AncestryRecords, deferring processing of conflicting AncestryRecords and storing them in # a temporary file - with tempfile.NamedTemporaryFile(mode="w+", delete=False) as deferred_records_file: + with open(deferred_records_filepath, mode="w+") as deferred_records_file: for record in ancestry_records: # Tee the stream of records into the accumulator, if one was provided (functional testing). if ancestry_records_accumulator is not None: @@ -166,33 +178,35 @@ def generate_updates( updated_doc_ids.add(update.id) yield update - # Merge all deferred records with matching lidvids - with open(deferred_records_file.name, "r") as deferred_records_file: # type: ignore - deferred_records_by_lidvid: Dict[PdsLidVid, AncestryRecord] = {} - for l in deferred_records_file.readlines(): - record = AncestryRecord.from_dict(json.loads(l)) - if record.lidvid in deferred_records_by_lidvid: - deferred_records_by_lidvid[record.lidvid].update_with(record) - else: - deferred_records_by_lidvid.update({record.lidvid: record}) - - # Retrieve the first partial history (already written to db) for each lidvid, merge with its deferred history, - # then yield a full-history-update for that lidvid - for record in deferred_records_by_lidvid.values(): - doc = get_existing_ancestry_for_product(client, record.lidvid, registry_db_mock) - partial_record_from_db = AncestryRecord.from_dict( - { - "lidvid": doc["lidvid"], - "parent_bundle_lidvids": doc[METADATA_PARENT_BUNDLE_KEY], - "parent_collection_lidvids": doc[METADATA_PARENT_COLLECTION_KEY], - } - ) - record.update_with(partial_record_from_db) - update = update_from_record(record) - yield update - # TODO: Check that ancestry version is equal to current, throw if not. - os.remove(deferred_records_file.name) +def generate_deferred_updates( + client: OpenSearch, deferred_records_filepath: str, registry_db_mock: DbMockTypeDef = None +) -> Iterable[Update]: + # Merge all deferred records with matching lidvids + with open(deferred_records_filepath, "r") as deferred_records_file: # type: ignore + deferred_records_by_lidvid: Dict[PdsLidVid, AncestryRecord] = {} + for l in deferred_records_file.readlines(): + record = AncestryRecord.from_dict(json.loads(l)) + if record.lidvid in deferred_records_by_lidvid: + deferred_records_by_lidvid[record.lidvid].update_with(record) + else: + deferred_records_by_lidvid.update({record.lidvid: record}) + + # Retrieve the first partial history (already written to db) for each lidvid, merge with its deferred history, + # then yield a full-history-update for that lidvid + for record in deferred_records_by_lidvid.values(): + doc = get_existing_ancestry_for_product(client, record.lidvid, registry_db_mock) + partial_record_from_db = AncestryRecord.from_dict( + { + "lidvid": doc["lidvid"], + "parent_bundle_lidvids": doc[METADATA_PARENT_BUNDLE_KEY], + "parent_collection_lidvids": doc[METADATA_PARENT_COLLECTION_KEY], + } + ) + record.update_with(partial_record_from_db) + update = update_from_record(record) + yield update + # TODO: Check that ancestry version is equal to current, throw if not. if __name__ == "__main__": diff --git a/tests/pds/registrysweepers/ancestry/test_ancestry.py b/tests/pds/registrysweepers/ancestry/test_ancestry.py index eb06491..cb701cc 100644 --- a/tests/pds/registrysweepers/ancestry/test_ancestry.py +++ b/tests/pds/registrysweepers/ancestry/test_ancestry.py @@ -1,12 +1,14 @@ import itertools import logging import os.path +import tempfile import unittest from typing import Dict from typing import List from typing import Tuple from pds.registrysweepers import ancestry +from pds.registrysweepers.ancestry import generate_deferred_updates from pds.registrysweepers.ancestry import generate_updates from pds.registrysweepers.ancestry.ancestryrecord import AncestryRecord from pds.registrysweepers.ancestry.constants import METADATA_PARENT_BUNDLE_KEY @@ -312,7 +314,13 @@ def test_ancestor_partial_history_accumulation(self): generate_nonaggregate_and_collection_records_iteratively(None, collection_ancestry_records, query_mock_f) ) - updates = list(generate_updates(None, collection_and_nonaggregate_records, None, None, query_mock_f)) + deferred_records_file = tempfile.NamedTemporaryFile(mode="w+", delete=False) + non_deferred_updates = list( + generate_updates(collection_and_nonaggregate_records, deferred_records_file.name, None, None) + ) + deferred_updates = list(generate_deferred_updates(None, deferred_records_file.name, query_mock_f)) + updates = non_deferred_updates + deferred_updates + os.remove(deferred_records_file.name) # TODO: increase to two nonmatching collections and two shared products