Skip to content

Commit

Permalink
fix refactors
Browse files Browse the repository at this point in the history
Signed-off-by: Sajid Alam <[email protected]>
  • Loading branch information
SajidAlamQB committed Oct 24, 2024
1 parent 3c59894 commit 579fc82
Show file tree
Hide file tree
Showing 4 changed files with 266 additions and 29 deletions.
13 changes: 11 additions & 2 deletions package/kedro_viz/models/flowchart/model_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ def _parse_filepath(dataset_description: Dict[str, Any]) -> Optional[str]:


def _extract_wrapped_func(func: FunctionType) -> FunctionType:
"""Extract a wrapped decorated function to inspect the source code if available."""
"""Extract a wrapped decorated function to inspect the source code if available.
Adapted from https://stackoverflow.com/a/43506509/1684058
"""
if func.__closure__ is None:
return func
closure = (c.cell_contents for c in func.__closure__)
wrapped_func = next((c for c in closure if isinstance(c, FunctionType)), None)
# return the original function if it's not a decorated function
return func if wrapped_func is None else wrapped_func


Expand All @@ -36,7 +39,13 @@ def get_dataset_type(dataset: AbstractDataset) -> str:


class NamedEntity(BaseModel):
"""Represent a named entity (Tag/Registered Pipeline) in a Kedro project."""
"""Represent a named entity (Tag/Registered Pipeline) in a Kedro project
Args:
id (str): Id of the registered pipeline
Raises:
AssertionError: If id is not supplied during instantiation
"""

id: str
name: Optional[str] = Field(
Expand Down
85 changes: 75 additions & 10 deletions package/kedro_viz/models/flowchart/node_metadata.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import inspect
import logging
from abc import ABC
from pathlib import Path
from typing import Any, ClassVar, Dict, List, Optional, Union, cast

from kedro.pipeline.node import Node as KedroNode
from pydantic import BaseModel, Field, ValidationInfo, field_validator, model_validator

try:
Expand All @@ -25,23 +28,33 @@


class GraphNodeMetadata(BaseModel, ABC):
"""Abstract base class representing metadata of a graph node."""
"""Represent a graph node's metadata"""


class TaskNodeMetadata(GraphNodeMetadata):
"""Represent the metadata of a TaskNode."""
"""Represent the metadata of a TaskNode
Args:
task_node (TaskNode): Task node to which this metadata belongs to.
Raises:
AssertionError: If task_node is not supplied during instantiation
"""

task_node: TaskNode = Field(..., exclude=True)

code: Optional[str] = Field(
default=None,
validate_default=True,
description="Source code of the node's function",
)

filepath: Optional[str] = Field(
default=None,
validate_default=True,
description="Path to the file where the node is defined",
)

parameters: Optional[Dict] = Field(
default=None,
validate_default=True,
Expand All @@ -52,6 +65,7 @@ class TaskNodeMetadata(GraphNodeMetadata):
validate_default=True,
description="The command to run the pipeline to this node",
)

inputs: Optional[List[str]] = Field(
default=None, validate_default=True, description="The inputs to the TaskNode"
)
Expand All @@ -74,25 +88,32 @@ def set_task_and_kedro_node(cls, task_node):
@field_validator("code")
@classmethod
def set_code(cls, code):
# this is required to handle partial, curry functions
if inspect.isfunction(cls.kedro_node.func):
code = inspect.getsource(_extract_wrapped_func(cls.kedro_node.func))
return code

return None

@field_validator("filepath")
@classmethod
def set_filepath(cls, filepath):
# this is required to handle partial, curry functions
if inspect.isfunction(cls.kedro_node.func):
code_full_path = (
Path(inspect.getfile(cls.kedro_node.func)).expanduser().resolve()
)

try:
filepath = code_full_path.relative_to(Path.cwd().parent)
except ValueError:
except ValueError: # pragma: no cover
# if the filepath can't be resolved relative to the current directory,
# e.g. either during tests or during launching development server
# outside of a Kedro project, simply return the fullpath to the file.
filepath = code_full_path

return str(filepath)

return None

@field_validator("parameters")
Expand All @@ -117,33 +138,52 @@ def set_outputs(cls, _):


class DataNodeMetadata(GraphNodeMetadata):
"""Represent the metadata of a DataNode."""
"""Represent the metadata of a DataNode
Args:
data_node (DataNode): Data node to which this metadata belongs to.
Attributes:
is_all_previews_enabled (bool): Class-level attribute to determine if
previews are enabled for all nodes. This can be configured via CLI
or UI to manage the preview settings.
Raises:
AssertionError: If data_node is not supplied during instantiation
"""

data_node: DataNode = Field(..., exclude=True)

is_all_previews_enabled: ClassVar[bool] = True

type: Optional[str] = Field(
default=None, validate_default=True, description="The type of the data node"
)

filepath: Optional[str] = Field(
default=None,
validate_default=True,
description="The path to the actual data file for the underlying dataset",
)

run_command: Optional[str] = Field(
default=None,
validate_default=True,
description="Command to run the pipeline to this node",
)

preview: Optional[Union[Dict, str]] = Field(
default=None,
validate_default=True,
description="Preview data for the underlying data node",
description="Preview data for the underlying datanode",
)

preview_type: Optional[str] = Field(
default=None,
validate_default=True,
description="Type of preview for the dataset",
)

stats: Optional[Dict] = Field(
default=None,
validate_default=True,
Expand All @@ -165,6 +205,9 @@ def set_is_all_previews_enabled(cls, value: bool):
def set_data_node_and_dataset(cls, data_node):
cls.data_node = data_node
cls.dataset = cast(AbstractDataset, data_node.kedro_obj)

# dataset.release clears the cache before loading to ensure that this issue
# does not arise: https://github.com/kedro-org/kedro-viz/pull/573.
cls.dataset.release()

@field_validator("type")
Expand Down Expand Up @@ -203,7 +246,7 @@ def set_preview(cls, _):
return cls.dataset.preview()
return cls.dataset.preview(**preview_args)

except Exception as exc:
except Exception as exc: # pylint: disable=broad-except
logger.warning(
"'%s' could not be previewed. Full exception: %s: %s",
cls.data_node.name,
Expand All @@ -226,12 +269,14 @@ def set_preview_type(cls, _):
preview_type_annotation = inspect.signature(
cls.dataset.preview
).return_annotation
# Attempt to get the name attribute, if it exists.
# Otherwise, use str to handle the annotation directly.
preview_type_name = getattr(
preview_type_annotation, "__name__", str(preview_type_annotation)
)
return preview_type_name

except Exception as exc:
except Exception as exc: # pylint: disable=broad-except # pragma: no cover
logger.warning(
"'%s' did not have preview type. Full exception: %s: %s",
cls.data_node.name,
Expand All @@ -247,14 +292,24 @@ def set_stats(cls, _):


class TranscodedDataNodeMetadata(GraphNodeMetadata):
"""Represent the metadata of a TranscodedDataNode."""
"""Represent the metadata of a TranscodedDataNode
Args:
transcoded_data_node (TranscodedDataNode): The underlying transcoded
data node to which this metadata belongs to.
Raises:
AssertionError: If transcoded_data_node is not supplied during instantiation
"""

transcoded_data_node: TranscodedDataNode = Field(..., exclude=True)

# Only available if the dataset has filepath set.
filepath: Optional[str] = Field(
default=None,
validate_default=True,
description="The path to the actual data file for the underlying dataset",
)

run_command: Optional[str] = Field(
default=None,
validate_default=True,
Expand All @@ -263,13 +318,15 @@ class TranscodedDataNodeMetadata(GraphNodeMetadata):
original_type: Optional[str] = Field(
default=None,
validate_default=True,
description="The dataset type of the original version",
description="The dataset type of the underlying transcoded data node original version",
)
transcoded_types: Optional[List[str]] = Field(
default=None,
validate_default=True,
description="The list of all dataset types for the transcoded versions",
)

# Statistics for the underlying data node
stats: Optional[Dict] = Field(
default=None,
validate_default=True,
Expand Down Expand Up @@ -316,7 +373,15 @@ def set_stats(cls, _):


class ParametersNodeMetadata(GraphNodeMetadata):
"""Represent the metadata of a ParametersNode."""
"""Represent the metadata of a ParametersNode
Args:
parameters_node (ParametersNode): The underlying parameters node
for the parameters metadata node.
Raises:
AssertionError: If parameters_node is not supplied during instantiation
"""

parameters_node: ParametersNode = Field(..., exclude=True)
parameters: Optional[Dict] = Field(
Expand Down
Loading

0 comments on commit 579fc82

Please sign in to comment.