Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collection iteration optimization #115

Merged
merged 40 commits into from
Apr 14, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
82392b4
bump peak memory estimation factor
alexdunnjpl Feb 6, 2024
3f0bde5
dump chunks every ten minutes
alexdunnjpl Mar 4, 2024
56750ce
add debug log
alexdunnjpl Mar 27, 2024
2eb33cd
switch ancestry from json dump to pickle dump
alexdunnjpl Mar 27, 2024
5b3a4f7
add run-configs to .gitig
alexdunnjpl Mar 27, 2024
48db330
back to manual management of chunk size
alexdunnjpl Mar 28, 2024
8f0cebb
improve query progress logging
alexdunnjpl Apr 2, 2024
a001d52
remove deprecated non-chunked ancestry processing
alexdunnjpl Apr 2, 2024
b4dbdc2
fix consumable iterator, as null-ancestry metadata still needs to be …
alexdunnjpl Apr 2, 2024
53e1d66
implement utils.misc.bin_elements()
alexdunnjpl Apr 2, 2024
2da05ba
overhaul ancestry with collection-iterative approach
alexdunnjpl Apr 2, 2024
c813e0e
Revert "remove deprecated non-chunked ancestry processing"
alexdunnjpl Apr 2, 2024
410aee1
squash! overhaul ancestry with collection-iterative approach
alexdunnjpl Apr 3, 2024
fe3ecbb
[fatally broken] improve handling of registry-refs pages where collec…
alexdunnjpl Apr 3, 2024
c2c70aa
implement intentional skipping of secondary-collection registry-refs …
alexdunnjpl Apr 3, 2024
691769e
squash! overhaul ancestry with collection-iterative approach
alexdunnjpl Apr 3, 2024
53c5de8
squash! [fatally broken] improve handling of registry-refs pages wher…
alexdunnjpl Apr 3, 2024
8f9c27d
increment ancestry version
alexdunnjpl Apr 3, 2024
60fd01d
remove alternate_ids ancestry test case
alexdunnjpl Apr 3, 2024
b167982
back to memory-based dumping for chunked behaviour
alexdunnjpl Apr 9, 2024
ee2a112
clean up cruft
alexdunnjpl Apr 9, 2024
fdfea21
minor additions
alexdunnjpl Apr 10, 2024
1a483a7
implement ancestry.utils.update.from_record()
alexdunnjpl Apr 10, 2024
31d974b
implement ancestry partial-record deferral/handling
alexdunnjpl Apr 10, 2024
e57d6cc
add query stubs for pre-existing ancestry tests
alexdunnjpl Apr 11, 2024
3ae1db2
add tests for record/update count
alexdunnjpl Apr 11, 2024
c10f268
update existing ancestry tests for overhaul
alexdunnjpl Apr 11, 2024
482146a
generate deferred updates only after non-deferred updates have been w…
alexdunnjpl Apr 11, 2024
bff5810
update log messages
alexdunnjpl Apr 11, 2024
5360632
fix bug in json structure
alexdunnjpl Apr 11, 2024
e753e28
bump default request timeout
alexdunnjpl Apr 11, 2024
ab8bd2b
fix assignment of bundle history to LID collection references
alexdunnjpl Apr 11, 2024
c81f2ed
excise behaviour related to aliases/alternate_ids, as this property i…
alexdunnjpl Apr 11, 2024
dc65c84
minor fix
alexdunnjpl Apr 11, 2024
303b9db
add type validation to AncestryRecord.lidvid
alexdunnjpl Apr 12, 2024
c112482
demote log
alexdunnjpl Apr 12, 2024
083b3fe
handle case where registry-refs page erroneously includes collection …
alexdunnjpl Apr 12, 2024
a8a1b86
increment SWEEPERS_ANCESTRY_VERSION
alexdunnjpl Apr 12, 2024
7c4c1ed
Merge pull request #116 from NASA-PDS/deferred-partial-updates
alexdunnjpl Apr 14, 2024
1b09ce6
tqdm pin tweak
alexdunnjpl Apr 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,4 @@ typescript
!.gitattributes
!.gitignore
!.gitkeep
/.run/
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ install_requires =
# this is a temporary dependency, we published the repo https://github.com/o19s/solr-to-es to pypi ourselves
# until the ticket https://github.com/o19s/solr-to-es/issues/23 is resolved.
pds.solr-to-es==0.3.0
tqdm==4.66.2
alexdunnjpl marked this conversation as resolved.
Show resolved Hide resolved


# Change this to False if you use things like __file__ or __path__—which you
Expand Down Expand Up @@ -78,6 +79,7 @@ dev =
types_requests~=2.28
types-retry~=0.9.9.4
types-setuptools~=68.1.0.0
types-tqdm~=4.66.0
Jinja2<3.1

[options.entry_points]
Expand Down
14 changes: 6 additions & 8 deletions src/pds/registrysweepers/ancestry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from opensearchpy import OpenSearch
from pds.registrysweepers.ancestry.ancestryrecord import AncestryRecord
from pds.registrysweepers.ancestry.generation import generate_nonaggregate_and_collection_records_iteratively
from pds.registrysweepers.ancestry.generation import get_bundle_ancestry_records
from pds.registrysweepers.ancestry.generation import get_collection_ancestry_records
from pds.registrysweepers.ancestry.generation import get_nonaggregate_ancestry_records
Expand Down Expand Up @@ -43,15 +44,12 @@ def run(

log.info(f"Starting ancestry v{SWEEPERS_ANCESTRY_VERSION} sweeper processing...")

bundle_records = get_bundle_ancestry_records(client, registry_mock_query_f)
collection_records = list(get_collection_ancestry_records(client, registry_mock_query_f))
nonaggregate_records = get_nonaggregate_ancestry_records(client, collection_records, registry_mock_query_f)
bundle_records = list(get_bundle_ancestry_records(client, registry_mock_query_f))
collection_and_nonaggregate_records = generate_nonaggregate_and_collection_records_iteratively(
client, get_collection_ancestry_records(client, registry_mock_query_f), registry_mock_query_f
)

# the order of this chain is now important - writing descendants first ensures that if an ancestor is given a
# "processed by sweeper version" flag, it may be assumed that all its descendants have also been processed
# this avoids the potential for a bundle/collection to be metadata-marked as up-to-date when execution failed before
# its descendants were updated (due to execution interruption, e.g. database overload)
ancestry_records = chain(nonaggregate_records, collection_records, bundle_records)
ancestry_records = chain(collection_and_nonaggregate_records, bundle_records)
ancestry_records_to_write = filter(lambda r: not r.skip_write, ancestry_records)
updates = generate_updates(ancestry_records_to_write, ancestry_records_accumulator, bulk_updates_sink)

Expand Down
126 changes: 121 additions & 5 deletions src/pds/registrysweepers/ancestry/generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import sys
import tempfile
from collections import namedtuple
from datetime import datetime
from datetime import timedelta
from typing import Dict
from typing import Iterable
from typing import List
Expand All @@ -19,16 +21,19 @@
from pds.registrysweepers.ancestry.queries import get_bundle_ancestry_records_query
from pds.registrysweepers.ancestry.queries import get_collection_ancestry_records_bundles_query
from pds.registrysweepers.ancestry.queries import get_collection_ancestry_records_collections_query
from pds.registrysweepers.ancestry.queries import get_nonaggregate_ancestry_records_for_collection_lid_query
from pds.registrysweepers.ancestry.queries import get_nonaggregate_ancestry_records_query
from pds.registrysweepers.ancestry.runtimeconstants import AncestryRuntimeConstants
from pds.registrysweepers.ancestry.utils import dump_history_to_disk
from pds.registrysweepers.ancestry.utils import gb_mem_to_size
from pds.registrysweepers.ancestry.utils import load_partial_history_to_records
from pds.registrysweepers.ancestry.utils import make_history_serializable
from pds.registrysweepers.ancestry.utils import merge_matching_history_chunks
from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION
from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION_METADATA_KEY
from pds.registrysweepers.utils.db import Update
from pds.registrysweepers.utils.db import write_updated_docs
from pds.registrysweepers.utils.misc import bin_elements
from pds.registrysweepers.utils.misc import coerce_list_type
from pds.registrysweepers.utils.productidentifiers.factory import PdsProductIdentifierFactory
from pds.registrysweepers.utils.productidentifiers.pdslid import PdsLid
Expand Down Expand Up @@ -181,6 +186,39 @@ def get_collection_ancestry_records(
return ancestry_by_collection_lidvid.values()


def generate_nonaggregate_and_collection_records_iteratively(
client: OpenSearch,
all_collections_records: Iterable[AncestryRecord],
registry_db_mock: DbMockTypeDef = None,
) -> Iterable[AncestryRecord]:
"""
Iteratively generate nonaggregate records in chunks, each chunk sharing a common collection LID. This
prevents the need to simultaneously store data in memory for a large volume of nonaggregate records.

After non-aggregate records are generated, the corresponding collections' records are updated, such that they are
only processed and marked up-to-date if their non-aggregates have successfully been updated.
"""

collection_records_by_lid = bin_elements(all_collections_records, lambda r: r.lidvid.lid)

for lid, collections_records_for_lid in collection_records_by_lid.items():
if all([record.skip_write for record in collections_records_for_lid]):
log.info(f"Skipping updates for up-to-date collection family: {str(lid)}")
continue
else:
log.info(
f"Processing all versions of collection {str(lid)}: {[str(id) for id in sorted([r.lidvid for r in collections_records_for_lid])]}"
)

for non_aggregate_record in get_nonaggregate_ancestry_records_for_collection_lid(
client, lid, collections_records_for_lid, registry_db_mock
):
yield non_aggregate_record

for collection_record in collections_records_for_lid:
yield collection_record


def get_nonaggregate_ancestry_records(
client: OpenSearch,
collection_ancestry_records: Iterable[AncestryRecord],
Expand All @@ -195,6 +233,64 @@ def get_nonaggregate_ancestry_records(
return f(client, collection_ancestry_records, registry_db_mock)


def get_nonaggregate_ancestry_records_for_collection_lid(
client: OpenSearch,
collection_lid: PdsLid,
collection_ancestry_records: Iterable[AncestryRecord],
registry_db_mock: DbMockTypeDef = None,
) -> Iterable[AncestryRecord]:
log.info(
f"Generating AncestryRecords for non-aggregate products of collections with LID {str(collection_lid)}, using non-chunked input/output..."
)

# Generate lookup for the parent bundles of all collections - these will be applied to non-aggregate products too.
bundle_ancestry_by_collection_lidvid = {
record.lidvid: record.parent_bundle_lidvids for record in collection_ancestry_records
}

collection_refs_query_docs = get_nonaggregate_ancestry_records_for_collection_lid_query(
client, collection_lid, registry_db_mock
)

nonaggregate_ancestry_records_by_lidvid = {}
# For each collection, add the collection and its bundle ancestry to all products the collection contains
for doc in collection_refs_query_docs:
try:
if doc["_id"].split("::")[2].startswith("S"):
log.info(f'Skipping secondary-collection document {doc["_id"]}')
continue

collection_lidvid = PdsLidVid.from_string(doc["_source"]["collection_lidvid"])
nonaggregate_lidvids = [PdsLidVid.from_string(s) for s in doc["_source"]["product_lidvid"]]
except (ValueError, KeyError) as err:
log.warning(
'Failed to parse collection and/or product LIDVIDs from document in index "%s" with id "%s" due to %s: %s',
doc.get("_index"),
doc.get("_id"),
type(err).__name__,
err,
)
continue

try:
bundle_ancestry = bundle_ancestry_by_collection_lidvid[collection_lidvid]
except KeyError:
log.debug(
f'Failed to resolve history for page {doc.get("_id")} in index {doc.get("_index")} with collection_lidvid {collection_lidvid} - no such collection exists in registry.'
)
continue

for lidvid in nonaggregate_lidvids:
if lidvid not in nonaggregate_ancestry_records_by_lidvid:
nonaggregate_ancestry_records_by_lidvid[lidvid] = AncestryRecord(lidvid=lidvid)

record = nonaggregate_ancestry_records_by_lidvid[lidvid]
record.parent_bundle_lidvids.update(bundle_ancestry)
record.parent_collection_lidvids.add(collection_lidvid)

return nonaggregate_ancestry_records_by_lidvid.values()


def _get_nonaggregate_ancestry_records_without_chunking(
client: OpenSearch,
collection_ancestry_records: Iterable[AncestryRecord],
Expand All @@ -213,8 +309,11 @@ def _get_nonaggregate_ancestry_records_without_chunking(
# For each collection, add the collection and its bundle ancestry to all products the collection contains
for doc in collection_refs_query_docs:
try:
if doc["_id"].split("::")[2].startswith("S"):
log.info(f'Skipping secondary-collection document {doc["_id"]}')
continue

collection_lidvid = PdsLidVid.from_string(doc["_source"]["collection_lidvid"])
bundle_ancestry = bundle_ancestry_by_collection_lidvid[collection_lidvid]
nonaggregate_lidvids = [PdsLidVid.from_string(s) for s in doc["_source"]["product_lidvid"]]
except (ValueError, KeyError) as err:
log.warning(
Expand All @@ -226,6 +325,14 @@ def _get_nonaggregate_ancestry_records_without_chunking(
)
continue

try:
bundle_ancestry = bundle_ancestry_by_collection_lidvid[collection_lidvid]
except KeyError:
log.debug(
f'Failed to resolve history for page {doc.get("_id")} in index {doc.get("_index")} with collection_lidvid {collection_lidvid} - no such collection exists in registry.'
)
continue

for lidvid in nonaggregate_lidvids:
if lidvid not in nonaggregate_ancestry_records_by_lidvid:
nonaggregate_ancestry_records_by_lidvid[lidvid] = AncestryRecord(lidvid=lidvid)
Expand Down Expand Up @@ -264,8 +371,8 @@ def _get_nonaggregate_ancestry_records_with_chunking(
user_configured_max_memory_usage = AncestryRuntimeConstants.max_acceptable_memory_usage
available_processing_memory = user_configured_max_memory_usage - baseline_memory_usage
disk_dump_memory_threshold = baseline_memory_usage + (
available_processing_memory / 2.5
) # peak expected memory use is during merge, where two dump files are open simultaneously. 0.5 added for overhead after testing revealed 2.0 was insufficient
available_processing_memory / 3.0
) # peak expected memory use is during merge, where two dump files are open simultaneously. 1.0 added for overhead after testing revealed 2.5 was insufficient
log.info(
f"Max memory use set at {user_configured_max_memory_usage}% - dumps will trigger when memory usage reaches {disk_dump_memory_threshold:.1f}%"
)
Expand All @@ -275,13 +382,21 @@ def _get_nonaggregate_ancestry_records_with_chunking(

most_recent_attempted_collection_lidvid: Union[PdsLidVid, None] = None
nonaggregate_ancestry_records_by_lidvid = {}

for doc in collection_refs_query_docs:
try:
collection_lidvid = PdsLidVid.from_string(doc["_source"]["collection_lidvid"])
most_recent_attempted_collection_lidvid = collection_lidvid
for nonaggregate_lidvid_str in doc["_source"]["product_lidvid"]:

try:
bundle_ancestry = bundle_ancestry_by_collection_lidvid[collection_lidvid]
except KeyError:
log.debug(
f'Failed to resolve history for page {doc.get("_id")} in index {doc.get("_index")} with collection_lidvid {collection_lidvid} - no such collection exists in registry.'
)
continue

for nonaggregate_lidvid_str in doc["_source"]["product_lidvid"]:
if nonaggregate_lidvid_str not in nonaggregate_ancestry_records_by_lidvid:
nonaggregate_ancestry_records_by_lidvid[nonaggregate_lidvid_str] = {
"lidvid": nonaggregate_lidvid_str,
Expand All @@ -294,7 +409,7 @@ def _get_nonaggregate_ancestry_records_with_chunking(
record_dict["parent_collection_lidvids"].add(str(collection_lidvid))

if psutil.virtual_memory().percent >= disk_dump_memory_threshold:
log.debug(
log.info(
f"Memory threshold {disk_dump_memory_threshold:.1f}% reached - dumping serialized history to disk for {len(nonaggregate_ancestry_records_by_lidvid)} products"
)
make_history_serializable(nonaggregate_ancestry_records_by_lidvid)
Expand All @@ -303,6 +418,7 @@ def _get_nonaggregate_ancestry_records_with_chunking(
chunk_size_max, sys.getsizeof(nonaggregate_ancestry_records_by_lidvid)
) # slightly problematic due to reuse of pointers vs actual values, but let's try it
nonaggregate_ancestry_records_by_lidvid = {}
last_dump_time = datetime.now()

# mark collection for metadata update
touched_ref_documents.append(
Expand Down
31 changes: 31 additions & 0 deletions src/pds/registrysweepers/ancestry/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION_METADATA_KEY
from pds.registrysweepers.utils.db import get_query_hits_count
from pds.registrysweepers.utils.db import query_registry_db_or_mock
from pds.registrysweepers.utils.productidentifiers.pdslid import PdsLid

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -93,6 +94,36 @@ def get_nonaggregate_ancestry_records_query(client: OpenSearch, registry_db_mock
return docs


def get_nonaggregate_ancestry_records_for_collection_lid_query(
client: OpenSearch, collection_lid: PdsLid, registry_db_mock: DbMockTypeDef
) -> Iterable[Dict]:
# Query the registry-refs index for the contents of all collections
query: Dict = {
"query": {
"bool": {
"must_not": [{"range": {SWEEPERS_ANCESTRY_VERSION_METADATA_KEY: {"gte": SWEEPERS_ANCESTRY_VERSION}}}],
"filter": [{"term": {"collection_lid": str(collection_lid)}}],
}
},
"seq_no_primary_term": True,
}
_source = {"includes": ["collection_lidvid", "batch_id", "product_lidvid"]}
query_f = query_registry_db_or_mock(registry_db_mock, "get_nonaggregate_ancestry_records", use_search_after=True)

# each document will have many product lidvids, so a smaller page size is warranted here
docs = query_f(
client,
"registry-refs",
query,
_source,
page_size=AncestryRuntimeConstants.nonaggregate_ancestry_records_query_page_size,
request_timeout_seconds=30,
sort_fields=["collection_lidvid", "batch_id"],
)

return docs


_orphaned_docs_query = {
"query": {
"bool": {"must_not": [{"range": {SWEEPERS_ANCESTRY_VERSION_METADATA_KEY: {"gte": SWEEPERS_ANCESTRY_VERSION}}}]}
Expand Down
Loading
Loading