Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement reindexer sweeper #149

Draft
wants to merge 39 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
eeb3797
[skeleton] implement reindexer sweeper
alexdunnjpl Sep 19, 2024
016ce3a
update metadata flag to use date instead of boolean
alexdunnjpl Oct 2, 2024
c3ddf8d
[revert me] add test init code
alexdunnjpl Oct 2, 2024
401c600
implement reindexer sweeper, with types resolved from the *-dd index,…
alexdunnjpl Oct 2, 2024
54e663c
implement harvest-time filter to ensure that products harvested mid-s…
alexdunnjpl Oct 9, 2024
4dcacae
implement logging of problematic harvest timestamp span and harvest s…
alexdunnjpl Oct 9, 2024
b4eeeb5
clean up code
alexdunnjpl Oct 9, 2024
5e2fa2e
remove test code
alexdunnjpl Oct 9, 2024
f3dfc41
improve comment
alexdunnjpl Oct 10, 2024
7730d20
add mypy ignores - None-guard is provided by conditionals
alexdunnjpl Oct 10, 2024
ba72ed4
add registry-dd to allowed index types for resolve_multitenant_index_…
alexdunnjpl Oct 14, 2024
36f9da1
ensure reindexer sweeper captures all relevant documents with a singl…
alexdunnjpl Oct 15, 2024
f5ccbb7
improve logging
alexdunnjpl Oct 15, 2024
5683972
implement batching approach in reindexer sweeper
alexdunnjpl Oct 23, 2024
9e0fa68
squash! implement batching approach in reindexer sweeper
alexdunnjpl Oct 23, 2024
4b3fef1
[weeeird bugfix] Patch apparent issues when paginating. See comments
alexdunnjpl Oct 23, 2024
223c820
map special-case properties onto their types and incorporate them int…
alexdunnjpl Oct 23, 2024
de13cae
implement stall while update indexing queue is backed up
alexdunnjpl Oct 24, 2024
0649a2c
disable noisy log
alexdunnjpl Oct 24, 2024
3090b5f
tweak stall time/log
alexdunnjpl Oct 24, 2024
f4fe83b
make reindexer hits count more human-friendly in logs
alexdunnjpl Oct 24, 2024
d2c1dcb
clean up logging
alexdunnjpl Oct 24, 2024
499682f
exclude ops:Provenance* properties from canonical_type_undefined_prop…
alexdunnjpl Oct 24, 2024
43f7c10
bump hits_stall_tolerance from 5% to 10% of batch_size_limit
alexdunnjpl Oct 24, 2024
1b00dd9
fix stall logic
alexdunnjpl Oct 24, 2024
58c8d81
fix format_hits_count()
alexdunnjpl Oct 24, 2024
249b937
change type hint to indicate that consumable iterators are not approp…
alexdunnjpl Oct 31, 2024
2bc9096
Incorporate detection/log/retry of HTTP429 (circuit-breaking throttle)
alexdunnjpl Oct 31, 2024
88671d3
remove manual stall logic
alexdunnjpl Oct 31, 2024
d71622b
disable default typing, per jpadams
alexdunnjpl Oct 31, 2024
0e9d6b2
re-enable generation of updates for docs having properties not in map…
alexdunnjpl Nov 1, 2024
7147d65
support all ISO-formatted harvest timestamp strings
alexdunnjpl Nov 5, 2024
1be6f54
correct erroneous log message
alexdunnjpl Nov 5, 2024
2392224
bugfix edge-cases
alexdunnjpl Nov 5, 2024
9cfbbba
demote noisy log
alexdunnjpl Nov 6, 2024
80d293a
deduplicate missing sweepers property logs
alexdunnjpl Nov 7, 2024
821dd84
remove cruft
alexdunnjpl Nov 11, 2024
a413ba3
flesh out static type mappings
alexdunnjpl Nov 11, 2024
5bb6586
fix infinite loop when there are fewer hits than a full batch
alexdunnjpl Nov 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docker/sweepers_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
from typing import Callable

from pds.registrysweepers import provenance, ancestry, repairkit, legacy_registry_sync
from pds.registrysweepers.reindexer import main as reindexer
from pds.registrysweepers.utils import configure_logging, parse_log_level
from pds.registrysweepers.utils.db.client import get_opensearch_client_from_environment
from pds.registrysweepers.utils.misc import get_human_readable_elapsed_since
Expand Down Expand Up @@ -107,7 +108,8 @@ def run_factory(sweeper_f: Callable) -> Callable:
sweepers = [
repairkit.run,
provenance.run,
ancestry.run
ancestry.run,
reindexer.run
]

for option, sweeper in optional_sweepers.items():
Expand Down
Empty file.
1 change: 1 addition & 0 deletions src/pds/registrysweepers/reindexer/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
REINDEXER_FLAG_METADATA_KEY = "ops:Provenance/ops:reindexed_at"
238 changes: 238 additions & 0 deletions src/pds/registrysweepers/reindexer/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
import logging
from datetime import datetime
from datetime import timezone
from typing import Dict
from typing import Iterable
from typing import Union

from opensearchpy import OpenSearch
from pds.registrysweepers.reindexer.constants import REINDEXER_FLAG_METADATA_KEY
from pds.registrysweepers.utils import configure_logging
from pds.registrysweepers.utils import parse_args
from pds.registrysweepers.utils.db import query_registry_db_with_search_after
from pds.registrysweepers.utils.db import write_updated_docs
from pds.registrysweepers.utils.db.client import get_userpass_opensearch_client
from pds.registrysweepers.utils.db.indexing import ensure_index_mapping
from pds.registrysweepers.utils.db.multitenancy import resolve_multitenant_index_name
from pds.registrysweepers.utils.db.update import Update

log = logging.getLogger(__name__)


def get_docs_query(filter_to_harvested_before: datetime):
"""
Return a query to get all docs which haven't been reindexed by this sweeper and which haven't been harvested
since this sweeper process instance started running
"""
# TODO: Remove this once query_registry_db_with_search_after is modified to remove mutation side-effects
return {
"query": {
"bool": {
"must_not": [{"exists": {"field": REINDEXER_FLAG_METADATA_KEY}}],
"must": {
"range": {
"ops:Harvest_Info/ops:harvest_date_time": {
"lt": filter_to_harvested_before.astimezone(timezone.utc).isoformat()
}
}
},
}
}
}


def fetch_dd_field_types(client: OpenSearch) -> Dict[str, str]:
dd_index_name = resolve_multitenant_index_name("registry-dd")
name_key = "es_field_name"
type_key = "es_data_type"
dd_docs = query_registry_db_with_search_after(
client,
dd_index_name,
_source={"includes": ["es_field_name", "es_data_type"]},
query={"query": {"match_all": {}}},
sort_fields=[name_key],
)
doc_sources = iter(doc["_source"] for doc in dd_docs)
dd_types = {
source[name_key]: source[type_key] for source in doc_sources if name_key in source and type_key in source
}
return dd_types


def accumulate_missing_mappings(
dd_field_types_by_name: Dict[str, str], mapping_field_types_by_field_name: Dict[str, str], docs: Iterable[dict]
) -> Dict[str, str]:
"""
Iterate over all properties of all docs, test whether they are present in the given set of mapping keys, and
return a mapping of the missing properties onto their types.
@param dd_field_types_by_name: a mapping of document property names onto their types, derived from the data-dictionary db data
@param mapping_field_types_by_field_name: a mapping of document property names onto their types, derived from the existing index mappings
@param docs: an iterable collection of product documents
"""
missing_mapping_updates = {}

dd_not_defines_type_property_names = set() # used to prevent duplicate WARN logs
bad_mapping_property_names = set() # used to log mappings requiring manual attention

earliest_problem_doc_harvested_at = None
latest_problem_doc_harvested_at = None
problematic_harvest_versions = set()
problem_docs_count = 0
total_docs_count = 0
for doc in docs:
problem_detected_in_document_already = False
total_docs_count += 1

for property_name, value in doc["_source"].items():
canonical_type = dd_field_types_by_name.get(property_name)
current_mapping_type = mapping_field_types_by_field_name.get(property_name)

mapping_missing = property_name not in mapping_field_types_by_field_name
dd_defines_type_for_property = property_name in dd_field_types_by_name
mapping_is_bad = all(
[canonical_type != current_mapping_type, canonical_type is not None, current_mapping_type is not None]
)

if not dd_defines_type_for_property and property_name not in dd_not_defines_type_property_names:
log.warning(
f"Property {property_name} does not have an entry in the DD index - this may indicate a problem"
)
dd_not_defines_type_property_names.add(property_name)

if mapping_is_bad and property_name not in bad_mapping_property_names:
log.warning(
f'Property {property_name} is defined in data dictionary as type "{canonical_type}" but exists in index mapping as type "{current_mapping_type}".)'
)
bad_mapping_property_names.add(property_name)

if (mapping_missing or mapping_is_bad) and not problem_detected_in_document_already:
problem_detected_in_document_already = True
problem_docs_count += 1
try:
doc_harvest_time = datetime.fromisoformat(
doc["_source"]["ops:Harvest_Info/ops:harvest_date_time"][0].replace("Z", ""),
)
earliest_problem_doc_harvested_at = min(
doc_harvest_time, earliest_problem_doc_harvested_at or datetime.max
)
latest_problem_doc_harvested_at = max(
doc_harvest_time, latest_problem_doc_harvested_at or datetime.min
)
except (KeyError, ValueError) as err:
log.warning(
f'Unable to parse "ops:Harvest_Info/ops:harvest_date_time" as zulu-formatted date from document {doc["_id"]}: {err}'
)

try:
problematic_harvest_versions.update(doc["_source"]["ops:Harvest_Info/ops:harvest_version"])
except KeyError as err:
log.warning(f'Unable to extract harvest version from document {doc["_id"]}: {err}')

if mapping_missing and property_name not in missing_mapping_updates:
if dd_defines_type_for_property:
log.info(
f'Property {property_name} will be updated to type "{canonical_type}" from data dictionary'
)
missing_mapping_updates[property_name] = canonical_type
else:
default_type = "keyword"
log.warning(
f'Property {property_name} is missing from the index mappings and does not have an entry in the data dictionary index - defaulting to type "{default_type}"'
)
missing_mapping_updates[property_name] = default_type

log.info(
f"Detected {problem_docs_count} docs with {len(missing_mapping_updates)} missing mappings and {len(bad_mapping_property_names)} mappings conflicting with the DD, out of a total of {total_docs_count} docs"
)

if problem_docs_count > 0:
log.warning(
f"Problems were detected with docs having harvest timestamps between {earliest_problem_doc_harvested_at.isoformat()} and {latest_problem_doc_harvested_at.isoformat()}"
)
log.warning(f"Problems were detected with docs having harvest versions {sorted(problematic_harvest_versions)}")

if len(bad_mapping_property_names) > 0:
log.error(
f"The following mappings have a type which does not match the type described by the data dictionary: {bad_mapping_property_names} - in-place update is not possible, data will need to be manually reindexed with manual updates (or that functionality must be added to this sweeper"
)

return missing_mapping_updates


def generate_updates(timestamp: datetime, docs: Iterable[Dict]) -> Iterable[Update]:
for document in docs:
id = document["_id"]
yield Update(id=id, content={REINDEXER_FLAG_METADATA_KEY: timestamp.isoformat()})


def run(
client: OpenSearch,
log_filepath: Union[str, None] = None,
log_level: int = logging.INFO,
):
configure_logging(filepath=log_filepath, log_level=log_level)

sweeper_start_timestamp = datetime.now()
products_index_name = resolve_multitenant_index_name("registry")
ensure_index_mapping(client, products_index_name, REINDEXER_FLAG_METADATA_KEY, "date")

dd_field_types_by_field_name = fetch_dd_field_types(client)
mapping_field_types_by_field_name = {
k: v["type"]
for k, v in client.indices.get_mapping(products_index_name)[products_index_name]["mappings"][
"properties"
].items()
}
missing_mappings = accumulate_missing_mappings(
dd_field_types_by_field_name,
mapping_field_types_by_field_name,
query_registry_db_with_search_after(
client, products_index_name, _source={}, query=get_docs_query(sweeper_start_timestamp)
),
)
for property, mapping_typename in missing_mappings.items():
log.info(f"Updating index {products_index_name} with missing mapping ({property}, {mapping_typename})")
ensure_index_mapping(client, products_index_name, property, mapping_typename)

updates = generate_updates(
sweeper_start_timestamp,
query_registry_db_with_search_after(
client, products_index_name, _source={}, query=get_docs_query(sweeper_start_timestamp)
),
)
log.info(
f"Updating newly-processed documents with {REINDEXER_FLAG_METADATA_KEY}={sweeper_start_timestamp.isoformat()}..."
)
write_updated_docs(
client,
updates,
index_name=products_index_name,
)

log.info("Completed reindexer sweeper processing!")


if __name__ == "__main__":
cli_description = f"""
Tests untested documents in registry index to ensure that all properties are present in the index mapping (i.e. that
they are searchable). Mapping types are derived from <<<to be determined>>>

When a document is tested, metadata attribute {REINDEXER_FLAG_METADATA_KEY} is given a value equal to the timestamp
at sweeper runtime. The presence of attribute {REINDEXER_FLAG_METADATA_KEY} indicates that the document has been
tested and may be skipped in future.

Writing a new value to this attribute triggers a re-index of the entire document, ensuring that the document is
fully-searchable.

"""

args = parse_args(description=cli_description)
client = get_userpass_opensearch_client(
endpoint_url=args.base_URL, username=args.username, password=args.password, verify_certs=not args.insecure
)

run(
client=client,
log_level=args.log_level,
log_filepath=args.log_file,
)
Loading