Skip to content

Commit

Permalink
[incomplete] initial multitenancy implementation - does not actually …
Browse files Browse the repository at this point in the history
…resolve index names
  • Loading branch information
alexdunnjpl committed Apr 18, 2024
1 parent 8c3ed6c commit def5a14
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 23 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The ancestry sweeper generates membership metadata for each product, i.e. which

#### Environment Variables
```
MULTITENANCY_NODE_ID= // If running in a multitenant environment, the id of the node, used to distinguish registry/registry-refs index instances
PROV_CREDENTIALS={"admin": "admin"} // OpenSearch username/password
PROV_ENDPOINT=https://localhost:9200 // OpenSearch host url and port
LOGLEVEL - an integer log level or anycase string matching a python log level like `INFO` (optional - defaults to `INFO`))
Expand Down
12 changes: 7 additions & 5 deletions src/pds/registrysweepers/ancestry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from pds.registrysweepers.utils.db import write_updated_docs
from pds.registrysweepers.utils.db.client import get_opensearch_client
from pds.registrysweepers.utils.db.indexing import ensure_index_mapping
from pds.registrysweepers.utils.db.multitenancy import resolve_multitenant_index_name
from pds.registrysweepers.utils.db.update import Update
from pds.registrysweepers.utils.productidentifiers.pdslidvid import PdsLidVid

Expand Down Expand Up @@ -70,18 +71,18 @@ def run(
METADATA_PARENT_COLLECTION_KEY,
SWEEPERS_ANCESTRY_VERSION_METADATA_KEY,
]:
ensure_index_mapping(client, "registry", metadata_key, "keyword")
ensure_index_mapping(client, resolve_multitenant_index_name("registry"), metadata_key, "keyword")

for metadata_key in [
SWEEPERS_ANCESTRY_VERSION_METADATA_KEY,
]:
ensure_index_mapping(client, "registry-refs", metadata_key, "keyword")
ensure_index_mapping(client, resolve_multitenant_index_name("registry-refs"), metadata_key, "keyword")

log.info("Writing bulk updates to database...")
write_updated_docs(
client,
updates,
index_name="registry",
index_name=resolve_multitenant_index_name("registry"),
)
log.info("Generating updates from deferred records...")
deferred_updates = generate_deferred_updates(client, deferred_records_file.name, registry_mock_query_f)
Expand All @@ -90,15 +91,16 @@ def run(
write_updated_docs(
client,
deferred_updates,
index_name="registry",
index_name=resolve_multitenant_index_name("registry"),
)
else:
# consume generator to dump bulk updates to sink
for _ in updates:
pass

log.info("Checking indexes for orphaned documents")
for index_name in ["registry", "registry-refs"]:
index_names = [resolve_multitenant_index_name(index_label) for index_label in ["registry", "registry-refs"]]
for index_name in index_names:
if log.isEnabledFor(logging.DEBUG):
orphaned_docs = get_orphaned_documents(client, registry_mock_query_f, index_name)
orphaned_doc_ids = [doc.get("_id") for doc in orphaned_docs]
Expand Down
7 changes: 5 additions & 2 deletions src/pds/registrysweepers/ancestry/generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION_METADATA_KEY
from pds.registrysweepers.utils.db import Update
from pds.registrysweepers.utils.db import write_updated_docs
from pds.registrysweepers.utils.db.multitenancy import resolve_multitenant_index_name
from pds.registrysweepers.utils.misc import bin_elements
from pds.registrysweepers.utils.misc import coerce_list_type
from pds.registrysweepers.utils.productidentifiers.factory import PdsProductIdentifierFactory
Expand Down Expand Up @@ -424,7 +425,7 @@ def _get_nonaggregate_ancestry_records_with_chunking(
isinstance(err, KeyError)
and most_recent_attempted_collection_lidvid not in bundle_ancestry_by_collection_lidvid
):
probable_cause = f'[Probable Cause]: Collection primary document with id "{doc["_source"].get("collection_lidvid")}" not found in index "registry" for registry-refs doc with id "{doc.get("_id")}"'
probable_cause = f'[Probable Cause]: Collection primary document with id "{doc["_source"].get("collection_lidvid")}" not found in index {resolve_multitenant_index_name("registry")} for {resolve_multitenant_index_name("registry-refs")} doc with id "{doc.get("_id")}"'
elif isinstance(err, ValueError):
probable_cause = f'[Probable Cause]: Failed to parse collection and/or product LIDVIDs from document with id "{doc.get("_id")}" in index "{doc.get("_index")}" due to {type(err).__name__}: {err}'
else:
Expand Down Expand Up @@ -488,5 +489,7 @@ def generate_update(doc: RefDocBookkeepingEntry) -> Update:
logging.info(
f"Updating {len(docs)} registry-refs docs with {SWEEPERS_ANCESTRY_VERSION_METADATA_KEY}={SWEEPERS_ANCESTRY_VERSION}"
)
write_updated_docs(client, updates, index_name="registry-refs", bulk_chunk_max_update_count=20000)
write_updated_docs(
client, updates, index_name=resolve_multitenant_index_name("registry-refs"), bulk_chunk_max_update_count=20000
)
logging.info("registry-refs metadata update complete")
15 changes: 8 additions & 7 deletions src/pds/registrysweepers/ancestry/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION_METADATA_KEY
from pds.registrysweepers.utils.db import get_query_hits_count
from pds.registrysweepers.utils.db import query_registry_db_or_mock
from pds.registrysweepers.utils.db.multitenancy import resolve_multitenant_index_name
from pds.registrysweepers.utils.productidentifiers.pdslid import PdsLid
from pds.registrysweepers.utils.productidentifiers.pdslidvid import PdsLidVid

Expand Down Expand Up @@ -41,7 +42,7 @@ def get_bundle_ancestry_records_query(client: OpenSearch, db_mock: DbMockTypeDef
query = product_class_query_factory(ProductClass.BUNDLE)
_source = {"includes": ["lidvid", SWEEPERS_ANCESTRY_VERSION_METADATA_KEY]}
query_f = query_registry_db_or_mock(db_mock, "get_bundle_ancestry_records", use_search_after=True)
docs = query_f(client, "registry", query, _source)
docs = query_f(client, resolve_multitenant_index_name("registry"), query, _source)

return docs

Expand All @@ -50,7 +51,7 @@ def get_collection_ancestry_records_bundles_query(client: OpenSearch, db_mock: D
query = product_class_query_factory(ProductClass.BUNDLE)
_source = {"includes": ["lidvid", "ref_lid_collection"]}
query_f = query_registry_db_or_mock(db_mock, "get_collection_ancestry_records_bundles", use_search_after=True)
docs = query_f(client, "registry", query, _source)
docs = query_f(client, resolve_multitenant_index_name("registry"), query, _source)

return docs

Expand All @@ -62,7 +63,7 @@ def get_collection_ancestry_records_collections_query(
query = product_class_query_factory(ProductClass.COLLECTION)
_source = {"includes": ["lidvid", SWEEPERS_ANCESTRY_VERSION_METADATA_KEY]}
query_f = query_registry_db_or_mock(db_mock, "get_collection_ancestry_records_collections", use_search_after=True)
docs = query_f(client, "registry", query, _source)
docs = query_f(client, resolve_multitenant_index_name("registry"), query, _source)

return docs

Expand All @@ -83,7 +84,7 @@ def get_nonaggregate_ancestry_records_query(client: OpenSearch, registry_db_mock
# each document will have many product lidvids, so a smaller page size is warranted here
docs = query_f(
client,
"registry-refs",
resolve_multitenant_index_name("registry-refs"),
query,
_source,
page_size=AncestryRuntimeConstants.nonaggregate_ancestry_records_query_page_size,
Expand Down Expand Up @@ -117,7 +118,7 @@ def get_nonaggregate_ancestry_records_for_collection_lid_query(
# each document will have many product lidvids, so a smaller page size is warranted here
docs = query_f(
client,
"registry-refs",
resolve_multitenant_index_name("registry-refs"),
query,
_source,
page_size=AncestryRuntimeConstants.nonaggregate_ancestry_records_query_page_size,
Expand All @@ -142,7 +143,7 @@ def get_orphaned_documents(client: OpenSearch, registry_db_mock: DbMockTypeDef,
query_f = query_registry_db_or_mock(registry_db_mock, "get_orphaned_ancestry_docs", use_search_after=True)

sort_fields_override = (
["collection_lidvid", "batch_id"] if index_name == "registry-refs" else None
["collection_lidvid", "batch_id"] if "registry-refs" in index_name else None
) # use default for registry

docs = query_f(client, index_name, _orphaned_docs_query, _source, sort_fields=sort_fields_override)
Expand Down Expand Up @@ -184,7 +185,7 @@ def get_existing_ancestry_for_product(

docs = query_f(
client,
"registry",
resolve_multitenant_index_name("registry"),
query,
_source,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import json
import logging
import sys
import opensearchpy.helpers

from opensearchpy import OpenSearch
from time import sleep
from typing import Union

import opensearchpy.helpers
from opensearchpy import OpenSearch
from pds.registrysweepers.legacy_registry_sync.opensearch_loaded_product import get_already_loaded_lidvids
from pds.registrysweepers.legacy_registry_sync.solr_doc_export_to_opensearch import SolrOsWrapperIter
from pds.registrysweepers.utils import configure_logging
Expand All @@ -17,6 +17,7 @@
OS_INDEX = "legacy_registry"
MAX_RETRIES = 5


def create_legacy_registry_index(es_conn=None):
"""
Creates if not already created the legacy_registry index.
Expand Down Expand Up @@ -61,4 +62,3 @@ def run(
):
if not ok:
log.error(item)

5 changes: 3 additions & 2 deletions src/pds/registrysweepers/provenance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
from pds.registrysweepers.utils.db import query_registry_db_with_search_after
from pds.registrysweepers.utils.db import write_updated_docs
from pds.registrysweepers.utils.db.client import get_opensearch_client
from pds.registrysweepers.utils.db.multitenancy import resolve_multitenant_index_name
from pds.registrysweepers.utils.db.update import Update
from pds.registrysweepers.utils.productidentifiers.pdslid import PdsLid

Expand All @@ -72,7 +73,7 @@ def get_records(client: OpenSearch) -> Iterable[ProvenanceRecord]:
}
_source = {"includes": ["lidvid", METADATA_SUCCESSOR_KEY, SWEEPERS_PROVENANCE_VERSION_METADATA_KEY]}

docs = query_registry_db_with_search_after(client, "registry", query, _source)
docs = query_registry_db_with_search_after(client, resolve_multitenant_index_name("registry"), query, _source)

for doc in docs:
try:
Expand Down Expand Up @@ -140,7 +141,7 @@ def run(
write_updated_docs(
client,
updates,
index_name="registry",
index_name=resolve_multitenant_index_name("registry"),
)

log.info("Completed provenance sweeper processing!")
Expand Down
13 changes: 10 additions & 3 deletions src/pds/registrysweepers/repairkit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from pds.registrysweepers.utils.db import write_updated_docs
from pds.registrysweepers.utils.db.client import get_opensearch_client
from pds.registrysweepers.utils.db.indexing import ensure_index_mapping
from pds.registrysweepers.utils.db.multitenancy import resolve_multitenant_index_name
from pds.registrysweepers.utils.db.update import Update

"""
Expand Down Expand Up @@ -95,10 +96,16 @@ def run(

# page_size and bulk_chunk_max_update_count constraints are necessary to avoid choking nodes with very-large docs
# i.e. ATM and GEO
all_docs = query_registry_db_with_search_after(client, "registry", unprocessed_docs_query, {}, page_size=5000)
all_docs = query_registry_db_with_search_after(
client, resolve_multitenant_index_name("registry"), unprocessed_docs_query, {}, page_size=5000
)
updates = generate_updates(all_docs, SWEEPERS_REPAIRKIT_VERSION_METADATA_KEY, SWEEPERS_REPAIRKIT_VERSION)
ensure_index_mapping(client, "registry", SWEEPERS_REPAIRKIT_VERSION_METADATA_KEY, "integer")
write_updated_docs(client, updates, index_name="registry", bulk_chunk_max_update_count=20000)
ensure_index_mapping(
client, resolve_multitenant_index_name("registry"), SWEEPERS_REPAIRKIT_VERSION_METADATA_KEY, "integer"
)
write_updated_docs(
client, updates, index_name=resolve_multitenant_index_name("registry"), bulk_chunk_max_update_count=20000
)

log.info("Repairkit sweeper processing complete!")

Expand Down
14 changes: 14 additions & 0 deletions src/pds/registrysweepers/utils/db/multitenancy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import os


def resolve_multitenant_index_name(index_type: str):
supported_index_types = {"registry", "registry-refs"}
node_id = os.environ.get("MULTITENANCY_NODE_ID", "").strip(" ")

if node_id == "":
return index_type
elif index_type not in supported_index_types:
raise ValueError(f'index_type "{index_type}" not supported (expected one of {supported_index_types})')
else:
raise NotImplementedError("the format has not yet been confirmed")
return f"{node_id}_{index_type}"

0 comments on commit def5a14

Please sign in to comment.