From 579fc826055b3d25a1ff0d52db117825d2b64f97 Mon Sep 17 00:00:00 2001 From: Sajid Alam Date: Thu, 24 Oct 2024 14:51:15 +0100 Subject: [PATCH] fix refactors Signed-off-by: Sajid Alam --- .../kedro_viz/models/flowchart/model_utils.py | 13 +- .../models/flowchart/node_metadata.py | 85 ++++++++- package/kedro_viz/models/flowchart/nodes.py | 180 ++++++++++++++++-- .../kedro_viz/models/flowchart/pipelines.py | 17 +- 4 files changed, 266 insertions(+), 29 deletions(-) diff --git a/package/kedro_viz/models/flowchart/model_utils.py b/package/kedro_viz/models/flowchart/model_utils.py index 5b209f70e..52098d9ac 100644 --- a/package/kedro_viz/models/flowchart/model_utils.py +++ b/package/kedro_viz/models/flowchart/model_utils.py @@ -22,11 +22,14 @@ def _parse_filepath(dataset_description: Dict[str, Any]) -> Optional[str]: def _extract_wrapped_func(func: FunctionType) -> FunctionType: - """Extract a wrapped decorated function to inspect the source code if available.""" + """Extract a wrapped decorated function to inspect the source code if available. + Adapted from https://stackoverflow.com/a/43506509/1684058 + """ if func.__closure__ is None: return func closure = (c.cell_contents for c in func.__closure__) wrapped_func = next((c for c in closure if isinstance(c, FunctionType)), None) + # return the original function if it's not a decorated function return func if wrapped_func is None else wrapped_func @@ -36,7 +39,13 @@ def get_dataset_type(dataset: AbstractDataset) -> str: class NamedEntity(BaseModel): - """Represent a named entity (Tag/Registered Pipeline) in a Kedro project.""" + """Represent a named entity (Tag/Registered Pipeline) in a Kedro project + Args: + id (str): Id of the registered pipeline + + Raises: + AssertionError: If id is not supplied during instantiation + """ id: str name: Optional[str] = Field( diff --git a/package/kedro_viz/models/flowchart/node_metadata.py b/package/kedro_viz/models/flowchart/node_metadata.py index ffaeab07b..6129a62aa 100644 --- a/package/kedro_viz/models/flowchart/node_metadata.py +++ b/package/kedro_viz/models/flowchart/node_metadata.py @@ -1,7 +1,10 @@ +import inspect import logging from abc import ABC +from pathlib import Path from typing import Any, ClassVar, Dict, List, Optional, Union, cast +from kedro.pipeline.node import Node as KedroNode from pydantic import BaseModel, Field, ValidationInfo, field_validator, model_validator try: @@ -25,23 +28,33 @@ class GraphNodeMetadata(BaseModel, ABC): - """Abstract base class representing metadata of a graph node.""" + """Represent a graph node's metadata""" class TaskNodeMetadata(GraphNodeMetadata): - """Represent the metadata of a TaskNode.""" + """Represent the metadata of a TaskNode + + Args: + task_node (TaskNode): Task node to which this metadata belongs to. + + Raises: + AssertionError: If task_node is not supplied during instantiation + """ task_node: TaskNode = Field(..., exclude=True) + code: Optional[str] = Field( default=None, validate_default=True, description="Source code of the node's function", ) + filepath: Optional[str] = Field( default=None, validate_default=True, description="Path to the file where the node is defined", ) + parameters: Optional[Dict] = Field( default=None, validate_default=True, @@ -52,6 +65,7 @@ class TaskNodeMetadata(GraphNodeMetadata): validate_default=True, description="The command to run the pipeline to this node", ) + inputs: Optional[List[str]] = Field( default=None, validate_default=True, description="The inputs to the TaskNode" ) @@ -74,14 +88,17 @@ def set_task_and_kedro_node(cls, task_node): @field_validator("code") @classmethod def set_code(cls, code): + # this is required to handle partial, curry functions if inspect.isfunction(cls.kedro_node.func): code = inspect.getsource(_extract_wrapped_func(cls.kedro_node.func)) return code + return None @field_validator("filepath") @classmethod def set_filepath(cls, filepath): + # this is required to handle partial, curry functions if inspect.isfunction(cls.kedro_node.func): code_full_path = ( Path(inspect.getfile(cls.kedro_node.func)).expanduser().resolve() @@ -89,10 +106,14 @@ def set_filepath(cls, filepath): try: filepath = code_full_path.relative_to(Path.cwd().parent) - except ValueError: + except ValueError: # pragma: no cover + # if the filepath can't be resolved relative to the current directory, + # e.g. either during tests or during launching development server + # outside of a Kedro project, simply return the fullpath to the file. filepath = code_full_path return str(filepath) + return None @field_validator("parameters") @@ -117,33 +138,52 @@ def set_outputs(cls, _): class DataNodeMetadata(GraphNodeMetadata): - """Represent the metadata of a DataNode.""" + """Represent the metadata of a DataNode + + Args: + data_node (DataNode): Data node to which this metadata belongs to. + + Attributes: + is_all_previews_enabled (bool): Class-level attribute to determine if + previews are enabled for all nodes. This can be configured via CLI + or UI to manage the preview settings. + + Raises: + AssertionError: If data_node is not supplied during instantiation + """ data_node: DataNode = Field(..., exclude=True) + is_all_previews_enabled: ClassVar[bool] = True + type: Optional[str] = Field( default=None, validate_default=True, description="The type of the data node" ) + filepath: Optional[str] = Field( default=None, validate_default=True, description="The path to the actual data file for the underlying dataset", ) + run_command: Optional[str] = Field( default=None, validate_default=True, description="Command to run the pipeline to this node", ) + preview: Optional[Union[Dict, str]] = Field( default=None, validate_default=True, - description="Preview data for the underlying data node", + description="Preview data for the underlying datanode", ) + preview_type: Optional[str] = Field( default=None, validate_default=True, description="Type of preview for the dataset", ) + stats: Optional[Dict] = Field( default=None, validate_default=True, @@ -165,6 +205,9 @@ def set_is_all_previews_enabled(cls, value: bool): def set_data_node_and_dataset(cls, data_node): cls.data_node = data_node cls.dataset = cast(AbstractDataset, data_node.kedro_obj) + + # dataset.release clears the cache before loading to ensure that this issue + # does not arise: https://github.com/kedro-org/kedro-viz/pull/573. cls.dataset.release() @field_validator("type") @@ -203,7 +246,7 @@ def set_preview(cls, _): return cls.dataset.preview() return cls.dataset.preview(**preview_args) - except Exception as exc: + except Exception as exc: # pylint: disable=broad-except logger.warning( "'%s' could not be previewed. Full exception: %s: %s", cls.data_node.name, @@ -226,12 +269,14 @@ def set_preview_type(cls, _): preview_type_annotation = inspect.signature( cls.dataset.preview ).return_annotation + # Attempt to get the name attribute, if it exists. + # Otherwise, use str to handle the annotation directly. preview_type_name = getattr( preview_type_annotation, "__name__", str(preview_type_annotation) ) return preview_type_name - except Exception as exc: + except Exception as exc: # pylint: disable=broad-except # pragma: no cover logger.warning( "'%s' did not have preview type. Full exception: %s: %s", cls.data_node.name, @@ -247,14 +292,24 @@ def set_stats(cls, _): class TranscodedDataNodeMetadata(GraphNodeMetadata): - """Represent the metadata of a TranscodedDataNode.""" + """Represent the metadata of a TranscodedDataNode + Args: + transcoded_data_node (TranscodedDataNode): The underlying transcoded + data node to which this metadata belongs to. + + Raises: + AssertionError: If transcoded_data_node is not supplied during instantiation + """ transcoded_data_node: TranscodedDataNode = Field(..., exclude=True) + + # Only available if the dataset has filepath set. filepath: Optional[str] = Field( default=None, validate_default=True, description="The path to the actual data file for the underlying dataset", ) + run_command: Optional[str] = Field( default=None, validate_default=True, @@ -263,13 +318,15 @@ class TranscodedDataNodeMetadata(GraphNodeMetadata): original_type: Optional[str] = Field( default=None, validate_default=True, - description="The dataset type of the original version", + description="The dataset type of the underlying transcoded data node original version", ) transcoded_types: Optional[List[str]] = Field( default=None, validate_default=True, description="The list of all dataset types for the transcoded versions", ) + + # Statistics for the underlying data node stats: Optional[Dict] = Field( default=None, validate_default=True, @@ -316,7 +373,15 @@ def set_stats(cls, _): class ParametersNodeMetadata(GraphNodeMetadata): - """Represent the metadata of a ParametersNode.""" + """Represent the metadata of a ParametersNode + + Args: + parameters_node (ParametersNode): The underlying parameters node + for the parameters metadata node. + + Raises: + AssertionError: If parameters_node is not supplied during instantiation + """ parameters_node: ParametersNode = Field(..., exclude=True) parameters: Optional[Dict] = Field( diff --git a/package/kedro_viz/models/flowchart/nodes.py b/package/kedro_viz/models/flowchart/nodes.py index c20c31061..ff9e2408e 100644 --- a/package/kedro_viz/models/flowchart/nodes.py +++ b/package/kedro_viz/models/flowchart/nodes.py @@ -37,7 +37,10 @@ class GraphNodeType(str, Enum): - """Represent all possible node types in the graph representation of a Kedro pipeline.""" + """Represent all possible node types in the graph representation of a Kedro pipeline. + The type needs to inherit from str as well so FastAPI can serialise it. See: + https://fastapi.tiangolo.com/tutorial/path-params/#working-with-python-enumerations + """ TASK = "task" DATA = "data" @@ -46,21 +49,41 @@ class GraphNodeType(str, Enum): class GraphNode(BaseModel, ABC): - """Abstract base class representing a node in the graph.""" + """Represent a node in the graph representation of a Kedro pipeline. + All node models except the metadata node models should inherit from this class + + Args: + id (str): A unique identifier for the node in the graph, + obtained by hashing the node's string representation. + name (str): The full name of this node obtained from the underlying Kedro object + type (str): The type of the graph node + tags (Set[str]): The tags associated with this node. Defaults to `set()`. + kedro_obj (Optional[Union[KedroNode, AbstractDataset]]): The underlying Kedro object + for each graph node, if any. Defaults to `None`. + pipelines (Set[str]): The set of registered pipeline IDs this + node belongs to. Defaults to `set()`. + modular_pipelines (Optional[Set(str)]): A set of modular pipeline names + this node belongs to. + + """ id: str name: str type: str - tags: Set[str] = Field(set()) + tags: Set[str] = Field(set(), description="The tags associated with this node") kedro_obj: Optional[Union[KedroNode, AbstractDataset]] = Field( None, description="The underlying Kedro object for each graph node, if any", exclude=True, ) - pipelines: Set[str] = Field(set()) + pipelines: Set[str] = Field( + set(), description="The set of registered pipeline IDs this node belongs to" + ) + modular_pipelines: Optional[Set[str]] = Field( default=None, validate_default=True, + description="The modular_pipelines this node belongs to", ) model_config = ConfigDict(arbitrary_types_allowed=True) @@ -68,6 +91,14 @@ class GraphNode(BaseModel, ABC): def create_task_node( cls, node: KedroNode, node_id: str, modular_pipelines: Optional[Set[str]] ) -> "TaskNode": + """Create a graph node of type task for a given Kedro Node instance. + Args: + node: A node in a Kedro pipeline. + node_id: Id of the task node. + modular_pipelines: A set of modular_pipeline_ids the node belongs to. + Returns: + An instance of TaskNode. + """ node_name = node._name or node._func_name return TaskNode( id=node_id, @@ -78,6 +109,7 @@ def create_task_node( ) @classmethod + # pylint: disable=too-many-positional-arguments def create_data_node( cls, dataset_id: str, @@ -89,6 +121,22 @@ def create_data_node( modular_pipelines: Optional[Set[str]], is_free_input: bool = False, ) -> Union["DataNode", "TranscodedDataNode"]: + """Create a graph node of type data for a given Kedro Dataset instance. + Args: + dataset_id: A hashed id for the dataset node + dataset_name: The name of the dataset, including namespace, e.g. + data_science.master_table. + layer: The optional layer that the dataset belongs to. + tags: The set of tags assigned to assign to the graph representation + of this dataset. N.B. currently it's derived from the node's tags. + dataset: A dataset in a Kedro pipeline. + stats: The dictionary of dataset statistics, e.g. + {"rows":2, "columns":3, "file_size":100} + modular_pipelines: A set of modular_pipeline_ids the node belongs to. + is_free_input: Whether the dataset is a free input in the pipeline + Returns: + An instance of DataNode. + """ is_transcoded_dataset = TRANSCODING_SEPARATOR in dataset_name if is_transcoded_dataset: name = _strip_transcoding(dataset_name) @@ -114,6 +162,7 @@ def create_data_node( ) @classmethod + # pylint: disable=too-many-positional-arguments def create_parameters_node( cls, dataset_id: str, @@ -123,6 +172,19 @@ def create_parameters_node( parameters: AbstractDataset, modular_pipelines: Optional[Set[str]], ) -> "ParametersNode": + """Create a graph node of type parameters for a given Kedro parameters dataset instance. + Args: + dataset_id: A hashed id for the parameters node + dataset_name: The name of the dataset, including namespace, e.g. + data_science.test_split_ratio + layer: The optional layer that the parameters belong to. + tags: The set of tags assigned to assign to the graph representation + of this dataset. N.B. currently it's derived from the node's tags. + parameters: A parameters dataset in a Kedro pipeline. + modular_pipelines: A set of modular_pipeline_ids the node belongs to. + Returns: + An instance of ParametersNode. + """ return ParametersNode( id=dataset_id, name=dataset_name, @@ -136,25 +198,50 @@ def create_parameters_node( def create_modular_pipeline_node( cls, modular_pipeline_id: str ) -> "ModularPipelineNode": + """Create a graph node of type modularPipeline for a given modular pipeline ID. + This is used to visualise all modular pipelines in a Kedro project on the graph. + Args: + modular_pipeline_id: The ID of the modular pipeline to convert into a graph node. + Returns: + An instance of ModularPipelineNode. + Example: + >>> node = GraphNode.create_modular_pipeline_node("pipeline.data_science") + >>> assert node.id == "pipeline.data_science" + >>> assert node.name == "pipeline.data_science" + >>> assert node.type == GraphNodeType.MODULAR_PIPELINE + """ return ModularPipelineNode(id=modular_pipeline_id, name=modular_pipeline_id) def add_pipeline(self, pipeline_id: str): + """Add a pipeline_id to the list of pipelines that this node belongs to.""" self.pipelines.add(pipeline_id) def belongs_to_pipeline(self, pipeline_id: str) -> bool: + """Check whether this graph node belongs to a given pipeline_id.""" return pipeline_id in self.pipelines def has_metadata(self) -> bool: + """Check whether this graph node has metadata. + Since metadata of a graph node is derived from the underlying Kedro object, + we just need to check whether the underlying object exists. + """ return self.kedro_obj is not None class TaskNode(GraphNode): - """Represent a graph node of type task.""" + """Represent a graph node of type task + + Raises: + AssertionError: If kedro_obj is not supplied during instantiation + """ parameters: Dict = Field( {}, description="A dictionary of parameter values for the task node" ) + + # The type for Task node type: str = GraphNodeType.TASK.value + namespace: Optional[str] = Field( default=None, validate_default=True, @@ -173,8 +260,18 @@ def set_namespace(cls, _, info: ValidationInfo): return info.data["kedro_obj"].namespace +# pylint: disable=missing-function-docstring class DataNode(GraphNode): - """Represent a graph node of type data.""" + """Represent a graph node of type data + + Args: + layer (Optional[str]): The layer that this data node belongs to. Defaults to `None`. + is_free_input (bool): Determines whether the data node is a free input. Defaults to `False`. + stats (Optional[Dict]): Statistics for the data node. Defaults to `None`. + + Raises: + AssertionError: If kedro_obj, name are not supplied during instantiation + """ layer: Optional[str] = Field( None, description="The layer that this data node belongs to" @@ -183,17 +280,22 @@ class DataNode(GraphNode): False, description="Determines whether the data node is a free input" ) stats: Optional[Dict] = Field(None, description="The statistics for the data node.") + dataset_type: Optional[str] = Field( default=None, validate_default=True, description="The concrete type of the underlying kedro_obj", ) + viz_metadata: Optional[Dict] = Field( default=None, validate_default=True, description="The metadata for data node" ) + run_command: Optional[str] = Field( None, description="The command to run the pipeline to this node" ) + + # The type for data node type: str = GraphNodeType.DATA.value @model_validator(mode="before") @@ -219,16 +321,30 @@ def set_viz_metadata(cls, _, info: ValidationInfo): return None def get_preview_args(self): + """Gets the preview arguments for a dataset""" return self.viz_metadata.get("preview_args", None) def is_preview_enabled(self): + """Checks if the dataset has a preview enabled at the node level.""" return ( self.viz_metadata is None or self.viz_metadata.get("preview") is not False ) class TranscodedDataNode(GraphNode): - """Represent a graph node of type data for transcoded datasets.""" + """Represent a graph node of type data + + Args: + layer (Optional[str]): The layer that this transcoded data + node belongs to. Defaults to `None`. + is_free_input (bool): Determines whether the transcoded data + node is a free input. Defaults to `False`. + stats (Optional[Dict]): Statistics for the data node + + Raises: + AssertionError: If name is not supplied during instantiation + + """ layer: Optional[str] = Field( None, description="The layer that this transcoded data node belongs to" @@ -244,12 +360,16 @@ class TranscodedDataNode(GraphNode): original_name: Optional[str] = Field( None, description="The original name for the generated run command" ) + run_command: Optional[str] = Field( None, description="The command to run the pipeline to this node" ) + # The transcoded versions of the transcoded data nodes. transcoded_versions: Set[AbstractDataset] = Field( - set(), description="The transcoded versions of the data nodes" + set(), description="The transcoded versions of the transcoded data nodes" ) + + # The type for data node type: str = GraphNodeType.DATA.value def has_metadata(self) -> bool: @@ -257,11 +377,19 @@ def has_metadata(self) -> bool: class ParametersNode(GraphNode): - """Represent a graph node of type parameters.""" + """Represent a graph node of type parameters + Args: + layer (Optional[str]): The layer that this parameters node belongs to. Defaults to `None`. + + Raises: + AssertionError: If kedro_obj, name are not supplied during instantiation + """ layer: Optional[str] = Field( None, description="The layer that this parameters node belongs to" ) + + # The type for Parameters Node type: str = GraphNodeType.PARAMETERS.value @model_validator(mode="before") @@ -272,26 +400,42 @@ def check_kedro_obj_and_name_exists(cls, values): return values def is_all_parameters(self) -> bool: + """Check whether the graph node represent all parameters in the pipeline""" return self.name == "parameters" def is_single_parameter(self) -> bool: + """Check whether the graph node represent a single parameter in the pipeline""" return not self.is_all_parameters() @property def parameter_name(self) -> str: + """Get a normalised parameter name without the "params:" prefix""" return self.name.replace("params:", "") @property def parameter_value(self) -> Any: + """Load the parameter value from the underlying dataset""" if not (self.kedro_obj and hasattr(self.kedro_obj, "load")): return None try: actual_parameter_value = self.kedro_obj.load() + # Return only json serializable value return jsonable_encoder(actual_parameter_value) except (TypeError, ValueError, RecursionError): + # In case the parameter is not JSON serializable, + # return the string representation return str(actual_parameter_value) - except Exception as exc: + except (AttributeError, DatasetError): + # This except clause triggers if the user passes a parameter that is not + # defined in the catalog (DatasetError) it also catches any case where + # the kedro_obj is None (AttributeError) -- GH#1231 + logger.warning( + "Cannot find parameter `%s` in the catalog.", self.parameter_name + ) + return None + # pylint: disable=broad-exception-caught + except Exception as exc: # pragma: no cover logger.error( "An error occurred when loading parameter `%s` in the catalog :: %s", self.parameter_name, @@ -301,16 +445,30 @@ def parameter_value(self) -> Any: class ModularPipelineNode(GraphNode): - """Represent a modular pipeline node in the graph.""" + """Represent a modular pipeline node in the graph""" + # A modular pipeline doesn't belong to any other modular pipeline, + # in the same sense as other types of GraphNode do. + # Therefore, it's default to None. + # The parent-child relationship between modular pipeline themselves is modelled explicitly. modular_pipelines: Optional[Set[str]] = None + + # Model the modular pipelines tree using a child-references representation of a tree. + # See: https://docs.mongodb.com/manual/tutorial/model-tree-structures-with-child-references/ + # for more details. + # For example, if a node namespace is "uk.data_science", + # the "uk" modular pipeline node's children are ["uk.data_science"] children: Set[ModularPipelineChild] = Field( set(), description="The children for the modular pipeline node" ) + inputs: Set[str] = Field( set(), description="The input datasets to the modular pipeline node" ) + outputs: Set[str] = Field( set(), description="The output datasets from the modular pipeline node" ) + + # The type for Modular Pipeline Node type: str = GraphNodeType.MODULAR_PIPELINE.value diff --git a/package/kedro_viz/models/flowchart/pipelines.py b/package/kedro_viz/models/flowchart/pipelines.py index a569c3bb8..ce023c710 100644 --- a/package/kedro_viz/models/flowchart/pipelines.py +++ b/package/kedro_viz/models/flowchart/pipelines.py @@ -6,8 +6,17 @@ from .nodes import GraphNodeType -class ModularPipelineChild(BaseModel): - """Represent a child of a modular pipeline.""" +class RegisteredPipeline(NamedEntity): + """Represent a registered pipeline in a Kedro project.""" + + +class ModularPipelineChild(BaseModel, frozen=True): + """Represent a child of a modular pipeline. + + Args: + id (str): Id of the modular pipeline child + type (GraphNodeType): Type of modular pipeline child + """ id: str type: GraphNodeType @@ -29,7 +38,3 @@ class ModularPipelineNode(BaseModel): set(), description="The output datasets from the modular pipeline node" ) type: str = GraphNodeType.MODULAR_PIPELINE.value - - -class RegisteredPipeline(NamedEntity): - """Represent a registered pipeline in a Kedro project."""