diff --git a/pyproject.toml b/pyproject.toml index 2b879726..1f6eef49 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,7 +48,23 @@ exclude = [ ] [tool.ruff.lint] -select = ["I", "B", "E", "F", "SIM", "W", "C90"] +select = ["I", "B", "E", "D", "F", "SIM", "W", "C90", "EXE"] +ignore = [ + "D407", # Missing dashed underline after section + "D203", # conflicts with D211 + "D212", # conflicts with D213 + "D413", # Missing blank line after last section + + # TODO: Remove these once we have docstrings + "D100", # Missing docstring in public module + "D102", # Missing docstring in public method + "D103", # Missing docstring in public function + "D104", # Missing docstring in public package + "D107", # Missing docstring in `__init__` +] + +[tool.ruff.lint.per-file-ignores] +"**/tests/*" = ["D"] [tool.ruff.format] indent-style = "space" diff --git a/setup.py b/setup.py index 78efba0d..3de6086d 100644 --- a/setup.py +++ b/setup.py @@ -4,6 +4,16 @@ class build_grpc(build): + """ + Custom build class to include gRPC build commands. + + This class modifies the default build process to include additional sub-commands + necessary for building gRPC components. + + Attributes + sub_commands (list): List of sub-commands to be executed during the build process. + """ + sub_commands = [("build_grpc", None)] + build.sub_commands diff --git a/src/converter/converter.py b/src/converter/converter.py index 4f47a245..c0cd701e 100644 --- a/src/converter/converter.py +++ b/src/converter/converter.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python3 - import argparse import logging import sys diff --git a/src/converter/pytorch_converter.py b/src/converter/pytorch_converter.py index aa29b531..62723bc3 100644 --- a/src/converter/pytorch_converter.py +++ b/src/converter/pytorch_converter.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python3 - import json import logging from typing import IO, Dict, List, Optional, Set, Tuple @@ -28,7 +26,7 @@ class PyTorchConverter: compatible with Chakra, a performance analysis tool. It handles the intricate mappings and transformations required to accurately represent the execution in a different format. - Attributes: + Attributes input_filename (str): Input file name containing PyTorch execution trace. output_filename (str): Output file name for the converted Chakra trace. logger (logging.Logger): Logger for logging information during conversion. @@ -36,8 +34,7 @@ class PyTorchConverter: def __init__(self, input_filename: str, output_filename: str, logger: logging.Logger) -> None: """ - Initializes the PyTorch to Chakra converter. It sets up necessary - attributes and prepares the environment for the conversion process. + Initialize the PyTorch to Chakra converter. It sets up necessary attributes and prepares the environment. Args: input_filename (str): Name of the input file containing PyTorch execution trace. @@ -49,10 +46,7 @@ def __init__(self, input_filename: str, output_filename: str, logger: logging.Lo self.logger = logger def convert(self) -> None: - """ - Converts PyTorch execution traces into the Chakra format. Orchestrates the conversion process including trace - loading, trace opening, phase end node construction, node splitting, and node conversion. - """ + """Convert PyTorch execution traces into the Chakra format.""" pytorch_et_data = self.load_pytorch_execution_traces() ( pytorch_schema, @@ -85,15 +79,15 @@ def convert(self) -> None: def load_pytorch_execution_traces(self) -> Dict: """ - Loads PyTorch execution traces from a file. + Load PyTorch execution traces from a file. - Reads and parses the PyTorch execution trace data from a file, creating PyTorchNode objects and establishing + Read and parse the PyTorch execution trace data from a file, creating PyTorchNode objects and establishing node relationships. - Raises: + Raises Exception: If there is an IOError in opening the file. - Returns: + Returns Dict: The loaded PyTorch execution trace data. """ self.logger.info("Loading PyTorch execution traces from file.") @@ -108,12 +102,12 @@ def _parse_and_instantiate_nodes( self, pytorch_et_data: Dict ) -> Tuple[str, int, str, int, int, Dict[int, PyTorchNode]]: """ - Parses and instantiates PyTorch nodes from execution trace data. + Parse and instantiate PyTorch nodes from execution trace data. Args: pytorch_et_data (Dict): The execution trace data. - Extracts node information, sorts nodes by timestamp, and establishes parent-child relationships among them. + Extract node information, sort nodes by timestamp, and establish parent-child relationships among them. Returns: Tuple: A tuple containing PyTorch schema, PID, time, start timestamp, finish timestamp, and dictionary of @@ -136,7 +130,7 @@ def _establish_parent_child_relationships( self, pytorch_node_objects: Dict[int, PyTorchNode], pytorch_root_nids: List[int] ) -> Dict[int, PyTorchNode]: """ - Establishes parent-child relationships among PyTorch nodes and counts the node types. + Establish parent-child relationships among PyTorch nodes and count the node types. Args: pytorch_node_objects (Dict[int, PyTorchNode]): Dictionary of PyTorch node objects. @@ -165,9 +159,9 @@ def _establish_parent_child_relationships( def _initialize_node_type_counts(self) -> Dict[str, int]: """ - Initializes counters for different types of nodes. + Initialize counters for different types of nodes. - Returns: + Returns Dict[str, int]: A dictionary with node type counters initialized to zero. """ return { @@ -181,7 +175,7 @@ def _initialize_node_type_counts(self) -> Dict[str, int]: def _is_root_node(self, pytorch_node: PyTorchNode) -> bool: """ - Checks if a given PyTorch node is a root node. + Check if a given PyTorch node is a root node. Args: pytorch_node (PyTorchNode): The PyTorch node to check. @@ -198,7 +192,7 @@ def _process_parent_child_relationships( self, pytorch_node_objects: Dict[int, PyTorchNode], pytorch_node: PyTorchNode, parent_id: int ) -> None: """ - Processes the parent-child relationships for PyTorch nodes. + Process the parent-child relationships for PyTorch nodes. Args: pytorch_node_objects (Dict[int, PyTorchNode]): Dictionary of PyTorch node objects. @@ -219,7 +213,7 @@ def _process_parent_child_relationships( def _update_node_type_counts(self, node_type_counts: Dict[str, int], pytorch_node: PyTorchNode) -> None: """ - Updates the node type counts based on the current PyTorch node. + Update the node type counts based on the current PyTorch node. Args: node_type_counts (Dict[str, int]): Dictionary of node type counts. @@ -237,7 +231,7 @@ def _update_node_type_counts(self, node_type_counts: Dict[str, int], pytorch_nod def open_chakra_execution_trace(self, output_filename: str) -> IO[bytes]: """ - Opens the Chakra execution trace file for writing. + Open the Chakra execution trace file for writing. Args: output_filename (str): Name of the output file for the converted Chakra trace. @@ -259,7 +253,7 @@ def open_chakra_execution_trace(self, output_filename: str) -> IO[bytes]: def convert_nodes(self, pytorch_nodes: Dict[int, PyTorchNode], chakra_nodes: Dict[int, ChakraNode]) -> None: """ - Converts PyTorch nodes to Chakra nodes. + Convert PyTorch nodes to Chakra nodes. This method traverses through the PyTorch nodes and converts them to Chakra nodes. It also handles special cases for GPU nodes and collective communication types. @@ -287,7 +281,7 @@ def convert_nodes(self, pytorch_nodes: Dict[int, PyTorchNode], chakra_nodes: Dic def convert_to_chakra_node(self, chakra_nodes: Dict[int, ChakraNode], pytorch_node: PyTorchNode) -> ChakraNode: """ - Converts a PyTorchNode to a ChakraNode. + Convert a PyTorchNode to a ChakraNode. Args: chakra_nodes (Dict[int, ChakraNode]): Dictionary of existing Chakra nodes. @@ -326,7 +320,7 @@ def convert_to_chakra_node(self, chakra_nodes: Dict[int, ChakraNode], pytorch_no def get_chakra_node_type_from_pytorch_node(self, pytorch_node: PyTorchNode) -> int: """ - Determines the Chakra node type from a PyTorch node. + Determine the Chakra node type from a PyTorch node. Args: pytorch_node (PyTorchNode): The PyTorch node to determine the type of. @@ -344,7 +338,7 @@ def get_chakra_node_type_from_pytorch_node(self, pytorch_node: PyTorchNode) -> i def get_collective_comm_type(self, name: str) -> int: """ - Returns the collective communication type of the node. + Return the collective communication type of the node. Args: name (str): The name of the node. @@ -374,7 +368,7 @@ def get_collective_comm_type(self, name: str) -> int: def is_root_node(self, node: ChakraNode) -> bool: """ - Determines whether a given node is a root node in the execution trace. + Determine whether a given node is a root node in the execution trace. In the context of PyTorch execution traces, root nodes are the starting points of execution graphs or execution traces. These nodes typically do not have parent nodes and act as the original sources of execution flow. This @@ -396,9 +390,9 @@ def convert_ctrl_dep_to_data_dep( # noqa: C901 chakra_node: ChakraNode, ) -> None: """ - Converts control dependencies to data dependencies in Chakra nodes. + Convert control dependencies to data dependencies in Chakra nodes. - Traverses nodes based on control dependencies (parent nodes) and encodes data dependencies appropriately. This + Traverse nodes based on control dependencies (parent nodes) and encode data dependencies appropriately. This method is crucial for converting the dependency structure from PyTorch execution traces to Chakra execution traces. In PyTorch traces, control dependencies are represented by a parent field in each node, denoting the parent node ID. This structure indicates which functions (operators) are called by a particular operator. @@ -483,7 +477,7 @@ def convert_ctrl_dep_to_data_dep( # noqa: C901 def remove_dangling_nodes(self, chakra_nodes: Dict[int, ChakraNode]) -> Dict[int, ChakraNode]: """ - Removes any dangling nodes from the chakra_nodes dictionary. + Remove any dangling nodes from the chakra_nodes dictionary. A node is considered dangling if it has no parents and no children. @@ -512,7 +506,7 @@ def remove_dangling_nodes(self, chakra_nodes: Dict[int, ChakraNode]) -> Dict[int def update_parent_to_children_map(self, chakra_nodes: Dict[int, ChakraNode]) -> Dict[int, List[int]]: """ - Updates the parent_to_children_map based on the data dependencies of each node. + Update the parent_to_children_map based on the data dependencies of each node. This map is used to efficiently simulate node execution based on data dependencies. """ @@ -526,13 +520,13 @@ def update_parent_to_children_map(self, chakra_nodes: Dict[int, ChakraNode]) -> def identify_cyclic_dependencies(self, chakra_nodes: Dict[int, ChakraNode]) -> None: """ - Identifies if there are any cyclic dependencies among Chakra nodes. + Identify if there are any cyclic dependencies among Chakra nodes. This method checks for cycles in the graph of Chakra nodes using a depth-first search (DFS) algorithm. It logs an error message and raises an exception if a cycle is detected, ensuring the graph is a Directed Acyclic Graph (DAG). - Raises: + Raises Exception: If a cyclic dependency is detected among the Chakra nodes. """ visited = set() @@ -581,9 +575,9 @@ def write_chakra_et( chakra_nodes: Dict[int, ChakraNode], ) -> None: """ - Writes the Chakra execution trace by encoding global metadata and nodes. + Write the Chakra execution trace by encoding global metadata and nodes. - Encodes and writes both the metadata and individual nodes to create a + Encode and write both the metadata and individual nodes to create a complete execution trace. """ self.logger.info("Writing Chakra execution trace.") @@ -603,7 +597,7 @@ def _write_global_metadata( pytorch_finish_ts: int, ) -> None: """ - Encodes and writes global metadata for the Chakra execution trace. + Encode and write global metadata for the Chakra execution trace. This process includes encoding metadata like schema, process ID, timestamps, and other relevant information for the Chakra execution trace. @@ -622,7 +616,7 @@ def _write_global_metadata( def _encode_and_write_nodes(self, chakra_et: IO[bytes], chakra_nodes: Dict[int, ChakraNode]) -> None: """ - Encodes and writes nodes for the Chakra execution trace. + Encode and write nodes for the Chakra execution trace. Each node from the PyTorch execution trace is encoded and written into the Chakra format. This includes node IDs, names, types, dependencies, and other attributes. @@ -640,9 +634,9 @@ def _encode_and_write_nodes(self, chakra_et: IO[bytes], chakra_nodes: Dict[int, def close_chakra_execution_trace(self, chakra_et: IO[bytes]) -> None: """ - Closes the Chakra execution trace file if it is open. + Close the Chakra execution trace file if it is open. - Ensures proper closure of the trace file to preserve data integrity. + Ensure proper closure of the trace file to preserve data integrity. Args: chakra_et (IO[bytes]): File handle for the Chakra execution trace output file. @@ -658,7 +652,7 @@ def simulate_execution( parent_to_children_map: Dict[int, List[int]], ) -> None: """ - Simulates the execution of Chakra nodes based on data dependencies. + Simulate the execution of Chakra nodes based on data dependencies. This method considers both CPU and GPU nodes. Nodes are issued for execution based on the readiness determined by dependency resolution. A simplistic global clock is used to model the execution time. diff --git a/src/converter/pytorch_node.py b/src/converter/pytorch_node.py index f4a67bdc..aec4f9fc 100644 --- a/src/converter/pytorch_node.py +++ b/src/converter/pytorch_node.py @@ -6,6 +6,15 @@ class PyTorchNodeType(Enum): + """ + Enum representing the type of a PyTorch node in an execution trace. + + Attributes + CPU_OP (int): Represents a CPU operation. + GPU_OP (int): Represents a GPU operation. + LABEL (int): Represents a non-operator node (e.g., labels). + """ + CPU_OP = 1 GPU_OP = 2 LABEL = 3 # Non-operator nodes @@ -15,7 +24,7 @@ class PyTorchNode: """ Represents a node in a PyTorch execution trace, initialized based on a schema version. - Attributes: + Attributes schema (str): Schema version used for initialization. data_deps (List[PyTorchNode]): List of data-dependent parent nodes. children (List[PyTorchNode]): List of child nodes. @@ -39,8 +48,7 @@ class PyTorchNode: def __init__(self, schema: str, node_data: Dict[str, Any]) -> None: """ - Initializes a PyTorchNode object using the node data and schema version - provided. + Initialize a PyTorchNode object using the node data and schema version provided. Args: schema (str): The schema version based on which the node will be initialized. @@ -57,9 +65,9 @@ def __init__(self, schema: str, node_data: Dict[str, Any]) -> None: def __repr__(self) -> str: """ - Provides a string representation of the PyTorchNode. + Provide a string representation of the PyTorchNode. - Returns: + Returns str: String representation of the node. """ return ( @@ -69,7 +77,7 @@ def __repr__(self) -> str: def parse_data(self, node_data: Dict[str, Any]) -> None: """ - Parses node data based on the provided schema version. + Parse node data based on the provided schema version. Args: node_data (Dict[str, Any]): The node data to be parsed. @@ -101,9 +109,9 @@ def _parse_data_1_0_3_chakra_0_0_4(self, node_data: Dict[str, Any]) -> None: def get_op_type(self) -> PyTorchNodeType: """ - Determines the type of PyTorch operation. + Determine the type of PyTorch operation. - Returns: + Returns PyTorchNodeType: The type of the PyTorch operation. """ if self.is_gpu_op(): @@ -115,25 +123,25 @@ def get_op_type(self) -> PyTorchNodeType: def is_cpu_op(self) -> bool: """ - Checks if the node is a CPU operator. + Check if the node is a CPU operator. - Returns: + Returns bool: True if the node is a CPU operator, False otherwise. """ return self.get_op_type() == PyTorchNodeType.CPU_OP def is_gpu_op(self) -> bool: """ - Checks if the node is a GPU operator. + Check if the node is a GPU operator. - Returns: + Returns bool: True if the node is a GPU operator, False otherwise. """ return self.cat is not None def add_data_dep(self, parent_node: "PyTorchNode") -> None: """ - Adds a data-dependent parent node to this node. + Add a data-dependent parent node to this node. Args: parent_node (PyTorchNode): The parent node to be added. @@ -142,7 +150,7 @@ def add_data_dep(self, parent_node: "PyTorchNode") -> None: def add_child(self, child_node: "PyTorchNode") -> None: """ - Adds a child node to this node. + Add a child node to this node. Args: child_node (PyTorchNode): The child node to be added. @@ -151,7 +159,7 @@ def add_child(self, child_node: "PyTorchNode") -> None: def add_gpu_child(self, gpu_child_node: "PyTorchNode") -> None: """ - Adds a child GPU node for this node. + Add a child GPU node for this node. Args: gpu_child_node (Optional[PyTorchNode]): The child GPU node to be added. @@ -160,18 +168,18 @@ def add_gpu_child(self, gpu_child_node: "PyTorchNode") -> None: def is_record_param_comms_op(self) -> bool: """ - Checks if the node is a record_param_comms operator. + Check if the node is a record_param_comms operator. - Returns: + Returns bool: True if the node is a record_param_comms operator, False otherwise. """ return "record_param_comms" in self.name def is_nccl_op(self) -> bool: """ - Checks if the node is a NCCL operator. + Check if the node is a NCCL operator. - Returns: + Returns bool: True if the node is a NCCL operator, False otherwise. """ return "nccl:" in self.name @@ -179,9 +187,9 @@ def is_nccl_op(self) -> bool: @property def comm_size(self) -> int: """ - Calculates the communication size for the given input types and shapes. + Calculate the communication size for the given input types and shapes. - Returns: + Returns int: The calculated communication size. """ comm_size = 0 @@ -195,7 +203,7 @@ def comm_size(self) -> int: @staticmethod def get_data_type_size(data_type: str) -> int: """ - Returns the data type size of a given data type in string. + Return the data type size of a given data type in string. Args: data_type (str): The data type as a string. diff --git a/src/converter/pytorch_tensor.py b/src/converter/pytorch_tensor.py index 91b9e468..e120b175 100644 --- a/src/converter/pytorch_tensor.py +++ b/src/converter/pytorch_tensor.py @@ -5,30 +5,27 @@ class PyTorchTensor: """ Represents a tensor with its associated properties. - Attributes: - tensor_data (List[int]): Data of the tensor including tensor_id, - storage_id, offset, number of elements, and size of each - element in bytes. + Attributes + tensor_data (List[int]): Data of the tensor including tensor_id, storage_id, offset, number of elements, and + size of each element in bytes. """ def __init__(self, tensor_data: List[int]) -> None: """ - Initializes a PyTorchTensor object with the provided tensor data. + Initialize a PyTorchTensor object with the provided tensor data. Args: - tensor_data (List[int]): Data of the tensor including tensor_id, - storage_id, offset, number of elements, and size of each - element in bytes. + tensor_data (List[int]): Data of the tensor including tensor_id, storage_id, offset, number of elements, + and size of each element in bytes. """ self.tensor_data = tensor_data def is_valid(self) -> bool: """ - Checks if the tensor data is valid. + Check if the tensor data is valid. - Returns: - bool: True if tensor_data is a list of exactly six integers, - False otherwise. + Returns + bool: True if tensor_data is a list of exactly six integers, False otherwise. """ return ( isinstance(self.tensor_data, list) @@ -39,9 +36,9 @@ def is_valid(self) -> bool: @property def tensor_id(self) -> int: """ - Returns the tensor ID. + Return the tensor ID. - Returns: + Returns int: Tensor ID. """ return self.tensor_data[0] @@ -49,9 +46,9 @@ def tensor_id(self) -> int: @property def storage_id(self) -> int: """ - Returns the storage ID. + Return the storage ID. - Returns: + Returns int: Storage ID. """ return self.tensor_data[1] @@ -59,9 +56,9 @@ def storage_id(self) -> int: @property def offset(self) -> int: """ - Returns the offset. + Return the offset. - Returns: + Returns int: Offset value. """ return self.tensor_data[2] @@ -69,9 +66,9 @@ def offset(self) -> int: @property def num_elem(self) -> int: """ - Returns the number of elements in the tensor. + Return the number of elements in the tensor. - Returns: + Returns int: Number of elements. """ return self.tensor_data[3] @@ -79,18 +76,18 @@ def num_elem(self) -> int: @property def elem_bytes(self) -> int: """ - Returns the size of each element in bytes. + Return the size of each element in bytes. - Returns: + Returns int: Size of each element in bytes. """ return self.tensor_data[4] def has_valid_storage_id(self) -> bool: """ - Checks if the tensor has a valid storage ID. + Check if the tensor has a valid storage ID. - Returns: + Returns bool: True if the storage ID is greater than 0, False otherwise. """ return self.storage_id > 0 @@ -98,11 +95,11 @@ def has_valid_storage_id(self) -> bool: def list_to_pytorch_tensor(tensor_list: List[int]) -> PyTorchTensor: """ - Converts a list representation of a tensor into a PyTorchTensor object. + Convert a list representation of a tensor into a PyTorchTensor object. Args: - tensor_list (List[int]): Data representing a tensor, including - tensor_id, storage_id, offset, num_elem, elem_bytes. + tensor_list (List[int]): Data representing a tensor, including tensor_id, storage_id, offset, num_elem, + elem_bytes. Returns: PyTorchTensor: The PyTorchTensor object created from the data. diff --git a/src/generator/generator.py b/src/generator/generator.py index acf71b3b..7dd7e0fc 100644 --- a/src/generator/generator.py +++ b/src/generator/generator.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python3 - import argparse from ...schema.protobuf.et_def_pb2 import ( diff --git a/src/jsonizer/jsonizer.py b/src/jsonizer/jsonizer.py index 1dc20aff..9c2d4d47 100644 --- a/src/jsonizer/jsonizer.py +++ b/src/jsonizer/jsonizer.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python3 - import argparse from google.protobuf.json_format import MessageToJson diff --git a/src/timeline_visualizer/timeline_visualizer.py b/src/timeline_visualizer/timeline_visualizer.py index 4967dc5a..a7838c36 100644 --- a/src/timeline_visualizer/timeline_visualizer.py +++ b/src/timeline_visualizer/timeline_visualizer.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python3 - import argparse import json import logging @@ -10,6 +8,16 @@ class TID(IntEnum): + """ + Enum representing the types of TID (Thread ID) used for classifying different nodes in a trace. + + Attributes + LOCAL_MEMORY (int): Represents local memory nodes. + REMOTE_MEMORY (int): Represents remote memory nodes. + COMP (int): Represents compute nodes. + COMM (int): Represents communication nodes. + """ + LOCAL_MEMORY = 1 REMOTE_MEMORY = 2 COMP = 3 diff --git a/src/trace_link/kineto_operator.py b/src/trace_link/kineto_operator.py index 70bad07d..12c7228a 100644 --- a/src/trace_link/kineto_operator.py +++ b/src/trace_link/kineto_operator.py @@ -7,7 +7,7 @@ class KinetoOperator: """ Represents a single operator in a Kineto trace. - Attributes: + Attributes id (Optional[int]): Identifier of the operator. category (str): Category of the operator. name (str): Name of the operator. @@ -28,7 +28,7 @@ class KinetoOperator: def __init__(self, kineto_op: Dict[str, Any]) -> None: """ - Initializes a new instance of the KinetoOperator class. + Initialize a new instance of the KinetoOperator class. Args: kineto_op (Dict[str, Any]): The dictionary representing the @@ -55,7 +55,7 @@ def __repr__(self) -> str: """ Represent the KinetoOperator as a string. - Returns: + Returns str: A string representation of the KinetoOperator. """ return ( @@ -69,14 +69,15 @@ def __repr__(self) -> str: def is_cpu_op(self) -> bool: """ - Determines if the operator is simulatable based on its category and name. + Determine if the operator is simulatable based on its category and name. + The categories 'cpu_op' and 'user_annotation' are considered CPU operators. Notably, 'user_annotation' operators often include the duration of CPU operator launch times. Ignoring the duration measured in 'user_annotation' can lead to inaccuracies in simulation. An exception to this is 'ProfilerStep', which should be completely ignored. Ideally, a more general rule should be developed to identify such exception nodes. - Returns: + Returns bool: True if the operator is simulatable, False otherwise. """ simulatable_categories = {"cpu_op", "user_annotation"} @@ -87,9 +88,9 @@ def is_cpu_op(self) -> bool: def is_cuda_launch_op(self) -> bool: """ - Determines whether the operator is a kernel-launching CUDA runtime operator. + Determine whether the operator is a kernel-launching CUDA runtime operator. - Returns: + Returns bool: True if it's a launch operation, otherwise False. """ cuda_launch_categories = {"cuda_runtime", "cuda_driver"} @@ -105,9 +106,9 @@ def is_cuda_launch_op(self) -> bool: def is_gpu_op(self) -> bool: """ - Checks if the operator is a GPU-side operator based on its category. + Check if the operator is a GPU-side operator based on its category. - Returns: + Returns bool: True if it's a GPU-side operation, otherwise False. """ gpu_categories = {"kernel", "gpu_memcpy"} @@ -115,9 +116,9 @@ def is_gpu_op(self) -> bool: def is_arrow_op(self) -> bool: """ - Checks if the operator is categorized as 'ac2g', which stands for arrows from CPU to GPU. + Check if the operator is categorized as 'ac2g', which stands for arrows from CPU to GPU. - Returns: + Returns bool: True if the operator is an 'ac2g' type, otherwise False. """ return self.category == "ac2g" diff --git a/src/trace_link/trace_link.py b/src/trace_link/trace_link.py index 55a7e105..c7ea59ff 100644 --- a/src/trace_link/trace_link.py +++ b/src/trace_link/trace_link.py @@ -5,7 +5,7 @@ def main() -> None: """ - Main function to execute the trace linking process. + Execute the trace linking process. For more detailed steps on collecting traces and converting them to Chakra traces, visit the guide at: https://github.com/mlcommons/chakra/wiki/Chakra-Execution-Trace-Collection-%E2%80%90-A-Comprehensive-Guide-on-Merging-PyTorch-and-Kineto-Traces diff --git a/src/trace_link/trace_linker.py b/src/trace_link/trace_linker.py index 4bbb8d36..ebf062b6 100644 --- a/src/trace_link/trace_linker.py +++ b/src/trace_link/trace_linker.py @@ -29,7 +29,7 @@ class TraceLinker: """ Links PyTorch Execution Traces (ET) and Kineto Traces to generate PyTorch ET plus. - Attributes: + Attributes pytorch_et_file (str): Path to the PyTorch execution trace file. kineto_file (str): Path to the Kineto trace file. pytorch_ops (List[PyTorchOperator]): PyTorch operators. @@ -62,7 +62,7 @@ class TraceLinker: def __init__(self, pytorch_et_file: str, kineto_file: str, log_level: str = "INFO") -> None: """ - Initializes the TraceLinker with paths to the PyTorch and Kineto trace files, and a log level. + Initialize the TraceLinker with paths to the PyTorch and Kineto trace files, and a log level. Args: pytorch_et_file (str): Path to the PyTorch execution trace file. @@ -95,21 +95,19 @@ def __init__(self, pytorch_et_file: str, kineto_file: str, log_level: str = "INF self.logger.setLevel(log_level.upper()) def load_traces(self) -> None: - """ - Loads both PyTorch Execution Traces and Kineto Traces. - """ + """Load both PyTorch Execution Traces and Kineto Traces.""" self.pytorch_ops = self.load_pytorch_et() kineto_data = self.load_kineto_trace() self.update_kineto_data(kineto_data) def load_pytorch_et(self) -> List[PyTorchOperator]: """ - Loads and processes the PyTorch Execution Trace. + Load and process the PyTorch Execution Trace. This method handles multiple iterations in the trace and extracts the nodes, considering the specified annotation for segmenting the iterations. - Returns: + Returns List[PyTorchOperator]: List of PyTorch operators. """ self.logger.info("Starting to load PyTorch Execution Trace.") @@ -124,7 +122,7 @@ def load_pytorch_et(self) -> List[PyTorchOperator]: def extract_pytorch_ops(self, node: PyTorchOperator) -> List[PyTorchOperator]: """ - Extracts and sorts nodes from the PyTorch execution trace recursively. + Extract and sort nodes from the PyTorch execution trace recursively. This method traverses the execution trace starting from the provided node, extracting all the operator nodes recursively, and then returns them sorted by their identifiers. @@ -147,11 +145,12 @@ def traverse(node: PyTorchOperator): def load_kineto_trace(self) -> Dict: """ - Loads and processes the Kineto Trace. + Load and process the Kineto Trace. + This method parses the Kineto trace file, creating KinetoOperator instances for each operator in the trace. It then categorizes and segments these operators for further processing and linking with PyTorch operators. - Returns: + Returns Dict: Dictionary containing various data structures needed for linking traces. """ self.logger.info("Starting to load Kineto Trace.") @@ -177,9 +176,10 @@ def load_kineto_trace(self) -> Dict: def construct_kineto_data_structures(self, kineto_ops: List[KinetoOperator]) -> Dict: """ - Constructs necessary data structures required for trace linking from the provided Kineto operators. This method - identifies process start time, end time, thread start time, and end time, and also categorizes operators into - CPU, GPU, and other relevant groups. + Construct necessary data structures required for trace linking from the provided Kineto operators. + + This method identifies process start time, end time, thread start time, and end time, and also categorizes + operators into CPU, GPU, and other relevant groups. Args: kineto_ops (List[KinetoOperator]): List of Kineto operators to categorize. @@ -256,9 +256,10 @@ def construct_kineto_data_structures(self, kineto_ops: List[KinetoOperator]) -> def calculate_exclusive_dur(self, kineto_tid_cpu_ops_map: Dict[int, List[KinetoOperator]]) -> None: """ - Calculates the exclusive duration of each operator in the Kineto traces in parallel. The exclusive duration is - defined as the total duration of the operator minus any time spent in child operators, effectively representing - the time spent exclusively in that operator. + Calculate the exclusive duration of each operator in the Kineto traces in parallel. + + The exclusive duration is defined as the total duration of the operator minus any time spent in child operators, + effectively representing the time spent exclusively in that operator. Args: kineto_tid_cpu_ops_map (Dict[int, List[KinetoOperator]]): Map of thread IDs to their corresponding Kineto @@ -316,7 +317,7 @@ def process_ops_for_thread(ops: List[KinetoOperator]) -> None: @staticmethod def merge_overlapping_intervals(intervals: List[Tuple[int, int]]) -> List[Tuple[int, int]]: """ - Merges overlapping intervals into a single interval. + Merge overlapping intervals into a single interval. Args: intervals (List[Tuple[int, int]]): List of intervals. @@ -344,8 +345,7 @@ def merge_overlapping_intervals(intervals: List[Tuple[int, int]]) -> List[Tuple[ def update_kineto_data(self, kineto_data: Dict) -> None: """ - Updates the instance variables of the TraceLinker class using the data structures from the kineto_data - dictionary. + Update the variables of the TraceLinker class using the data structures from the kineto_data dictionary. Args: kineto_data (Dict): Dictionary containing categorized operators and timing boundaries. @@ -365,10 +365,12 @@ def update_kineto_data(self, kineto_data: Dict) -> None: def enforce_inter_thread_order(self, threshold: int = 1000) -> None: """ - Enforces order between groups of operators in different threads. In Kineto traces with multiple threads, - operators are executed in turns, creating groups. This function identifies these groups by detecting - significant gaps in execution within each thread. It then establishes dependencies between these groups across - different threads, ensuring the final Chakra execution traces reflect inter-thread dependencies realistically. + Enforce order between groups of operators in different threads. + + In Kineto traces with multiple threads, operators are executed in turns, creating groups. This function + identifies these groups by detecting significant gaps in execution within each thread. It then establishes + dependencies between these groups across different threads, ensuring the final Chakra execution traces reflect + inter-thread dependencies realistically. An isolated group is formed when there's a significant gap in execution within a thread. Each new group relies on the last CPU operator from other threads, enforcing order and dependency across threads. @@ -424,8 +426,9 @@ def find_last_cpu_node_before_timestamp( timestamp: int, ) -> Optional[int]: """ - Finds the last CPU node ID before a given timestamp in threads other than the excluded one. This ID is used - to establish dependencies between groups across threads. + Find the last CPU node ID before a given timestamp in threads other than the excluded one. + + This ID is used to establish dependencies between groups across threads. Args: ops_by_tid (Dict[int, List[KinetoOperator]]): Operators grouped by thread ID. @@ -456,8 +459,9 @@ def find_last_cpu_node_before_timestamp( def link_traces(self) -> None: """ - Initiates the linking process between PyTorch Execution Traces (ET) and Kineto Traces to produce an enhanced - PyTorch Execution Trace (ET+). This process relies on the assumption of an 'exact match' between these traces. + Link PyTorch Execution Traces (ET) and Kineto Traces to produce an enhanced PyTorch Execution Trace (ET+). + + This process relies on the assumption of an 'exact match' between these traces. """ self.logger.info("Starting the process of linking PyTorch and Kineto traces.") ( @@ -507,8 +511,9 @@ def add_thread_and_process_annotations( kineto_process_end_time: int, ) -> Tuple[List[KinetoOperator], List[KinetoOperator], List[int]]: """ - Adds thread and process annotations to Kineto operators based on previously tracked timing information. These - annotations are crucial for aligning Kineto operators with PyTorch ET nodes, ensuring completeness and + Add thread and process annotations to Kineto operators based on previously tracked timing information. + + These annotations are crucial for aligning Kineto operators with PyTorch ET nodes, ensuring completeness and compatibility of trace data for analysis. This method uses the process start and end times, as well as thread start and end times, collected during the categorization process to insert appropriate annotations directly into the Kineto operators list. @@ -576,10 +581,7 @@ def map_pytorch_to_kineto_ops( kineto_rf_id_to_kineto_op_map: Dict[int, KinetoOperator], kineto_gpu_ops: List[KinetoOperator], ) -> Tuple[Dict[int, List[KinetoOperator]], Dict[int, int], Dict[int, int], Dict[int, int], Dict[int, int]]: - """ - Maps PyTorch ET nodes to corresponding Kineto operators, ensuring each PyTorch node has a matching Kineto - operator. - """ + """Map PyTorch ET nodes to corresponding Kineto operators.""" self.logger.info("Mapping PyTorch ET nodes to Kineto operators.") cpu_ev_idx_to_gpu_ops_map = self.group_gpu_ops_by_cpu_launchers( kineto_gpu_ops, kineto_correlation_cuda_runtime_map @@ -631,7 +633,7 @@ def group_gpu_ops_by_cpu_launchers( self, kineto_gpu_ops: List[KinetoOperator], kineto_correlation_cuda_runtime_map: Dict[int, KinetoOperator] ) -> Dict[int, List[KinetoOperator]]: """ - Groups GPU operators based on their corresponding CPU launchers. + Group GPU operators based on their corresponding CPU launchers. This is determined by the 'ev_idx' which links GPU operators to their initiating CPU launcher events. @@ -672,9 +674,10 @@ def find_parent_cpu_op( self, kineto_gpu_op: KinetoOperator, kineto_correlation_cuda_runtime_map: Dict[int, KinetoOperator] ) -> Optional[KinetoOperator]: """ - Finds the parent CPU operator for a given GPU operator by identifying the corresponding CUDA runtime operator - through the correlation ID. It then locates the closest preceding CPU operator based on the CUDA runtime's - timestamp, considering the temporal distance between the GPU operation's start and the initiating CPU operation. + Find the parent CPU operator for a given GPU operator by identifying the corresponding CUDA runtime operator. + + It then locates the closest preceding CPU operator based on the CUDA runtime's timestamp, considering the + temporal distance between the GPU operation's start and the initiating CPU operation. Args: kineto_gpu_op (KinetoOperator): The GPU operator. @@ -715,8 +718,7 @@ def find_closest_op( self, kineto_gpu_op: KinetoOperator, kineto_ops: List[KinetoOperator], ts: int ) -> Optional[KinetoOperator]: """ - Finds the Kineto operator that is closest in start time to a given timestamp and has a duration that covers - the timestamp. + Find the Kineto operator that is closest in start time to a given timestamp and that covers the timestamp. Args: kineto_gpu_op (KinetoOperator): The GPU operator being compared. @@ -771,7 +773,7 @@ def link_ops( kineto_rf_id_to_kineto_op_map: Dict[int, KinetoOperator], ) -> Tuple[List[KinetoOperator], int, int, int, Optional[int]]: """ - Links a PyTorch operator to its corresponding Kineto operator and any associated GPU operators. + Link a PyTorch operator to its corresponding Kineto operator and any associated GPU operators. Args: pytorch_op (PyTorchOperator): PyTorch operator to link. @@ -805,7 +807,7 @@ def link_ops( def link_gpu_ops(self, pytorch_op: PyTorchOperator, kineto_gpu_ops: List[KinetoOperator]) -> None: """ - Links GPU operators to a PyTorch operator. + Link GPU operators to a PyTorch operator. Args: pytorch_op (PyTorchOperator): The PyTorch operator to link to. @@ -824,8 +826,7 @@ def construct_et_plus_data( pytorch_op_id_to_inter_thread_dep_map: Dict[int, int], ) -> Dict: """ - Constructs the enhanced PyTorch Execution Trace (ET+) data structure by integrating Kineto data into the - original PyTorch Execution Trace. + Construct the enhanced PyTorch Execution Trace (ET+) data structure. This method enriches the PyTorch execution trace with detailed performance data from the Kineto trace, offering a comprehensive view of the execution. @@ -878,8 +879,7 @@ def process_op_and_dependents( pytorch_op_id_to_inter_thread_dep_map: Dict[int, int], ) -> List[Dict]: """ - Processes a single operator in the PyTorch ET data, assigns a new unique ID, and processes any dependent GPU - operators. + Process a single operator in the PyTorch ET data, assign a new unique ID, and process any dependent operators. Args: op (Dict): The operator to be processed. @@ -921,8 +921,9 @@ def process_dependent_gpu_ops( self, cpu_op: Dict, orig_op_id: int, pytorch_op_id_to_kineto_ops_map: Dict[int, List[KinetoOperator]] ) -> List[Dict]: """ - Creates and returns a list of GPU operators that are dependent on a specific CPU operator, sorted by their - timestamp. The GPU operators are deep copies of the existing operators with updated IDs and other relevant + Create and return a list of GPU operators that are dependent on a specific CPU operator. + + The GPU operators are deep copies of the existing operators with updated IDs and other relevant fields from the CPU operator. Args: @@ -960,7 +961,7 @@ def process_dependent_gpu_ops( def dump_pytorch_execution_trace_plus(self, output_file: str) -> None: """ - Dumps the enhanced PyTorch Execution Trace (ET+) data to a file. + Dump the enhanced PyTorch Execution Trace (ET+) data to a file. Args: output_file (str): The file path where the ET+ data will be saved. diff --git a/src/trace_link/unique_id_assigner.py b/src/trace_link/unique_id_assigner.py index a26d0441..10f20eb5 100644 --- a/src/trace_link/unique_id_assigner.py +++ b/src/trace_link/unique_id_assigner.py @@ -9,7 +9,7 @@ class UniqueIdAssigner: It's particularly useful in scenarios where the uniqueness of IDs across different entities or iterations needs to be preserved. - Attributes: + Attributes next_id (int): The next unique ID to be assigned. original_to_new_ids (Dict[int, int]): A mapping from original IDs to their corresponding new unique IDs. This helps in retrieving already assigned unique IDs and ensures the same original ID always maps to the same @@ -17,16 +17,13 @@ class UniqueIdAssigner: """ def __init__(self) -> None: - """ - Initializes the UniqueIdAssigner with a starting ID of 0. - """ + """Initialize the UniqueIdAssigner with a starting ID of 0.""" self.next_id: int = 0 self.original_to_new_ids: Dict[int, int] = {} def assign_or_retrieve_id(self, original_id: int) -> int: """ - Assigns a new unique ID to the given original ID if it doesn't have one already; otherwise, returns the - previously assigned unique ID. + Assign a new unique ID to the given original ID if it doesn't have one already. Args: original_id (int): The original ID for which a unique ID is needed. @@ -42,11 +39,11 @@ def assign_or_retrieve_id(self, original_id: int) -> int: def generate_new_id(self) -> int: """ - Generates a new unique ID without needing an original ID. + Generate a new unique ID without needing an original ID. This is useful for cases where new entities are created that do not have an existing identifier. - Returns: + Returns int: A new unique ID. """ unique_id = self.next_id @@ -55,7 +52,7 @@ def generate_new_id(self) -> int: def lookup_new_id(self, original_id: int) -> int: """ - Retrieves the new unique ID for a given original ID, if it has been assigned. + Retrieve the new unique ID for a given original ID, if it has been assigned. This method is useful for checking if a unique ID has already been assigned to an original ID and retrieving it. diff --git a/src/visualizer/visualizer.py b/src/visualizer/visualizer.py index ff83e08d..b2737b80 100644 --- a/src/visualizer/visualizer.py +++ b/src/visualizer/visualizer.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python3 - import argparse import re