Skip to content

Commit

Permalink
Merge pull request #116 from NASA-PDS/deferred-partial-updates
Browse files Browse the repository at this point in the history
Deferred partial updates
  • Loading branch information
alexdunnjpl authored Apr 14, 2024
2 parents ee2a112 + a8a1b86 commit 7c4c1ed
Show file tree
Hide file tree
Showing 14 changed files with 367 additions and 76 deletions.
124 changes: 97 additions & 27 deletions src/pds/registrysweepers/ancestry/__init__.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,31 @@
import json
import logging
import os
import tempfile
from itertools import chain
from typing import Callable
from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional
from typing import Set
from typing import TextIO
from typing import Tuple
from typing import Union

from opensearchpy import OpenSearch
from pds.registrysweepers.ancestry.ancestryrecord import AncestryRecord
from pds.registrysweepers.ancestry.constants import METADATA_PARENT_BUNDLE_KEY
from pds.registrysweepers.ancestry.constants import METADATA_PARENT_COLLECTION_KEY
from pds.registrysweepers.ancestry.generation import generate_nonaggregate_and_collection_records_iteratively
from pds.registrysweepers.ancestry.generation import get_bundle_ancestry_records
from pds.registrysweepers.ancestry.generation import get_collection_ancestry_records
from pds.registrysweepers.ancestry.generation import get_nonaggregate_ancestry_records
from pds.registrysweepers.ancestry.queries import get_existing_ancestry_for_product
from pds.registrysweepers.ancestry.queries import get_orphaned_documents
from pds.registrysweepers.ancestry.queries import get_orphaned_documents_count
from pds.registrysweepers.ancestry.typedefs import DbMockTypeDef
from pds.registrysweepers.ancestry.utils import update_from_record
from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION
from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION_METADATA_KEY
from pds.registrysweepers.utils import configure_logging
Expand All @@ -25,12 +34,10 @@
from pds.registrysweepers.utils.db.client import get_opensearch_client
from pds.registrysweepers.utils.db.indexing import ensure_index_mapping
from pds.registrysweepers.utils.db.update import Update
from pds.registrysweepers.utils.productidentifiers.pdslidvid import PdsLidVid

log = logging.getLogger(__name__)

METADATA_PARENT_BUNDLE_KEY = "ops:Provenance/ops:parent_bundle_identifier"
METADATA_PARENT_COLLECTION_KEY = "ops:Provenance/ops:parent_collection_identifier"


def run(
client: OpenSearch,
Expand All @@ -51,7 +58,10 @@ def run(

ancestry_records = chain(collection_and_nonaggregate_records, bundle_records)
ancestry_records_to_write = filter(lambda r: not r.skip_write, ancestry_records)
updates = generate_updates(ancestry_records_to_write, ancestry_records_accumulator, bulk_updates_sink)
deferred_records_file = tempfile.NamedTemporaryFile(mode="w+", delete=False)
updates = generate_updates(
ancestry_records_to_write, deferred_records_file.name, ancestry_records_accumulator, bulk_updates_sink
)

if bulk_updates_sink is None:
log.info("Ensuring metadata keys are present in database index...")
Expand All @@ -73,6 +83,15 @@ def run(
updates,
index_name="registry",
)
log.info("Generating updates from deferred records...")
deferred_updates = generate_deferred_updates(client, deferred_records_file.name, registry_mock_query_f)

log.info("Writing deferred updates to database...")
write_updated_docs(
client,
deferred_updates,
index_name="registry",
)
else:
# consume generator to dump bulk updates to sink
for _ in updates:
Expand Down Expand Up @@ -108,39 +127,90 @@ def orphan_counter_mock(_, __):


def generate_updates(
ancestry_records: Iterable[AncestryRecord], ancestry_records_accumulator=None, bulk_updates_sink=None
ancestry_records: Iterable[AncestryRecord],
deferred_records_filepath: str,
ancestry_records_accumulator=None,
bulk_updates_sink=None,
) -> Iterable[Update]:
updates: Set[str] = set()
"""
Given a collection of AncestryRecords, yield corresponding Update objects, excluding any deferred updates, which
must be generated seperately.
Ideally, there should be one record per product, but this is not necessarily the case due to the potential of
nonaggregate products to be shared between collections with different LIDs. In that case, it is necessary to:
- defer processing of all records which conflict with a previously-processed record
- ensure all non-deferred records have been written to the db
- retrieve the conflicting records which have been written to db, since the streaming collection-iterative
approach prevents efficiently detecting conflicts until the first partial history is already processed/written.
- merge all deferred/retrieved partial histories into a full history for each distinct product lidvid
- yield those full-history updates, which will overwrite the partial histories initially written to db
"""
updated_doc_ids: Set[str] = set()

log.info("Generating document bulk updates for AncestryRecords...")

for record in ancestry_records:
# Tee the stream of records into the accumulator, if one was provided (functional testing).
if ancestry_records_accumulator is not None:
ancestry_records_accumulator.append(record)
# stream/yield Updates for AncestryRecords, deferring processing of conflicting AncestryRecords and storing them in
# a temporary file
with open(deferred_records_filepath, mode="w+") as deferred_records_file:
for record in ancestry_records:
# Tee the stream of records into the accumulator, if one was provided (functional testing).
if ancestry_records_accumulator is not None:
ancestry_records_accumulator.append(record)

if record.lidvid.is_collection() and len(record.parent_bundle_lidvids) == 0:
log.warning(f"Collection {record.lidvid} is not referenced by any bundle.")

if record.lidvid.is_collection() and len(record.parent_bundle_lidvids) == 0:
log.warning(f"Collection {record.lidvid} is not referenced by any bundle.")
update = update_from_record(record)

doc_id = str(record.lidvid)
update_content = {
METADATA_PARENT_BUNDLE_KEY: [str(id) for id in record.parent_bundle_lidvids],
METADATA_PARENT_COLLECTION_KEY: [str(id) for id in record.parent_collection_lidvids],
SWEEPERS_ANCESTRY_VERSION_METADATA_KEY: int(SWEEPERS_ANCESTRY_VERSION),
}
# Tee the stream of bulk update KVs into the accumulator, if one was provided (functional testing).
if bulk_updates_sink is not None:
bulk_updates_sink.append((update.id, update.content))

# Tee the stream of bulk update KVs into the accumulator, if one was provided (functional testing).
if bulk_updates_sink is not None:
bulk_updates_sink.append((doc_id, update_content))
if update.id in updated_doc_ids:
log.debug(
f"Multiple updates detected for doc_id {update.id} - deferring subsequent parts"
" - storing in {deferred_updates_file.name}"
)
deferred_records_file.write(json.dumps(record.to_dict(sort_lists=False)) + "\n")
deferred_records_file.flush()
continue

if doc_id in updates:
log.error(
f"Multiple updates detected for doc_id {doc_id} - cannot create update! (new content {update_content} will not be written)"
updated_doc_ids.add(update.id)
yield update


def generate_deferred_updates(
client: OpenSearch, deferred_records_filepath: str, registry_db_mock: DbMockTypeDef = None
) -> Iterable[Update]:
# Merge all deferred records with matching lidvids
with open(deferred_records_filepath, "r") as deferred_records_file: # type: ignore
deferred_records_by_lidvid: Dict[PdsLidVid, AncestryRecord] = {}
for l in deferred_records_file.readlines():
record = AncestryRecord.from_dict(json.loads(l))
if record.lidvid in deferred_records_by_lidvid:
deferred_records_by_lidvid[record.lidvid].update_with(record)
else:
deferred_records_by_lidvid.update({record.lidvid: record})

# Retrieve the first partial history (already written to db) for each lidvid, merge with its deferred history,
# then yield a full-history-update for that lidvid
for record in deferred_records_by_lidvid.values():
doc = get_existing_ancestry_for_product(client, record.lidvid, registry_db_mock)
try:
partial_record_from_db = AncestryRecord.from_dict(
{
"lidvid": doc["_source"]["lidvid"],
"parent_bundle_lidvids": doc["_source"][METADATA_PARENT_BUNDLE_KEY],
"parent_collection_lidvids": doc["_source"][METADATA_PARENT_COLLECTION_KEY],
}
)
continue
record.update_with(partial_record_from_db)
update = update_from_record(record)
yield update
except (KeyError, ValueError) as err:
log.error(f'Failed to parse valid AncestryRecord from document with id "{doc["_id"]}: {err}"')

updates.add(doc_id)
yield Update(id=doc_id, content=update_content)
# TODO: Check that ancestry version is equal to current, throw if not.


if __name__ == "__main__":
Expand Down
4 changes: 4 additions & 0 deletions src/pds/registrysweepers/ancestry/ancestryrecord.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ class AncestryRecord:
# equivalent record is known to already exist due to up-to-date ancestry version flag in the source document
skip_write: bool = False

def __post_init__(self):
if not isinstance(self.lidvid, PdsLidVid):
raise ValueError('Cannot initialise AncestryRecord with non-PdsLidVid value for "lidvid"')

def __repr__(self):
return f"AncestryRecord(lidvid={self.lidvid}, parent_collection_lidvids={sorted([str(x) for x in self.parent_collection_lidvids])}, parent_bundle_lidvids={sorted([str(x) for x in self.parent_bundle_lidvids])})"

Expand Down
2 changes: 2 additions & 0 deletions src/pds/registrysweepers/ancestry/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
METADATA_PARENT_BUNDLE_KEY = "ops:Provenance/ops:parent_bundle_identifier"
METADATA_PARENT_COLLECTION_KEY = "ops:Provenance/ops:parent_collection_identifier"
36 changes: 15 additions & 21 deletions src/pds/registrysweepers/ancestry/generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
import psutil # type: ignore
from opensearchpy import OpenSearch
from pds.registrysweepers.ancestry.ancestryrecord import AncestryRecord
from pds.registrysweepers.ancestry.queries import DbMockTypeDef
from pds.registrysweepers.ancestry.queries import get_bundle_ancestry_records_query
from pds.registrysweepers.ancestry.queries import get_collection_ancestry_records_bundles_query
from pds.registrysweepers.ancestry.queries import get_collection_ancestry_records_collections_query
from pds.registrysweepers.ancestry.queries import get_nonaggregate_ancestry_records_for_collection_lid_query
from pds.registrysweepers.ancestry.queries import get_nonaggregate_ancestry_records_query
from pds.registrysweepers.ancestry.runtimeconstants import AncestryRuntimeConstants
from pds.registrysweepers.ancestry.typedefs import DbMockTypeDef
from pds.registrysweepers.ancestry.utils import dump_history_to_disk
from pds.registrysweepers.ancestry.utils import gb_mem_to_size
from pds.registrysweepers.ancestry.utils import load_partial_history_to_records
Expand Down Expand Up @@ -90,19 +90,6 @@ def get_ancestry_by_collection_lidvid(collections_docs: Iterable[Dict]) -> Mappi
return ancestry_by_collection_lidvid


def get_collection_aliases_by_lid(collections_docs: Iterable[Dict]) -> Dict[PdsLid, Set[PdsLid]]:
aliases_by_lid: Dict[PdsLid, Set[PdsLid]] = {}
for doc in collections_docs:
alternate_ids: List[str] = doc["_source"].get("alternate_ids", [])
lids: Set[PdsLid] = {PdsProductIdentifierFactory.from_string(id).lid for id in alternate_ids}
for lid in lids:
if lid not in aliases_by_lid:
aliases_by_lid[lid] = set()
aliases_by_lid[lid].update(lids)

return aliases_by_lid


def get_ancestry_by_collection_lid(
ancestry_by_collection_lidvid: Mapping[PdsLidVid, AncestryRecord]
) -> Mapping[PdsLid, Set[AncestryRecord]]:
Expand All @@ -124,9 +111,6 @@ def get_collection_ancestry_records(
bundles_docs = get_collection_ancestry_records_bundles_query(client, registry_db_mock)
collections_docs = list(get_collection_ancestry_records_collections_query(client, registry_db_mock))

# Prepare LID alias sets for every LID
collection_aliases_by_lid: Dict[PdsLid, Set[PdsLid]] = get_collection_aliases_by_lid(collections_docs)

# Prepare empty ancestry records for collections, with fast access by LID or LIDVID
ancestry_by_collection_lidvid: Mapping[PdsLidVid, AncestryRecord] = get_ancestry_by_collection_lidvid(
collections_docs
Expand Down Expand Up @@ -167,9 +151,8 @@ def get_collection_ancestry_records(
)
elif isinstance(identifier, PdsLid):
try:
for alias in collection_aliases_by_lid[identifier]:
for record in ancestry_by_collection_lid[alias]:
record.parent_bundle_lidvids.add(bundle_lidvid)
for record in ancestry_by_collection_lid[identifier.lid]:
record.parent_bundle_lidvids.add(bundle_lidvid)
except KeyError:
log.warning(
f"No versions of collection {identifier} referenced by bundle {bundle_lidvid} "
Expand Down Expand Up @@ -261,7 +244,18 @@ def get_nonaggregate_ancestry_records_for_collection_lid(
continue

collection_lidvid = PdsLidVid.from_string(doc["_source"]["collection_lidvid"])
nonaggregate_lidvids = [PdsLidVid.from_string(s) for s in doc["_source"]["product_lidvid"]]
referenced_lidvids = [PdsLidVid.from_string(s) for s in doc["_source"]["product_lidvid"]]
nonaggregate_lidvids = [id for id in referenced_lidvids if id.is_basic_product()]

erroneous_lidvids = [id for id in referenced_lidvids if not id.is_basic_product()]
if len(erroneous_lidvids) > 0:
log.error(
f'registry-refs document with id {doc["_id"]} references one or more aggregate products in its product_lidvid refs list: {[str(id) for id in erroneous_lidvids]}'
)

except IndexError as err:
doc_id = doc["_id"]
log.warning(f'Encountered document with unexpected _id: "{doc_id}"')
except (ValueError, KeyError) as err:
log.warning(
'Failed to parse collection and/or product LIDVIDs from document in index "%s" with id "%s" due to %s: %s',
Expand Down
52 changes: 46 additions & 6 deletions src/pds/registrysweepers/ancestry/queries.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
import logging
from enum import auto
from enum import Enum
from typing import Callable
from typing import Dict
from typing import Iterable
from typing import Optional

from opensearchpy import OpenSearch
from pds.registrysweepers.ancestry.constants import METADATA_PARENT_BUNDLE_KEY
from pds.registrysweepers.ancestry.constants import METADATA_PARENT_COLLECTION_KEY
from pds.registrysweepers.ancestry.runtimeconstants import AncestryRuntimeConstants
from pds.registrysweepers.ancestry.typedefs import DbMockTypeDef
from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION
from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION_METADATA_KEY
from pds.registrysweepers.utils.db import get_query_hits_count
from pds.registrysweepers.utils.db import query_registry_db_or_mock
from pds.registrysweepers.utils.productidentifiers.pdslid import PdsLid
from pds.registrysweepers.utils.productidentifiers.pdslidvid import PdsLidVid

log = logging.getLogger(__name__)

DbMockTypeDef = Optional[Callable[[str], Iterable[Dict]]]


class ProductClass(Enum):
BUNDLE = (auto(),)
Expand Down Expand Up @@ -60,7 +60,7 @@ def get_collection_ancestry_records_collections_query(
) -> Iterable[Dict]:
# Query the registry for all collection identifiers
query = product_class_query_factory(ProductClass.COLLECTION)
_source = {"includes": ["lidvid", "alternate_ids", SWEEPERS_ANCESTRY_VERSION_METADATA_KEY]}
_source = {"includes": ["lidvid", SWEEPERS_ANCESTRY_VERSION_METADATA_KEY]}
query_f = query_registry_db_or_mock(db_mock, "get_collection_ancestry_records_collections", use_search_after=True)
docs = query_f(client, "registry", query, _source)

Expand Down Expand Up @@ -108,7 +108,11 @@ def get_nonaggregate_ancestry_records_for_collection_lid_query(
"seq_no_primary_term": True,
}
_source = {"includes": ["collection_lidvid", "batch_id", "product_lidvid"]}
query_f = query_registry_db_or_mock(registry_db_mock, "get_nonaggregate_ancestry_records", use_search_after=True)
query_f = query_registry_db_or_mock(
registry_db_mock,
f"get_nonaggregate_ancestry_records_for_collection_lid-{collection_lid}",
use_search_after=True,
)

# each document will have many product lidvids, so a smaller page size is warranted here
docs = query_f(
Expand Down Expand Up @@ -150,3 +154,39 @@ def get_orphaned_documents_count(client: OpenSearch, index_name: str) -> int:
# Query an index documents without an up-to-date ancestry version reference - this would indicate a product which is
# orphaned and is getting missed in processing
return get_query_hits_count(client, index_name, _orphaned_docs_query)


def get_existing_ancestry_for_product(
client: OpenSearch, product_lidvid: PdsLidVid, registry_db_mock: DbMockTypeDef
) -> Dict:
# Retrieve ancestry for a single document. It would be simpler to just pull it by id, but this is compatible with
# the existing functional testing framework.
query: Dict = {
"query": {
"bool": {
"filter": [
{"term": {"lidvid": str(product_lidvid)}},
],
}
},
}
_source = {
"includes": [
"lidvid",
METADATA_PARENT_BUNDLE_KEY,
METADATA_PARENT_COLLECTION_KEY,
SWEEPERS_ANCESTRY_VERSION_METADATA_KEY,
]
}
query_f = query_registry_db_or_mock(
registry_db_mock, f"get_existing_ancestry_for_product-{product_lidvid}", use_search_after=True
)

docs = query_f(
client,
"registry",
query,
_source,
)

return list(docs)[0]
4 changes: 4 additions & 0 deletions src/pds/registrysweepers/ancestry/typedefs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from typing import Callable
from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional
from typing import Union

SerializableAncestryRecordTypeDef = Dict[str, Union[str, List[str]]]
DbMockTypeDef = Optional[Callable[[str], Iterable[Dict]]]
Loading

0 comments on commit 7c4c1ed

Please sign in to comment.