Skip to content

Commit

Permalink
add docstring
Browse files Browse the repository at this point in the history
  • Loading branch information
thomas loubrieu committed Sep 6, 2023
1 parent 2143bf7 commit d80fbfe
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 68 deletions.
2 changes: 1 addition & 1 deletion src/pds/registrysweepers/legacy_registry_sync/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .legacy_registry_sync import run # noqa
from .legacy_registry_sync import run # noqa
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
import logging
from typing import Union
from solr_to_es.solrSource import SlowSolrDocs
from elasticsearch import Elasticsearch

import elasticsearch.helpers
import logging
from pds.registrysweepers.utils import configure_logging
from pds.registrysweepers.legacy_registry_sync.solr_doc_export_to_opensearch import SolrOsWrapperIter
from elasticsearch import Elasticsearch
from pds.registrysweepers.legacy_registry_sync.opensearch_loaded_product import get_already_loaded_lidvids
from pds.registrysweepers.legacy_registry_sync.solr_doc_export_to_opensearch import SolrOsWrapperIter
from pds.registrysweepers.utils import configure_logging
from solr_to_es.solrSource import SlowSolrDocs # type: ignore

log = logging.getLogger(__name__)

SOLR_URL = 'https://pds.nasa.gov/services/search/search'
SOLR_URL = "https://pds.nasa.gov/services/search/search"
OS_INDEX = "legacy_registry"


def create_legacy_registry_index(es_conn=None):
"""
Creates if not already created the legacy_registry index.
@param es_conn: elasticsearch.ElasticSearch instance for the ElasticSearch or OpenSearch connection
@return:
"""
if not es_conn.indices.exists(OS_INDEX):
log.info("create index %s", OS_INDEX)
es_conn.indices.create(
index=OS_INDEX,
body={}
)
es_conn.indices.create(index=OS_INDEX, body={})
log.info("index created %s", OS_INDEX)


Expand All @@ -31,34 +35,33 @@ def run(
log_filepath: Union[str, None] = None,
log_level: int = logging.INFO,
):
"""
Runs the Solr Legacy Registry synchronization with OpenSearch.
@param base_url: of the OpenSearch service
@param username:
@param password:
@param verify_host_certs:
@param log_filepath:
@param log_level:
@return:
"""

configure_logging(filepath=log_filepath, log_level=log_level)

es_conn = Elasticsearch(
hosts=base_url,
verify_certs=verify_host_certs,
http_auth=(username, password)
)
es_conn = Elasticsearch(hosts=base_url, verify_certs=verify_host_certs, http_auth=(username, password))

solr_itr = SlowSolrDocs(SOLR_URL, "*", rows=100)

create_legacy_registry_index(es_conn=es_conn)

prod_ids = get_already_loaded_lidvids(
product_classes=["Product_Context", "Product_Collection", "Product_Bundle"],
es_conn=es_conn
product_classes=["Product_Context", "Product_Collection", "Product_Bundle"], es_conn=es_conn
)

es_actions = SolrOsWrapperIter(solr_itr, OS_INDEX, found_ids=prod_ids)
for ok, item in elasticsearch.helpers.streaming_bulk(
es_conn,
es_actions,
chunk_size=50,
max_chunk_bytes=50000000,
max_retries=5,
initial_backoff=10):
es_conn, es_actions, chunk_size=50, max_chunk_bytes=50000000, max_retries=5, initial_backoff=10
):
if not ok:
log.error(item)



Original file line number Diff line number Diff line change
@@ -1,42 +1,47 @@
import elasticsearch
import os

import elasticsearch

# Optional Environment variable used for the Cross Cluster Search
# connections aliases. Each element is separated by a ","
CCS_CONN = "CCS_CONN"


def get_cross_cluster_indices():
"""
Get the aliases for the Cross Cluster Search from an environment variable
@return: the aliases in a list including the main index.
"""

indices = ["registry"]

if CCS_CONN in os.environ:
clusters = os.environ[CCS_CONN].split(',')
clusters = os.environ[CCS_CONN].split(",")
indices.extend([f"{c}:registry" for c in clusters])

return indices


def get_already_loaded_lidvids(product_classes=[], es_conn=None):
def get_already_loaded_lidvids(product_classes=None, es_conn=None):
"""
Get the lidvids of the PDS4 products already loaded in the (new) registry.
Note that this function should not be applied to the product classes Product_Observational or documents, there would be too many results.
query = {
"query": {
"bool": {
"should": [],
"minimum_should_match": 1
}
},
"fields": ["_id"]
}
@param product_classes: list of the product classes you are interested in,
e.g. "Product_Bundle", "Product_Collection" ...
@param es_conn: elasticsearch.ElasticSearch instance for the ElasticSearch or OpenSearch connection
@return: the list of the already loaded PDS4 lidvid
"""

query = {"query": {"bool": {"should": [], "minimum_should_match": 1}}, "fields": ["_id"]}

prod_class_prop = "pds:Identification_Area/pds:product_class"
query["query"]["bool"]["should"] = [
dict(match_phrase={prod_class_prop: prod_class}) for prod_class in product_classes
]

prod_id_resp = elasticsearch.helpers.scan(
es_conn,
index=get_cross_cluster_indices(),
query=query,
scroll="3m")

if product_classes is not None:
query["query"]["bool"]["should"] = [
dict(match_phrase={prod_class_prop: prod_class}) for prod_class in product_classes
]

prod_id_resp = elasticsearch.helpers.scan(es_conn, index=get_cross_cluster_indices(), query=query, scroll="3m")
return [p["_id"] for p in prod_id_resp]
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
import logging
import os
from datetime import datetime

log = logging.getLogger(__name__)
Expand All @@ -22,15 +22,28 @@ class MissingIdentifierError(Exception):


def pds4_id_field_fun(doc):
if 'version_id' in doc:
return doc['identifier'] + '::' + doc['version_id'][-1]
elif 'identifier' in doc:
return doc['identifier']
"""
Compute the unique identifier in the new registry from a document in the legacy registry
@param doc: document from the legacy registry
@return: lidvid
"""
if "version_id" in doc:
return doc["identifier"] + "::" + doc["version_id"][-1]
elif "identifier" in doc:
return doc["identifier"]
else:
raise MissingIdentifierError()


def get_node_from_file_ref(file_ref: str):
"""
Thanks to the file system storage of the labels in the legacy registry we can retrieve the
Discipline Node in charge of each label.
@param file_ref: location of the XML PDS4 Label in the legacy registry
@return: the Discipline Node code used in the (new) registry.
"""
file_path = os.fspath(file_ref)
path = os.path.normpath(file_path)
dirs = path.split(os.sep)
Expand All @@ -39,6 +52,17 @@ def get_node_from_file_ref(file_ref: str):

class SolrOsWrapperIter:
def __init__(self, solr_itr, es_index, found_ids=None):
"""
Iterable on the Solr legacy registry documents returning the migrated document for each iteration (next).
The migrated documents contains in addition to the Solr document properties:
- one identifier matching the one used in the new registry
- the Discipline Node responsible for the product
- a flag set to True if the current document was loaded in the new registry.
@param solr_itr: iterator on the solr documents. SlowSolrDocs instance from the solr-to-es repository
@param es_index: OpenSearch/ElasticSearch index name
@param found_ids: list of the lidvid already available in the new registry
"""
self.index = es_index
self.type = "_doc"
self.id_field_fun = pds4_id_field_fun
Expand All @@ -48,18 +72,18 @@ def __init__(self, solr_itr, es_index, found_ids=None):
def __iter__(self):
return self

def solrDoc_to_osDoc(self, doc):
def solr_doc_to_os_doc(self, doc):
new_doc = dict()
new_doc['_index'] = self.index
new_doc['_type'] = self.type
new_doc["_index"] = self.index
new_doc["_type"] = self.type

# remove empty fields
new_doc['_source'] = {}
new_doc["_source"] = {}
for k, v in doc.items():
# get the node from the data string
# for example : /data/pds4/releases/ppi/galileo-traj-jup-20230818
if k == "file_ref_location":
new_doc['_source']['node'] = get_node_from_file_ref(v[0])
new_doc["_source"]["node"] = get_node_from_file_ref(v[0])

# manage dates
if "date" in k:
Expand All @@ -70,21 +94,21 @@ def solrDoc_to_osDoc(self, doc):
# validate dates
try:
datetime.fromisoformat(v[0].replace("Z", ""))
new_doc['_source'][k] = v
new_doc["_source"][k] = v
except ValueError:
log.warning("Date %s for field %s is invalid", v, k)
elif "year" in k:
if len(v[0]) > 0:
new_doc['_source'][k] = v
new_doc["_source"][k] = v
else:
log.warning("Year %s for field %s is invalid", v, k)
else:
new_doc['_source'][k] = v
new_doc["_source"][k] = v

if self.id_field_fun:
id = self.id_field_fun(doc)
new_doc['_id'] = id
new_doc['_source']['found_in_registry'] = "true" if id in self.found_ids else "false"
new_doc["_id"] = id
new_doc["_source"]["found_in_registry"] = "true" if id in self.found_ids else "false"
return new_doc

def __next__(self):
Expand All @@ -94,4 +118,3 @@ def __next__(self):
return self.solrDoc_to_osDoc(doc)
except MissingIdentifierError as e:
log.warning(str(e))

6 changes: 3 additions & 3 deletions src/pds/registrysweepers/utils/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
from pds.registrysweepers.utils.misc import auto_raise_for_status
from pds.registrysweepers.utils.misc import get_random_hex_id
from requests import HTTPError
from retry import retry
from retry.api import retry_call
from retry import retry # type: ignore
from retry.api import retry_call # type: ignore

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -152,7 +152,7 @@ def write_updated_docs(host: Host, updates: Iterable[Update], index_name: str =
update_statement_strs = update_as_statements(update)

for s in update_statement_strs:
bulk_buffer_size_mb += sys.getsizeof(s) / 1024**2
bulk_buffer_size_mb += sys.getsizeof(s) / 1024 ** 2

bulk_updates_buffer.extend(update_statement_strs)
updated_doc_count += 1
Expand Down
2 changes: 1 addition & 1 deletion src/pds/registrysweepers/utils/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def get_human_readable_elapsed_since(begin: datetime) -> str:


def get_random_hex_id(id_len: int = 6) -> str:
val = random.randint(0, 16**id_len)
val = random.randint(0, 16 ** id_len)
return hex(val)[2:]


Expand Down

0 comments on commit d80fbfe

Please sign in to comment.