diff --git a/src/pds/registrysweepers/ancestry/__init__.py b/src/pds/registrysweepers/ancestry/__init__.py index a2f07f3..7a72803 100644 --- a/src/pds/registrysweepers/ancestry/__init__.py +++ b/src/pds/registrysweepers/ancestry/__init__.py @@ -1,4 +1,7 @@ +import json import logging +import os +import tempfile from itertools import chain from typing import Callable from typing import Dict @@ -6,17 +9,23 @@ from typing import List from typing import Optional from typing import Set +from typing import TextIO from typing import Tuple from typing import Union from opensearchpy import OpenSearch from pds.registrysweepers.ancestry.ancestryrecord import AncestryRecord +from pds.registrysweepers.ancestry.constants import METADATA_PARENT_BUNDLE_KEY +from pds.registrysweepers.ancestry.constants import METADATA_PARENT_COLLECTION_KEY from pds.registrysweepers.ancestry.generation import generate_nonaggregate_and_collection_records_iteratively from pds.registrysweepers.ancestry.generation import get_bundle_ancestry_records from pds.registrysweepers.ancestry.generation import get_collection_ancestry_records from pds.registrysweepers.ancestry.generation import get_nonaggregate_ancestry_records +from pds.registrysweepers.ancestry.queries import get_existing_ancestry_for_product from pds.registrysweepers.ancestry.queries import get_orphaned_documents from pds.registrysweepers.ancestry.queries import get_orphaned_documents_count +from pds.registrysweepers.ancestry.typedefs import DbMockTypeDef +from pds.registrysweepers.ancestry.utils import update_from_record from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION_METADATA_KEY from pds.registrysweepers.utils import configure_logging @@ -25,12 +34,10 @@ from pds.registrysweepers.utils.db.client import get_opensearch_client from pds.registrysweepers.utils.db.indexing import ensure_index_mapping from pds.registrysweepers.utils.db.update import Update +from pds.registrysweepers.utils.productidentifiers.pdslidvid import PdsLidVid log = logging.getLogger(__name__) -METADATA_PARENT_BUNDLE_KEY = "ops:Provenance/ops:parent_bundle_identifier" -METADATA_PARENT_COLLECTION_KEY = "ops:Provenance/ops:parent_collection_identifier" - def run( client: OpenSearch, @@ -51,7 +58,10 @@ def run( ancestry_records = chain(collection_and_nonaggregate_records, bundle_records) ancestry_records_to_write = filter(lambda r: not r.skip_write, ancestry_records) - updates = generate_updates(ancestry_records_to_write, ancestry_records_accumulator, bulk_updates_sink) + deferred_records_file = tempfile.NamedTemporaryFile(mode="w+", delete=False) + updates = generate_updates( + ancestry_records_to_write, deferred_records_file.name, ancestry_records_accumulator, bulk_updates_sink + ) if bulk_updates_sink is None: log.info("Ensuring metadata keys are present in database index...") @@ -73,6 +83,15 @@ def run( updates, index_name="registry", ) + log.info("Generating updates from deferred records...") + deferred_updates = generate_deferred_updates(client, deferred_records_file.name, registry_mock_query_f) + + log.info("Writing deferred updates to database...") + write_updated_docs( + client, + deferred_updates, + index_name="registry", + ) else: # consume generator to dump bulk updates to sink for _ in updates: @@ -108,39 +127,90 @@ def orphan_counter_mock(_, __): def generate_updates( - ancestry_records: Iterable[AncestryRecord], ancestry_records_accumulator=None, bulk_updates_sink=None + ancestry_records: Iterable[AncestryRecord], + deferred_records_filepath: str, + ancestry_records_accumulator=None, + bulk_updates_sink=None, ) -> Iterable[Update]: - updates: Set[str] = set() + """ + Given a collection of AncestryRecords, yield corresponding Update objects, excluding any deferred updates, which + must be generated seperately. + + Ideally, there should be one record per product, but this is not necessarily the case due to the potential of + nonaggregate products to be shared between collections with different LIDs. In that case, it is necessary to: + - defer processing of all records which conflict with a previously-processed record + - ensure all non-deferred records have been written to the db + - retrieve the conflicting records which have been written to db, since the streaming collection-iterative + approach prevents efficiently detecting conflicts until the first partial history is already processed/written. + - merge all deferred/retrieved partial histories into a full history for each distinct product lidvid + - yield those full-history updates, which will overwrite the partial histories initially written to db + """ + updated_doc_ids: Set[str] = set() log.info("Generating document bulk updates for AncestryRecords...") - for record in ancestry_records: - # Tee the stream of records into the accumulator, if one was provided (functional testing). - if ancestry_records_accumulator is not None: - ancestry_records_accumulator.append(record) + # stream/yield Updates for AncestryRecords, deferring processing of conflicting AncestryRecords and storing them in + # a temporary file + with open(deferred_records_filepath, mode="w+") as deferred_records_file: + for record in ancestry_records: + # Tee the stream of records into the accumulator, if one was provided (functional testing). + if ancestry_records_accumulator is not None: + ancestry_records_accumulator.append(record) + + if record.lidvid.is_collection() and len(record.parent_bundle_lidvids) == 0: + log.warning(f"Collection {record.lidvid} is not referenced by any bundle.") - if record.lidvid.is_collection() and len(record.parent_bundle_lidvids) == 0: - log.warning(f"Collection {record.lidvid} is not referenced by any bundle.") + update = update_from_record(record) - doc_id = str(record.lidvid) - update_content = { - METADATA_PARENT_BUNDLE_KEY: [str(id) for id in record.parent_bundle_lidvids], - METADATA_PARENT_COLLECTION_KEY: [str(id) for id in record.parent_collection_lidvids], - SWEEPERS_ANCESTRY_VERSION_METADATA_KEY: int(SWEEPERS_ANCESTRY_VERSION), - } + # Tee the stream of bulk update KVs into the accumulator, if one was provided (functional testing). + if bulk_updates_sink is not None: + bulk_updates_sink.append((update.id, update.content)) - # Tee the stream of bulk update KVs into the accumulator, if one was provided (functional testing). - if bulk_updates_sink is not None: - bulk_updates_sink.append((doc_id, update_content)) + if update.id in updated_doc_ids: + log.debug( + f"Multiple updates detected for doc_id {update.id} - deferring subsequent parts" + " - storing in {deferred_updates_file.name}" + ) + deferred_records_file.write(json.dumps(record.to_dict(sort_lists=False)) + "\n") + deferred_records_file.flush() + continue - if doc_id in updates: - log.error( - f"Multiple updates detected for doc_id {doc_id} - cannot create update! (new content {update_content} will not be written)" + updated_doc_ids.add(update.id) + yield update + + +def generate_deferred_updates( + client: OpenSearch, deferred_records_filepath: str, registry_db_mock: DbMockTypeDef = None +) -> Iterable[Update]: + # Merge all deferred records with matching lidvids + with open(deferred_records_filepath, "r") as deferred_records_file: # type: ignore + deferred_records_by_lidvid: Dict[PdsLidVid, AncestryRecord] = {} + for l in deferred_records_file.readlines(): + record = AncestryRecord.from_dict(json.loads(l)) + if record.lidvid in deferred_records_by_lidvid: + deferred_records_by_lidvid[record.lidvid].update_with(record) + else: + deferred_records_by_lidvid.update({record.lidvid: record}) + + # Retrieve the first partial history (already written to db) for each lidvid, merge with its deferred history, + # then yield a full-history-update for that lidvid + for record in deferred_records_by_lidvid.values(): + doc = get_existing_ancestry_for_product(client, record.lidvid, registry_db_mock) + try: + partial_record_from_db = AncestryRecord.from_dict( + { + "lidvid": doc["_source"]["lidvid"], + "parent_bundle_lidvids": doc["_source"][METADATA_PARENT_BUNDLE_KEY], + "parent_collection_lidvids": doc["_source"][METADATA_PARENT_COLLECTION_KEY], + } ) - continue + record.update_with(partial_record_from_db) + update = update_from_record(record) + yield update + except (KeyError, ValueError) as err: + log.error(f'Failed to parse valid AncestryRecord from document with id "{doc["_id"]}: {err}"') - updates.add(doc_id) - yield Update(id=doc_id, content=update_content) + # TODO: Check that ancestry version is equal to current, throw if not. if __name__ == "__main__": diff --git a/src/pds/registrysweepers/ancestry/ancestryrecord.py b/src/pds/registrysweepers/ancestry/ancestryrecord.py index 62bfcac..9f3c1b8 100644 --- a/src/pds/registrysweepers/ancestry/ancestryrecord.py +++ b/src/pds/registrysweepers/ancestry/ancestryrecord.py @@ -20,6 +20,10 @@ class AncestryRecord: # equivalent record is known to already exist due to up-to-date ancestry version flag in the source document skip_write: bool = False + def __post_init__(self): + if not isinstance(self.lidvid, PdsLidVid): + raise ValueError('Cannot initialise AncestryRecord with non-PdsLidVid value for "lidvid"') + def __repr__(self): return f"AncestryRecord(lidvid={self.lidvid}, parent_collection_lidvids={sorted([str(x) for x in self.parent_collection_lidvids])}, parent_bundle_lidvids={sorted([str(x) for x in self.parent_bundle_lidvids])})" diff --git a/src/pds/registrysweepers/ancestry/constants.py b/src/pds/registrysweepers/ancestry/constants.py new file mode 100644 index 0000000..1fd0cb2 --- /dev/null +++ b/src/pds/registrysweepers/ancestry/constants.py @@ -0,0 +1,2 @@ +METADATA_PARENT_BUNDLE_KEY = "ops:Provenance/ops:parent_bundle_identifier" +METADATA_PARENT_COLLECTION_KEY = "ops:Provenance/ops:parent_collection_identifier" diff --git a/src/pds/registrysweepers/ancestry/generation.py b/src/pds/registrysweepers/ancestry/generation.py index bebed6e..f50951e 100644 --- a/src/pds/registrysweepers/ancestry/generation.py +++ b/src/pds/registrysweepers/ancestry/generation.py @@ -17,13 +17,13 @@ import psutil # type: ignore from opensearchpy import OpenSearch from pds.registrysweepers.ancestry.ancestryrecord import AncestryRecord -from pds.registrysweepers.ancestry.queries import DbMockTypeDef from pds.registrysweepers.ancestry.queries import get_bundle_ancestry_records_query from pds.registrysweepers.ancestry.queries import get_collection_ancestry_records_bundles_query from pds.registrysweepers.ancestry.queries import get_collection_ancestry_records_collections_query from pds.registrysweepers.ancestry.queries import get_nonaggregate_ancestry_records_for_collection_lid_query from pds.registrysweepers.ancestry.queries import get_nonaggregate_ancestry_records_query from pds.registrysweepers.ancestry.runtimeconstants import AncestryRuntimeConstants +from pds.registrysweepers.ancestry.typedefs import DbMockTypeDef from pds.registrysweepers.ancestry.utils import dump_history_to_disk from pds.registrysweepers.ancestry.utils import gb_mem_to_size from pds.registrysweepers.ancestry.utils import load_partial_history_to_records @@ -90,19 +90,6 @@ def get_ancestry_by_collection_lidvid(collections_docs: Iterable[Dict]) -> Mappi return ancestry_by_collection_lidvid -def get_collection_aliases_by_lid(collections_docs: Iterable[Dict]) -> Dict[PdsLid, Set[PdsLid]]: - aliases_by_lid: Dict[PdsLid, Set[PdsLid]] = {} - for doc in collections_docs: - alternate_ids: List[str] = doc["_source"].get("alternate_ids", []) - lids: Set[PdsLid] = {PdsProductIdentifierFactory.from_string(id).lid for id in alternate_ids} - for lid in lids: - if lid not in aliases_by_lid: - aliases_by_lid[lid] = set() - aliases_by_lid[lid].update(lids) - - return aliases_by_lid - - def get_ancestry_by_collection_lid( ancestry_by_collection_lidvid: Mapping[PdsLidVid, AncestryRecord] ) -> Mapping[PdsLid, Set[AncestryRecord]]: @@ -124,9 +111,6 @@ def get_collection_ancestry_records( bundles_docs = get_collection_ancestry_records_bundles_query(client, registry_db_mock) collections_docs = list(get_collection_ancestry_records_collections_query(client, registry_db_mock)) - # Prepare LID alias sets for every LID - collection_aliases_by_lid: Dict[PdsLid, Set[PdsLid]] = get_collection_aliases_by_lid(collections_docs) - # Prepare empty ancestry records for collections, with fast access by LID or LIDVID ancestry_by_collection_lidvid: Mapping[PdsLidVid, AncestryRecord] = get_ancestry_by_collection_lidvid( collections_docs @@ -167,9 +151,8 @@ def get_collection_ancestry_records( ) elif isinstance(identifier, PdsLid): try: - for alias in collection_aliases_by_lid[identifier]: - for record in ancestry_by_collection_lid[alias]: - record.parent_bundle_lidvids.add(bundle_lidvid) + for record in ancestry_by_collection_lid[identifier.lid]: + record.parent_bundle_lidvids.add(bundle_lidvid) except KeyError: log.warning( f"No versions of collection {identifier} referenced by bundle {bundle_lidvid} " @@ -261,7 +244,18 @@ def get_nonaggregate_ancestry_records_for_collection_lid( continue collection_lidvid = PdsLidVid.from_string(doc["_source"]["collection_lidvid"]) - nonaggregate_lidvids = [PdsLidVid.from_string(s) for s in doc["_source"]["product_lidvid"]] + referenced_lidvids = [PdsLidVid.from_string(s) for s in doc["_source"]["product_lidvid"]] + nonaggregate_lidvids = [id for id in referenced_lidvids if id.is_basic_product()] + + erroneous_lidvids = [id for id in referenced_lidvids if not id.is_basic_product()] + if len(erroneous_lidvids) > 0: + log.error( + f'registry-refs document with id {doc["_id"]} references one or more aggregate products in its product_lidvid refs list: {[str(id) for id in erroneous_lidvids]}' + ) + + except IndexError as err: + doc_id = doc["_id"] + log.warning(f'Encountered document with unexpected _id: "{doc_id}"') except (ValueError, KeyError) as err: log.warning( 'Failed to parse collection and/or product LIDVIDs from document in index "%s" with id "%s" due to %s: %s', diff --git a/src/pds/registrysweepers/ancestry/queries.py b/src/pds/registrysweepers/ancestry/queries.py index 4bbff06..612fb03 100644 --- a/src/pds/registrysweepers/ancestry/queries.py +++ b/src/pds/registrysweepers/ancestry/queries.py @@ -1,23 +1,23 @@ import logging from enum import auto from enum import Enum -from typing import Callable from typing import Dict from typing import Iterable -from typing import Optional from opensearchpy import OpenSearch +from pds.registrysweepers.ancestry.constants import METADATA_PARENT_BUNDLE_KEY +from pds.registrysweepers.ancestry.constants import METADATA_PARENT_COLLECTION_KEY from pds.registrysweepers.ancestry.runtimeconstants import AncestryRuntimeConstants +from pds.registrysweepers.ancestry.typedefs import DbMockTypeDef from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION_METADATA_KEY from pds.registrysweepers.utils.db import get_query_hits_count from pds.registrysweepers.utils.db import query_registry_db_or_mock from pds.registrysweepers.utils.productidentifiers.pdslid import PdsLid +from pds.registrysweepers.utils.productidentifiers.pdslidvid import PdsLidVid log = logging.getLogger(__name__) -DbMockTypeDef = Optional[Callable[[str], Iterable[Dict]]] - class ProductClass(Enum): BUNDLE = (auto(),) @@ -60,7 +60,7 @@ def get_collection_ancestry_records_collections_query( ) -> Iterable[Dict]: # Query the registry for all collection identifiers query = product_class_query_factory(ProductClass.COLLECTION) - _source = {"includes": ["lidvid", "alternate_ids", SWEEPERS_ANCESTRY_VERSION_METADATA_KEY]} + _source = {"includes": ["lidvid", SWEEPERS_ANCESTRY_VERSION_METADATA_KEY]} query_f = query_registry_db_or_mock(db_mock, "get_collection_ancestry_records_collections", use_search_after=True) docs = query_f(client, "registry", query, _source) @@ -108,7 +108,11 @@ def get_nonaggregate_ancestry_records_for_collection_lid_query( "seq_no_primary_term": True, } _source = {"includes": ["collection_lidvid", "batch_id", "product_lidvid"]} - query_f = query_registry_db_or_mock(registry_db_mock, "get_nonaggregate_ancestry_records", use_search_after=True) + query_f = query_registry_db_or_mock( + registry_db_mock, + f"get_nonaggregate_ancestry_records_for_collection_lid-{collection_lid}", + use_search_after=True, + ) # each document will have many product lidvids, so a smaller page size is warranted here docs = query_f( @@ -150,3 +154,39 @@ def get_orphaned_documents_count(client: OpenSearch, index_name: str) -> int: # Query an index documents without an up-to-date ancestry version reference - this would indicate a product which is # orphaned and is getting missed in processing return get_query_hits_count(client, index_name, _orphaned_docs_query) + + +def get_existing_ancestry_for_product( + client: OpenSearch, product_lidvid: PdsLidVid, registry_db_mock: DbMockTypeDef +) -> Dict: + # Retrieve ancestry for a single document. It would be simpler to just pull it by id, but this is compatible with + # the existing functional testing framework. + query: Dict = { + "query": { + "bool": { + "filter": [ + {"term": {"lidvid": str(product_lidvid)}}, + ], + } + }, + } + _source = { + "includes": [ + "lidvid", + METADATA_PARENT_BUNDLE_KEY, + METADATA_PARENT_COLLECTION_KEY, + SWEEPERS_ANCESTRY_VERSION_METADATA_KEY, + ] + } + query_f = query_registry_db_or_mock( + registry_db_mock, f"get_existing_ancestry_for_product-{product_lidvid}", use_search_after=True + ) + + docs = query_f( + client, + "registry", + query, + _source, + ) + + return list(docs)[0] diff --git a/src/pds/registrysweepers/ancestry/typedefs.py b/src/pds/registrysweepers/ancestry/typedefs.py index 7e1e9c6..57cd54d 100644 --- a/src/pds/registrysweepers/ancestry/typedefs.py +++ b/src/pds/registrysweepers/ancestry/typedefs.py @@ -1,5 +1,9 @@ +from typing import Callable from typing import Dict +from typing import Iterable from typing import List +from typing import Optional from typing import Union SerializableAncestryRecordTypeDef = Dict[str, Union[str, List[str]]] +DbMockTypeDef = Optional[Callable[[str], Iterable[Dict]]] diff --git a/src/pds/registrysweepers/ancestry/utils.py b/src/pds/registrysweepers/ancestry/utils.py index 5cd4c7b..212c99a 100644 --- a/src/pds/registrysweepers/ancestry/utils.py +++ b/src/pds/registrysweepers/ancestry/utils.py @@ -11,8 +11,13 @@ from typing import Set from typing import Union -from pds.registrysweepers.ancestry import AncestryRecord +from pds.registrysweepers.ancestry.ancestryrecord import AncestryRecord +from pds.registrysweepers.ancestry.constants import METADATA_PARENT_BUNDLE_KEY +from pds.registrysweepers.ancestry.constants import METADATA_PARENT_COLLECTION_KEY from pds.registrysweepers.ancestry.typedefs import SerializableAncestryRecordTypeDef +from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION +from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION_METADATA_KEY +from pds.registrysweepers.utils.db import Update log = logging.getLogger(__name__) @@ -138,3 +143,13 @@ def load_partial_history_to_records(fn: str) -> Iterable[AncestryRecord]: def gb_mem_to_size(desired_mem_usage_gb) -> int: # rough estimated ratio of memory size to sys.getsizeof() report return desired_mem_usage_gb / 3.1 * 2621536 + + +def update_from_record(record: AncestryRecord) -> Update: + doc_id = str(record.lidvid) + content = { + METADATA_PARENT_BUNDLE_KEY: [str(id) for id in record.parent_bundle_lidvids], + METADATA_PARENT_COLLECTION_KEY: [str(id) for id in record.parent_collection_lidvids], + SWEEPERS_ANCESTRY_VERSION_METADATA_KEY: int(SWEEPERS_ANCESTRY_VERSION), + } + return Update(id=doc_id, content=content) diff --git a/src/pds/registrysweepers/ancestry/versioning.py b/src/pds/registrysweepers/ancestry/versioning.py index 9728b90..926d05c 100644 --- a/src/pds/registrysweepers/ancestry/versioning.py +++ b/src/pds/registrysweepers/ancestry/versioning.py @@ -3,5 +3,5 @@ # previously-processed data from pds.registrysweepers.utils.misc import get_sweeper_version_metadata_key -SWEEPERS_ANCESTRY_VERSION = 2 +SWEEPERS_ANCESTRY_VERSION = 3 SWEEPERS_ANCESTRY_VERSION_METADATA_KEY = get_sweeper_version_metadata_key("ancestry") diff --git a/src/pds/registrysweepers/utils/db/__init__.py b/src/pds/registrysweepers/utils/db/__init__.py index 27e106e..e8e3c5f 100644 --- a/src/pds/registrysweepers/utils/db/__init__.py +++ b/src/pds/registrysweepers/utils/db/__init__.py @@ -238,7 +238,7 @@ def write_updated_docs( index_name: str, bulk_chunk_max_update_count: Union[int, None] = None, ): - log.info("Updating a lazily-generated collection of product documents...") + log.info("Writing document updates...") updated_doc_count = 0 bulk_buffer_max_size_mb = 30.0 @@ -275,7 +275,7 @@ def write_updated_docs( log.debug(f"Writing documents updates for {buffered_updates_count} remaining products to db...") _write_bulk_updates_chunk(client, index_name, bulk_updates_buffer) - log.info(f"Updated documents for {updated_doc_count} total products!") + log.info(f"Updated documents for {updated_doc_count} products!") def update_as_statements(update: Update) -> Iterable[str]: @@ -294,7 +294,7 @@ def update_as_statements(update: Update) -> Iterable[str]: def _write_bulk_updates_chunk(client: OpenSearch, index_name: str, bulk_updates: Iterable[str]): bulk_data = "\n".join(bulk_updates) + "\n" - request_timeout = 90 + request_timeout = 180 response_content = client.bulk(index=index_name, body=bulk_data, request_timeout=request_timeout) if response_content.get("errors"): diff --git a/src/pds/registrysweepers/utils/db/update.py b/src/pds/registrysweepers/utils/db/update.py index 2e149dd..1f65222 100644 --- a/src/pds/registrysweepers/utils/db/update.py +++ b/src/pds/registrysweepers/utils/db/update.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from dataclasses import dataclass from typing import Dict from typing import Union diff --git a/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryDeferredPartialUpdatesTestCase.json b/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryDeferredPartialUpdatesTestCase.json new file mode 100644 index 0000000..b1e9389 --- /dev/null +++ b/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryDeferredPartialUpdatesTestCase.json @@ -0,0 +1,70 @@ +{ + "--note": [ + "Example history for two collections with different LIDVIDs, each having two unique non-agg products and", + "another which exists in both collections.", + "This tests correct accumulation of history when a non-aggregate product derives its history from collections", + "which are processed in different chunks (as records are chunked by collection LID)" + ], + "get_nonaggregate_ancestry_records_for_collection_lid-a:b:c:matching_bundle:matching_collection": [ + { + "_id": "a:b:c:matching_bundle:matching_collection::1.0::P1", + "_source": { + "collection_lidvid": "a:b:c:matching_bundle:matching_collection::1.0", + "batch_id": 1, + "product_lidvid": [ + "a:b:c:matching_bundle:matching_collection:matching_collection_unique_product_1::1.0", + "a:b:c:matching_bundle:matching_collection:overlapping_product::1.0" + ] + } + }, + { + "_id": "a:b:c:matching_bundle:matching_collection::1.0::P2", + "_source": { + "collection_lidvid": "a:b:c:matching_bundle:matching_collection::1.0", + "batch_id": 2, + "product_lidvid": [ + "a:b:c:matching_bundle:matching_collection:matching_collection_unique_product_2::1.0", + "a:b:c:matching_bundle:matching_collection:overlapping_product::1.0" + ] + } + } + ], + "get_nonaggregate_ancestry_records_for_collection_lid-a:b:c:nonmatching_bundle:nonmatching_collection": [ + { + "_id": "a:b:c:nonmatching_bundle:nonmatching_collection::1.0::P1", + "_source": { + "collection_lidvid": "a:b:c:nonmatching_bundle:nonmatching_collection::1.0", + "batch_id": 1, + "product_lidvid": [ + "a:b:c:nonmatching_bundle:nonmatching_collection:nonmatching_collection_unique_product_1::1.0", + "a:b:c:matching_bundle:matching_collection:overlapping_product::1.0" + ] + } + }, + { + "_id": "a:b:c:nonmatching_bundle:nonmatching_collection::1.0::P2", + "_source": { + "collection_lidvid": "a:b:c:nonmatching_bundle:nonmatching_collection::1.0", + "batch_id": 2, + "product_lidvid": [ + "a:b:c:nonmatching_bundle:nonmatching_collection:nonmatching_collection_unique_product_2::1.0", + "a:b:c:matching_bundle:matching_collection:overlapping_product::1.0" + ] + } + } + ], + "get_existing_ancestry_for_product-a:b:c:matching_bundle:matching_collection:overlapping_product::1.0": [ + { + "_source": { + "lidvid": "a:b:c:matching_bundle:matching_collection:overlapping_product::1.0", + "ops:Provenance/ops:parent_bundle_identifier": [ + "a:b:c:matching_bundle::1.0" + ], + "ops:Provenance/ops:parent_collection_identifier": [ + "a:b:c:matching_bundle:matching_collection::1.0" + ], + "ops:Provenance/ops:registry_sweepers_ancestry_version": 1 + } + } + ] +} diff --git a/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryFunctionalTestCase.json b/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryFunctionalTestCase.json index 6035e9f..a6684e6 100644 --- a/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryFunctionalTestCase.json +++ b/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryFunctionalTestCase.json @@ -55,10 +55,12 @@ } } ], - "get_nonaggregate_ancestry_records": [ + "get_nonaggregate_ancestry_records_for_collection_lid-a:b:c:bundle:lidrefcollection": [ { + "_id": "a:b:c:bundle:lidrefcollection::1.0::P1", "_source": { "collection_lidvid": "a:b:c:bundle:lidrefcollection::1.0", + "batch_id": 1, "product_lidvid": [ "a:b:c:bundle:lidrefcollection:collectionuniqueproduct::1.0", "a:b:c:bundle:lidrefcollection:collectionsharedproduct::1.0" @@ -66,17 +68,23 @@ } }, { + "_id": "a:b:c:bundle:lidrefcollection::2.0::P1", "_source": { "collection_lidvid": "a:b:c:bundle:lidrefcollection::2.0", + "batch_id": 1, "product_lidvid": [ "a:b:c:bundle:lidrefcollection:collectionuniqueproduct::2.0", "a:b:c:bundle:lidrefcollection:collectionsharedproduct::1.0" ] } - }, + } + ], + "get_nonaggregate_ancestry_records_for_collection_lid-a:b:c:bundle:lidvidrefcollection": [ { + "_id": "a:b:c:bundle:lidvidrefcollection::1.0::P1", "_source": { "collection_lidvid": "a:b:c:bundle:lidvidrefcollection::1.0", + "batch_id": 1, "product_lidvid": [ "a:b:c:bundle:lidvidrefcollection:collectionuniqueproduct::1.0", "a:b:c:bundle:lidvidrefcollection:collectionsharedproduct::1.0" @@ -84,8 +92,10 @@ } }, { + "_id": "a:b:c:bundle:lidvidrefcollection::2.0::P1", "_source": { "collection_lidvid": "a:b:c:bundle:lidvidrefcollection::2.0", + "batch_id": 1, "product_lidvid": [ "a:b:c:bundle:lidvidrefcollection:collectionuniqueproduct::2.0", "a:b:c:bundle:lidvidrefcollection:collectionsharedproduct::1.0" diff --git a/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryMalformedDocsTestCase.json b/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryMalformedDocsTestCase.json index 018eacb..49a371b 100644 --- a/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryMalformedDocsTestCase.json +++ b/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryMalformedDocsTestCase.json @@ -63,19 +63,12 @@ } } ], - "get_nonaggregate_ancestry_records": [ + "get_nonaggregate_ancestry_records_for_collection_lid-a:b:c:bundle:lidrefcollection": [ { + "_id": "a:b:c:matching_bundle:matching_collection::1.0::P1", "_source": { "collection_lidvid": "a:b:c:bundle:lidrefcollection::1.0", - "product_lidvid": [ - "a:b:c:bundle:lidrefcollection:collectionuniqueproduct::1.0", - "a:b:c:bundle:lidrefcollection:collectionsharedproduct::1.0" - ] - } - }, - { - "_source": { - "_": "example of record with missing 'collection_lidvid' field", + "batch_id": 1, "product_lidvid": [ "a:b:c:bundle:lidrefcollection:collectionuniqueproduct::1.0", "a:b:c:bundle:lidrefcollection:collectionsharedproduct::1.0" diff --git a/tests/pds/registrysweepers/ancestry/test_ancestry.py b/tests/pds/registrysweepers/ancestry/test_ancestry.py index 84c7508..cb701cc 100644 --- a/tests/pds/registrysweepers/ancestry/test_ancestry.py +++ b/tests/pds/registrysweepers/ancestry/test_ancestry.py @@ -1,16 +1,24 @@ import itertools +import logging import os.path +import tempfile import unittest from typing import Dict from typing import List from typing import Tuple from pds.registrysweepers import ancestry -from pds.registrysweepers.ancestry import AncestryRecord -from pds.registrysweepers.ancestry import get_collection_ancestry_records -from pds.registrysweepers.ancestry import get_nonaggregate_ancestry_records -from pds.registrysweepers.ancestry import SWEEPERS_ANCESTRY_VERSION -from pds.registrysweepers.ancestry import SWEEPERS_ANCESTRY_VERSION_METADATA_KEY +from pds.registrysweepers.ancestry import generate_deferred_updates +from pds.registrysweepers.ancestry import generate_updates +from pds.registrysweepers.ancestry.ancestryrecord import AncestryRecord +from pds.registrysweepers.ancestry.constants import METADATA_PARENT_BUNDLE_KEY +from pds.registrysweepers.ancestry.constants import METADATA_PARENT_COLLECTION_KEY +from pds.registrysweepers.ancestry.generation import generate_nonaggregate_and_collection_records_iteratively +from pds.registrysweepers.ancestry.generation import get_collection_ancestry_records +from pds.registrysweepers.ancestry.generation import get_nonaggregate_ancestry_records +from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION +from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION_METADATA_KEY +from pds.registrysweepers.utils import configure_logging from pds.registrysweepers.utils.productidentifiers.pdslidvid import PdsLidVid from tests.mocks.registryquerymock import RegistryQueryMock @@ -81,6 +89,14 @@ def setUpClass(cls) -> None: cls.updates_by_lidvid_str = {id: content for id, content in cls.bulk_updates} + def test_correct_record_counts(self): + self.assertEqual(1, len(self.bundle_records)) + self.assertEqual(4, len(self.collection_records)) + self.assertEqual(6, len(self.nonaggregate_records)) + + def test_correct_update_counts(self): + self.assertEqual(11, len(self.updates_by_lidvid_str)) + def test_bundles_have_no_ancestry(self): for record in self.bundle_records: self.assertTrue(len(record.parent_bundle_lidvids) == 0) @@ -170,6 +186,10 @@ def test_ancestry_completes_without_fatal_error(self): str(r.lidvid): r for r in self.ancestry_records if r.lidvid.is_basic_product() } + self.assertEqual(1, len(self.bundle_records)) + self.assertEqual(1, len(self.collection_records)) + self.assertEqual(2, len(self.nonaggregate_records)) + self.updates_by_lidvid_str = {id: content for id, content in self.bulk_updates} @@ -252,5 +272,72 @@ def test_ancestor_reference_aggregation(self): self.assertIn(record, collection_ancestry_records, msg=f"Expected record is produced") +class AncestryDeferredPartialUpdatesTestCase(unittest.TestCase): + input_file_path = os.path.abspath( + "./tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryDeferredPartialUpdatesTestCase.json" + ) + registry_query_mock = RegistryQueryMock(input_file_path) + + def test_ancestor_partial_history_accumulation(self): + """ + TODO: document + """ + + configure_logging(filepath=None, log_level=logging.DEBUG) + + mb = PdsLidVid.from_string("a:b:c:matching_bundle::1.0") + nmb = PdsLidVid.from_string("a:b:c:nonmatching_bundle::1.0") + mc = PdsLidVid.from_string("a:b:c:matching_bundle:matching_collection::1.0") + nmc = PdsLidVid.from_string("a:b:c:nonmatching_bundle:nonmatching_collection::1.0") + + mup1 = PdsLidVid.from_string( + "a:b:c:matching_bundle:matching_collection:matching_collection_unique_product_1::1.0" + ) + mup2 = PdsLidVid.from_string( + "a:b:c:matching_bundle:matching_collection:matching_collection_unique_product_2::1.0" + ) + nmup1 = PdsLidVid.from_string( + "a:b:c:nonmatching_bundle:nonmatching_collection:nonmatching_collection_unique_product_1::1.0" + ) + nmup2 = PdsLidVid.from_string( + "a:b:c:nonmatching_bundle:nonmatching_collection:nonmatching_collection_unique_product_2::1.0" + ) + op = PdsLidVid.from_string("a:b:c:matching_bundle:matching_collection:overlapping_product::1.0") + + query_mock_f = self.registry_query_mock.get_mocked_query + collection_ancestry_records = [ + AncestryRecord(lidvid=mc, parent_bundle_lidvids={mb}, parent_collection_lidvids=set()), + AncestryRecord(lidvid=nmc, parent_bundle_lidvids={nmb}, parent_collection_lidvids=set()), + ] + + collection_and_nonaggregate_records = list( + generate_nonaggregate_and_collection_records_iteratively(None, collection_ancestry_records, query_mock_f) + ) + + deferred_records_file = tempfile.NamedTemporaryFile(mode="w+", delete=False) + non_deferred_updates = list( + generate_updates(collection_and_nonaggregate_records, deferred_records_file.name, None, None) + ) + deferred_updates = list(generate_deferred_updates(None, deferred_records_file.name, query_mock_f)) + updates = non_deferred_updates + deferred_updates + os.remove(deferred_records_file.name) + + # TODO: increase to two nonmatching collections and two shared products + + incomplete_opu1 = next( + u for u in updates if u.id == str(op) and len(u.content[METADATA_PARENT_COLLECTION_KEY]) == 1 + ) + self.assertIn(str(mb), incomplete_opu1.content[METADATA_PARENT_BUNDLE_KEY]) + self.assertNotIn(str(nmb), incomplete_opu1.content[METADATA_PARENT_BUNDLE_KEY]) + self.assertIn(str(mc), incomplete_opu1.content[METADATA_PARENT_COLLECTION_KEY]) + self.assertNotIn(str(nmc), incomplete_opu1.content[METADATA_PARENT_COLLECTION_KEY]) + + opu1 = next(u for u in updates if u.id == str(op) and len(u.content[METADATA_PARENT_COLLECTION_KEY]) > 1) + self.assertIn(str(mb), opu1.content[METADATA_PARENT_BUNDLE_KEY]) + self.assertIn(str(nmb), opu1.content[METADATA_PARENT_BUNDLE_KEY]) + self.assertIn(str(mc), opu1.content[METADATA_PARENT_COLLECTION_KEY]) + self.assertIn(str(nmc), opu1.content[METADATA_PARENT_COLLECTION_KEY]) + + if __name__ == "__main__": unittest.main()