Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronization of Legacy Solr Registry in the new #68

Merged
merged 7 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ FROM python:3.10-slim-bullseye
ENV INSTALL_WORKDIR=/tmp/registry-sweepers

COPY . $INSTALL_WORKDIR

RUN apt-get update && \
apt-get upgrade -y && \
apt-get install -y git

RUN pip install $INSTALL_WORKDIR \
&& cp $INSTALL_WORKDIR/docker/sweepers_driver.py /usr/local/bin/ \
&& rm -r $INSTALL_WORKDIR
Expand Down
10 changes: 5 additions & 5 deletions docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ Requires a running deployment of registry

To build and run (assuming registry local-dev defaults for host/credentials)

cd path/to/registry-sweepers
docker build -t registry-sweepers .
cd path/to/registry-sweepers/
docker build -t registry-sweepers docker --file ./docker/Dockerfile .
nutjob4life marked this conversation as resolved.
Show resolved Hide resolved
docker run -e PROV_ENDPOINT='https://localhost:9200/' -e PROV_CREDENTIALS='{"admin": "admin"}' registry-sweepers
nutjob4life marked this conversation as resolved.
Show resolved Hide resolved

### Release of new versions

To release a new version for I&T, an updated image must be built and published to Docker Hub at `nasapds/registry-sweepers`

cd path/to/registry-sweepers
docker build -t nasapds/registry-sweepers .
docker push nasapds/registry-sweepers
cd path/to/registry-sweepers/docker
docker build -t nasapds/registry-sweepers:{version} --file ./docker/Dockerfile .
nutjob4life marked this conversation as resolved.
Show resolved Hide resolved
docker push nasapds/registry-sweepers:{version}
nutjob4life marked this conversation as resolved.
Show resolved Hide resolved

### Production Deployment

Expand Down
20 changes: 19 additions & 1 deletion docker/sweepers_driver.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

request refactor - rather than defining sweepers and optional sweepers, then maintaining separate execution paths for each, it is preferable to

  1. define a collection of sweepers (either declaratively or by declaring the always-run sweepers then conditionally mutating that collection immediately to add the desired optional sweepers)
  2. run a common execution path for the set of all sweepers (always + conditional), treating them alike

if that's not clear, let me know and I'll modify it first-thing Tuesday when I'm back at work - it's a quick change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

request: use descriptive variable names in preference to o, s

request: use a more idiomatic approach to the optional conditional check

  • argument should set a binary flag with `action='store_true'
  • conditional check should use something like if sweeper_name in args and arg[sweeper_name] == True (I forget the syntax for pulling variable-named args from argparser, but you get the idea)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Alex, indeed I only tested with the option present.

Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,14 @@

import functools
import inspect
import argparse
import json
import logging
import os
from datetime import datetime
from typing import Callable

from pds.registrysweepers import provenance, ancestry, repairkit
from pds.registrysweepers import provenance, ancestry, repairkit, legacy_registry_sync
from pds.registrysweepers.utils import configure_logging, parse_log_level
from pds.registrysweepers.utils.misc import get_human_readable_elapsed_since

Expand Down Expand Up @@ -109,13 +110,24 @@ def run_factory(sweeper_f: Callable) -> Callable:
)


parser = argparse.ArgumentParser(
prog='registry-sweepers',
description='sweeps the PDS registry with different routines meant to run regularly on the database'
)
parser.add_argument('--legacy-sync', action='store_true')
args = parser.parse_args()

# Define sweepers to be run here, in order of execution
sweepers = [
repairkit.run,
provenance.run,
ancestry.run
]

optional_sweepers = {
'legacy_sync': legacy_registry_sync.run
}

sweeper_descriptions = [inspect.getmodule(f).__name__ for f in sweepers]
log.info(f'Running sweepers: {sweeper_descriptions}')

Expand All @@ -125,4 +137,10 @@ def run_factory(sweeper_f: Callable) -> Callable:
run_sweeper_f = run_factory(sweeper)
run_sweeper_f()

for o, s in optional_sweepers.items():
if hasattr(args, o):
run_sweeper_f = run_factory(s)
run_sweeper_f()


log.info(f'Sweepers successfully executed in {get_human_readable_elapsed_since(execution_begin)}')
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ classifiers =
install_requires =
requests~=2.28
retry~=0.9.2
solr-to-es @ git+https://github.com/o19s/solr-to-es@master
nutjob4life marked this conversation as resolved.
Show resolved Hide resolved


# Change this to False if you use things like __file__ or __path__—which you
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .legacy_registry_sync import run # noqa
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from typing import Union
nutjob4life marked this conversation as resolved.
Show resolved Hide resolved
from solr_to_es.solrSource import SlowSolrDocs
from elasticsearch import Elasticsearch
import elasticsearch.helpers
import logging
from pds.registrysweepers.utils import configure_logging
from pds.registrysweepers.legacy_registry_sync.solr_doc_export_to_opensearch import SolrOsWrapperIter
from pds.registrysweepers.legacy_registry_sync.opensearch_loaded_product import get_already_loaded_lidvids

log = logging.getLogger(__name__)

SOLR_URL = 'https://pds.nasa.gov/services/search/search'
OS_INDEX = "legacy_registry"


def create_legacy_registry_index(es_conn=None):
if not es_conn.indices.exists(OS_INDEX):
nutjob4life marked this conversation as resolved.
Show resolved Hide resolved
log.info("create index %s", OS_INDEX)
_ = es_conn.indices.create(
nutjob4life marked this conversation as resolved.
Show resolved Hide resolved
index=OS_INDEX,
body={}
)
log.info("index created %s", OS_INDEX)


def run(
base_url: str,
username: str,
password: str,
verify_host_certs: bool = True,
log_filepath: Union[str, None] = None,
log_level: int = logging.INFO,
):

nutjob4life marked this conversation as resolved.
Show resolved Hide resolved
configure_logging(filepath=log_filepath, log_level=log_level)

es_conn = Elasticsearch(
hosts=base_url,
verify_certs=verify_host_certs,
http_auth=(username, password)
)

solr_itr = SlowSolrDocs(SOLR_URL, "*", rows=100)

create_legacy_registry_index(es_conn=es_conn)

prod_ids = get_already_loaded_lidvids(
product_classes=["Product_Context", "Product_Collection", "Product_Bundle"],
es_conn=es_conn
)

es_actions = SolrOsWrapperIter(solr_itr, OS_INDEX, found_ids=prod_ids)
for ok, item in elasticsearch.helpers.streaming_bulk(
es_conn,
es_actions,
chunk_size=50,
max_chunk_bytes=50000000,
max_retries=5,
initial_backoff=10):
if not ok:
log.error(item)



Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import elasticsearch
nutjob4life marked this conversation as resolved.
Show resolved Hide resolved
import os

# Optional Environment variable used for the Cross Cluster Search
# connections aliases. Each element is separated by a ","
CCS_CONN = "CCS_CONN"


def get_cross_cluster_indices():

nutjob4life marked this conversation as resolved.
Show resolved Hide resolved
indices = ["registry"]

if CCS_CONN in os.environ:
clusters = os.environ[CCS_CONN].split(',')
indices.extend([f"{c}:registry" for c in clusters])

return indices


def get_already_loaded_lidvids(product_classes=[], es_conn=None):

nutjob4life marked this conversation as resolved.
Show resolved Hide resolved
query = {
"query": {
"bool": {
"should": [],
"minimum_should_match": 1
}
},
"fields": ["_id"]
}

prod_class_prop = "pds:Identification_Area/pds:product_class"
query["query"]["bool"]["should"] = [
dict(match_phrase={prod_class_prop: prod_class}) for prod_class in product_classes
]

prod_id_resp = elasticsearch.helpers.scan(
es_conn,
index=get_cross_cluster_indices(),
query=query,
scroll="3m")
return [p["_id"] for p in prod_id_resp]
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import os
nutjob4life marked this conversation as resolved.
Show resolved Hide resolved
import logging
from datetime import datetime

log = logging.getLogger(__name__)

NODE_FOLDERS = {
"atmos": "PDS_ATM",
"en": "PDS_ENG",
"geo": "PDS_GEO",
"img": "PDS_IMG",
"naif": "PDS_NAIF",
"ppi": "PDS_PPI",
"rings": "PDS_RMS",
"rs": "PDS_RS",
"sbn": "PDS_SBN",
}


class MissingIdentifierError(Exception):
pass
nutjob4life marked this conversation as resolved.
Show resolved Hide resolved


def pds4_id_field_fun(doc):
if 'version_id' in doc:
nutjob4life marked this conversation as resolved.
Show resolved Hide resolved
return doc['identifier'] + '::' + doc['version_id'][-1]
elif 'identifier' in doc:
return doc['identifier']
else:
raise MissingIdentifierError()


def get_node_from_file_ref(file_ref: str):
file_path = os.fspath(file_ref)
nutjob4life marked this conversation as resolved.
Show resolved Hide resolved
path = os.path.normpath(file_path)
dirs = path.split(os.sep)
return NODE_FOLDERS.get(dirs[4], "PDS_EN")


class SolrOsWrapperIter:
nutjob4life marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, solr_itr, es_index, found_ids=None):
nutjob4life marked this conversation as resolved.
Show resolved Hide resolved
self.index = es_index
self.type = "_doc"
self.id_field_fun = pds4_id_field_fun
self.found_ids = found_ids
self.solr_itr = iter(solr_itr)

def __iter__(self):
return self

def solrDoc_to_osDoc(self, doc):
nutjob4life marked this conversation as resolved.
Show resolved Hide resolved
new_doc = dict()
new_doc['_index'] = self.index
new_doc['_type'] = self.type

# remove empty fields
new_doc['_source'] = {}
for k, v in doc.items():
# get the node from the data string
# for example : /data/pds4/releases/ppi/galileo-traj-jup-20230818
if k == "file_ref_location":
new_doc['_source']['node'] = get_node_from_file_ref(v[0])

# manage dates
if "date" in k:
# only keep the latest modification date, for kibana
if k == "modification_date":
v = [v[-1]]

# validate dates
try:
datetime.fromisoformat(v[0].replace("Z", ""))
new_doc['_source'][k] = v
except ValueError:
log.warning("Date %s for field %s is invalid", v, k)
elif "year" in k:
if len(v[0]) > 0:
new_doc['_source'][k] = v
else:
log.warning("Year %s for field %s is invalid", v, k)
else:
new_doc['_source'][k] = v

if self.id_field_fun:
id = self.id_field_fun(doc)
new_doc['_id'] = id
new_doc['_source']['found_in_registry'] = "true" if id in self.found_ids else "false"
return new_doc

def __next__(self):
while True:
try:
doc = next(self.solr_itr)
return self.solrDoc_to_osDoc(doc)
except MissingIdentifierError as e:
log.warning(str(e))

Loading