Skip to content

Commit

Permalink
add fs endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
rsarm committed Oct 23, 2024
1 parent 3f5546c commit ecb981f
Showing 1 changed file with 386 additions and 1 deletion.
387 changes: 386 additions & 1 deletion firecrest/v2/AsyncClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
import httpx
import json
import logging
import pathlib
import ssl

from typing import Any, Optional, List
from io import BytesIO
from contextlib import nullcontext
from typing import Any, ContextManager, Optional, List
from packaging.version import Version, parse

from firecrest.utilities import (
Expand Down Expand Up @@ -263,3 +266,385 @@ async def partitions(
endpoint=f"/status/{system_name}/partitions"
)
return self._json_response(resp, 200)["partitions"]

async def list_files(
self,
system_name: str,
path: str,
show_hidden: bool = False,
recursive: bool = False,
numeric_uid: bool = False,
follow_links: bool = False
) -> List[dict]:
"""Returns a list of files in a directory.
:param system_name: the machine name where the filesystem belongs to
:param path: the absolute target path
:param show_hidden: show hidden files
:param recursive: recursively list directories encountered
:param follow_links: Follow symbolik links
:calls: GET `/filesystem/{system_name}/ops/ls`
"""
params: dict[str, Any] = {"path": f"{path}"}
if show_hidden is True:
params["showHidden"] = show_hidden

if recursive is True:
params["recursive"] = recursive

if numeric_uid is True:
params["numericUid"] = numeric_uid

if follow_links is True:
params["followLinks"] = follow_links

resp = await self._get_request(
endpoint=f"/filesystem/{system_name}/ops/ls",
params=params
)
return self._json_response(resp, 200)

async def head(
self,
system_name: str,
path: str,
bytes: Optional[int] = None,
lines: Optional[int] = None,
skip_ending: bool = False,
) -> List[dict]:
"""Display the beginning of a specified file.
By default 10 lines will be returned.
Bytes and lines cannot be specified simultaneously.
:param system_name: the machine name where the filesystem belongs to
:param path: the absolute target path
:param bytes: The output will be the first NUM bytes of each file
:param lines: The output will be the first NUM lines of each file
:param skip_ending: The output will be the whole file, without the last
NUM bytes/lines of each file. NUM should be
specified in the respective argument through
``bytes`` or ``lines``.
:calls: GET `/filesystem/{system_name}/ops/head`
"""
params: dict[str, Any] = {"path": f"{path}"}
if bytes is True:
params["bytes"] = bytes

if lines is True:
params["lines"] = lines

if skip_ending is True:
params["skipEnding"] = skip_beginning

resp = await self._get_request(
endpoint=f"/filesystem/{system_name}/ops/head",
params=params
)
return self._json_response(resp, 200)

async def tail(
self,
system_name: str,
path: str,
bytes: Optional[int] = None,
lines: Optional[int] = None,
skip_beginning: bool = False,
) -> List[dict]:
"""Display the ending of a specified file.
By default 10 lines will be returned.
Bytes and lines cannot be specified simultaneously.
:param system_name: the machine name where the filesystem belongs to
:param path: the absolute target path
:param bytes: The output will be the last NUM bytes of each file
:param lines: The output will be the last NUM lines of each file
:param skip_beginning: The output will be the whole file, without the last
NUM bytes/lines of each file. NUM should be
specified in the respective argument through
``bytes`` or ``lines``.
:calls: GET `/filesystem/{system_name}/ops/tail`
"""
params: dict[str, Any] = {"path": f"{path}"}
if bytes is True:
params["bytes"] = bytes

if lines is True:
params["lines"] = lines

if skip_beginning is True:
params["skipBeginning"] = skip_beginning

resp = await self._get_request(
endpoint=f"/filesystem/{system_name}/ops/tail",
params=params
)
return self._json_response(resp, 200)

async def view(
self,
system_name: str,
path: str,
) -> List[dict]:
"""
View full file content (up to 5MB files)
:param system_name: the machine name where the filesystem belongs to
:param path: the absolute target path
:calls: GET `/filesystem/{system_name}/ops/view`
"""
params: dict[str, str] = {"path": f"{path}"}

resp = await self._get_request(
endpoint=f"/filesystem/{system_name}/ops/view",
params=params
)
return self._json_response(resp, 200)

async def checksum(
self,
system_name: str,
path: str,
) -> List[dict]:
"""
Calculate the SHA256 (256-bit) checksum of a specified file.
:param system_name: the machine name where the filesystem belongs to
:param path: the absolute target path
:calls: GET `/filesystem/{system_name}/ops/checksum`
"""
params: dict[str, str] = {"path": f"{path}"}

resp = await self._get_request(
endpoint=f"/filesystem/{system_name}/ops/checksum",
params=params
)
return self._json_response(resp, 200)

async def file_type(
self,
system_name: str,
path: str,
) -> List[dict]:
"""
Uses the `file` linux application to determine the type of a file.
:param system_name: the machine name where the filesystem belongs to
:param path: the absolute target path
:calls: GET `/filesystem/{system_name}/ops/checksum`
"""
params: dict[str, str] = {"path": f"{path}"}

resp = await self._get_request(
endpoint=f"/filesystem/{system_name}/ops/file",
params=params
)
return self._json_response(resp, 200)

async def chmod(
self,
system_name: str,
path: str,
mode: str
) -> List[dict]:
"""Changes the file mod bits of a given file according to the specified mode.
:param machine: the machine name where the filesystem belongs to
:param path: the absolute target path
:param mode: same as numeric mode of linux chmod tool
:calls: PUT `/filesystem/{system_name}/ops/chmod`
"""
params: dict[str, str] = {
"path": f"{path}",
"mode": f"{mode}"
}
resp = await self._put_request(
endpoint=f"/filesystem/{system_name}/ops/chmod",
params=params
)
return self._json_response(resp, 200)

async def chown(
self,
system_name: str,
path: str,
owner: str,
group: str
) -> List[dict]:
"""Changes the user and/or group ownership of a given file.
If only owner or group information is passed, only that information will be updated.
:param machine: the machine name where the filesystem belongs to
:param path: the absolute target path
:param owner: owner ID for target
:param group: group ID for target
:calls: PUT `/filesystem/{system_name}/ops/chown`
"""
params: dict[str, str] = {"path": f"{path}"}
if owner:
data["owner"] = owner

if group:
data["group"] = group

resp = await self._put_request(
endpoint=f"/filesystem/{system_name}/ops/chown",
params=params
)
return self._json_response(resp, 200)


async def stat(
self,
system_name: str,
path: str,
) -> List[dict]:
"""
Uses the stat linux application to determine the status of a file on the machine's filesystem.
The result follows: https://docs.python.org/3/library/os.html#os.stat_result.
:param system_name: the machine name where the filesystem belongs to
:param path: the absolute target path
:param dereference: follow symbolic links
:calls: GET `/filesystem/{system_name}/ops/checksum`
"""
params: dict[str, str] = {"path": f"{path}"}

if dereference is True:
params["dereference"] = dereference

resp = await self._get_request(
endpoint=f"/filesystem/{system_name}/ops/stat",
params=params
)
return self._json_response(resp, 200)

async def upload(
self,
system_name: str,
source_path: str,
target_path: str,
filename: Optional[str] = None,
) -> List[dict]:
"""Blocking call to upload a small file.
The file that will be uploaded will have the same name as the source_path.
:param machine: the machine name where the filesystem belongs to
:param source_path: the source path of the file or binary stream
:param target_path: the absolute target path of the directory where the file will be uploaded
:param filename: naming target file to filename (default is same as the local one)
:calls: POST `/filesystem/{system_name}/transfer/upload`
"""
context: ContextManager[BytesIO] = (
open(source_path, "rb") # type: ignore
if isinstance(source_path, str) or isinstance(source_path, pathlib.Path)
else nullcontext(source_path)
)
with context as f:
# Set filename
if filename is not None:
f = (filename, f) # type: ignore

params: dict[str, str] = {
"fileName": f,
"targetPath": f"{target_path}"
}

resp = await self._post_request(
endpoint=f"/filesystem/{system_name}/transfer/upload",
params=params
)
return self._json_response(resp, 200)

async def download(
self,
system_name: str,
source_path: str,
target_path: str,
) -> List[dict]:
"""Blocking call to download a small file.
:param machine: the machine name where the filesystem belongs to
:param source_path: the absolute source path of the file or binary stream
:param target_path: the target path in the local filesystem or binary stream
:calls: POST `/filesystem/{system_name}/transfer/upload`
"""
params: dict[str, str] = {"source_path": f"{sourcePath}"}
resp = await self._post_request(
endpoint=f"/filesystem/{system_name}/transfer/upload",
params=params
)
self._json_response([resp], 200, allow_none_result=True)
context: ContextManager[BytesIO] = (
open(target_path, "wb") # type: ignore
if isinstance(target_path, str) or isinstance(target_path, pathlib.Path)
else nullcontext(target_path)
)
with context as f:
f.write(resp.content)

async def mv(
self,
system_name: str,
source_path: str,
target_path: str
) -> List[dict]:
"""Rename/move a file, directory, or symlink at the `source_path` to the `target_path` on `machine`'s filesystem.
When successful, the method returns a string with the new path of the file.
:param machine: the machine name where the filesystem belongs to
:param source_path: the absolute source path
:param target_path: the absolute target path
:calls: POST `/filesystem/{system_name}/transfer/mv`
"""
params: dict[str, str] = {
"sourcePath": f"{source_path}",
"targetPath": f"{target_path}"
}
resp = await self._post_request(
endpoint=f"/filesystem/{system_name}/transfer/mv",
params=params
)
return self._json_response(resp, 200)

async def cp(
self,
system_name: str,
source_path: str,
target_path: str
) -> List[dict]:
"""Copies file from `source_path` to `target_path`.
When successful, the method returns a string with the path of the newly created file.
:param machine: the machine name where the filesystem belongs to
:param source_path: the absolute source path
:param target_path: the absolute target path
:calls: POST `/filesystem/{system_name}/transfer/cp`
"""
params: dict[str, str] = {
"sourcePath": f"{source_path}",
"targetPath": f"{target_path}"
}
resp = await self._post_request(
endpoint=f"/filesystem/{system_name}/transfer/cp",
params=params
)
return self._json_response(resp, 200)

async def rm(
self,
system_name: str,
path: str,
target_path: str
) -> List[dict]:
"""Blocking call to delete a small file.
:param machine: the machine name where the filesystem belongs to
:param path: the absolute target path
:calls: DELETE `/filesystem/{system_name}/transfer/rm`
"""
params: dict[str, str] = {"path": f"{path}"}
resp = await self._post_request(
endpoint=f"/filesystem/{system_name}/transfer/rm",
params=params
)
return self._json_response(resp, 200)

0 comments on commit ecb981f

Please sign in to comment.