Skip to content

Commit

Permalink
reimplement provenance sweeper with non-redundant processing
Browse files Browse the repository at this point in the history
record skipping is based on provenance software version
this updated version writes null values for latest products, rather than not writing the metadata attribute
  • Loading branch information
alexdunnjpl committed Jan 26, 2024
1 parent 82962a6 commit 3f94d33
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 41 deletions.
121 changes: 80 additions & 41 deletions src/pds/registrysweepers/provenance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
# It is important to note that the document is updated, not any dependent
# index.
#
import functools
import itertools
import logging
from typing import Dict
from typing import Iterable
Expand All @@ -47,17 +49,73 @@
from typing import Union

from opensearchpy import OpenSearch
from pds.registrysweepers.utils import _vid_as_tuple_of_int
from pds.registrysweepers.provenance.constants import METADATA_SUCCESSOR_KEY
from pds.registrysweepers.provenance.provenancerecord import ProvenanceRecord
from pds.registrysweepers.provenance.versioning import SWEEPERS_PROVENANCE_VERSION
from pds.registrysweepers.provenance.versioning import SWEEPERS_PROVENANCE_VERSION_METADATA_KEY
from pds.registrysweepers.utils import configure_logging
from pds.registrysweepers.utils import parse_args
from pds.registrysweepers.utils.db import get_extant_lidvids
from pds.registrysweepers.utils.db import query_registry_db_with_search_after
from pds.registrysweepers.utils.db import write_updated_docs
from pds.registrysweepers.utils.db.client import get_opensearch_client
from pds.registrysweepers.utils.db.update import Update
from pds.registrysweepers.utils.productidentifiers.pdslid import PdsLid

log = logging.getLogger(__name__)

METADATA_SUCCESSOR_KEY = "ops:Provenance/ops:superseded_by"

def get_records(client: OpenSearch) -> Iterable[ProvenanceRecord]:
log.info("Fetching docs and generating records...")

query = {
"query": {"bool": {"must": [{"terms": {"ops:Tracking_Meta/ops:archive_status": ["archived", "certified"]}}]}}
}
_source = {"includes": ["lidvid", METADATA_SUCCESSOR_KEY]}

docs = query_registry_db_with_search_after(client, "registry", query, _source)

for doc in docs:
try:
yield ProvenanceRecord.from_doc(doc)
except ValueError as err:
log.warning(
f'Failed to parse ProvenanceRecord from doc with id {doc["_id"]} due to {err} - source was {doc["_source"]}'
)


def create_record_chains(
records: Iterable[ProvenanceRecord], drop_singletons: bool = True
) -> Iterable[List[ProvenanceRecord]]:
"""
Create an iterable of unsorted collections of records which share LIDs.
If drop_singletons is True, single-element collections will be removed for efficiency as they have no links
"""

# bin chains by LID
record_chains: Dict[PdsLid, List[ProvenanceRecord]] = {}
for record in records:
if record.lidvid.lid not in record_chains:
record_chains[record.lidvid.lid] = []
record_chains[record.lidvid.lid].append(record)

if drop_singletons:
for lid, record_chain in list(record_chains.items()):
if not len(record_chain) > 1:
record_chains.pop(lid)

return record_chains.values()


def link_records_in_chain(record_chain: List[ProvenanceRecord]):
"""
Given a List of ProvenanceRecords sharing the same LID, sort the list and create all elements' successor links
"""
record_chain.sort(key=lambda record: record.lidvid)

for i in range(len(record_chain) - 1):
record = record_chain[i]
successor_record = record_chain[i + 1]
record.set_successor(successor_record.lidvid)


def run(
Expand All @@ -69,11 +127,12 @@ def run(

log.info("Starting provenance sweeper processing...")

# host = Host(password, base_url, username, verify_host_certs)
records = get_records(client)
record_chains = create_record_chains(records)
for record_chain in record_chains:
link_records_in_chain(record_chain)

extant_lidvids = get_extant_lidvids(client)
successors = get_successors_by_lidvid(extant_lidvids)
updates = generate_updates(successors)
updates = generate_updates(itertools.chain(*record_chains))

write_updated_docs(
client,
Expand All @@ -84,42 +143,22 @@ def run(
log.info("Completed provenance sweeper processing!")


def get_successors_by_lidvid(extant_lidvids: Iterable[str]) -> Mapping[str, str]:
"""
Given a collection of LIDVIDs, return a new mapping to their updated direct successors.
"""

log.info("Generating updated history...")

extant_lidvids = list(extant_lidvids) # ensure against consumable iterator

unique_lids = {lidvid.split("::")[0] for lidvid in extant_lidvids}

log.info(" ...binning LIDVIDs by LID...")
lidvid_aggregates_by_lid: Dict[str, List[str]] = {lid: [] for lid in unique_lids}
for lidvid in extant_lidvids:
lid = lidvid.split("::")[0]
lidvid_aggregates_by_lid[lid].append(lidvid)

log.info(" ...determining updated successors for LIDVIDs...")
successors_by_lidvid = {}
lidvid_aggregates_with_multiple_versions = filter(lambda l: 1 < len(l), lidvid_aggregates_by_lid.values())
for lidvids in lidvid_aggregates_with_multiple_versions:
lidvids.sort(key=_vid_as_tuple_of_int, reverse=True)

for successor_idx, lidvid in enumerate(lidvids[1:]):
# TODO: consider converting this dict accumulation to a tuple/dict generator (yield) for memory optimization
successors_by_lidvid[lidvid] = lidvids[successor_idx]

log.info(f"Successors will be updated for {len(successors_by_lidvid)} LIDVIDs!")

return successors_by_lidvid
def generate_updates(records: Iterable[ProvenanceRecord]) -> Iterable[Update]:
update_count = 0
skipped_count = 0
for record in records:
update_content = {
METADATA_SUCCESSOR_KEY: str(record.successor),
SWEEPERS_PROVENANCE_VERSION_METADATA_KEY: SWEEPERS_PROVENANCE_VERSION,
}

if record.skip_write:
skipped_count += 1
else:
update_count += 1
yield Update(id=str(record.lidvid), content=update_content)

def generate_updates(successors_by_id: Mapping[str, str]) -> Iterable[Update]:
for id, successor in successors_by_id.items():
update_content = {METADATA_SUCCESSOR_KEY: successor}
yield Update(id=id, content=update_content)
log.info(f"Generated provenance updates for {update_count} products, skipping {skipped_count} up-to-date products")


if __name__ == "__main__":
Expand Down
1 change: 1 addition & 0 deletions src/pds/registrysweepers/provenance/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
METADATA_SUCCESSOR_KEY = "ops:Provenance/ops:superseded_by"
44 changes: 44 additions & 0 deletions src/pds/registrysweepers/provenance/provenancerecord.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from __future__ import annotations

from typing import Dict
from typing import Optional

from pds.registrysweepers.provenance.constants import METADATA_SUCCESSOR_KEY
from pds.registrysweepers.provenance.versioning import SWEEPERS_PROVENANCE_VERSION
from pds.registrysweepers.provenance.versioning import SWEEPERS_PROVENANCE_VERSION_METADATA_KEY
from pds.registrysweepers.utils.productidentifiers.pdslidvid import PdsLidVid


class ProvenanceRecord:
lidvid: PdsLidVid
_successor: Optional[PdsLidVid]
skip_write: bool

def __init__(self, lidvid: PdsLidVid, successor: Optional[PdsLidVid], skip_write: bool = False):
self.lidvid = lidvid
self._successor = successor
self.skip_write = skip_write

@property
def successor(self) -> Optional[PdsLidVid]:
return self._successor

def set_successor(self, successor: PdsLidVid):
if successor != self._successor:
self._successor = successor
self.skip_write = False

@staticmethod
def from_source(_source: Dict) -> ProvenanceRecord:
if METADATA_SUCCESSOR_KEY in _source:
successor = PdsLidVid.from_string(_source[METADATA_SUCCESSOR_KEY])
else:
successor = None
skip_write = _source.get(SWEEPERS_PROVENANCE_VERSION_METADATA_KEY, 0) >= SWEEPERS_PROVENANCE_VERSION
return ProvenanceRecord(
lidvid=PdsLidVid.from_string(_source["lidvid"]), successor=successor, skip_write=skip_write
)

@staticmethod
def from_doc(doc: Dict) -> ProvenanceRecord:
return ProvenanceRecord.from_source(doc["_source"])
7 changes: 7 additions & 0 deletions src/pds/registrysweepers/provenance/versioning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Defines constants used for versioning updated documents with the in-use version of sweepers
# SWEEPERS_VERSION must be incremented any time sweepers is changed in a way which requires reprocessing of
# previously-processed data
from pds.registrysweepers.utils.misc import get_sweeper_version_metadata_key

SWEEPERS_PROVENANCE_VERSION = 1
SWEEPERS_PROVENANCE_VERSION_METADATA_KEY = get_sweeper_version_metadata_key("provenance")

0 comments on commit 3f94d33

Please sign in to comment.