Skip to content

Commit

Permalink
fix dynamics flow and more graphs
Browse files Browse the repository at this point in the history
  • Loading branch information
gpetretto committed Dec 7, 2023
1 parent 2b32951 commit 138a7b1
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 33 deletions.
13 changes: 13 additions & 0 deletions src/jobflow_remote/cli/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,14 @@ def graph(
help="Show the graph in a dash app",
),
] = False,
print_mermaid: Annotated[
bool,
typer.Option(
"--mermaid",
"-m",
help="Print the mermaid graph",
),
] = False,
):
"""
Provide detailed information on a Flow
Expand All @@ -245,6 +253,11 @@ def graph(
if not flows_info:
exit_with_error_msg("No data matching the request")

if print_mermaid:
from jobflow_remote.jobs.graph import get_mermaid

print(get_mermaid(flows_info[0]))

if dash_plot:
plot_dash(flows_info[0])
else:
Expand Down
18 changes: 14 additions & 4 deletions src/jobflow_remote/cli/formatting.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from jobflow_remote.cli.utils import ReprStr, fmt_datetime
from jobflow_remote.config.base import ExecutionConfig, WorkerBase
from jobflow_remote.jobs.data import FlowInfo, JobInfo
from jobflow_remote.jobs.data import FlowInfo, JobDoc, JobInfo
from jobflow_remote.jobs.state import JobState
from jobflow_remote.utils.data import convert_utc_time

Expand Down Expand Up @@ -130,14 +130,24 @@ def get_flow_info_table(flows_info: list[FlowInfo], verbosity: int):
return table


def format_job_info(job_info: JobInfo, show_none: bool = False):
def format_job_info(
job_info: JobInfo | JobDoc, verbosity: int, show_none: bool = False
):
d = job_info.dict(exclude_none=not show_none)
if verbosity == 1:
d.pop("job", None)

# convert dates at the first level and for the remote error
for k, v in d.items():
if isinstance(v, datetime.datetime):
d[k] = convert_utc_time(v)
d[k] = convert_utc_time(v).strftime(fmt_datetime)

if d["remote"]["retry_time_limit"]:
d["remote"]["retry_time_limit"] = convert_utc_time(
d["remote"]["retry_time_limit"]
).strftime(fmt_datetime)

d = jsanitize(d, allow_bson=False, enum_values=True)
d = jsanitize(d, allow_bson=True, enum_values=True, strict=True)
error = d.get("error")
if error:
d["error"] = ReprStr(error)
Expand Down
8 changes: 5 additions & 3 deletions src/jobflow_remote/cli/jf.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,13 @@ def main(
try:
project_data = cm.get_project_data()
text = Text.from_markup(
f"The selected project is [green]{project_data.project.name}[/green] from config file [green]{project_data.filepath}[/green]"
f"The selected project is [green]{project_data.project.name}[/green] "
f"from config file [green]{project_data.filepath}[/green]"
)
out_console.print(text)
except ConfigError as e:
out_console.print(f"Current project could not be determined: {e}", style="red")
except ConfigError:
# no warning printed if not needed as this seems to be confusing for the user
pass

if profile:
profiler.disable()
Expand Down
23 changes: 16 additions & 7 deletions src/jobflow_remote/cli/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def job_info(
help="Show the data whose values are None. Usually hidden",
),
] = False,
verbosity: verbosity_opt = 0,
):
"""
Detail information on a specific job
Expand All @@ -171,15 +172,23 @@ def job_info(
with loading_spinner():
jc = get_job_controller()

job_info = jc.get_job_info(
job_id=job_id,
job_index=job_index,
db_id=db_id,
)
if not job_info:
if verbosity > 0:
job_data = jc.get_job_doc(
job_id=job_id,
job_index=job_index,
db_id=db_id,
)
else:
job_data = jc.get_job_info(
job_id=job_id,
job_index=job_index,
db_id=db_id,
)

if not job_data:
exit_with_error_msg("No data matching the request")

out_console.print(format_job_info(job_info, show_none=show_none))
out_console.print(format_job_info(job_data, verbosity, show_none=show_none))


@app_job.command()
Expand Down
63 changes: 62 additions & 1 deletion src/jobflow_remote/jobs/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,71 @@ def displayTapNodeData(data):
app.run(debug=True)


def get_mermaid(flow: FlowInfo, show_subflows: bool = True):
nodes, edges, hosts = get_graph_elements(flow)
from monty.collections import tree

hosts_hierarchy = tree()
for db_id, job_hosts in hosts.items():
d = hosts_hierarchy
for host in reversed(job_hosts):
d = d[host]
d[db_id] = None

lines = ["flowchart TD"]

# add style classes
for state, color in COLOR_MAPPING.items():
# this could be optimised by compressing in one line and using a
# same class for states with the same color
lines.append(f" classDef {state} fill:{color}")

# add edges
for parent_db_id, child_db_id in edges:
parent = nodes[parent_db_id]
child = nodes[child_db_id]
line = (
f" {parent_db_id}({parent['name']}) --> {child_db_id}({child['name']})"
)
lines.append(line)

subgraph_styles = []

# add subgraphs
def add_subgraph(nested_hosts_hierarchy, indent_level=0):
if show_subflows:
prefix = " " * indent_level
else:
prefix = " "

for ref_id in sorted(nested_hosts_hierarchy, key=lambda x: str(x)):
subhosts = nested_hosts_hierarchy[ref_id]
if subhosts:
if indent_level > 0 and show_subflows:
# don't put any title
lines.append(f"{prefix}subgraph {ref_id}['']")
subgraph_styles.append(
f" style {ref_id} fill:#2B65EC,opacity:0.2"
)

add_subgraph(subhosts, indent_level=indent_level + 1)

if indent_level > 0 and show_subflows:
lines.append(f"{prefix}end")
else:
job = nodes[ref_id]
lines.append(f"{prefix}{ref_id}:::{job['state'].value}")

add_subgraph(hosts_hierarchy)
lines.extend(subgraph_styles)

return "\n".join(lines)


BLUE_COLOR = "#5E6BFF"
RED_COLOR = "#fC3737"
COLOR_MAPPING = {
JobState.WAITING.value: "grey",
JobState.WAITING.value: "#aaaaaa",
JobState.READY.value: "#DAF7A6",
JobState.CHECKED_OUT.value: BLUE_COLOR,
JobState.UPLOADED.value: BLUE_COLOR,
Expand Down
37 changes: 19 additions & 18 deletions src/jobflow_remote/jobs/jobcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -1258,19 +1258,19 @@ def get_job_info_by_job_uuid(
raise ValueError(f"job_index value: {job_index} is not supported")
return self.jobs.find_one(query, projection=projection, sort=sort)

def get_job_doc_by_job_uuid(self, job_uuid: str, job_index: int | str = "last"):
query: dict[str, Any] = {"uuid": job_uuid}
sort = None
if isinstance(job_index, int):
query["index"] = job_index
elif job_index == "last":
sort = [("index", -1)]
else:
raise ValueError(f"job_index value: {job_index} is not supported")
doc = self.jobs.find_one(query, sort=sort)
if doc:
return JobDoc.model_validate(doc)
return None
def get_job_doc(
self,
job_id: str | None = None,
db_id: int | None = None,
job_index: int | None = None,
) -> JobDoc | None:
query, sort = self.generate_job_id_query(db_id, job_id, job_index)

data = list(self.jobs.find(query, sort=sort, limit=1))
if not data:
return None

return JobDoc.model_validate(data[0])

def get_jobs(self, query, projection: list | dict | None = None):
return list(self.jobs.find(query, projection=projection))
Expand Down Expand Up @@ -1436,12 +1436,7 @@ def _append_flow(
resources=resources,
)
)
# if job.index > 1:
# flow_dict["parents"][job.uuid][str(job.index)] = parents
# else:
# flow_dict["parents"][job.uuid] = {str(job.index): parents}
flow_updates["$set"][f"parents.{job.uuid}.{job.index}"] = parents
# flow_dict["ids"].append((job_dicts[-1]["db_id"], job.uuid, job.index))
ids_to_push.append((job_dicts[-1]["db_id"], job.uuid, job.index))
flow_updates["$push"]["ids"] = {"$each": ids_to_push}

Expand Down Expand Up @@ -1649,6 +1644,8 @@ def checkin_job(
response.replace,
response_type=DynamicResponseType.REPLACE,
worker=job_doc.worker,
exec_config=job_doc.exec_config,
resources=job_doc.resources,
)

if response.addition is not None:
Expand All @@ -1658,6 +1655,8 @@ def checkin_job(
response.addition,
response_type=DynamicResponseType.ADDITION,
worker=job_doc.worker,
exec_config=job_doc.exec_config,
resources=job_doc.resources,
)

if response.detour is not None:
Expand All @@ -1667,6 +1666,8 @@ def checkin_job(
response.detour,
response_type=DynamicResponseType.DETOUR,
worker=job_doc.worker,
exec_config=job_doc.exec_config,
resources=job_doc.resources,
)

if response.stored_data is not None:
Expand Down

0 comments on commit 138a7b1

Please sign in to comment.