Skip to content

Commit

Permalink
remove default index value for db.query_registry_db() for consistency
Browse files Browse the repository at this point in the history
  • Loading branch information
alexdunnjpl committed Jan 26, 2024
1 parent 00b749b commit 60d3819
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 11 deletions.
10 changes: 5 additions & 5 deletions src/pds/registrysweepers/ancestry/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def get_bundle_ancestry_records_query(client: OpenSearch, db_mock: DbMockTypeDef
query = product_class_query_factory(ProductClass.BUNDLE)
_source = {"includes": ["lidvid", SWEEPERS_ANCESTRY_VERSION_METADATA_KEY]}
query_f = query_registry_db_or_mock(db_mock, "get_bundle_ancestry_records", use_search_after=True)
docs = query_f(client, query, _source)
docs = query_f(client, "registry", query, _source)

return docs

Expand All @@ -48,7 +48,7 @@ def get_collection_ancestry_records_bundles_query(client: OpenSearch, db_mock: D
query = product_class_query_factory(ProductClass.BUNDLE)
_source = {"includes": ["lidvid", "ref_lid_collection"]}
query_f = query_registry_db_or_mock(db_mock, "get_collection_ancestry_records_bundles", use_search_after=True)
docs = query_f(client, query, _source)
docs = query_f(client, "registry", query, _source)

return docs

Expand All @@ -60,7 +60,7 @@ def get_collection_ancestry_records_collections_query(
query = product_class_query_factory(ProductClass.COLLECTION)
_source = {"includes": ["lidvid", "alternate_ids", SWEEPERS_ANCESTRY_VERSION_METADATA_KEY]}
query_f = query_registry_db_or_mock(db_mock, "get_collection_ancestry_records_collections", use_search_after=True)
docs = query_f(client, query, _source)
docs = query_f(client, "registry", query, _source)

return docs

Expand All @@ -81,9 +81,9 @@ def get_nonaggregate_ancestry_records_query(client: OpenSearch, registry_db_mock
# each document will have many product lidvids, so a smaller page size is warranted here
docs = query_f(
client,
"registry-refs",
query,
_source,
index_name="registry-refs",
page_size=AncestryRuntimeConstants.nonaggregate_ancestry_records_query_page_size,
request_timeout_seconds=30,
sort_fields=["collection_lidvid", "batch_id"],
Expand All @@ -109,6 +109,6 @@ def get_orphaned_documents(client: OpenSearch, registry_db_mock: DbMockTypeDef,
["collection_lidvid", "batch_id"] if index_name == "registry-refs" else None
) # use default for registry

docs = query_f(client, query, _source, index_name=index_name, sort_fields=sort_fields_override)
docs = query_f(client, index_name, query, _source, sort_fields=sort_fields_override)

return docs
2 changes: 1 addition & 1 deletion src/pds/registrysweepers/repairkit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def run(

# page_size and bulk_chunk_max_update_count constraints are necessary to avoid choking nodes with very-large docs
# i.e. ATM and GEO
all_docs = query_registry_db(client, unprocessed_docs_query, {}, page_size=1000)
all_docs = query_registry_db(client, "registry", unprocessed_docs_query, {}, page_size=1000)
updates = generate_updates(all_docs, SWEEPERS_REPAIRKIT_VERSION_METADATA_KEY, SWEEPERS_REPAIRKIT_VERSION)
ensure_index_mapping(client, "registry", SWEEPERS_REPAIRKIT_VERSION_METADATA_KEY, "integer")
write_updated_docs(client, updates, index_name="registry", bulk_chunk_max_update_count=20000)
Expand Down
12 changes: 7 additions & 5 deletions src/pds/registrysweepers/utils/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@

def query_registry_db(
client: OpenSearch,
index_name: str,
query: Dict,
_source: Dict,
index_name: str = "registry",
page_size: int = 10000,
scroll_keepalive_minutes: int = 10,
request_timeout_seconds: int = 20,
Expand Down Expand Up @@ -119,9 +119,9 @@ def fetch_func(_scroll_id: str = scroll_id):

def query_registry_db_with_search_after(
client: OpenSearch,
index_name: str,
query: Dict,
_source: Dict,
index_name: str = "registry",
page_size: int = 10000,
sort_fields: Union[List[str], None] = None,
request_timeout_seconds: int = 20,
Expand Down Expand Up @@ -214,15 +214,17 @@ def fetch_func():


def query_registry_db_or_mock(
mock_f: Optional[Callable[[str], Iterable[Dict]]], mock_query_id: str, use_search_after: bool = False
mock_f: Optional[Callable[[str], Iterable[Dict]]],
mock_query_id: str,
use_search_after: bool = False,
):
if mock_f is not None:

def mock_wrapper(
client: OpenSearch,
index_name: str,
query: Dict,
_source: Dict,
index_name: str = "registry",
page_size: int = 10000,
scroll_validity_duration_minutes: int = 10,
request_timeout_seconds: int = 20,
Expand Down Expand Up @@ -372,6 +374,6 @@ def get_extant_lidvids(client: OpenSearch) -> Iterable[str]:
}
_source = {"includes": ["lidvid"]}

results = query_registry_db(client, query, _source, scroll_keepalive_minutes=1)
results = query_registry_db(client, "registry", query, _source, scroll_keepalive_minutes=1)

return map(lambda doc: doc["_source"]["lidvid"], results)

0 comments on commit 60d3819

Please sign in to comment.