From 3f94d339edc53d38098b01117c291655bab90266 Mon Sep 17 00:00:00 2001 From: Alex Dunn Date: Fri, 26 Jan 2024 00:31:40 -0800 Subject: [PATCH] reimplement provenance sweeper with non-redundant processing record skipping is based on provenance software version this updated version writes null values for latest products, rather than not writing the metadata attribute --- .../registrysweepers/provenance/__init__.py | 121 ++++++++++++------ .../registrysweepers/provenance/constants.py | 1 + .../provenance/provenancerecord.py | 44 +++++++ .../registrysweepers/provenance/versioning.py | 7 + 4 files changed, 132 insertions(+), 41 deletions(-) create mode 100644 src/pds/registrysweepers/provenance/constants.py create mode 100644 src/pds/registrysweepers/provenance/provenancerecord.py create mode 100644 src/pds/registrysweepers/provenance/versioning.py diff --git a/src/pds/registrysweepers/provenance/__init__.py b/src/pds/registrysweepers/provenance/__init__.py index 42b9689..0030154 100644 --- a/src/pds/registrysweepers/provenance/__init__.py +++ b/src/pds/registrysweepers/provenance/__init__.py @@ -39,6 +39,8 @@ # It is important to note that the document is updated, not any dependent # index. # +import functools +import itertools import logging from typing import Dict from typing import Iterable @@ -47,17 +49,73 @@ from typing import Union from opensearchpy import OpenSearch -from pds.registrysweepers.utils import _vid_as_tuple_of_int +from pds.registrysweepers.provenance.constants import METADATA_SUCCESSOR_KEY +from pds.registrysweepers.provenance.provenancerecord import ProvenanceRecord +from pds.registrysweepers.provenance.versioning import SWEEPERS_PROVENANCE_VERSION +from pds.registrysweepers.provenance.versioning import SWEEPERS_PROVENANCE_VERSION_METADATA_KEY from pds.registrysweepers.utils import configure_logging from pds.registrysweepers.utils import parse_args -from pds.registrysweepers.utils.db import get_extant_lidvids +from pds.registrysweepers.utils.db import query_registry_db_with_search_after from pds.registrysweepers.utils.db import write_updated_docs from pds.registrysweepers.utils.db.client import get_opensearch_client from pds.registrysweepers.utils.db.update import Update +from pds.registrysweepers.utils.productidentifiers.pdslid import PdsLid log = logging.getLogger(__name__) -METADATA_SUCCESSOR_KEY = "ops:Provenance/ops:superseded_by" + +def get_records(client: OpenSearch) -> Iterable[ProvenanceRecord]: + log.info("Fetching docs and generating records...") + + query = { + "query": {"bool": {"must": [{"terms": {"ops:Tracking_Meta/ops:archive_status": ["archived", "certified"]}}]}} + } + _source = {"includes": ["lidvid", METADATA_SUCCESSOR_KEY]} + + docs = query_registry_db_with_search_after(client, "registry", query, _source) + + for doc in docs: + try: + yield ProvenanceRecord.from_doc(doc) + except ValueError as err: + log.warning( + f'Failed to parse ProvenanceRecord from doc with id {doc["_id"]} due to {err} - source was {doc["_source"]}' + ) + + +def create_record_chains( + records: Iterable[ProvenanceRecord], drop_singletons: bool = True +) -> Iterable[List[ProvenanceRecord]]: + """ + Create an iterable of unsorted collections of records which share LIDs. + If drop_singletons is True, single-element collections will be removed for efficiency as they have no links + """ + + # bin chains by LID + record_chains: Dict[PdsLid, List[ProvenanceRecord]] = {} + for record in records: + if record.lidvid.lid not in record_chains: + record_chains[record.lidvid.lid] = [] + record_chains[record.lidvid.lid].append(record) + + if drop_singletons: + for lid, record_chain in list(record_chains.items()): + if not len(record_chain) > 1: + record_chains.pop(lid) + + return record_chains.values() + + +def link_records_in_chain(record_chain: List[ProvenanceRecord]): + """ + Given a List of ProvenanceRecords sharing the same LID, sort the list and create all elements' successor links + """ + record_chain.sort(key=lambda record: record.lidvid) + + for i in range(len(record_chain) - 1): + record = record_chain[i] + successor_record = record_chain[i + 1] + record.set_successor(successor_record.lidvid) def run( @@ -69,11 +127,12 @@ def run( log.info("Starting provenance sweeper processing...") - # host = Host(password, base_url, username, verify_host_certs) + records = get_records(client) + record_chains = create_record_chains(records) + for record_chain in record_chains: + link_records_in_chain(record_chain) - extant_lidvids = get_extant_lidvids(client) - successors = get_successors_by_lidvid(extant_lidvids) - updates = generate_updates(successors) + updates = generate_updates(itertools.chain(*record_chains)) write_updated_docs( client, @@ -84,42 +143,22 @@ def run( log.info("Completed provenance sweeper processing!") -def get_successors_by_lidvid(extant_lidvids: Iterable[str]) -> Mapping[str, str]: - """ - Given a collection of LIDVIDs, return a new mapping to their updated direct successors. - """ - - log.info("Generating updated history...") - - extant_lidvids = list(extant_lidvids) # ensure against consumable iterator - - unique_lids = {lidvid.split("::")[0] for lidvid in extant_lidvids} - - log.info(" ...binning LIDVIDs by LID...") - lidvid_aggregates_by_lid: Dict[str, List[str]] = {lid: [] for lid in unique_lids} - for lidvid in extant_lidvids: - lid = lidvid.split("::")[0] - lidvid_aggregates_by_lid[lid].append(lidvid) - - log.info(" ...determining updated successors for LIDVIDs...") - successors_by_lidvid = {} - lidvid_aggregates_with_multiple_versions = filter(lambda l: 1 < len(l), lidvid_aggregates_by_lid.values()) - for lidvids in lidvid_aggregates_with_multiple_versions: - lidvids.sort(key=_vid_as_tuple_of_int, reverse=True) - - for successor_idx, lidvid in enumerate(lidvids[1:]): - # TODO: consider converting this dict accumulation to a tuple/dict generator (yield) for memory optimization - successors_by_lidvid[lidvid] = lidvids[successor_idx] - - log.info(f"Successors will be updated for {len(successors_by_lidvid)} LIDVIDs!") - - return successors_by_lidvid +def generate_updates(records: Iterable[ProvenanceRecord]) -> Iterable[Update]: + update_count = 0 + skipped_count = 0 + for record in records: + update_content = { + METADATA_SUCCESSOR_KEY: str(record.successor), + SWEEPERS_PROVENANCE_VERSION_METADATA_KEY: SWEEPERS_PROVENANCE_VERSION, + } + if record.skip_write: + skipped_count += 1 + else: + update_count += 1 + yield Update(id=str(record.lidvid), content=update_content) -def generate_updates(successors_by_id: Mapping[str, str]) -> Iterable[Update]: - for id, successor in successors_by_id.items(): - update_content = {METADATA_SUCCESSOR_KEY: successor} - yield Update(id=id, content=update_content) + log.info(f"Generated provenance updates for {update_count} products, skipping {skipped_count} up-to-date products") if __name__ == "__main__": diff --git a/src/pds/registrysweepers/provenance/constants.py b/src/pds/registrysweepers/provenance/constants.py new file mode 100644 index 0000000..8aa59a0 --- /dev/null +++ b/src/pds/registrysweepers/provenance/constants.py @@ -0,0 +1 @@ +METADATA_SUCCESSOR_KEY = "ops:Provenance/ops:superseded_by" diff --git a/src/pds/registrysweepers/provenance/provenancerecord.py b/src/pds/registrysweepers/provenance/provenancerecord.py new file mode 100644 index 0000000..eb7f51f --- /dev/null +++ b/src/pds/registrysweepers/provenance/provenancerecord.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +from typing import Dict +from typing import Optional + +from pds.registrysweepers.provenance.constants import METADATA_SUCCESSOR_KEY +from pds.registrysweepers.provenance.versioning import SWEEPERS_PROVENANCE_VERSION +from pds.registrysweepers.provenance.versioning import SWEEPERS_PROVENANCE_VERSION_METADATA_KEY +from pds.registrysweepers.utils.productidentifiers.pdslidvid import PdsLidVid + + +class ProvenanceRecord: + lidvid: PdsLidVid + _successor: Optional[PdsLidVid] + skip_write: bool + + def __init__(self, lidvid: PdsLidVid, successor: Optional[PdsLidVid], skip_write: bool = False): + self.lidvid = lidvid + self._successor = successor + self.skip_write = skip_write + + @property + def successor(self) -> Optional[PdsLidVid]: + return self._successor + + def set_successor(self, successor: PdsLidVid): + if successor != self._successor: + self._successor = successor + self.skip_write = False + + @staticmethod + def from_source(_source: Dict) -> ProvenanceRecord: + if METADATA_SUCCESSOR_KEY in _source: + successor = PdsLidVid.from_string(_source[METADATA_SUCCESSOR_KEY]) + else: + successor = None + skip_write = _source.get(SWEEPERS_PROVENANCE_VERSION_METADATA_KEY, 0) >= SWEEPERS_PROVENANCE_VERSION + return ProvenanceRecord( + lidvid=PdsLidVid.from_string(_source["lidvid"]), successor=successor, skip_write=skip_write + ) + + @staticmethod + def from_doc(doc: Dict) -> ProvenanceRecord: + return ProvenanceRecord.from_source(doc["_source"]) diff --git a/src/pds/registrysweepers/provenance/versioning.py b/src/pds/registrysweepers/provenance/versioning.py new file mode 100644 index 0000000..abae199 --- /dev/null +++ b/src/pds/registrysweepers/provenance/versioning.py @@ -0,0 +1,7 @@ +# Defines constants used for versioning updated documents with the in-use version of sweepers +# SWEEPERS_VERSION must be incremented any time sweepers is changed in a way which requires reprocessing of +# previously-processed data +from pds.registrysweepers.utils.misc import get_sweeper_version_metadata_key + +SWEEPERS_PROVENANCE_VERSION = 1 +SWEEPERS_PROVENANCE_VERSION_METADATA_KEY = get_sweeper_version_metadata_key("provenance")