Skip to content

Commit

Permalink
synchronization of solr legacy registry in new legacy_registry index …
Browse files Browse the repository at this point in the history
…in Opensearch, for new registry load status
  • Loading branch information
thomas loubrieu committed Aug 30, 2023
1 parent 5a1ca1e commit a0077aa
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 1 deletion.
21 changes: 20 additions & 1 deletion docker/sweepers_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,15 @@

import functools
import inspect
import argparse
import json
import logging
import os
from datetime import datetime
from typing import Callable

from pds.registrysweepers import provenance, ancestry, repairkit
from pds.registrysweepers import provenance, ancestry, repairkit, legacy_registry_sync

from pds.registrysweepers.utils import configure_logging, get_human_readable_elapsed_since, parse_log_level

configure_logging(filepath=None, log_level=logging.INFO)
Expand Down Expand Up @@ -108,13 +110,24 @@ def run_factory(sweeper_f: Callable) -> Callable:
)


parser = argparse.ArgumentParser(
prog='registry-sweepers',
description='sweeps the PDS registry with different routines meant to run regularly on the database'
)
parser.add_argument('--legacy-sync', action='store_true')
args = parser.parse_args()

# Define sweepers to be run here, in order of execution
sweepers = [
repairkit.run,
provenance.run,
ancestry.run
]

optional_sweepers = {
'legacy_sync': legacy_registry_sync.run
}

sweeper_descriptions = [inspect.getmodule(f).__name__ for f in sweepers]
log.info(f'Running sweepers: {sweeper_descriptions}')

Expand All @@ -124,4 +137,10 @@ def run_factory(sweeper_f: Callable) -> Callable:
run_sweeper_f = run_factory(sweeper)
run_sweeper_f()

for o, s in optional_sweepers:
if hasattr(args, o):
run_sweeper_f = run_factory(s)
run_sweeper_f()


log.info(f'Sweepers successfully executed in {get_human_readable_elapsed_since(execution_begin)}')
1 change: 1 addition & 0 deletions src/pds/registrysweepers/legacy_registry_sync/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .legacy_registry_sync import run # noqa
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from typing import Union
from solr_to_es.solrSource import SlowSolrDocs
from elasticsearch import Elasticsearch
import elasticsearch.helpers
import logging
from pds.registrysweepers.utils import configure_logging
from pds.registrysweepers.legacy_registry_sync.solr_doc_export_to_opensearch import SolrOsWrapperIter
from pds.registrysweepers.legacy_registry_sync.opensearch_loaded_product import get_already_loaded_lidvids

log = logging.getLogger(__name__)

SOLR_URL = 'https://pds.nasa.gov/services/search/search'
OS_INDEX = "legacy_registry"


def create_legacy_registry_index(es_conn=None):
if not es_conn.indices.exists(OS_INDEX):
log.info("create index %s", OS_INDEX)
_ = es_conn.indices.create(
index=OS_INDEX,
body={}
)
log.info("index created %s", OS_INDEX)


def run(
base_url: str,
username: str,
password: str,
verify_host_certs: bool = True,
log_filepath: Union[str, None] = None,
log_level: int = logging.INFO,
):

configure_logging(filepath=log_filepath, log_level=log_level)

es_conn = Elasticsearch(
hosts=base_url,
verify_certs=verify_host_certs,
http_auth=(username, password)
)

solr_itr = SlowSolrDocs(SOLR_URL, "*", rows=100, )

create_legacy_registry_index(es_conn=es_conn)

prod_ids = get_already_loaded_lidvids(
product_classes=["Product_Context", "Product_Collection", "Product_Bundle"],
es_conn=es_conn
)

es_actions = SolrOsWrapperIter(solr_itr, OS_INDEX, found_ids=prod_ids)
for ok, item in elasticsearch.helpers.streaming_bulk(
es_conn,
es_actions,
chunk_size=50,
max_chunk_bytes=50000000,
max_retries=5,
initial_backoff=10):
if not ok:
log.error(item)



Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import elasticsearch

def get_cross_cluster_indices():
# use the CCS connection aliases
clusters = [
"atm-prod-ccs",
"geo-prod-ccs",
"img-prod-ccs",
"naif-prod-ccs",
"ppi-prod-ccs",
"psa-prod",
"rms-prod",
"sbnpsi-prod-ccs",
"sbnumd-prod-ccs"
]
indices = ["registry"]
indices.extend([f"{c}:registry" for c in clusters])
return indices


def get_already_loaded_lidvids(product_classes=[], es_conn=None):

query = {
"query": {
"bool": {
"should": [],
"minimum_should_match": 1
}
},
"fields": ["_id"]
}

prod_class_prop = "pds:Identification_Area/pds:product_class"
query["query"]["bool"]["should"] = [
dict(match_phrase={prod_class_prop: prod_class}) for prod_class in product_classes
]

prod_id_resp = elasticsearch.helpers.scan(
es_conn,
index=get_cross_cluster_indices(),
query=query,
scroll="3m")
return [p["_id"] for p in prod_id_resp]
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import os
import logging
from datetime import datetime

log = logging.getLogger(__name__)

NODE_FOLDERS = {
"atmos": "PDS_ATM",
"en": "PDS_ENG",
"geo": "PDS_GEO",
"img": "PDS_IMG",
"naif": "PDS_NAIF",
"ppi": "PDS_PPI",
"rings": "PDS_RMS",
"rs": "PDS_RS",
"sbn": "PDS_SBN",
}


class MissingIdentifierError(Exception):
pass


def pds4_id_field_fun(doc):
if 'version_id' in doc:
return doc['identifier'] + '::' + doc['version_id'][-1]
elif 'identifier' in doc:
return doc['identifier']
else:
raise MissingIdentifierError()


def get_node_from_file_ref(file_ref: str):
file_path = os.fspath(file_ref)
path = os.path.normpath(file_path)
dirs = path.split(os.sep)
return NODE_FOLDERS.get(dirs[4], "PDS_EN")


class SolrOsWrapperIter:
def __init__(self, solr_itr, es_index, found_ids=None):
self.index = es_index
self.type = "_doc"
self.id_field_fun = pds4_id_field_fun
self.found_ids = found_ids
self.solr_itr = iter(solr_itr)

def __iter__(self):
return self

def solrDoc_to_osDoc(self, doc):
new_doc = dict()
new_doc['_index'] = self.index
new_doc['_type'] = self.type

# remove empty fields
new_doc['_source'] = {}
for k, v in doc.items():
# get the node from the data string
# for example : /data/pds4/releases/ppi/galileo-traj-jup-20230818
if k == "file_ref_location":
new_doc['_source']['node'] = get_node_from_file_ref(v[0])

# manage dates
if "date" in k:
# only keep the latest modification date, for kibana
if k == "modification_date":
v = [v[-1]]

# validate dates
try:
datetime.fromisoformat(v[0].replace("Z", ""))
new_doc['_source'][k] = v
except ValueError:
log.warning("Date %s for field %s is invalid", v, k)
elif "year" in k:
if len(v[0]) > 0:
new_doc['_source'][k] = v
else:
log.warning("Year %s for field %s is invalid", v, k)
else:
new_doc['_source'][k] = v

if self.id_field_fun:
id = self.id_field_fun(doc)
new_doc['_id'] = id
new_doc['_source']['found_in_registry'] = "true" if id in self.found_ids else "false"
return new_doc

def __next__(self):
while True:
try:
doc = next(self.solr_itr)
return self.solrDoc_to_osDoc(doc)
except MissingIdentifierError as e:
log.warning(str(e))

0 comments on commit a0077aa

Please sign in to comment.