From a0077aa99c7aa1901979fb8934b5003b92575412 Mon Sep 17 00:00:00 2001 From: thomas loubrieu Date: Wed, 30 Aug 2023 19:41:02 -0400 Subject: [PATCH] synchronization of solr legacy registry in new legacy_registry index in Opensearch, for new registry load status --- docker/sweepers_driver.py | 21 +++- .../legacy_registry_sync/__init__.py | 1 + .../legacy_registry_sync.py | 64 ++++++++++++ .../opensearch_loaded_product.py | 43 ++++++++ .../solr_doc_export_to_opensearch.py | 97 +++++++++++++++++++ 5 files changed, 225 insertions(+), 1 deletion(-) create mode 100644 src/pds/registrysweepers/legacy_registry_sync/__init__.py create mode 100644 src/pds/registrysweepers/legacy_registry_sync/legacy_registry_sync.py create mode 100644 src/pds/registrysweepers/legacy_registry_sync/opensearch_loaded_product.py create mode 100644 src/pds/registrysweepers/legacy_registry_sync/solr_doc_export_to_opensearch.py diff --git a/docker/sweepers_driver.py b/docker/sweepers_driver.py index 16e3e47..02c25bb 100755 --- a/docker/sweepers_driver.py +++ b/docker/sweepers_driver.py @@ -56,13 +56,15 @@ import functools import inspect +import argparse import json import logging import os from datetime import datetime from typing import Callable -from pds.registrysweepers import provenance, ancestry, repairkit +from pds.registrysweepers import provenance, ancestry, repairkit, legacy_registry_sync + from pds.registrysweepers.utils import configure_logging, get_human_readable_elapsed_since, parse_log_level configure_logging(filepath=None, log_level=logging.INFO) @@ -108,6 +110,13 @@ def run_factory(sweeper_f: Callable) -> Callable: ) +parser = argparse.ArgumentParser( + prog='registry-sweepers', + description='sweeps the PDS registry with different routines meant to run regularly on the database' +) +parser.add_argument('--legacy-sync', action='store_true') +args = parser.parse_args() + # Define sweepers to be run here, in order of execution sweepers = [ repairkit.run, @@ -115,6 +124,10 @@ def run_factory(sweeper_f: Callable) -> Callable: ancestry.run ] +optional_sweepers = { + 'legacy_sync': legacy_registry_sync.run +} + sweeper_descriptions = [inspect.getmodule(f).__name__ for f in sweepers] log.info(f'Running sweepers: {sweeper_descriptions}') @@ -124,4 +137,10 @@ def run_factory(sweeper_f: Callable) -> Callable: run_sweeper_f = run_factory(sweeper) run_sweeper_f() +for o, s in optional_sweepers: + if hasattr(args, o): + run_sweeper_f = run_factory(s) + run_sweeper_f() + + log.info(f'Sweepers successfully executed in {get_human_readable_elapsed_since(execution_begin)}') diff --git a/src/pds/registrysweepers/legacy_registry_sync/__init__.py b/src/pds/registrysweepers/legacy_registry_sync/__init__.py new file mode 100644 index 0000000..a8f1370 --- /dev/null +++ b/src/pds/registrysweepers/legacy_registry_sync/__init__.py @@ -0,0 +1 @@ +from .legacy_registry_sync import run # noqa \ No newline at end of file diff --git a/src/pds/registrysweepers/legacy_registry_sync/legacy_registry_sync.py b/src/pds/registrysweepers/legacy_registry_sync/legacy_registry_sync.py new file mode 100644 index 0000000..fe93cba --- /dev/null +++ b/src/pds/registrysweepers/legacy_registry_sync/legacy_registry_sync.py @@ -0,0 +1,64 @@ +from typing import Union +from solr_to_es.solrSource import SlowSolrDocs +from elasticsearch import Elasticsearch +import elasticsearch.helpers +import logging +from pds.registrysweepers.utils import configure_logging +from pds.registrysweepers.legacy_registry_sync.solr_doc_export_to_opensearch import SolrOsWrapperIter +from pds.registrysweepers.legacy_registry_sync.opensearch_loaded_product import get_already_loaded_lidvids + +log = logging.getLogger(__name__) + +SOLR_URL = 'https://pds.nasa.gov/services/search/search' +OS_INDEX = "legacy_registry" + + +def create_legacy_registry_index(es_conn=None): + if not es_conn.indices.exists(OS_INDEX): + log.info("create index %s", OS_INDEX) + _ = es_conn.indices.create( + index=OS_INDEX, + body={} + ) + log.info("index created %s", OS_INDEX) + + +def run( + base_url: str, + username: str, + password: str, + verify_host_certs: bool = True, + log_filepath: Union[str, None] = None, + log_level: int = logging.INFO, +): + + configure_logging(filepath=log_filepath, log_level=log_level) + + es_conn = Elasticsearch( + hosts=base_url, + verify_certs=verify_host_certs, + http_auth=(username, password) + ) + + solr_itr = SlowSolrDocs(SOLR_URL, "*", rows=100, ) + + create_legacy_registry_index(es_conn=es_conn) + + prod_ids = get_already_loaded_lidvids( + product_classes=["Product_Context", "Product_Collection", "Product_Bundle"], + es_conn=es_conn + ) + + es_actions = SolrOsWrapperIter(solr_itr, OS_INDEX, found_ids=prod_ids) + for ok, item in elasticsearch.helpers.streaming_bulk( + es_conn, + es_actions, + chunk_size=50, + max_chunk_bytes=50000000, + max_retries=5, + initial_backoff=10): + if not ok: + log.error(item) + + + diff --git a/src/pds/registrysweepers/legacy_registry_sync/opensearch_loaded_product.py b/src/pds/registrysweepers/legacy_registry_sync/opensearch_loaded_product.py new file mode 100644 index 0000000..902b806 --- /dev/null +++ b/src/pds/registrysweepers/legacy_registry_sync/opensearch_loaded_product.py @@ -0,0 +1,43 @@ +import elasticsearch + +def get_cross_cluster_indices(): + # use the CCS connection aliases + clusters = [ + "atm-prod-ccs", + "geo-prod-ccs", + "img-prod-ccs", + "naif-prod-ccs", + "ppi-prod-ccs", + "psa-prod", + "rms-prod", + "sbnpsi-prod-ccs", + "sbnumd-prod-ccs" + ] + indices = ["registry"] + indices.extend([f"{c}:registry" for c in clusters]) + return indices + + +def get_already_loaded_lidvids(product_classes=[], es_conn=None): + + query = { + "query": { + "bool": { + "should": [], + "minimum_should_match": 1 + } + }, + "fields": ["_id"] + } + + prod_class_prop = "pds:Identification_Area/pds:product_class" + query["query"]["bool"]["should"] = [ + dict(match_phrase={prod_class_prop: prod_class}) for prod_class in product_classes + ] + + prod_id_resp = elasticsearch.helpers.scan( + es_conn, + index=get_cross_cluster_indices(), + query=query, + scroll="3m") + return [p["_id"] for p in prod_id_resp] diff --git a/src/pds/registrysweepers/legacy_registry_sync/solr_doc_export_to_opensearch.py b/src/pds/registrysweepers/legacy_registry_sync/solr_doc_export_to_opensearch.py new file mode 100644 index 0000000..802ea05 --- /dev/null +++ b/src/pds/registrysweepers/legacy_registry_sync/solr_doc_export_to_opensearch.py @@ -0,0 +1,97 @@ +import os +import logging +from datetime import datetime + +log = logging.getLogger(__name__) + +NODE_FOLDERS = { + "atmos": "PDS_ATM", + "en": "PDS_ENG", + "geo": "PDS_GEO", + "img": "PDS_IMG", + "naif": "PDS_NAIF", + "ppi": "PDS_PPI", + "rings": "PDS_RMS", + "rs": "PDS_RS", + "sbn": "PDS_SBN", +} + + +class MissingIdentifierError(Exception): + pass + + +def pds4_id_field_fun(doc): + if 'version_id' in doc: + return doc['identifier'] + '::' + doc['version_id'][-1] + elif 'identifier' in doc: + return doc['identifier'] + else: + raise MissingIdentifierError() + + +def get_node_from_file_ref(file_ref: str): + file_path = os.fspath(file_ref) + path = os.path.normpath(file_path) + dirs = path.split(os.sep) + return NODE_FOLDERS.get(dirs[4], "PDS_EN") + + +class SolrOsWrapperIter: + def __init__(self, solr_itr, es_index, found_ids=None): + self.index = es_index + self.type = "_doc" + self.id_field_fun = pds4_id_field_fun + self.found_ids = found_ids + self.solr_itr = iter(solr_itr) + + def __iter__(self): + return self + + def solrDoc_to_osDoc(self, doc): + new_doc = dict() + new_doc['_index'] = self.index + new_doc['_type'] = self.type + + # remove empty fields + new_doc['_source'] = {} + for k, v in doc.items(): + # get the node from the data string + # for example : /data/pds4/releases/ppi/galileo-traj-jup-20230818 + if k == "file_ref_location": + new_doc['_source']['node'] = get_node_from_file_ref(v[0]) + + # manage dates + if "date" in k: + # only keep the latest modification date, for kibana + if k == "modification_date": + v = [v[-1]] + + # validate dates + try: + datetime.fromisoformat(v[0].replace("Z", "")) + new_doc['_source'][k] = v + except ValueError: + log.warning("Date %s for field %s is invalid", v, k) + elif "year" in k: + if len(v[0]) > 0: + new_doc['_source'][k] = v + else: + log.warning("Year %s for field %s is invalid", v, k) + else: + new_doc['_source'][k] = v + + if self.id_field_fun: + id = self.id_field_fun(doc) + new_doc['_id'] = id + new_doc['_source']['found_in_registry'] = "true" if id in self.found_ids else "false" + return new_doc + + def __next__(self): + while True: + try: + doc = next(self.solr_itr) + return self.solrDoc_to_osDoc(doc) + except MissingIdentifierError as e: + log.warning(str(e)) +