Skip to content

Commit

Permalink
Merge pull request #68 from NASA-PDS/solr_sync_58
Browse files Browse the repository at this point in the history
Synchronization of Legacy Solr Registry in the new
  • Loading branch information
tloubrieu-jpl authored Sep 12, 2023
2 parents d04bda4 + 1f01794 commit aa747c6
Show file tree
Hide file tree
Showing 9 changed files with 270 additions and 11 deletions.
5 changes: 5 additions & 0 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ FROM python:3.10-slim-bullseye
ENV INSTALL_WORKDIR=/tmp/registry-sweepers

COPY . $INSTALL_WORKDIR

RUN apt-get update && \
apt-get upgrade -y && \
apt-get install -y git

RUN pip install $INSTALL_WORKDIR \
&& cp $INSTALL_WORKDIR/docker/sweepers_driver.py /usr/local/bin/ \
&& rm -r $INSTALL_WORKDIR
Expand Down
12 changes: 6 additions & 6 deletions docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ Requires a running deployment of registry

To build and run (assuming registry local-dev defaults for host/credentials)

cd path/to/registry-sweepers
docker build -t registry-sweepers .
docker run -e PROV_ENDPOINT='https://localhost:9200/' -e PROV_CREDENTIALS='{"admin": "admin"}' registry-sweepers
cd path/to/registry-sweepers/
docker image build --tag registry-sweepers --file ./docker/Dockerfile .
docker run --env PROV_ENDPOINT='https://localhost:9200/' --env PROV_CREDENTIALS='{"admin": "admin"}' registry-sweepers

### Release of new versions

To release a new version for I&T, an updated image must be built and published to Docker Hub at `nasapds/registry-sweepers`

cd path/to/registry-sweepers
docker build -t nasapds/registry-sweepers .
docker push nasapds/registry-sweepers
cd path/to/registry-sweepers/docker
docker image build --tag nasapds/registry-sweepers:{version} --file ./docker/Dockerfile .
docker image push nasapds/registry-sweepers:{version}

### Production Deployment

Expand Down
23 changes: 21 additions & 2 deletions docker/sweepers_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,14 @@

import functools
import inspect
import argparse
import json
import logging
import os
from datetime import datetime
from typing import Callable

from pds.registrysweepers import provenance, ancestry, repairkit
from pds.registrysweepers import provenance, ancestry, repairkit, legacy_registry_sync
from pds.registrysweepers.utils import configure_logging, parse_log_level
from pds.registrysweepers.utils.db.client import get_opensearch_client_from_environment
from pds.registrysweepers.utils.misc import get_human_readable_elapsed_since
Expand Down Expand Up @@ -107,13 +108,31 @@ def run_factory(sweeper_f: Callable) -> Callable:
)


# Define sweepers to be run here, in order of execution
parser = argparse.ArgumentParser(
prog='registry-sweepers',
description='sweeps the PDS registry with different routines meant to run regularly on the database'
)

# define optional sweepers
parser.add_argument('--legacy-sync', action='store_true')
optional_sweepers = {
'legacy_sync': legacy_registry_sync.run
}

args = parser.parse_args()


# Define default sweepers to be run here, in order of execution
sweepers = [
repairkit.run,
provenance.run,
ancestry.run
]

for option, sweeper in optional_sweepers.items():
if getattr(args, option):
sweepers.append(sweeper)

sweeper_descriptions = [inspect.getmodule(f).__name__ for f in sweepers]
log.info(f'Running sweepers: {sweeper_descriptions}')

Expand Down
4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[metadata]
name = registry-sweepers
name = pds.registry-sweepers
author = PDS
author_email = [email protected]
description = A set of utility scripts which transform/augment PDS registry metadata to support additional capabilities.
Expand Down Expand Up @@ -29,13 +29,13 @@ install_requires =
opensearch-py~=2.3.1
requests~=2.28
retry~=0.9.2
solr-to-es @ git+https://github.com/o19s/solr-to-es@fc758840b07873de0fcaa2ccd675c79bcafc0b99


# Change this to False if you use things like __file__ or __path__—which you
# shouldn't use anyway, because that's what ``pkg_resources`` is for 🙂
zip_safe = True
include_package_data = True
namespace_packages = pds
# base directory for code is in src/. Don't change this.
package_dir =
= src
Expand Down
1 change: 1 addition & 0 deletions src/pds/registrysweepers/legacy_registry_sync/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .legacy_registry_sync import run # noqa
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import logging
from typing import Union

import elasticsearch.helpers # type: ignore
from elasticsearch import Elasticsearch
from pds.registrysweepers.legacy_registry_sync.opensearch_loaded_product import get_already_loaded_lidvids
from pds.registrysweepers.legacy_registry_sync.solr_doc_export_to_opensearch import SolrOsWrapperIter
from pds.registrysweepers.utils import configure_logging
from solr_to_es.solrSource import SlowSolrDocs # type: ignore

log = logging.getLogger(__name__)

SOLR_URL = "https://pds.nasa.gov/services/search/search"
OS_INDEX = "legacy_registry"


def create_legacy_registry_index(es_conn=None):
"""
Creates if not already created the legacy_registry index.
@param es_conn: elasticsearch.ElasticSearch instance for the ElasticSearch or OpenSearch connection
@return:
"""
if not es_conn.indices.exists(OS_INDEX):
log.info("create index %s", OS_INDEX)
es_conn.indices.create(index=OS_INDEX, body={})
log.info("index created %s", OS_INDEX)


def run(
base_url: str,
username: str,
password: str,
verify_host_certs: bool = True,
log_filepath: Union[str, None] = None,
log_level: int = logging.INFO,
):
"""
Runs the Solr Legacy Registry synchronization with OpenSearch.
@param base_url: of the OpenSearch service
@param username:
@param password:
@param verify_host_certs:
@param log_filepath:
@param log_level:
@return:
"""

configure_logging(filepath=log_filepath, log_level=log_level)

es_conn = Elasticsearch(hosts=base_url, verify_certs=verify_host_certs, http_auth=(username, password))

solr_itr = SlowSolrDocs(SOLR_URL, "*", rows=100)

create_legacy_registry_index(es_conn=es_conn)

prod_ids = get_already_loaded_lidvids(
product_classes=["Product_Context", "Product_Collection", "Product_Bundle"], es_conn=es_conn
)

es_actions = SolrOsWrapperIter(solr_itr, OS_INDEX, found_ids=prod_ids)
for ok, item in elasticsearch.helpers.streaming_bulk(
es_conn, es_actions, chunk_size=50, max_chunk_bytes=50000000, max_retries=5, initial_backoff=10
):
if not ok:
log.error(item)
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import os

import elasticsearch # type: ignore

# Optional Environment variable used for the Cross Cluster Search
# connections aliases. Each element is separated by a ","
CCS_CONN = "CCS_CONN"


def get_cross_cluster_indices():
"""
Get the aliases for the Cross Cluster Search from an environment variable
@return: the aliases in a list including the main index.
"""

indices = ["registry"]

if CCS_CONN in os.environ:
clusters = os.environ[CCS_CONN].split(",")
indices.extend([f"{c}:registry" for c in clusters])

return indices


def get_already_loaded_lidvids(product_classes=None, es_conn=None):
"""
Get the lidvids of the PDS4 products already loaded in the (new) registry.
Note that this function should not be applied to the product classes Product_Observational or documents, there would be too many results.
@param product_classes: list of the product classes you are interested in,
e.g. "Product_Bundle", "Product_Collection" ...
@param es_conn: elasticsearch.ElasticSearch instance for the ElasticSearch or OpenSearch connection
@return: the list of the already loaded PDS4 lidvid
"""

query = {"query": {"bool": {"should": [], "minimum_should_match": 1}}, "fields": ["_id"]}

prod_class_prop = "pds:Identification_Area/pds:product_class"

if product_classes is not None:
query["query"]["bool"]["should"] = [
dict(match_phrase={prod_class_prop: prod_class}) for prod_class in product_classes
]

prod_id_resp = elasticsearch.helpers.scan(es_conn, index=get_cross_cluster_indices(), query=query, scroll="3m")
return [p["_id"] for p in prod_id_resp]
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import logging
import os
from datetime import datetime

log = logging.getLogger(__name__)

NODE_FOLDERS = {
"atmos": "PDS_ATM",
"en": "PDS_ENG",
"geo": "PDS_GEO",
"img": "PDS_IMG",
"naif": "PDS_NAIF",
"ppi": "PDS_PPI",
"rings": "PDS_RMS",
"rs": "PDS_RS",
"sbn": "PDS_SBN",
}


class MissingIdentifierError(Exception):
pass


def pds4_id_field_fun(doc):
"""
Compute the unique identifier in the new registry from a document in the legacy registry
@param doc: document from the legacy registry
@return: lidvid
"""
if "version_id" in doc:
return doc["identifier"] + "::" + doc["version_id"][-1]
elif "identifier" in doc:
return doc["identifier"]
else:
raise MissingIdentifierError()


def get_node_from_file_ref(file_ref: str):
"""
Thanks to the file system storage of the labels in the legacy registry we can retrieve the
Discipline Node in charge of each label.
@param file_ref: location of the XML PDS4 Label in the legacy registry
@return: the Discipline Node code used in the (new) registry.
"""
file_path = os.fspath(file_ref)
path = os.path.normpath(file_path)
dirs = path.split(os.sep)
return NODE_FOLDERS.get(dirs[4], "PDS_EN")


class SolrOsWrapperIter:
def __init__(self, solr_itr, es_index, found_ids=None):
"""
Iterable on the Solr legacy registry documents returning the migrated document for each iteration (next).
The migrated documents contains in addition to the Solr document properties:
- one identifier matching the one used in the new registry
- the Discipline Node responsible for the product
- a flag set to True if the current document was loaded in the new registry.
@param solr_itr: iterator on the solr documents. SlowSolrDocs instance from the solr-to-es repository
@param es_index: OpenSearch/ElasticSearch index name
@param found_ids: list of the lidvid already available in the new registry
"""
self.index = es_index
self.type = "_doc"
self.id_field_fun = pds4_id_field_fun
self.found_ids = found_ids
self.solr_itr = iter(solr_itr)

def __iter__(self):
return self

def solr_doc_to_os_doc(self, doc):
new_doc = dict()
new_doc["_index"] = self.index
new_doc["_type"] = self.type

# remove empty fields
new_doc["_source"] = {}
for k, v in doc.items():
# get the node from the data string
# for example : /data/pds4/releases/ppi/galileo-traj-jup-20230818
if k == "file_ref_location":
new_doc["_source"]["node"] = get_node_from_file_ref(v[0])

# manage dates
if "date" in k:
# only keep the latest modification date, for kibana
if k == "modification_date":
v = [v[-1]]

# validate dates
try:
datetime.fromisoformat(v[0].replace("Z", ""))
new_doc["_source"][k] = v
except ValueError:
log.warning("Date %s for field %s is invalid", v, k)
elif "year" in k:
if len(v[0]) > 0:
new_doc["_source"][k] = v
else:
log.warning("Year %s for field %s is invalid", v, k)
else:
new_doc["_source"][k] = v

if self.id_field_fun:
id = self.id_field_fun(doc)
new_doc["_id"] = id
new_doc["_source"]["found_in_registry"] = "true" if id in self.found_ids else "false"
return new_doc

def __next__(self):
while True:
try:
doc = next(self.solr_itr)
return self.solrDoc_to_osDoc(doc)
except MissingIdentifierError as e:
log.warning(str(e))
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ isolated_build = True

[testenv]
deps = .[dev]
whitelist_externals = pytest
allowlist_externals = pytest
commands = pytest

[testenv:docs]
Expand Down

0 comments on commit aa747c6

Please sign in to comment.