From fdfea21fb5a0cb86879462d456b10cfcb011da90 Mon Sep 17 00:00:00 2001 From: edunn Date: Wed, 10 Apr 2024 15:58:23 -0700 Subject: [PATCH 01/17] minor additions --- src/pds/registrysweepers/ancestry/constants.py | 2 ++ src/pds/registrysweepers/ancestry/generation.py | 5 ++++- src/pds/registrysweepers/ancestry/typedefs.py | 4 ++++ 3 files changed, 10 insertions(+), 1 deletion(-) create mode 100644 src/pds/registrysweepers/ancestry/constants.py diff --git a/src/pds/registrysweepers/ancestry/constants.py b/src/pds/registrysweepers/ancestry/constants.py new file mode 100644 index 0000000..1fd0cb2 --- /dev/null +++ b/src/pds/registrysweepers/ancestry/constants.py @@ -0,0 +1,2 @@ +METADATA_PARENT_BUNDLE_KEY = "ops:Provenance/ops:parent_bundle_identifier" +METADATA_PARENT_COLLECTION_KEY = "ops:Provenance/ops:parent_collection_identifier" diff --git a/src/pds/registrysweepers/ancestry/generation.py b/src/pds/registrysweepers/ancestry/generation.py index bebed6e..8293ef2 100644 --- a/src/pds/registrysweepers/ancestry/generation.py +++ b/src/pds/registrysweepers/ancestry/generation.py @@ -17,13 +17,13 @@ import psutil # type: ignore from opensearchpy import OpenSearch from pds.registrysweepers.ancestry.ancestryrecord import AncestryRecord -from pds.registrysweepers.ancestry.queries import DbMockTypeDef from pds.registrysweepers.ancestry.queries import get_bundle_ancestry_records_query from pds.registrysweepers.ancestry.queries import get_collection_ancestry_records_bundles_query from pds.registrysweepers.ancestry.queries import get_collection_ancestry_records_collections_query from pds.registrysweepers.ancestry.queries import get_nonaggregate_ancestry_records_for_collection_lid_query from pds.registrysweepers.ancestry.queries import get_nonaggregate_ancestry_records_query from pds.registrysweepers.ancestry.runtimeconstants import AncestryRuntimeConstants +from pds.registrysweepers.ancestry.typedefs import DbMockTypeDef from pds.registrysweepers.ancestry.utils import dump_history_to_disk from pds.registrysweepers.ancestry.utils import gb_mem_to_size from pds.registrysweepers.ancestry.utils import load_partial_history_to_records @@ -262,6 +262,9 @@ def get_nonaggregate_ancestry_records_for_collection_lid( collection_lidvid = PdsLidVid.from_string(doc["_source"]["collection_lidvid"]) nonaggregate_lidvids = [PdsLidVid.from_string(s) for s in doc["_source"]["product_lidvid"]] + except IndexError as err: + doc_id = doc["_id"] + log.warning(f'Encountered document with unexpected _id: "{doc_id}"') except (ValueError, KeyError) as err: log.warning( 'Failed to parse collection and/or product LIDVIDs from document in index "%s" with id "%s" due to %s: %s', diff --git a/src/pds/registrysweepers/ancestry/typedefs.py b/src/pds/registrysweepers/ancestry/typedefs.py index 7e1e9c6..57cd54d 100644 --- a/src/pds/registrysweepers/ancestry/typedefs.py +++ b/src/pds/registrysweepers/ancestry/typedefs.py @@ -1,5 +1,9 @@ +from typing import Callable from typing import Dict +from typing import Iterable from typing import List +from typing import Optional from typing import Union SerializableAncestryRecordTypeDef = Dict[str, Union[str, List[str]]] +DbMockTypeDef = Optional[Callable[[str], Iterable[Dict]]] From 1a483a7c7d45beb5081165def3461c7ed2d468f8 Mon Sep 17 00:00:00 2001 From: edunn Date: Wed, 10 Apr 2024 15:58:49 -0700 Subject: [PATCH 02/17] implement ancestry.utils.update.from_record() --- src/pds/registrysweepers/ancestry/utils.py | 17 ++++++++++++++++- src/pds/registrysweepers/utils/db/update.py | 2 ++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/src/pds/registrysweepers/ancestry/utils.py b/src/pds/registrysweepers/ancestry/utils.py index 5cd4c7b..212c99a 100644 --- a/src/pds/registrysweepers/ancestry/utils.py +++ b/src/pds/registrysweepers/ancestry/utils.py @@ -11,8 +11,13 @@ from typing import Set from typing import Union -from pds.registrysweepers.ancestry import AncestryRecord +from pds.registrysweepers.ancestry.ancestryrecord import AncestryRecord +from pds.registrysweepers.ancestry.constants import METADATA_PARENT_BUNDLE_KEY +from pds.registrysweepers.ancestry.constants import METADATA_PARENT_COLLECTION_KEY from pds.registrysweepers.ancestry.typedefs import SerializableAncestryRecordTypeDef +from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION +from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION_METADATA_KEY +from pds.registrysweepers.utils.db import Update log = logging.getLogger(__name__) @@ -138,3 +143,13 @@ def load_partial_history_to_records(fn: str) -> Iterable[AncestryRecord]: def gb_mem_to_size(desired_mem_usage_gb) -> int: # rough estimated ratio of memory size to sys.getsizeof() report return desired_mem_usage_gb / 3.1 * 2621536 + + +def update_from_record(record: AncestryRecord) -> Update: + doc_id = str(record.lidvid) + content = { + METADATA_PARENT_BUNDLE_KEY: [str(id) for id in record.parent_bundle_lidvids], + METADATA_PARENT_COLLECTION_KEY: [str(id) for id in record.parent_collection_lidvids], + SWEEPERS_ANCESTRY_VERSION_METADATA_KEY: int(SWEEPERS_ANCESTRY_VERSION), + } + return Update(id=doc_id, content=content) diff --git a/src/pds/registrysweepers/utils/db/update.py b/src/pds/registrysweepers/utils/db/update.py index 2e149dd..1f65222 100644 --- a/src/pds/registrysweepers/utils/db/update.py +++ b/src/pds/registrysweepers/utils/db/update.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from dataclasses import dataclass from typing import Dict from typing import Union From 31d974be941bcc11b083f2ba67625240ff787256 Mon Sep 17 00:00:00 2001 From: edunn Date: Wed, 10 Apr 2024 15:59:58 -0700 Subject: [PATCH 03/17] implement ancestry partial-record deferral/handling --- src/pds/registrysweepers/ancestry/__init__.py | 114 +++++++++++++----- src/pds/registrysweepers/ancestry/queries.py | 43 ++++++- ...ncestryDeferredPartialUpdatesTestCase.json | 66 ++++++++++ .../ancestry/test_ancestry.py | 77 +++++++++++- 4 files changed, 259 insertions(+), 41 deletions(-) create mode 100644 tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryDeferredPartialUpdatesTestCase.json diff --git a/src/pds/registrysweepers/ancestry/__init__.py b/src/pds/registrysweepers/ancestry/__init__.py index a2f07f3..337dc60 100644 --- a/src/pds/registrysweepers/ancestry/__init__.py +++ b/src/pds/registrysweepers/ancestry/__init__.py @@ -1,4 +1,7 @@ +import json import logging +import os +import tempfile from itertools import chain from typing import Callable from typing import Dict @@ -11,12 +14,17 @@ from opensearchpy import OpenSearch from pds.registrysweepers.ancestry.ancestryrecord import AncestryRecord +from pds.registrysweepers.ancestry.constants import METADATA_PARENT_BUNDLE_KEY +from pds.registrysweepers.ancestry.constants import METADATA_PARENT_COLLECTION_KEY from pds.registrysweepers.ancestry.generation import generate_nonaggregate_and_collection_records_iteratively from pds.registrysweepers.ancestry.generation import get_bundle_ancestry_records from pds.registrysweepers.ancestry.generation import get_collection_ancestry_records from pds.registrysweepers.ancestry.generation import get_nonaggregate_ancestry_records +from pds.registrysweepers.ancestry.queries import get_existing_ancestry_for_product from pds.registrysweepers.ancestry.queries import get_orphaned_documents from pds.registrysweepers.ancestry.queries import get_orphaned_documents_count +from pds.registrysweepers.ancestry.typedefs import DbMockTypeDef +from pds.registrysweepers.ancestry.utils import update_from_record from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION_METADATA_KEY from pds.registrysweepers.utils import configure_logging @@ -25,12 +33,10 @@ from pds.registrysweepers.utils.db.client import get_opensearch_client from pds.registrysweepers.utils.db.indexing import ensure_index_mapping from pds.registrysweepers.utils.db.update import Update +from pds.registrysweepers.utils.productidentifiers.pdslidvid import PdsLidVid log = logging.getLogger(__name__) -METADATA_PARENT_BUNDLE_KEY = "ops:Provenance/ops:parent_bundle_identifier" -METADATA_PARENT_COLLECTION_KEY = "ops:Provenance/ops:parent_collection_identifier" - def run( client: OpenSearch, @@ -51,7 +57,9 @@ def run( ancestry_records = chain(collection_and_nonaggregate_records, bundle_records) ancestry_records_to_write = filter(lambda r: not r.skip_write, ancestry_records) - updates = generate_updates(ancestry_records_to_write, ancestry_records_accumulator, bulk_updates_sink) + updates = generate_updates( + client, ancestry_records_to_write, ancestry_records_accumulator, bulk_updates_sink, registry_mock_query_f + ) if bulk_updates_sink is None: log.info("Ensuring metadata keys are present in database index...") @@ -108,39 +116,83 @@ def orphan_counter_mock(_, __): def generate_updates( - ancestry_records: Iterable[AncestryRecord], ancestry_records_accumulator=None, bulk_updates_sink=None + client: OpenSearch, + ancestry_records: Iterable[AncestryRecord], + ancestry_records_accumulator=None, + bulk_updates_sink=None, + registry_db_mock: DbMockTypeDef = None, ) -> Iterable[Update]: - updates: Set[str] = set() + """ + Given a collection of AncestryRecords, yield corresponding Update objects. + + Ideally, there should be one record per product, but this is not necessarily the case due to the potential of + nonaggregate products to be shared between collections with different LIDs. In that case, it is necessary to: + - defer processing of all records which conflict with a previously-processed record + - retrieve the conflicting records which have been written to db, since the streaming collection-iterative + approach prevents efficiently detecting conflicts until the first partial history is already processed/written. + - merge all deferred/retrieved partial histories into a full history for each distinct product lidvid + - yield those full-history updates, which will overwrite the partial histories initially written to db + """ + updated_doc_ids: Set[str] = set() log.info("Generating document bulk updates for AncestryRecords...") - for record in ancestry_records: - # Tee the stream of records into the accumulator, if one was provided (functional testing). - if ancestry_records_accumulator is not None: - ancestry_records_accumulator.append(record) - - if record.lidvid.is_collection() and len(record.parent_bundle_lidvids) == 0: - log.warning(f"Collection {record.lidvid} is not referenced by any bundle.") - - doc_id = str(record.lidvid) - update_content = { - METADATA_PARENT_BUNDLE_KEY: [str(id) for id in record.parent_bundle_lidvids], - METADATA_PARENT_COLLECTION_KEY: [str(id) for id in record.parent_collection_lidvids], - SWEEPERS_ANCESTRY_VERSION_METADATA_KEY: int(SWEEPERS_ANCESTRY_VERSION), - } - - # Tee the stream of bulk update KVs into the accumulator, if one was provided (functional testing). - if bulk_updates_sink is not None: - bulk_updates_sink.append((doc_id, update_content)) - - if doc_id in updates: - log.error( - f"Multiple updates detected for doc_id {doc_id} - cannot create update! (new content {update_content} will not be written)" + # stream/yield Updates for AncestryRecords, deferring processing of conflicting AncestryRecords and storing them in + # a temporary file + with tempfile.NamedTemporaryFile(mode="w+", delete=False) as deferred_records_file: + for record in ancestry_records: + # Tee the stream of records into the accumulator, if one was provided (functional testing). + if ancestry_records_accumulator is not None: + ancestry_records_accumulator.append(record) + + if record.lidvid.is_collection() and len(record.parent_bundle_lidvids) == 0: + log.warning(f"Collection {record.lidvid} is not referenced by any bundle.") + + update = update_from_record(record) + + # Tee the stream of bulk update KVs into the accumulator, if one was provided (functional testing). + if bulk_updates_sink is not None: + bulk_updates_sink.append((update.id, update.content)) + + if update.id in updated_doc_ids: + log.error( + f"Multiple updates detected for doc_id {update.id} - deferring subsequent parts" + " - storing in {deferred_updates_file.name}" + ) + deferred_records_file.write(json.dumps(record.to_dict(sort_lists=False)) + "\n") + deferred_records_file.flush() + continue + + updated_doc_ids.add(update.id) + yield update + + # Merge all deferred records with matching lidvids + with open(deferred_records_file.name, "r") as deferred_records_file: # type: ignore + deferred_records_by_lidvid: Dict[PdsLidVid, AncestryRecord] = {} + for l in deferred_records_file.readlines(): + record = AncestryRecord.from_dict(json.loads(l)) + if record.lidvid in deferred_records_by_lidvid: + deferred_records_by_lidvid[record.lidvid].update_with(record) + else: + deferred_records_by_lidvid.update({record.lidvid: record}) + + # Retrieve the first partial history (already written to db) for each lidvid, merge with its deferred history, + # then yield a full-history-update for that lidvid + for record in deferred_records_by_lidvid.values(): + doc = get_existing_ancestry_for_product(client, record.lidvid, registry_db_mock) + partial_record_from_db = AncestryRecord.from_dict( + { + "lidvid": doc["lidvid"], + "parent_bundle_lidvids": doc[METADATA_PARENT_BUNDLE_KEY], + "parent_collection_lidvids": doc[METADATA_PARENT_COLLECTION_KEY], + } ) - continue + record.update_with(partial_record_from_db) + update = update_from_record(record) + yield update + # TODO: Check that ancestry version is equal to current, throw if not. - updates.add(doc_id) - yield Update(id=doc_id, content=update_content) + os.remove(deferred_records_file.name) if __name__ == "__main__": diff --git a/src/pds/registrysweepers/ancestry/queries.py b/src/pds/registrysweepers/ancestry/queries.py index 4bbff06..a176bc0 100644 --- a/src/pds/registrysweepers/ancestry/queries.py +++ b/src/pds/registrysweepers/ancestry/queries.py @@ -1,23 +1,23 @@ import logging from enum import auto from enum import Enum -from typing import Callable from typing import Dict from typing import Iterable -from typing import Optional from opensearchpy import OpenSearch +from pds.registrysweepers.ancestry.constants import METADATA_PARENT_BUNDLE_KEY +from pds.registrysweepers.ancestry.constants import METADATA_PARENT_COLLECTION_KEY from pds.registrysweepers.ancestry.runtimeconstants import AncestryRuntimeConstants +from pds.registrysweepers.ancestry.typedefs import DbMockTypeDef from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION_METADATA_KEY from pds.registrysweepers.utils.db import get_query_hits_count from pds.registrysweepers.utils.db import query_registry_db_or_mock from pds.registrysweepers.utils.productidentifiers.pdslid import PdsLid +from pds.registrysweepers.utils.productidentifiers.pdslidvid import PdsLidVid log = logging.getLogger(__name__) -DbMockTypeDef = Optional[Callable[[str], Iterable[Dict]]] - class ProductClass(Enum): BUNDLE = (auto(),) @@ -108,7 +108,11 @@ def get_nonaggregate_ancestry_records_for_collection_lid_query( "seq_no_primary_term": True, } _source = {"includes": ["collection_lidvid", "batch_id", "product_lidvid"]} - query_f = query_registry_db_or_mock(registry_db_mock, "get_nonaggregate_ancestry_records", use_search_after=True) + query_f = query_registry_db_or_mock( + registry_db_mock, + f"get_nonaggregate_ancestry_records_for_collection_lid-{collection_lid}", + use_search_after=True, + ) # each document will have many product lidvids, so a smaller page size is warranted here docs = query_f( @@ -150,3 +154,32 @@ def get_orphaned_documents_count(client: OpenSearch, index_name: str) -> int: # Query an index documents without an up-to-date ancestry version reference - this would indicate a product which is # orphaned and is getting missed in processing return get_query_hits_count(client, index_name, _orphaned_docs_query) + + +def get_existing_ancestry_for_product( + client: OpenSearch, product_lidvid: PdsLidVid, registry_db_mock: DbMockTypeDef +) -> Dict: + # Retrieve ancestry for a single document. It would be simpler to just pull it by id, but this is compatible with + # the existing functional testing framework. + query: Dict = { + "query": { + "bool": { + "filter": [ + {"term": {"lidvid": str(product_lidvid)}}, + ], + } + }, + } + _source = {"includes": ["lidvid", METADATA_PARENT_BUNDLE_KEY, METADATA_PARENT_COLLECTION_KEY]} + query_f = query_registry_db_or_mock( + registry_db_mock, f"get_existing_ancestry_for_product-{product_lidvid}", use_search_after=False + ) + + docs = query_f( + client, + "registry", + query, + _source, + ) + + return list(docs)[0] diff --git a/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryDeferredPartialUpdatesTestCase.json b/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryDeferredPartialUpdatesTestCase.json new file mode 100644 index 0000000..bfbb387 --- /dev/null +++ b/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryDeferredPartialUpdatesTestCase.json @@ -0,0 +1,66 @@ +{ + "--note": [ + "Example history for two collections with different LIDVIDs, each having two unique non-agg products and", + "another which exists in both collections.", + "This tests correct accumulation of history when a non-aggregate product derives its history from collections", + "which are processed in different chunks (as records are chunked by collection LID)" + ], + "get_nonaggregate_ancestry_records_for_collection_lid-a:b:c:matching_bundle:matching_collection": [ + { + "_id": "a:b:c:matching_bundle:matching_collection::1.0::P1", + "_source": { + "collection_lidvid": "a:b:c:matching_bundle:matching_collection::1.0", + "batch_id": 1, + "product_lidvid": [ + "a:b:c:matching_bundle:matching_collection:matching_collection_unique_product_1::1.0", + "a:b:c:matching_bundle:matching_collection:overlapping_product::1.0" + ] + } + }, + { + "_id": "a:b:c:matching_bundle:matching_collection::1.0::P2", + "_source": { + "collection_lidvid": "a:b:c:matching_bundle:matching_collection::1.0", + "batch_id": 2, + "product_lidvid": [ + "a:b:c:matching_bundle:matching_collection:matching_collection_unique_product_2::1.0", + "a:b:c:matching_bundle:matching_collection:overlapping_product::1.0" + ] + } + } + ], + "get_nonaggregate_ancestry_records_for_collection_lid-a:b:c:nonmatching_bundle:nonmatching_collection": [ + { + "_id": "a:b:c:nonmatching_bundle:nonmatching_collection::1.0::P1", + "_source": { + "collection_lidvid": "a:b:c:nonmatching_bundle:nonmatching_collection::1.0", + "batch_id": 1, + "product_lidvid": [ + "a:b:c:nonmatching_bundle:nonmatching_collection:nonmatching_collection_unique_product_1::1.0", + "a:b:c:matching_bundle:matching_collection:overlapping_product::1.0" + ] + } + }, + { + "_id": "a:b:c:nonmatching_bundle:nonmatching_collection::1.0::P2", + "_source": { + "collection_lidvid": "a:b:c:nonmatching_bundle:nonmatching_collection::1.0", + "batch_id": 2, + "product_lidvid": [ + "a:b:c:nonmatching_bundle:nonmatching_collection:nonmatching_collection_unique_product_2::1.0", + "a:b:c:matching_bundle:matching_collection:overlapping_product::1.0" + ] + } + } + ], + "get_existing_ancestry_for_product-a:b:c:matching_bundle:matching_collection:overlapping_product::1.0": [{ + "lidvid": "a:b:c:matching_bundle:matching_collection:overlapping_product::1.0", + "ops:Provenance/ops:parent_bundle_identifier": [ + "a:b:c:matching_bundle::1.0" + ], + "ops:Provenance/ops:parent_collection_identifier": [ + "a:b:c:matching_bundle:matching_collection::1.0" + ], + "ops:Provenance/ops:registry_sweepers_ancestry_version": 1 + }] +} diff --git a/tests/pds/registrysweepers/ancestry/test_ancestry.py b/tests/pds/registrysweepers/ancestry/test_ancestry.py index 84c7508..34c9017 100644 --- a/tests/pds/registrysweepers/ancestry/test_ancestry.py +++ b/tests/pds/registrysweepers/ancestry/test_ancestry.py @@ -1,4 +1,5 @@ import itertools +import logging import os.path import unittest from typing import Dict @@ -6,11 +7,16 @@ from typing import Tuple from pds.registrysweepers import ancestry -from pds.registrysweepers.ancestry import AncestryRecord -from pds.registrysweepers.ancestry import get_collection_ancestry_records -from pds.registrysweepers.ancestry import get_nonaggregate_ancestry_records -from pds.registrysweepers.ancestry import SWEEPERS_ANCESTRY_VERSION -from pds.registrysweepers.ancestry import SWEEPERS_ANCESTRY_VERSION_METADATA_KEY +from pds.registrysweepers.ancestry import generate_updates +from pds.registrysweepers.ancestry.ancestryrecord import AncestryRecord +from pds.registrysweepers.ancestry.constants import METADATA_PARENT_BUNDLE_KEY +from pds.registrysweepers.ancestry.constants import METADATA_PARENT_COLLECTION_KEY +from pds.registrysweepers.ancestry.generation import generate_nonaggregate_and_collection_records_iteratively +from pds.registrysweepers.ancestry.generation import get_collection_ancestry_records +from pds.registrysweepers.ancestry.generation import get_nonaggregate_ancestry_records +from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION +from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION_METADATA_KEY +from pds.registrysweepers.utils import configure_logging from pds.registrysweepers.utils.productidentifiers.pdslidvid import PdsLidVid from tests.mocks.registryquerymock import RegistryQueryMock @@ -252,5 +258,66 @@ def test_ancestor_reference_aggregation(self): self.assertIn(record, collection_ancestry_records, msg=f"Expected record is produced") +class AncestryDeferredPartialUpdatesTestCase(unittest.TestCase): + input_file_path = os.path.abspath( + "./tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryDeferredPartialUpdatesTestCase.json" + ) + registry_query_mock = RegistryQueryMock(input_file_path) + + def test_ancestor_partial_history_accumulation(self): + """ + TODO: document + """ + + configure_logging(filepath=None, log_level=logging.DEBUG) + + mb = PdsLidVid.from_string("a:b:c:matching_bundle::1.0") + nmb = PdsLidVid.from_string("a:b:c:nonmatching_bundle::1.0") + mc = PdsLidVid.from_string("a:b:c:matching_bundle:matching_collection::1.0") + nmc = PdsLidVid.from_string("a:b:c:nonmatching_bundle:nonmatching_collection::1.0") + + mup1 = PdsLidVid.from_string( + "a:b:c:matching_bundle:matching_collection:matching_collection_unique_product_1::1.0" + ) + mup2 = PdsLidVid.from_string( + "a:b:c:matching_bundle:matching_collection:matching_collection_unique_product_2::1.0" + ) + nmup1 = PdsLidVid.from_string( + "a:b:c:nonmatching_bundle:nonmatching_collection:nonmatching_collection_unique_product_1::1.0" + ) + nmup2 = PdsLidVid.from_string( + "a:b:c:nonmatching_bundle:nonmatching_collection:nonmatching_collection_unique_product_2::1.0" + ) + op = PdsLidVid.from_string("a:b:c:matching_bundle:matching_collection:overlapping_product::1.0") + + query_mock_f = self.registry_query_mock.get_mocked_query + collection_ancestry_records = [ + AncestryRecord(lidvid=mc, parent_bundle_lidvids={mb}, parent_collection_lidvids=set()), + AncestryRecord(lidvid=nmc, parent_bundle_lidvids={nmb}, parent_collection_lidvids=set()), + ] + + collection_and_nonaggregate_records = list( + generate_nonaggregate_and_collection_records_iteratively(None, collection_ancestry_records, query_mock_f) + ) + + updates = list(generate_updates(None, collection_and_nonaggregate_records, None, None, query_mock_f)) + + # TODO: increase to two nonmatching collections and two shared products + + incomplete_opu1 = next( + u for u in updates if u.id == str(op) and len(u.content[METADATA_PARENT_COLLECTION_KEY]) == 1 + ) + self.assertIn(str(mb), incomplete_opu1.content[METADATA_PARENT_BUNDLE_KEY]) + self.assertNotIn(str(nmb), incomplete_opu1.content[METADATA_PARENT_BUNDLE_KEY]) + self.assertIn(str(mc), incomplete_opu1.content[METADATA_PARENT_COLLECTION_KEY]) + self.assertNotIn(str(nmc), incomplete_opu1.content[METADATA_PARENT_COLLECTION_KEY]) + + opu1 = next(u for u in updates if u.id == str(op) and len(u.content[METADATA_PARENT_COLLECTION_KEY]) > 1) + self.assertIn(str(mb), opu1.content[METADATA_PARENT_BUNDLE_KEY]) + self.assertIn(str(nmb), opu1.content[METADATA_PARENT_BUNDLE_KEY]) + self.assertIn(str(mc), opu1.content[METADATA_PARENT_COLLECTION_KEY]) + self.assertIn(str(nmc), opu1.content[METADATA_PARENT_COLLECTION_KEY]) + + if __name__ == "__main__": unittest.main() From e57d6cc16cae81affd0f95926633f9f10a4c505b Mon Sep 17 00:00:00 2001 From: edunn Date: Wed, 10 Apr 2024 17:13:37 -0700 Subject: [PATCH 04/17] add query stubs for pre-existing ancestry tests --- .../test_ancestry_mock_AncestryFunctionalTestCase.json | 4 +++- .../test_ancestry_mock_AncestryMalformedDocsTestCase.json | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryFunctionalTestCase.json b/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryFunctionalTestCase.json index 6035e9f..f797be4 100644 --- a/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryFunctionalTestCase.json +++ b/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryFunctionalTestCase.json @@ -93,5 +93,7 @@ } } ], - "get_orphaned_ancestry_docs": [] + "get_orphaned_ancestry_docs": [], + "get_nonaggregate_ancestry_records_for_collection_lid-a:b:c:bundle:lidrefcollection": [], + "get_nonaggregate_ancestry_records_for_collection_lid-a:b:c:bundle:lidvidrefcollection": [] } diff --git a/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryMalformedDocsTestCase.json b/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryMalformedDocsTestCase.json index 018eacb..e57aeee 100644 --- a/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryMalformedDocsTestCase.json +++ b/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryMalformedDocsTestCase.json @@ -89,5 +89,6 @@ } } ], - "get_orphaned_ancestry_docs": [] + "get_orphaned_ancestry_docs": [], + "get_nonaggregate_ancestry_records_for_collection_lid-a:b:c:bundle:lidrefcollection": [] } From 3ae1db2f39800b157ecc7f79b7373f5eeba35efd Mon Sep 17 00:00:00 2001 From: edunn Date: Wed, 10 Apr 2024 17:35:40 -0700 Subject: [PATCH 05/17] add tests for record/update count previously, nil records/updates would not cause failure of some test cases --- tests/pds/registrysweepers/ancestry/test_ancestry.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/pds/registrysweepers/ancestry/test_ancestry.py b/tests/pds/registrysweepers/ancestry/test_ancestry.py index 34c9017..eb06491 100644 --- a/tests/pds/registrysweepers/ancestry/test_ancestry.py +++ b/tests/pds/registrysweepers/ancestry/test_ancestry.py @@ -87,6 +87,14 @@ def setUpClass(cls) -> None: cls.updates_by_lidvid_str = {id: content for id, content in cls.bulk_updates} + def test_correct_record_counts(self): + self.assertEqual(1, len(self.bundle_records)) + self.assertEqual(4, len(self.collection_records)) + self.assertEqual(6, len(self.nonaggregate_records)) + + def test_correct_update_counts(self): + self.assertEqual(11, len(self.updates_by_lidvid_str)) + def test_bundles_have_no_ancestry(self): for record in self.bundle_records: self.assertTrue(len(record.parent_bundle_lidvids) == 0) @@ -176,6 +184,10 @@ def test_ancestry_completes_without_fatal_error(self): str(r.lidvid): r for r in self.ancestry_records if r.lidvid.is_basic_product() } + self.assertEqual(1, len(self.bundle_records)) + self.assertEqual(1, len(self.collection_records)) + self.assertEqual(2, len(self.nonaggregate_records)) + self.updates_by_lidvid_str = {id: content for id, content in self.bulk_updates} From c10f26888dc4b1ec45137487c2359f9983bddb5a Mon Sep 17 00:00:00 2001 From: edunn Date: Thu, 11 Apr 2024 09:54:06 -0700 Subject: [PATCH 06/17] update existing ancestry tests for overhaul --- ...cestry_mock_AncestryFunctionalTestCase.json | 18 +++++++++++++----- ...try_mock_AncestryMalformedDocsTestCase.json | 16 ++++------------ 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryFunctionalTestCase.json b/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryFunctionalTestCase.json index f797be4..a6684e6 100644 --- a/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryFunctionalTestCase.json +++ b/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryFunctionalTestCase.json @@ -55,10 +55,12 @@ } } ], - "get_nonaggregate_ancestry_records": [ + "get_nonaggregate_ancestry_records_for_collection_lid-a:b:c:bundle:lidrefcollection": [ { + "_id": "a:b:c:bundle:lidrefcollection::1.0::P1", "_source": { "collection_lidvid": "a:b:c:bundle:lidrefcollection::1.0", + "batch_id": 1, "product_lidvid": [ "a:b:c:bundle:lidrefcollection:collectionuniqueproduct::1.0", "a:b:c:bundle:lidrefcollection:collectionsharedproduct::1.0" @@ -66,17 +68,23 @@ } }, { + "_id": "a:b:c:bundle:lidrefcollection::2.0::P1", "_source": { "collection_lidvid": "a:b:c:bundle:lidrefcollection::2.0", + "batch_id": 1, "product_lidvid": [ "a:b:c:bundle:lidrefcollection:collectionuniqueproduct::2.0", "a:b:c:bundle:lidrefcollection:collectionsharedproduct::1.0" ] } - }, + } + ], + "get_nonaggregate_ancestry_records_for_collection_lid-a:b:c:bundle:lidvidrefcollection": [ { + "_id": "a:b:c:bundle:lidvidrefcollection::1.0::P1", "_source": { "collection_lidvid": "a:b:c:bundle:lidvidrefcollection::1.0", + "batch_id": 1, "product_lidvid": [ "a:b:c:bundle:lidvidrefcollection:collectionuniqueproduct::1.0", "a:b:c:bundle:lidvidrefcollection:collectionsharedproduct::1.0" @@ -84,8 +92,10 @@ } }, { + "_id": "a:b:c:bundle:lidvidrefcollection::2.0::P1", "_source": { "collection_lidvid": "a:b:c:bundle:lidvidrefcollection::2.0", + "batch_id": 1, "product_lidvid": [ "a:b:c:bundle:lidvidrefcollection:collectionuniqueproduct::2.0", "a:b:c:bundle:lidvidrefcollection:collectionsharedproduct::1.0" @@ -93,7 +103,5 @@ } } ], - "get_orphaned_ancestry_docs": [], - "get_nonaggregate_ancestry_records_for_collection_lid-a:b:c:bundle:lidrefcollection": [], - "get_nonaggregate_ancestry_records_for_collection_lid-a:b:c:bundle:lidvidrefcollection": [] + "get_orphaned_ancestry_docs": [] } diff --git a/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryMalformedDocsTestCase.json b/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryMalformedDocsTestCase.json index e57aeee..49a371b 100644 --- a/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryMalformedDocsTestCase.json +++ b/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryMalformedDocsTestCase.json @@ -63,19 +63,12 @@ } } ], - "get_nonaggregate_ancestry_records": [ + "get_nonaggregate_ancestry_records_for_collection_lid-a:b:c:bundle:lidrefcollection": [ { + "_id": "a:b:c:matching_bundle:matching_collection::1.0::P1", "_source": { "collection_lidvid": "a:b:c:bundle:lidrefcollection::1.0", - "product_lidvid": [ - "a:b:c:bundle:lidrefcollection:collectionuniqueproduct::1.0", - "a:b:c:bundle:lidrefcollection:collectionsharedproduct::1.0" - ] - } - }, - { - "_source": { - "_": "example of record with missing 'collection_lidvid' field", + "batch_id": 1, "product_lidvid": [ "a:b:c:bundle:lidrefcollection:collectionuniqueproduct::1.0", "a:b:c:bundle:lidrefcollection:collectionsharedproduct::1.0" @@ -89,6 +82,5 @@ } } ], - "get_orphaned_ancestry_docs": [], - "get_nonaggregate_ancestry_records_for_collection_lid-a:b:c:bundle:lidrefcollection": [] + "get_orphaned_ancestry_docs": [] } From 482146ade666f18e13a06bed13d656a05dd8fbab Mon Sep 17 00:00:00 2001 From: edunn Date: Thu, 11 Apr 2024 12:11:28 -0700 Subject: [PATCH 07/17] generate deferred updates only after non-deferred updates have been written to db --- src/pds/registrysweepers/ancestry/__init__.py | 76 +++++++++++-------- .../ancestry/test_ancestry.py | 10 ++- 2 files changed, 54 insertions(+), 32 deletions(-) diff --git a/src/pds/registrysweepers/ancestry/__init__.py b/src/pds/registrysweepers/ancestry/__init__.py index 337dc60..ef98299 100644 --- a/src/pds/registrysweepers/ancestry/__init__.py +++ b/src/pds/registrysweepers/ancestry/__init__.py @@ -9,6 +9,7 @@ from typing import List from typing import Optional from typing import Set +from typing import TextIO from typing import Tuple from typing import Union @@ -57,8 +58,9 @@ def run( ancestry_records = chain(collection_and_nonaggregate_records, bundle_records) ancestry_records_to_write = filter(lambda r: not r.skip_write, ancestry_records) + deferred_records_file = tempfile.NamedTemporaryFile(mode="w+", delete=False) updates = generate_updates( - client, ancestry_records_to_write, ancestry_records_accumulator, bulk_updates_sink, registry_mock_query_f + ancestry_records_to_write, deferred_records_file.name, ancestry_records_accumulator, bulk_updates_sink ) if bulk_updates_sink is None: @@ -81,6 +83,15 @@ def run( updates, index_name="registry", ) + log.info("Generating updates from deferred records...") + deferred_updates = generate_deferred_updates(client, deferred_records_file.name, registry_mock_query_f) + + log.info("Writing deferred updates to database...") + write_updated_docs( + client, + deferred_updates, + index_name="registry", + ) else: # consume generator to dump bulk updates to sink for _ in updates: @@ -116,18 +127,19 @@ def orphan_counter_mock(_, __): def generate_updates( - client: OpenSearch, ancestry_records: Iterable[AncestryRecord], + deferred_records_filepath: str, ancestry_records_accumulator=None, bulk_updates_sink=None, - registry_db_mock: DbMockTypeDef = None, ) -> Iterable[Update]: """ - Given a collection of AncestryRecords, yield corresponding Update objects. + Given a collection of AncestryRecords, yield corresponding Update objects, excluding any deferred updates, which + must be generated seperately. Ideally, there should be one record per product, but this is not necessarily the case due to the potential of nonaggregate products to be shared between collections with different LIDs. In that case, it is necessary to: - defer processing of all records which conflict with a previously-processed record + - ensure all non-deferred records have been written to the db - retrieve the conflicting records which have been written to db, since the streaming collection-iterative approach prevents efficiently detecting conflicts until the first partial history is already processed/written. - merge all deferred/retrieved partial histories into a full history for each distinct product lidvid @@ -139,7 +151,7 @@ def generate_updates( # stream/yield Updates for AncestryRecords, deferring processing of conflicting AncestryRecords and storing them in # a temporary file - with tempfile.NamedTemporaryFile(mode="w+", delete=False) as deferred_records_file: + with open(deferred_records_filepath, mode="w+") as deferred_records_file: for record in ancestry_records: # Tee the stream of records into the accumulator, if one was provided (functional testing). if ancestry_records_accumulator is not None: @@ -166,33 +178,35 @@ def generate_updates( updated_doc_ids.add(update.id) yield update - # Merge all deferred records with matching lidvids - with open(deferred_records_file.name, "r") as deferred_records_file: # type: ignore - deferred_records_by_lidvid: Dict[PdsLidVid, AncestryRecord] = {} - for l in deferred_records_file.readlines(): - record = AncestryRecord.from_dict(json.loads(l)) - if record.lidvid in deferred_records_by_lidvid: - deferred_records_by_lidvid[record.lidvid].update_with(record) - else: - deferred_records_by_lidvid.update({record.lidvid: record}) - - # Retrieve the first partial history (already written to db) for each lidvid, merge with its deferred history, - # then yield a full-history-update for that lidvid - for record in deferred_records_by_lidvid.values(): - doc = get_existing_ancestry_for_product(client, record.lidvid, registry_db_mock) - partial_record_from_db = AncestryRecord.from_dict( - { - "lidvid": doc["lidvid"], - "parent_bundle_lidvids": doc[METADATA_PARENT_BUNDLE_KEY], - "parent_collection_lidvids": doc[METADATA_PARENT_COLLECTION_KEY], - } - ) - record.update_with(partial_record_from_db) - update = update_from_record(record) - yield update - # TODO: Check that ancestry version is equal to current, throw if not. - os.remove(deferred_records_file.name) +def generate_deferred_updates( + client: OpenSearch, deferred_records_filepath: str, registry_db_mock: DbMockTypeDef = None +) -> Iterable[Update]: + # Merge all deferred records with matching lidvids + with open(deferred_records_filepath, "r") as deferred_records_file: # type: ignore + deferred_records_by_lidvid: Dict[PdsLidVid, AncestryRecord] = {} + for l in deferred_records_file.readlines(): + record = AncestryRecord.from_dict(json.loads(l)) + if record.lidvid in deferred_records_by_lidvid: + deferred_records_by_lidvid[record.lidvid].update_with(record) + else: + deferred_records_by_lidvid.update({record.lidvid: record}) + + # Retrieve the first partial history (already written to db) for each lidvid, merge with its deferred history, + # then yield a full-history-update for that lidvid + for record in deferred_records_by_lidvid.values(): + doc = get_existing_ancestry_for_product(client, record.lidvid, registry_db_mock) + partial_record_from_db = AncestryRecord.from_dict( + { + "lidvid": doc["lidvid"], + "parent_bundle_lidvids": doc[METADATA_PARENT_BUNDLE_KEY], + "parent_collection_lidvids": doc[METADATA_PARENT_COLLECTION_KEY], + } + ) + record.update_with(partial_record_from_db) + update = update_from_record(record) + yield update + # TODO: Check that ancestry version is equal to current, throw if not. if __name__ == "__main__": diff --git a/tests/pds/registrysweepers/ancestry/test_ancestry.py b/tests/pds/registrysweepers/ancestry/test_ancestry.py index eb06491..cb701cc 100644 --- a/tests/pds/registrysweepers/ancestry/test_ancestry.py +++ b/tests/pds/registrysweepers/ancestry/test_ancestry.py @@ -1,12 +1,14 @@ import itertools import logging import os.path +import tempfile import unittest from typing import Dict from typing import List from typing import Tuple from pds.registrysweepers import ancestry +from pds.registrysweepers.ancestry import generate_deferred_updates from pds.registrysweepers.ancestry import generate_updates from pds.registrysweepers.ancestry.ancestryrecord import AncestryRecord from pds.registrysweepers.ancestry.constants import METADATA_PARENT_BUNDLE_KEY @@ -312,7 +314,13 @@ def test_ancestor_partial_history_accumulation(self): generate_nonaggregate_and_collection_records_iteratively(None, collection_ancestry_records, query_mock_f) ) - updates = list(generate_updates(None, collection_and_nonaggregate_records, None, None, query_mock_f)) + deferred_records_file = tempfile.NamedTemporaryFile(mode="w+", delete=False) + non_deferred_updates = list( + generate_updates(collection_and_nonaggregate_records, deferred_records_file.name, None, None) + ) + deferred_updates = list(generate_deferred_updates(None, deferred_records_file.name, query_mock_f)) + updates = non_deferred_updates + deferred_updates + os.remove(deferred_records_file.name) # TODO: increase to two nonmatching collections and two shared products From bff5810c303aeecf0155e67172aaf9518d822fc3 Mon Sep 17 00:00:00 2001 From: edunn Date: Thu, 11 Apr 2024 14:06:03 -0700 Subject: [PATCH 08/17] update log messages --- src/pds/registrysweepers/utils/db/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pds/registrysweepers/utils/db/__init__.py b/src/pds/registrysweepers/utils/db/__init__.py index 27e106e..1c1fc22 100644 --- a/src/pds/registrysweepers/utils/db/__init__.py +++ b/src/pds/registrysweepers/utils/db/__init__.py @@ -238,7 +238,7 @@ def write_updated_docs( index_name: str, bulk_chunk_max_update_count: Union[int, None] = None, ): - log.info("Updating a lazily-generated collection of product documents...") + log.info("Writing document updates...") updated_doc_count = 0 bulk_buffer_max_size_mb = 30.0 @@ -275,7 +275,7 @@ def write_updated_docs( log.debug(f"Writing documents updates for {buffered_updates_count} remaining products to db...") _write_bulk_updates_chunk(client, index_name, bulk_updates_buffer) - log.info(f"Updated documents for {updated_doc_count} total products!") + log.info(f"Updated documents for {updated_doc_count} products!") def update_as_statements(update: Update) -> Iterable[str]: From 53606322693fd875b901db0e6820f6ce4328741d Mon Sep 17 00:00:00 2001 From: edunn Date: Thu, 11 Apr 2024 14:10:23 -0700 Subject: [PATCH 09/17] fix bug in json structure --- src/pds/registrysweepers/ancestry/__init__.py | 17 ++++++++++------- src/pds/registrysweepers/ancestry/queries.py | 11 +++++++++-- ..._AncestryDeferredPartialUpdatesTestCase.json | 8 ++++++-- 3 files changed, 25 insertions(+), 11 deletions(-) diff --git a/src/pds/registrysweepers/ancestry/__init__.py b/src/pds/registrysweepers/ancestry/__init__.py index ef98299..a7aa040 100644 --- a/src/pds/registrysweepers/ancestry/__init__.py +++ b/src/pds/registrysweepers/ancestry/__init__.py @@ -196,13 +196,16 @@ def generate_deferred_updates( # then yield a full-history-update for that lidvid for record in deferred_records_by_lidvid.values(): doc = get_existing_ancestry_for_product(client, record.lidvid, registry_db_mock) - partial_record_from_db = AncestryRecord.from_dict( - { - "lidvid": doc["lidvid"], - "parent_bundle_lidvids": doc[METADATA_PARENT_BUNDLE_KEY], - "parent_collection_lidvids": doc[METADATA_PARENT_COLLECTION_KEY], - } - ) + try: + partial_record_from_db = AncestryRecord.from_dict( + { + "lidvid": doc["_source"]["lidvid"], + "parent_bundle_lidvids": doc["_source"][METADATA_PARENT_BUNDLE_KEY], + "parent_collection_lidvids": doc["_source"][METADATA_PARENT_COLLECTION_KEY], + } + ) + except (KeyError, ValueError) as err: + log.error(f'Failed to parse valid AncestryRecord from document with id "{doc["_id"]}: {err}"') record.update_with(partial_record_from_db) update = update_from_record(record) yield update diff --git a/src/pds/registrysweepers/ancestry/queries.py b/src/pds/registrysweepers/ancestry/queries.py index a176bc0..f565fba 100644 --- a/src/pds/registrysweepers/ancestry/queries.py +++ b/src/pds/registrysweepers/ancestry/queries.py @@ -170,9 +170,16 @@ def get_existing_ancestry_for_product( } }, } - _source = {"includes": ["lidvid", METADATA_PARENT_BUNDLE_KEY, METADATA_PARENT_COLLECTION_KEY]} + _source = { + "includes": [ + "lidvid", + METADATA_PARENT_BUNDLE_KEY, + METADATA_PARENT_COLLECTION_KEY, + SWEEPERS_ANCESTRY_VERSION_METADATA_KEY, + ] + } query_f = query_registry_db_or_mock( - registry_db_mock, f"get_existing_ancestry_for_product-{product_lidvid}", use_search_after=False + registry_db_mock, f"get_existing_ancestry_for_product-{product_lidvid}", use_search_after=True ) docs = query_f( diff --git a/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryDeferredPartialUpdatesTestCase.json b/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryDeferredPartialUpdatesTestCase.json index bfbb387..b1e9389 100644 --- a/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryDeferredPartialUpdatesTestCase.json +++ b/tests/pds/registrysweepers/ancestry/resources/test_ancestry_mock_AncestryDeferredPartialUpdatesTestCase.json @@ -53,7 +53,9 @@ } } ], - "get_existing_ancestry_for_product-a:b:c:matching_bundle:matching_collection:overlapping_product::1.0": [{ + "get_existing_ancestry_for_product-a:b:c:matching_bundle:matching_collection:overlapping_product::1.0": [ + { + "_source": { "lidvid": "a:b:c:matching_bundle:matching_collection:overlapping_product::1.0", "ops:Provenance/ops:parent_bundle_identifier": [ "a:b:c:matching_bundle::1.0" @@ -62,5 +64,7 @@ "a:b:c:matching_bundle:matching_collection::1.0" ], "ops:Provenance/ops:registry_sweepers_ancestry_version": 1 - }] + } + } + ] } From e753e28727f973d3db41bc537f6965e58e706302 Mon Sep 17 00:00:00 2001 From: edunn Date: Thu, 11 Apr 2024 14:26:39 -0700 Subject: [PATCH 10/17] bump default request timeout bulk updates are often taking longer than 90 seconds --- src/pds/registrysweepers/utils/db/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pds/registrysweepers/utils/db/__init__.py b/src/pds/registrysweepers/utils/db/__init__.py index 1c1fc22..e8e3c5f 100644 --- a/src/pds/registrysweepers/utils/db/__init__.py +++ b/src/pds/registrysweepers/utils/db/__init__.py @@ -294,7 +294,7 @@ def update_as_statements(update: Update) -> Iterable[str]: def _write_bulk_updates_chunk(client: OpenSearch, index_name: str, bulk_updates: Iterable[str]): bulk_data = "\n".join(bulk_updates) + "\n" - request_timeout = 90 + request_timeout = 180 response_content = client.bulk(index=index_name, body=bulk_data, request_timeout=request_timeout) if response_content.get("errors"): From ab8bd2bed4e27aa2e45474646044117980e28357 Mon Sep 17 00:00:00 2001 From: edunn Date: Thu, 11 Apr 2024 16:54:35 -0700 Subject: [PATCH 11/17] fix assignment of bundle history to LID collection references aliases are not working, and have been removed --- src/pds/registrysweepers/ancestry/generation.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/pds/registrysweepers/ancestry/generation.py b/src/pds/registrysweepers/ancestry/generation.py index 8293ef2..f726787 100644 --- a/src/pds/registrysweepers/ancestry/generation.py +++ b/src/pds/registrysweepers/ancestry/generation.py @@ -167,9 +167,8 @@ def get_collection_ancestry_records( ) elif isinstance(identifier, PdsLid): try: - for alias in collection_aliases_by_lid[identifier]: - for record in ancestry_by_collection_lid[alias]: - record.parent_bundle_lidvids.add(bundle_lidvid) + for record in ancestry_by_collection_lid[identifier.lid]: + record.parent_bundle_lidvids.add(bundle_lidvid) except KeyError: log.warning( f"No versions of collection {identifier} referenced by bundle {bundle_lidvid} " From c81f2edf78a922a86669cb57d60f6d27e16b4148 Mon Sep 17 00:00:00 2001 From: edunn Date: Thu, 11 Apr 2024 16:58:51 -0700 Subject: [PATCH 12/17] excise behaviour related to aliases/alternate_ids, as this property is not reliable and should not be used --- src/pds/registrysweepers/ancestry/generation.py | 16 ---------------- src/pds/registrysweepers/ancestry/queries.py | 2 +- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/src/pds/registrysweepers/ancestry/generation.py b/src/pds/registrysweepers/ancestry/generation.py index f726787..6fc4c9d 100644 --- a/src/pds/registrysweepers/ancestry/generation.py +++ b/src/pds/registrysweepers/ancestry/generation.py @@ -90,19 +90,6 @@ def get_ancestry_by_collection_lidvid(collections_docs: Iterable[Dict]) -> Mappi return ancestry_by_collection_lidvid -def get_collection_aliases_by_lid(collections_docs: Iterable[Dict]) -> Dict[PdsLid, Set[PdsLid]]: - aliases_by_lid: Dict[PdsLid, Set[PdsLid]] = {} - for doc in collections_docs: - alternate_ids: List[str] = doc["_source"].get("alternate_ids", []) - lids: Set[PdsLid] = {PdsProductIdentifierFactory.from_string(id).lid for id in alternate_ids} - for lid in lids: - if lid not in aliases_by_lid: - aliases_by_lid[lid] = set() - aliases_by_lid[lid].update(lids) - - return aliases_by_lid - - def get_ancestry_by_collection_lid( ancestry_by_collection_lidvid: Mapping[PdsLidVid, AncestryRecord] ) -> Mapping[PdsLid, Set[AncestryRecord]]: @@ -124,9 +111,6 @@ def get_collection_ancestry_records( bundles_docs = get_collection_ancestry_records_bundles_query(client, registry_db_mock) collections_docs = list(get_collection_ancestry_records_collections_query(client, registry_db_mock)) - # Prepare LID alias sets for every LID - collection_aliases_by_lid: Dict[PdsLid, Set[PdsLid]] = get_collection_aliases_by_lid(collections_docs) - # Prepare empty ancestry records for collections, with fast access by LID or LIDVID ancestry_by_collection_lidvid: Mapping[PdsLidVid, AncestryRecord] = get_ancestry_by_collection_lidvid( collections_docs diff --git a/src/pds/registrysweepers/ancestry/queries.py b/src/pds/registrysweepers/ancestry/queries.py index f565fba..612fb03 100644 --- a/src/pds/registrysweepers/ancestry/queries.py +++ b/src/pds/registrysweepers/ancestry/queries.py @@ -60,7 +60,7 @@ def get_collection_ancestry_records_collections_query( ) -> Iterable[Dict]: # Query the registry for all collection identifiers query = product_class_query_factory(ProductClass.COLLECTION) - _source = {"includes": ["lidvid", "alternate_ids", SWEEPERS_ANCESTRY_VERSION_METADATA_KEY]} + _source = {"includes": ["lidvid", SWEEPERS_ANCESTRY_VERSION_METADATA_KEY]} query_f = query_registry_db_or_mock(db_mock, "get_collection_ancestry_records_collections", use_search_after=True) docs = query_f(client, "registry", query, _source) From dc65c8489e8f1f78ff7f00e83fd748d27df87b15 Mon Sep 17 00:00:00 2001 From: edunn Date: Thu, 11 Apr 2024 16:59:19 -0700 Subject: [PATCH 13/17] minor fix --- src/pds/registrysweepers/ancestry/__init__.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/pds/registrysweepers/ancestry/__init__.py b/src/pds/registrysweepers/ancestry/__init__.py index a7aa040..99eb810 100644 --- a/src/pds/registrysweepers/ancestry/__init__.py +++ b/src/pds/registrysweepers/ancestry/__init__.py @@ -204,11 +204,12 @@ def generate_deferred_updates( "parent_collection_lidvids": doc["_source"][METADATA_PARENT_COLLECTION_KEY], } ) + record.update_with(partial_record_from_db) + update = update_from_record(record) + yield update except (KeyError, ValueError) as err: log.error(f'Failed to parse valid AncestryRecord from document with id "{doc["_id"]}: {err}"') - record.update_with(partial_record_from_db) - update = update_from_record(record) - yield update + # TODO: Check that ancestry version is equal to current, throw if not. From 303b9dbe9d1b75b52b62f39a36906d51db9bb428 Mon Sep 17 00:00:00 2001 From: edunn Date: Thu, 11 Apr 2024 17:21:13 -0700 Subject: [PATCH 14/17] add type validation to AncestryRecord.lidvid --- src/pds/registrysweepers/ancestry/ancestryrecord.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/pds/registrysweepers/ancestry/ancestryrecord.py b/src/pds/registrysweepers/ancestry/ancestryrecord.py index 62bfcac..9f3c1b8 100644 --- a/src/pds/registrysweepers/ancestry/ancestryrecord.py +++ b/src/pds/registrysweepers/ancestry/ancestryrecord.py @@ -20,6 +20,10 @@ class AncestryRecord: # equivalent record is known to already exist due to up-to-date ancestry version flag in the source document skip_write: bool = False + def __post_init__(self): + if not isinstance(self.lidvid, PdsLidVid): + raise ValueError('Cannot initialise AncestryRecord with non-PdsLidVid value for "lidvid"') + def __repr__(self): return f"AncestryRecord(lidvid={self.lidvid}, parent_collection_lidvids={sorted([str(x) for x in self.parent_collection_lidvids])}, parent_bundle_lidvids={sorted([str(x) for x in self.parent_bundle_lidvids])})" From c11248218b841da07ee8f3589d5c8b9a8314843b Mon Sep 17 00:00:00 2001 From: edunn Date: Thu, 11 Apr 2024 19:35:49 -0700 Subject: [PATCH 15/17] demote log --- src/pds/registrysweepers/ancestry/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pds/registrysweepers/ancestry/__init__.py b/src/pds/registrysweepers/ancestry/__init__.py index 99eb810..7a72803 100644 --- a/src/pds/registrysweepers/ancestry/__init__.py +++ b/src/pds/registrysweepers/ancestry/__init__.py @@ -167,7 +167,7 @@ def generate_updates( bulk_updates_sink.append((update.id, update.content)) if update.id in updated_doc_ids: - log.error( + log.debug( f"Multiple updates detected for doc_id {update.id} - deferring subsequent parts" " - storing in {deferred_updates_file.name}" ) From 083b3feef6b5cc7d22c48ee0ab9d6d1d6f9b6402 Mon Sep 17 00:00:00 2001 From: edunn Date: Thu, 11 Apr 2024 21:19:37 -0700 Subject: [PATCH 16/17] handle case where registry-refs page erroneously includes collection in list of its own references --- src/pds/registrysweepers/ancestry/generation.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/pds/registrysweepers/ancestry/generation.py b/src/pds/registrysweepers/ancestry/generation.py index 6fc4c9d..f50951e 100644 --- a/src/pds/registrysweepers/ancestry/generation.py +++ b/src/pds/registrysweepers/ancestry/generation.py @@ -244,7 +244,15 @@ def get_nonaggregate_ancestry_records_for_collection_lid( continue collection_lidvid = PdsLidVid.from_string(doc["_source"]["collection_lidvid"]) - nonaggregate_lidvids = [PdsLidVid.from_string(s) for s in doc["_source"]["product_lidvid"]] + referenced_lidvids = [PdsLidVid.from_string(s) for s in doc["_source"]["product_lidvid"]] + nonaggregate_lidvids = [id for id in referenced_lidvids if id.is_basic_product()] + + erroneous_lidvids = [id for id in referenced_lidvids if not id.is_basic_product()] + if len(erroneous_lidvids) > 0: + log.error( + f'registry-refs document with id {doc["_id"]} references one or more aggregate products in its product_lidvid refs list: {[str(id) for id in erroneous_lidvids]}' + ) + except IndexError as err: doc_id = doc["_id"] log.warning(f'Encountered document with unexpected _id: "{doc_id}"') From a8a1b8654e53da1f1efd72d0132eef23844dd6fe Mon Sep 17 00:00:00 2001 From: edunn Date: Thu, 11 Apr 2024 21:22:30 -0700 Subject: [PATCH 17/17] increment SWEEPERS_ANCESTRY_VERSION --- src/pds/registrysweepers/ancestry/versioning.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pds/registrysweepers/ancestry/versioning.py b/src/pds/registrysweepers/ancestry/versioning.py index 9728b90..926d05c 100644 --- a/src/pds/registrysweepers/ancestry/versioning.py +++ b/src/pds/registrysweepers/ancestry/versioning.py @@ -3,5 +3,5 @@ # previously-processed data from pds.registrysweepers.utils.misc import get_sweeper_version_metadata_key -SWEEPERS_ANCESTRY_VERSION = 2 +SWEEPERS_ANCESTRY_VERSION = 3 SWEEPERS_ANCESTRY_VERSION_METADATA_KEY = get_sweeper_version_metadata_key("ancestry")