Skip to content

Commit

Permalink
Check automatically for the version of the api through /status/param…
Browse files Browse the repository at this point in the history
…eters (#128)

* Implemented version control

* Check for the version of the api in /status/parameters

* Remove type from _api_version

* Fix typo

* Update types and cli

* Add empty line

* Fix typo

* Update implementation to get automatically the api version

* Update implementation

* Fix type

* Fix doc

---------

Co-authored-by: Ali <[email protected]>
  • Loading branch information
ekouts and khsrali authored Oct 21, 2024
1 parent ec79533 commit bb45370
Show file tree
Hide file tree
Showing 17 changed files with 414 additions and 24 deletions.
59 changes: 54 additions & 5 deletions firecrest/AsyncClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
import firecrest.types as t
from firecrest.AsyncExternalStorage import AsyncExternalUpload, AsyncExternalDownload
from firecrest.utilities import (
parse_retry_after, slurm_state_completed, time_block
async_validate_api_version_compatibility,
parse_retry_after,
slurm_state_completed,
time_block
)


Expand Down Expand Up @@ -200,7 +203,6 @@ def __init__(
self.polling_sleep_times: list = 250 * [0]
#: Disable all logging from the client.
self.disable_client_logging: bool = False
self._api_version: Version = parse("1.15.0")
self._session = httpx.AsyncClient(verify=self._verify)

#: Seconds between requests in each microservice
Expand Down Expand Up @@ -247,11 +249,18 @@ def __init__(
"/tasks": None,
}

self._api_version = parse("1.15.0")
self._query_api_version = True

def set_api_version(self, api_version: str) -> None:
"""Set the version of the api of firecrest. By default it will be assumed that you are
using version 1.13.1 or compatible. The version is parsed by the `packaging` library.
"""Set the version of the api of firecrest manually. By default, the
client will query the api, through the
/status endpoint. This information is only available for
version>=1.16.1, so for older deployments the default will be 1.15.0.
The version is parsed by the `packaging` library.
"""
self._api_version = parse(api_version)
self._query_api_version = False

async def close_session(self) -> None:
"""Close the httpx session"""
Expand Down Expand Up @@ -708,8 +717,30 @@ async def parameters(self) -> t.Parameters:
:calls: GET `/status/parameters`
"""
resp = await self._get_request(endpoint="/status/parameters")
return self._json_response([resp], 200)["out"]
json_response = self._json_response([resp], 200)["out"]
if self._query_api_version:
self._query_api_version = False
try:
general_params = json_response["general"]
for g in general_params:
if g["name"] == "FIRECREST_VERSION":
self._api_version = parse(g["value"])
return json_response

raise KeyError

except KeyError:
self.log(
logging.WARNING,
"Could not get the version of the api from firecREST. "
"The version will be set to 1.15.0, but you can manually "
"set it with the method `set_api_version`."
)
self._api_version = parse("1.15.0")

return json_response

@async_validate_api_version_compatibility()
async def filesystems(self, system_name: Optional[str] = None) -> dict[str, List[t.Filesystem]]:
"""Returns the status of the filesystems per system.
Expand All @@ -731,6 +762,7 @@ async def filesystems(self, system_name: Optional[str] = None) -> dict[str, List
return self._json_response([resp], 200)["out"]

# Utilities
@async_validate_api_version_compatibility()
async def list_files(
self, machine: str, target_path: str, show_hidden: bool = False,
recursive: bool = False
Expand All @@ -745,6 +777,13 @@ async def list_files(
.. warning:: The argument ``recursive`` is available only for FirecREST>=1.16.0
"""
if recursive and self._api_version < parse("1.16.0"):
raise fe.NotImplementedOnAPIversion(
"`recursive=True` flag is not available for "
"function `list_files` for version <1.16.0 "
"in the client."
)

params: dict[str, Any] = {"targetPath": f"{target_path}"}
if show_hidden is True:
params["showhidden"] = show_hidden
Expand Down Expand Up @@ -864,6 +903,7 @@ async def copy(self, machine: str, source_path: str, target_path: str) -> str:
self._json_response([resp], 201)
return target_path

@async_validate_api_version_compatibility()
async def compress(
self,
machine: str,
Expand Down Expand Up @@ -962,6 +1002,7 @@ async def compress(

return target_path

@async_validate_api_version_compatibility()
async def extract(
self,
machine: str,
Expand Down Expand Up @@ -1316,6 +1357,7 @@ async def whoami(self, machine=None) -> Optional[str]:
# Invalid token, cannot retrieve username
return None

@async_validate_api_version_compatibility()
async def groups(self, machine) -> t.UserId:
"""Returns the output of the `id` command, user and group ids.
Expand Down Expand Up @@ -1566,6 +1608,7 @@ async def poll_active(

return ret

@async_validate_api_version_compatibility()
async def nodes(
self,
machine: str,
Expand Down Expand Up @@ -1596,6 +1639,7 @@ async def nodes(
result = (await t.poll_task("200", iter(self.polling_sleep_times)))[0]
return result

@async_validate_api_version_compatibility()
async def partitions(
self,
machine: str,
Expand Down Expand Up @@ -1626,6 +1670,7 @@ async def partitions(
result = (await t.poll_task("200", iter(self.polling_sleep_times)))[0]
return result

@async_validate_api_version_compatibility()
async def reservations(
self,
machine: str,
Expand Down Expand Up @@ -1816,6 +1861,7 @@ async def submit_copy_job(
result.update({"system": job_info[1]})
return result

@async_validate_api_version_compatibility()
async def submit_compress_job(
self,
machine: str,
Expand Down Expand Up @@ -1869,6 +1915,7 @@ async def submit_compress_job(
result.update({"system": job_info[1]})
return result

@async_validate_api_version_compatibility()
async def submit_extract_job(
self,
machine: str,
Expand Down Expand Up @@ -2019,6 +2066,7 @@ async def submit_delete_job(
result.update({"system": job_info[1]})
return result

@async_validate_api_version_compatibility()
async def external_upload(
self, machine: str, source_path: str, target_path: str
) -> AsyncExternalUpload:
Expand All @@ -2037,6 +2085,7 @@ async def external_upload(
json_response = self._json_response([resp], 201)["task_id"]
return AsyncExternalUpload(self, json_response, [resp])

@async_validate_api_version_compatibility()
async def external_download(
self, machine: str, source_path: str
) -> AsyncExternalDownload:
Expand Down
63 changes: 57 additions & 6 deletions firecrest/BasicClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@
from io import BytesIO
from requests.compat import json # type: ignore
from typing import Any, ContextManager, Optional, overload, Sequence, Tuple, List
from packaging.version import Version, parse
from packaging.version import parse

import firecrest.FirecrestException as fe
import firecrest.types as t
from firecrest.ExternalStorage import ExternalUpload, ExternalDownload
from firecrest.utilities import (parse_retry_after, slurm_state_completed, time_block)
from firecrest.utilities import (
parse_retry_after,
slurm_state_completed,
time_block,
validate_api_version_compatibility
)

if sys.version_info >= (3, 8):
from typing import Literal
Expand Down Expand Up @@ -157,14 +162,20 @@ def __init__(
self.polling_sleep_times: list = [1, 0.5] + 234 * [0.25]
#: Disable all logging from the client.
self.disable_client_logging: bool = False
self._api_version: Version = parse("1.15.0")
self._session = requests.Session()

self._api_version = parse("1.15.0")
self._query_api_version = True

def set_api_version(self, api_version: str) -> None:
"""Set the version of the api of firecrest. By default it will be assumed that you are
using version 1.13.1 or compatible. The version is parsed by the `packaging` library.
"""Set the version of the api of firecrest manually. By default, the
client will query the api, through the
/status endpoint. This information is only available for
version>=1.16.1, so for older deployments the default will be 1.15.0.
The version is parsed by the `packaging` library.
"""
self._api_version = parse(api_version)
self._query_api_version = False

def log(self, level: int, msg: Any) -> None:
"""Log a message with the given level on the client logger.
Expand Down Expand Up @@ -460,8 +471,30 @@ def parameters(self) -> t.Parameters:
:calls: GET `/status/parameters`
"""
resp = self._get_request(endpoint="/status/parameters")
return self._json_response([resp], 200)["out"]
json_response = self._json_response([resp], 200)["out"]
if self._query_api_version:
self._query_api_version = False
try:
general_params = json_response["general"]
for g in general_params:
if g["name"] == "FIRECREST_VERSION":
self._api_version = parse(g["value"])
return json_response

raise KeyError

except KeyError:
self.log(
logging.WARNING,
"Could not get the version of the api from firecREST. "
"The version will be set to 1.15.0, but you can manually "
"set it with the method `set_api_version`."
)
self._api_version = parse("1.15.0")

return json_response

@validate_api_version_compatibility()
def filesystems(self, system_name: Optional[str] = None) -> dict[str, List[t.Filesystem]]:
"""Returns the status of the filesystems per system.
Expand All @@ -483,6 +516,7 @@ def filesystems(self, system_name: Optional[str] = None) -> dict[str, List[t.Fil
return self._json_response([resp], 200)["out"]

# Utilities
@validate_api_version_compatibility()
def list_files(
self, machine: str, target_path: str, show_hidden: bool = False,
recursive: bool = False
Expand All @@ -497,6 +531,13 @@ def list_files(
.. warning:: The argument ``recursive`` is available only for FirecREST>=1.16.0
"""
if recursive and self._api_version < parse("1.16.0"):
raise fe.NotImplementedOnAPIversion(
"`recursive=True` flag is not available for "
"function `list_files` for version <1.16.0 "
"in the client."
)

params: dict[str, Any] = {"targetPath": f"{target_path}"}
if show_hidden is True:
params["showhidden"] = show_hidden
Expand Down Expand Up @@ -614,6 +655,7 @@ def copy(self, machine: str, source_path: str, target_path: str) -> str:
self._json_response([resp], 201)
return target_path

@validate_api_version_compatibility()
def compress(
self,
machine: str,
Expand Down Expand Up @@ -713,6 +755,7 @@ def compress(

return target_path

@validate_api_version_compatibility()
def extract(
self,
machine: str,
Expand Down Expand Up @@ -1068,6 +1111,7 @@ def whoami(self, machine=None) -> Optional[str]:
# Invalid token, cannot retrieve username
return None

@validate_api_version_compatibility()
def groups(self, machine) -> t.UserId:
"""Returns the output of the `id` command, user and group ids.
Expand Down Expand Up @@ -1340,6 +1384,7 @@ def poll_active(
)[0]
return list(dict_result.values())

@validate_api_version_compatibility()
def nodes(
self,
machine: str,
Expand Down Expand Up @@ -1372,6 +1417,7 @@ def nodes(
)[0]
return result

@validate_api_version_compatibility()
def partitions(
self,
machine: str,
Expand Down Expand Up @@ -1404,6 +1450,7 @@ def partitions(
)[0]
return result

@validate_api_version_compatibility()
def reservations(
self,
machine: str,
Expand Down Expand Up @@ -1694,6 +1741,7 @@ def submit_delete_job(
result.update({"system": transfer_info[1]})
return result

@validate_api_version_compatibility()
def external_upload(
self, machine: str, source_path: str, target_path: str
) -> ExternalUpload:
Expand All @@ -1712,6 +1760,7 @@ def external_upload(
json_response = self._json_response([resp], 201)["task_id"]
return ExternalUpload(self, json_response, [resp])

@validate_api_version_compatibility()
def external_download(self, machine: str, source_path: str) -> ExternalDownload:
"""Non blocking call for the download of larger files.
Expand All @@ -1728,6 +1777,7 @@ def external_download(self, machine: str, source_path: str) -> ExternalDownload:
self, self._json_response([resp], 201)["task_id"], [resp]
)

@validate_api_version_compatibility()
def submit_compress_job(
self,
machine: str,
Expand Down Expand Up @@ -1781,6 +1831,7 @@ def submit_compress_job(
result.update({"system": transfer_info[1]})
return result

@validate_api_version_compatibility()
def submit_extract_job(
self,
machine: str,
Expand Down
4 changes: 4 additions & 0 deletions firecrest/FirecrestException.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,7 @@ def __str__(self):
f"is exhausted. Update `polling_sleep_times` of the client "
f"to increase the number of polling attempts."
)


class NotImplementedOnAPIversion(Exception):
"""Exception raised when a feature is not developed yet for the current API version"""
5 changes: 3 additions & 2 deletions firecrest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@
from firecrest.FirecrestException import (
ClientsCredentialsException,
FirecrestException,
UnauthorizedException,
HeaderException,
UnexpectedStatusException,
NotImplementedOnAPIversion,
StorageDownloadException,
StorageUploadException,
UnauthorizedException,
UnexpectedStatusException,
)
Loading

0 comments on commit bb45370

Please sign in to comment.