Skip to content

Commit

Permalink
move internal (private) params to x_ prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
philipmac authored and annshress committed Dec 7, 2023
1 parent cf4e9b6 commit ccc1158
Show file tree
Hide file tree
Showing 12 changed files with 81 additions and 81 deletions.
14 changes: 7 additions & 7 deletions em_workflows/brt/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,20 +515,20 @@ def brt_flow(
# end user facing adoc params
file_share: str,
input_dir: str,
file_name: Optional[str] = None,
x_file_name: Optional[str] = None,
callback_url: Optional[str] = None,
token: Optional[str] = None,
no_api: bool = False,
keep_workdir: bool = False,
x_no_api: bool = False,
x_keep_workdir: bool = False,
adoc_template: str = "plastic_brt",
):
utils.notify_api_running(no_api, token, callback_url)
utils.notify_api_running(x_no_api, token, callback_url)

# a single input_dir will have n tomograms
input_dir_fp = utils.get_input_dir(share_name=file_share, input_dir=input_dir)
# input_dir_fp = utils.get_input_dir(input_dir=input_dir)
input_fps = utils.list_files(
input_dir=input_dir_fp, exts=["MRC", "ST", "mrc", "st"], single_file=file_name
input_dir=input_dir_fp, exts=["MRC", "ST", "mrc", "st"], single_file=x_file_name
)

fps = utils.gen_fps(share_name=file_share, input_dir=input_dir_fp, fps_in=input_fps)
Expand Down Expand Up @@ -640,10 +640,10 @@ def brt_flow(
utils.callback_with_cleanup(
fps=fps,
callback_result=callback_with_tilt_mov,
no_api=no_api,
x_no_api=x_no_api,
callback_url=callback_url,
token=token,
keep_workdir=keep_workdir,
x_keep_workdir=x_keep_workdir,
)

"""
Expand Down
12 changes: 6 additions & 6 deletions em_workflows/czi/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,18 +215,18 @@ def update_file_metadata(file_path: FilePath, callback_with_zarr: Dict) -> Dict:
async def czi_flow(
file_share: str,
input_dir: str,
file_name: Optional[str] = None,
x_file_name: Optional[str] = None,
callback_url: Optional[str] = None,
token: Optional[str] = None,
no_api: bool = False,
keep_workdir: bool = False,
x_no_api: bool = False,
x_keep_workdir: bool = False,
):
input_dir_fp = utils.get_input_dir(share_name=file_share, input_dir=input_dir)

input_fps = utils.list_files(
input_dir_fp,
VALID_CZI_INPUTS,
single_file=file_name,
single_file=x_file_name,
)
fps = utils.gen_fps(share_name=file_share, input_dir=input_dir_fp, fps_in=input_fps)
prim_fps = utils.gen_prim_fps.map(fp_in=fps)
Expand All @@ -242,8 +242,8 @@ async def czi_flow(
utils.callback_with_cleanup(
fps=fps,
callback_result=callback_with_zarrs,
no_api=no_api,
x_no_api=x_no_api,
callback_url=callback_url,
token=token,
keep_workdir=keep_workdir,
x_keep_workdir=x_keep_workdir,
)
12 changes: 6 additions & 6 deletions em_workflows/dm_conversion/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,11 @@ def convert_intermediate_files(fps):
def dm_flow(
file_share: str,
input_dir: str,
file_name: Optional[str] = None,
x_file_name: Optional[str] = None,
callback_url: Optional[str] = None,
token: Optional[str] = None,
no_api: bool = False,
keep_workdir: bool = False,
x_no_api: bool = False,
x_keep_workdir: bool = False,
):
# run_config=LocalRun(labels=[utils.get_environment()]),
"""
Expand All @@ -232,7 +232,7 @@ def dm_flow(
input_fps = utils.list_files(
input_dir_fp,
VALID_2D_INPUT_EXTS,
single_file=file_name,
single_file=x_file_name,
)
fps = utils.gen_fps(share_name=file_share, input_dir=input_dir_fp, fps_in=input_fps)
# logs = utils.init_log.map(file_path=fps)
Expand All @@ -253,8 +253,8 @@ def dm_flow(
utils.callback_with_cleanup(
fps=fps,
callback_result=callback_with_keyimgs,
no_api=no_api,
x_no_api=x_no_api,
callback_url=callback_url,
token=token,
keep_workdir=keep_workdir,
x_keep_workdir=x_keep_workdir,
)
16 changes: 8 additions & 8 deletions em_workflows/lrg_2d_rgb/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,19 +146,19 @@ def gen_thumb(file_path: FilePath):
def lrg_2d_flow(
file_share: str,
input_dir: str,
file_name: Optional[str] = None,
x_file_name: Optional[str] = None,
callback_url: Optional[str] = None,
token: Optional[str] = None,
no_api: bool = False,
keep_workdir: bool = False,
x_no_api: bool = False,
x_keep_workdir: bool = False,
):
"""
-list all png inputs (assumes all are "large")
-create tmp dir for each.
-convert to tiff -> zarr -> jpegs (thumb)
"""
if no_api:
utils.notify_api_running(no_api=no_api)
if x_no_api:
utils.notify_api_running(x_no_api=x_no_api)
else:
utils.notify_api_running(token=token, callback_url=callback_url)

Expand All @@ -167,7 +167,7 @@ def lrg_2d_flow(
input_fps = utils.list_files(
input_dir_fp,
VALID_LRG_2D_RGB_INPUTS,
single_file=file_name,
single_file=x_file_name,
)
fps = utils.gen_fps(share_name=file_share, input_dir=input_dir_fp, fps_in=input_fps)
tiffs = convert_png_to_tiff.map(file_path=fps)
Expand All @@ -185,10 +185,10 @@ def lrg_2d_flow(
utils.callback_with_cleanup(
fps=fps,
callback_result=callback_with_pyramids,
no_api=no_api,
x_no_api=x_no_api,
callback_url=callback_url,
token=token,
keep_workdir=keep_workdir,
x_keep_workdir=x_keep_workdir,
)

"""
Expand Down
10 changes: 5 additions & 5 deletions em_workflows/sem_tomo/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,11 @@ def gen_ng_metadata(fp_in: FilePath) -> Dict:
def sem_tomo_flow(
file_share: str,
input_dir: str,
file_name: Optional[str] = None,
x_file_name: Optional[str] = None,
callback_url: Optional[str] = None,
token: Optional[str] = None,
no_api: bool = False,
keep_workdir: bool = False,
x_no_api: bool = False,
x_keep_workdir: bool = False,
tilt_angle: float = 0,
):
input_dir_fp = utils.get_input_dir(share_name=file_share, input_dir=input_dir)
Expand Down Expand Up @@ -376,8 +376,8 @@ def sem_tomo_flow(
utils.callback_with_cleanup(
fps=fps,
callback_result=callback_with_corr_movies,
no_api=no_api,
x_no_api=x_no_api,
callback_url=callback_url,
token=token,
keep_workdir=keep_workdir,
x_keep_workdir=x_keep_workdir,
)
40 changes: 20 additions & 20 deletions em_workflows/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,16 +173,16 @@ def add_asset(prim_fp: dict, asset: dict, image_idx: int = None) -> dict:

# triggers like "always_run" are managed when calling the task itself
@task(retries=3, retry_delay_seconds=10)
def cleanup_workdir(fps: List[FilePath], keep_workdir: bool):
def cleanup_workdir(fps: List[FilePath], x_keep_workdir: bool):
"""
:param fp: a FilePath which has a working_dir to be removed
| working_dir isn't needed after run, so remove unless "keep_workdir" is True.
| working_dir isn't needed after run, so remove unless "x_keep_workdir" is True.
| task wrapper on the FilePath rm_workdir method.
"""
if keep_workdir is True:
log("keep_workdir is set to True, skipping removal.")
if x_keep_workdir is True:
log("x_keep_workdir is set to True, skipping removal.")
else:
for fp in fps:
log(f"Trying to remove {fp.working_dir}")
Expand Down Expand Up @@ -449,13 +449,13 @@ def list_dirs(input_dir_fp: Path) -> List[Path]:

@task(retries=1, retry_delay_seconds=10)
def notify_api_running(
no_api: bool = None, token: str = None, callback_url: str = None
x_no_api: bool = None, token: str = None, callback_url: str = None
):
"""
tells API the workflow has started to run.
"""
if no_api:
log("no_api flag used, not interacting with API")
if x_no_api:
log("x_no_api flag used, not interacting with API")
return
elif not callback_url or not token:
raise RuntimeError("impossible args for notify_api_running")
Expand Down Expand Up @@ -496,8 +496,8 @@ def notify_api_running(
# else:
# message = "error"
# ns = state
# if prefect.context.parameters.get("no_api"):
# log(f"no_api flag used, terminal: success is {message}")
# if prefect.context.parameters.get("x_no_api"):
# log(f"x_no_api flag used, terminal: success is {message}")
# else:
# callback_url = prefect.context.parameters.get("callback_url")
# token = prefect.context.parameters.get("token")
Expand All @@ -523,12 +523,12 @@ def notify_api_completion(flow: Flow, flow_run: FlowRun, state: State):
https://docs.prefect.io/core/concepts/notifications.html#state-handlers
"""
status = "Completed" if state.is_completed else "Failed"
no_api = flow_run.parameters.get("no_api", True)
x_no_api = flow_run.parameters.get("x_no_api", True)
token = flow_run.parameters.get("token", "")
callback_url = flow_run.parameters.get("callback_url", "")

if no_api:
log(f"no_api flag used\nCompletion status: {status}")
if x_no_api:
log(f"x_no_api flag used\nCompletion status: {status}")
return

headers = {
Expand Down Expand Up @@ -603,7 +603,7 @@ def gen_fps(share_name: str, input_dir: Path, fps_in: List[Path]) -> List[FilePa
# TODO handle "trigger=any_successful"
@task(retries=3, retry_delay_seconds=60)
def send_callback_body(
no_api: bool,
x_no_api: bool,
files_elts: List[Dict],
token: str = None,
callback_url: str = None,
Expand All @@ -617,8 +617,8 @@ def send_callback_body(
Refer to docs/demo_callback.json for expected
"""
data = {"files": files_elts}
if no_api is True:
log("no_api flag used, not interacting with API")
if x_no_api is True:
log("x_no_api flag used, not interacting with API")
log(json.dumps(data))
return

Expand All @@ -639,29 +639,29 @@ def send_callback_body(
raise RuntimeError(msg)
else:
raise RuntimeError(
"Invalid state - need callback_url and token, OR set no_api to True."
"Invalid state - need callback_url and token, OR set x_no_api to True."
)


def callback_with_cleanup(
fps: List[FilePath],
callback_result: List,
no_api: bool = False,
x_no_api: bool = False,
callback_url: str = None,
token: str = None,
keep_workdir: bool = False,
x_keep_workdir: bool = False,
):
cp_wd_logs_to_assets = copy_workdir_logs.map(fps, wait_for=[callback_result])

cb = send_callback_body(
no_api=no_api,
x_no_api=x_no_api,
token=token,
callback_url=callback_url,
files_elts=callback_result,
)
cleanup_workdir(
fps,
keep_workdir,
x_keep_workdir,
wait_for=[allow_failure(cb), allow_failure(cp_wd_logs_to_assets)],
)

Expand Down
8 changes: 4 additions & 4 deletions test/test_brt.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ def test_brt(mock_nfs_mount):
THICKNESS=30,
file_share="test",
input_dir=input_dir,
no_api=True,
keep_workdir=True,
x_no_api=True,
x_keep_workdir=True,
return_state=True,
)
assert result.is_completed(), "`result` is not successful!"
Expand Down Expand Up @@ -60,8 +60,8 @@ def test_brt_callback(mock_nfs_mount, caplog, mock_callback_data):
THICKNESS=30,
file_share="test",
input_dir=input_dir,
no_api=True,
keep_workdir=True,
x_no_api=True,
x_keep_workdir=True,
return_state=True,
)
assert result.is_completed(), "`result` is not successful!"
Expand Down
6 changes: 3 additions & 3 deletions test/test_czi.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ async def test_input_fname(mock_nfs_mount, caplog, mock_reuse_zarr):
state = await czi_flow(
file_share="test",
input_dir="test/input_files/IF_czi/Projects/Cropped_Image/",
no_api=True,
x_no_api=True,
return_state=True,
)
assert state.is_completed()
Expand All @@ -35,7 +35,7 @@ async def test_no_mount_point_flow_fails(mock_binaries, monkeypatch, caplog):
await czi_flow(
file_share=share_name,
input_dir="test/input_files/IF_czi/Projects/Cropped_Image/",
no_api=True,
x_no_api=True,
)
assert f"{share_name} doesn't exist. Failing!" in caplog.text, caplog.text

Expand All @@ -56,7 +56,7 @@ async def test_czi_workflow_callback_structure(
state = await czi_flow(
file_share="test",
input_dir=input_dir,
no_api=True,
x_no_api=True,
return_state=True,
)
assert state.is_completed()
Expand Down
Loading

0 comments on commit ccc1158

Please sign in to comment.