Skip to content

Commit

Permalink
generate deferred updates only after non-deferred updates have been w…
Browse files Browse the repository at this point in the history
…ritten to db
  • Loading branch information
alexdunnjpl committed Apr 11, 2024
1 parent c10f268 commit 482146a
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 32 deletions.
76 changes: 45 additions & 31 deletions src/pds/registrysweepers/ancestry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import List
from typing import Optional
from typing import Set
from typing import TextIO
from typing import Tuple
from typing import Union

Expand Down Expand Up @@ -57,8 +58,9 @@ def run(

ancestry_records = chain(collection_and_nonaggregate_records, bundle_records)
ancestry_records_to_write = filter(lambda r: not r.skip_write, ancestry_records)
deferred_records_file = tempfile.NamedTemporaryFile(mode="w+", delete=False)
updates = generate_updates(
client, ancestry_records_to_write, ancestry_records_accumulator, bulk_updates_sink, registry_mock_query_f
ancestry_records_to_write, deferred_records_file.name, ancestry_records_accumulator, bulk_updates_sink
)

if bulk_updates_sink is None:
Expand All @@ -81,6 +83,15 @@ def run(
updates,
index_name="registry",
)
log.info("Generating updates from deferred records...")
deferred_updates = generate_deferred_updates(client, deferred_records_file.name, registry_mock_query_f)

log.info("Writing deferred updates to database...")
write_updated_docs(
client,
deferred_updates,
index_name="registry",
)
else:
# consume generator to dump bulk updates to sink
for _ in updates:
Expand Down Expand Up @@ -116,18 +127,19 @@ def orphan_counter_mock(_, __):


def generate_updates(
client: OpenSearch,
ancestry_records: Iterable[AncestryRecord],
deferred_records_filepath: str,
ancestry_records_accumulator=None,
bulk_updates_sink=None,
registry_db_mock: DbMockTypeDef = None,
) -> Iterable[Update]:
"""
Given a collection of AncestryRecords, yield corresponding Update objects.
Given a collection of AncestryRecords, yield corresponding Update objects, excluding any deferred updates, which
must be generated seperately.
Ideally, there should be one record per product, but this is not necessarily the case due to the potential of
nonaggregate products to be shared between collections with different LIDs. In that case, it is necessary to:
- defer processing of all records which conflict with a previously-processed record
- ensure all non-deferred records have been written to the db
- retrieve the conflicting records which have been written to db, since the streaming collection-iterative
approach prevents efficiently detecting conflicts until the first partial history is already processed/written.
- merge all deferred/retrieved partial histories into a full history for each distinct product lidvid
Expand All @@ -139,7 +151,7 @@ def generate_updates(

# stream/yield Updates for AncestryRecords, deferring processing of conflicting AncestryRecords and storing them in
# a temporary file
with tempfile.NamedTemporaryFile(mode="w+", delete=False) as deferred_records_file:
with open(deferred_records_filepath, mode="w+") as deferred_records_file:
for record in ancestry_records:
# Tee the stream of records into the accumulator, if one was provided (functional testing).
if ancestry_records_accumulator is not None:
Expand All @@ -166,33 +178,35 @@ def generate_updates(
updated_doc_ids.add(update.id)
yield update

# Merge all deferred records with matching lidvids
with open(deferred_records_file.name, "r") as deferred_records_file: # type: ignore
deferred_records_by_lidvid: Dict[PdsLidVid, AncestryRecord] = {}
for l in deferred_records_file.readlines():
record = AncestryRecord.from_dict(json.loads(l))
if record.lidvid in deferred_records_by_lidvid:
deferred_records_by_lidvid[record.lidvid].update_with(record)
else:
deferred_records_by_lidvid.update({record.lidvid: record})

# Retrieve the first partial history (already written to db) for each lidvid, merge with its deferred history,
# then yield a full-history-update for that lidvid
for record in deferred_records_by_lidvid.values():
doc = get_existing_ancestry_for_product(client, record.lidvid, registry_db_mock)
partial_record_from_db = AncestryRecord.from_dict(
{
"lidvid": doc["lidvid"],
"parent_bundle_lidvids": doc[METADATA_PARENT_BUNDLE_KEY],
"parent_collection_lidvids": doc[METADATA_PARENT_COLLECTION_KEY],
}
)
record.update_with(partial_record_from_db)
update = update_from_record(record)
yield update
# TODO: Check that ancestry version is equal to current, throw if not.

os.remove(deferred_records_file.name)
def generate_deferred_updates(
client: OpenSearch, deferred_records_filepath: str, registry_db_mock: DbMockTypeDef = None
) -> Iterable[Update]:
# Merge all deferred records with matching lidvids
with open(deferred_records_filepath, "r") as deferred_records_file: # type: ignore
deferred_records_by_lidvid: Dict[PdsLidVid, AncestryRecord] = {}
for l in deferred_records_file.readlines():
record = AncestryRecord.from_dict(json.loads(l))
if record.lidvid in deferred_records_by_lidvid:
deferred_records_by_lidvid[record.lidvid].update_with(record)
else:
deferred_records_by_lidvid.update({record.lidvid: record})

# Retrieve the first partial history (already written to db) for each lidvid, merge with its deferred history,
# then yield a full-history-update for that lidvid
for record in deferred_records_by_lidvid.values():
doc = get_existing_ancestry_for_product(client, record.lidvid, registry_db_mock)
partial_record_from_db = AncestryRecord.from_dict(
{
"lidvid": doc["lidvid"],
"parent_bundle_lidvids": doc[METADATA_PARENT_BUNDLE_KEY],
"parent_collection_lidvids": doc[METADATA_PARENT_COLLECTION_KEY],
}
)
record.update_with(partial_record_from_db)
update = update_from_record(record)
yield update
# TODO: Check that ancestry version is equal to current, throw if not.


if __name__ == "__main__":
Expand Down
10 changes: 9 additions & 1 deletion tests/pds/registrysweepers/ancestry/test_ancestry.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import itertools
import logging
import os.path
import tempfile
import unittest
from typing import Dict
from typing import List
from typing import Tuple

from pds.registrysweepers import ancestry
from pds.registrysweepers.ancestry import generate_deferred_updates
from pds.registrysweepers.ancestry import generate_updates
from pds.registrysweepers.ancestry.ancestryrecord import AncestryRecord
from pds.registrysweepers.ancestry.constants import METADATA_PARENT_BUNDLE_KEY
Expand Down Expand Up @@ -312,7 +314,13 @@ def test_ancestor_partial_history_accumulation(self):
generate_nonaggregate_and_collection_records_iteratively(None, collection_ancestry_records, query_mock_f)
)

updates = list(generate_updates(None, collection_and_nonaggregate_records, None, None, query_mock_f))
deferred_records_file = tempfile.NamedTemporaryFile(mode="w+", delete=False)
non_deferred_updates = list(
generate_updates(collection_and_nonaggregate_records, deferred_records_file.name, None, None)
)
deferred_updates = list(generate_deferred_updates(None, deferred_records_file.name, query_mock_f))
updates = non_deferred_updates + deferred_updates
os.remove(deferred_records_file.name)

# TODO: increase to two nonmatching collections and two shared products

Expand Down

0 comments on commit 482146a

Please sign in to comment.