Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added mypy and fixed square bracket splitting #26

Merged
merged 1 commit into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,9 @@ repos:
args: [ --fix ]
# Run the formatter.
- id: ruff-format
- repo: https://github.com/pre-commit/mirrors-mypy
rev: 'v1.9.0' # Use the sha / tag you want to point at
hooks:
- id: mypy
additional_dependencies: ["types-requests"]
exclude: "test_script.py"
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ For example if you want to find all the zonal (`uo`) and meridonal (`vo`) veloci
```python
from pangeo_forge_esgf.parsing import parse_instance_ids
parse_iids = [
"CMIP6.PMIP.*.*.lgm.*.*.uo.*.*",
"CMIP6.PMIP.*.*.lgm.*.*.vo.*.*",
"CMIP6.PMIP.*.*.lgm.*.*.[uo, vo].*.*",
]
# Comma separated values in square brackets will be expanded and the above is equivalent to:
# parse_iids = [
# "CMIP6.PMIP.*.*.lgm.*.*.[uo, vo].*.*", # this is equivalent to passing
# "CMIP6.PMIP.*.*.lgm.*.*.vo.*.*",
# ]
iids = []
for piid in parse_iids:
iids.extend(parse_instance_ids(piid))
Expand Down
68 changes: 43 additions & 25 deletions pangeo_forge_esgf/parsing.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import requests
from typing import Optional
from typing import Optional, List

from .utils import facets_from_iid

Expand All @@ -24,32 +24,50 @@ def instance_ids_from_request(json_dict):
return uniqe_iids


def parse_instance_ids(iid: str, search_node: Optional[str] = None) -> list[str]:
"""Parse an instance id with wildcards"""
facets = facets_from_iid(iid)
# convert string to list if square brackets are found
for k, v in facets.items():
if "[" in v:
v = (
v.replace("[", "")
.replace("]", "")
.replace("'", "")
.replace(" ", "")
.split(",")
)
facets[k] = v
facets_filtered = {k: v for k, v in facets.items() if v != "*"}
def split_square_brackets(facet_string: str) -> List[str]:
## split a string like this `a.[b1, b2].c.[d1, d2]` into a list like this: ['a.b1.c.d1', 'a.b1.c.d2', 'a.b2.c.d1', 'a.b2.c.d2']
if "[" not in facet_string:
return [facet_string]

start_index = facet_string.find("[")
end_index = facet_string.find("]")
prefix = facet_string[:start_index]
suffix = facet_string[end_index + 1 :]

inner_parts = [
part.strip() for part in facet_string[start_index + 1 : end_index].split(",")
]

split_iid_combinations = []
for part in inner_parts:
inner_combinations = split_square_brackets(part + suffix)
for inner_combination in inner_combinations:
split_iid_combinations.append(prefix + inner_combination)

return split_iid_combinations


def parse_instance_ids(iid_string: str, search_node: Optional[str] = None) -> list[str]:
"""Parse an instance id with wildcards"""
if search_node is None:
# search_node = "https://esgf-node.llnl.gov/esg-search/search"
search_node = "https://esgf-data.dkrz.de/esg-search/search"
# FIXME: I got some really weird flakyness with the LLNL node. This is a dumb way to test this...
# TODO: how do I iterate over this more efficiently?
# Maybe we do not want to allow more than x files parsed?
resp = request_from_facets(search_node, **facets_filtered)
if resp.status_code != 200:
print(f"Request [{resp.url}] failed with {resp.status_code}")
return resp
else:
json_dict = resp.json()
return instance_ids_from_request(json_dict)

# first resolve the square brackets
split_iids: List[str] = split_square_brackets(iid_string)

parsed_iids: List[str] = []
for iid in split_iids:
facets = facets_from_iid(iid)
facets_filtered = {
k: v for k, v in facets.items() if v != "*"
} # leaving out the wildcards here will just request everything for that facet

resp = request_from_facets(search_node, **facets_filtered)
if resp.status_code != 200:
print(f"Request [{resp.url}] failed with {resp.status_code}")
else:
json_dict = resp.json()
parsed_iids.extend(instance_ids_from_request(json_dict))
return parsed_iids
33 changes: 20 additions & 13 deletions pangeo_forge_esgf/recipe_inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import backoff

from .utils import facets_from_iid
from typing import Dict, List, Tuple, Any, Optional
from typing import Dict, List, Tuple, Any, Optional, Union

# import backoff #might still be using the backoff stuff later
from tqdm.asyncio import tqdm
Expand Down Expand Up @@ -33,14 +33,19 @@ async def url_responsive(
semaphore: asyncio.BoundedSemaphore,
url: str,
timeout: int,
) -> bool:
) -> Union[None, str]:
async with semaphore:
try:
async with session.get(url, timeout=timeout) as resp:
if (
resp.status <= 300
): # TODO: Is this a good way to check if the search node and data_url is responsive?
return url
else:
resp.raise_for_status()
return None # mypy did not like the missing return, but this smells to me.
# maybe I should overthink this design. In the end I want to backoff on None, but also get the actual exception back.
# There might be a better way to achive this.
except Exception as e:
logger.debug(f"Responsivness check for {url=} failed with: {e}")
return None
Expand Down Expand Up @@ -70,7 +75,7 @@ async def get_response_data(
url: str,
params: Dict[str, str],
timeout: int,
) -> str:
) -> Union[None, str]:
async with semaphore:
try:
async with session.get(
Expand Down Expand Up @@ -112,7 +117,7 @@ async def get_first_responsive_url(
session: aiohttp.ClientSession,
semaphore: asyncio.BoundedSemaphore,
iid_url_tuple: Tuple[str, List[str]],
) -> Tuple[str, str]:
) -> Union[Tuple[str, str], Tuple[str, None]]:
"""Filters a list of search nodes for those that are responsive."""
label, url_list = iid_url_tuple
try:
Expand All @@ -139,7 +144,7 @@ async def get_urls_for_iid(
iid: str,
node_url: str,
timeout: int,
) -> str:
) -> Union[None, Dict[str, List[Dict[str, str]]]]:
params = esgf_params_from_iid({}, iid)
logger.debug(f"{iid=} Requesting from {node_url=} {params =}")
iid_response = await get_response_data(
Expand All @@ -164,13 +169,15 @@ def get_http(urls: list[str]) -> str:
return url


def nest_dict_from_keyed_list(keyed_list: List[Tuple[str, Any]], sep: str = "|"):
def nest_dict_from_keyed_list(
keyed_list: List[Tuple[str, Any]], sep: str = "|"
) -> Dict[str, Dict[str, str]]:
"""
Creates nested dict from a flat list of tuples (key, value)
where key is a string with a separator indicating the levels of the dict.
This is not general(only works on two levels), but works for the specific case of the ESGF search API
"""
new_dict = {}
new_dict: Dict[str, Any] = {}

for label, url in keyed_list:
# split label
Expand All @@ -190,7 +197,7 @@ def sort_urls_by_time(urls: List[str]) -> List[str]:

def url_result_processing(
flat_urls_per_file: List[Tuple[str, str]], expected_files: Dict[str, List[str]]
):
) -> Dict[str, List[str]]:
filtered_dict = nest_dict_from_keyed_list(flat_urls_per_file)

# now check which files are missing per iid
Expand Down Expand Up @@ -249,16 +256,16 @@ def get_unique_filenames(
return filename_dict


def esgf_params_from_iid(params: Dict[str, str], iid: str):
def esgf_params_from_iid(params: Dict[str, str], iid: str) -> Dict[str, str]:
"""Generates parameters for a GET request to the ESGF API based on the instance id."""
# set default search parameters
default_params = {
default_params: Dict[str, str] = {
"type": "File",
"retracted": "false",
"format": "application/solr+json",
"fields": "id, url, title, latest, replica, data_node",
"distrib": "true",
"limit": 500, # This determines the number of urls/files that are returned. I dont expect this to be ever more than 500?
"limit": "500", # This determines the number of urls/files that are returned. I dont expect this to be ever more than 500?
}
params = default_params | params
facets = facets_from_iid(iid)
Expand Down Expand Up @@ -413,7 +420,7 @@ async def get_urls_from_esgf(

logger.info("Processing responses: Group results")
# aggregate urls of results per iid and filename
group_dict = {}
group_dict: Dict[str, List[str]] = {}
for r in keyed_results:
if r[0] not in group_dict:
group_dict[r[0]] = []
Expand Down Expand Up @@ -451,7 +458,7 @@ async def get_urls_from_esgf(
filtered_urls_per_file, expected_files_per_iid
)

missing_iids = set(iids) - set(final_url_dict.keys())
missing_iids = list(set(iids) - set(final_url_dict.keys()))
if len(missing_iids) > 0:
logger.warn(
f"Was not able to construct url list for ({len(missing_iids)}/{len(iids)}) iids"
Expand Down
19 changes: 17 additions & 2 deletions pangeo_forge_esgf/tests/test_parsing.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from pangeo_forge_esgf.parsing import parse_instance_ids
import pytest


def test_readme_example():
# This is possibly flaky (due to the dependence on the ESGF API)
parse_iids = [
"CMIP6.PMIP.*.*.lgm.*.*.uo.*.*",
"CMIP6.PMIP.*.*.lgm.*.*.vo.*.*",
"CMIP6.PMIP.*.*.lgm.*.*.[uo,vo].*.*",
]
iids = []
for piid in parse_iids:
Expand All @@ -29,3 +29,18 @@ def test_readme_example():

for iid in expected_iids:
assert iid in iids


@pytest.mark.parametrize(
"facet_iid, expected",
[
("a.b.c.d", ["a.b.c.d"]),
("a.[b1, b2].c.[d1, d2]", ["a.b1.c.d1", "a.b1.c.d2", "a.b2.c.d1", "a.b2.c.d2"]),
("a.[b1, b2].c.d", ["a.b1.c.d", "a.b2.c.d"]),
("a.b.c.[d1, d2]", ["a.b.c.d1", "a.b.c.d2"]),
],
)
def test_split_square_brackets(facet_iid, expected):
from pangeo_forge_esgf.parsing import split_square_brackets

assert split_square_brackets(facet_iid) == expected
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ test = [
dev = [
"ruff",
"mypy",
"types-requests",
"pre-commit",
"pangeo-forge-esgf[test]"
]
Expand Down