diff --git a/src/converter/pytorch_converter.py b/src/converter/pytorch_converter.py index a1ab7bd2..2371dccb 100644 --- a/src/converter/pytorch_converter.py +++ b/src/converter/pytorch_converter.py @@ -225,8 +225,10 @@ def convert_json_to_protobuf_nodes( protobuf_node_map (Dict[int, ChakraNode]): Dictionary where the converted Protobuf nodes will be stored. """ for _, json_node in json_node_map.items(): - if (json_node.get_op_type() == PyTorchNodeType.CPU_OP) or ( - json_node.get_op_type() == PyTorchNodeType.LABEL + if ( + (json_node.get_op_type() == PyTorchNodeType.CPU_OP) + or (json_node.get_op_type() == PyTorchNodeType.LABEL) + or (json_node.get_op_type() == PyTorchNodeType.METADATA) ): chakra_node = self.convert_json_to_protobuf_node(json_node_map, protobuf_node_map, json_node) protobuf_node_map[chakra_node.id] = chakra_node @@ -242,6 +244,7 @@ def convert_json_to_protobuf_nodes( [ ChakraAttr(name="comm_type", int64_val=collective_comm_type), ChakraAttr(name="comm_size", int64_val=pytorch_gpu_node.comm_size), + *( [ChakraAttr(name="pg_name", string_val=pytorch_gpu_node.pg_name)] if pytorch_gpu_node.pg_name != "" else [] ), ] ) @@ -249,6 +252,7 @@ def convert_json_to_protobuf_nodes( chakra_gpu_node.attr.extend( [ ChakraAttr(name="comm_size", int64_val=pytorch_gpu_node.comm_size), + *( [ChakraAttr(name="pg_name", string_val=pytorch_gpu_node.pg_name)] if pytorch_gpu_node.pg_name != "" else [] ), ] ) diff --git a/src/converter/pytorch_node.py b/src/converter/pytorch_node.py index 89b29dcb..b569bd51 100644 --- a/src/converter/pytorch_node.py +++ b/src/converter/pytorch_node.py @@ -13,11 +13,13 @@ class PyTorchNodeType(Enum): CPU_OP (int): Represents a CPU operation. GPU_OP (int): Represents a GPU operation. LABEL (int): Represents a non-operator node (e.g., labels). + METADATA (int): Represents a metadata node (e.g., process group initialization). """ CPU_OP = 1 GPU_OP = 2 LABEL = 3 # Non-operator nodes + METADATA = 4 # Metadata nodes class PyTorchNode: @@ -42,6 +44,7 @@ class PyTorchNode: inter_thread_dep (Any): Inter-thread dependency of the node. cat (Any): Category of the node. stream (int): Stream associated with the node. + pg_name (str): Process Group name for the inter-GPU communication. """ SUPPORTED_VERSIONS = ["1.0.2-chakra.0.0.4", "1.0.3-chakra.0.0.4", "1.1.0-chakra.0.0.4"] @@ -109,6 +112,10 @@ def _parse_data_1_0_3_chakra_0_0_4(self, node_data: Dict[str, Any]) -> None: self.inter_thread_dep = node_data.get("inter_thread_dep") self.cat = node_data.get("cat") self.stream = node_data.get("stream", 0) + # In Colletive comms nodes, pg_name is in node_data if exists. + # In SendRecv nodes, pg_name is in the attrs if exists. + # Otherwise, pg_name is not present. + self.pg_name = node_data.get("pg_name", "") for attr in node_data.get("attrs", []): setattr(self, attr["name"], attr["value"]) @@ -120,7 +127,9 @@ def get_op_type(self) -> PyTorchNodeType: Returns PyTorchNodeType: The type of the PyTorch operation. """ - if self.is_gpu_op(): + if "process_group:init" in self.name: + return PyTorchNodeType.METADATA + elif self.is_gpu_op(): return PyTorchNodeType.GPU_OP elif hasattr(self, "op_schema") or hasattr(self, "outputs"): return PyTorchNodeType.CPU_OP diff --git a/src/feeder/et_feeder_node.cpp b/src/feeder/et_feeder_node.cpp index 3889fbb5..e0427e41 100644 --- a/src/feeder/et_feeder_node.cpp +++ b/src/feeder/et_feeder_node.cpp @@ -32,6 +32,8 @@ ETFeederNode::ETFeederNode(std::shared_ptr node) { this->comm_dst_ = static_cast(attr.int32_val()); } else if (attr_name == "comm_tag") { this->comm_tag_ = static_cast(attr.int32_val()); + } else if (attr_name == "pg_name") { + this->pg_name_ = static_cast(attr.string_val()); } else { this->other_attrs_.emplace(attr_name, attr); } @@ -138,3 +140,7 @@ uint32_t ETFeederNode::comm_dst() { uint32_t ETFeederNode::comm_tag() { return comm_tag_; } + +string ETFeederNode::pg_name() { + return pg_name_; +} diff --git a/src/feeder/et_feeder_node.h b/src/feeder/et_feeder_node.h index 68fa5c8c..c0aede48 100644 --- a/src/feeder/et_feeder_node.h +++ b/src/feeder/et_feeder_node.h @@ -38,6 +38,7 @@ class ETFeederNode { uint32_t comm_src(); uint32_t comm_dst(); uint32_t comm_tag(); + std::string pg_name(); private: void assign_attr_val( @@ -65,6 +66,7 @@ class ETFeederNode { uint32_t comm_src_; uint32_t comm_dst_; uint32_t comm_tag_; + std::string pg_name_; }; } // namespace Chakra diff --git a/src/trace_link/kineto_operator.py b/src/trace_link/kineto_operator.py index 5078aeba..95bbcede 100644 --- a/src/trace_link/kineto_operator.py +++ b/src/trace_link/kineto_operator.py @@ -25,6 +25,7 @@ class KinetoOperator: stream (Optional[int]): CUDA stream identifier associated with the operator. rf_id (Optional[int]): Record function identifier. correlation (int): Identifier used to correlate CUDA runtime and GPU operations. + pg_name (Optional[str]): Process Group name for the collective communication. """ def __init__(self, kineto_op: Dict[str, Any]) -> None: @@ -51,6 +52,7 @@ def __init__(self, kineto_op: Dict[str, Any]) -> None: self.stream: Optional[int] = kineto_op.get("args", {}).get("stream", None) self.rf_id: Optional[int] = kineto_op.get("args", {}).get("Record function id", None) self.correlation: int = kineto_op.get("args", {}).get("correlation", -1) + self.pg_name: Optional[str] = kineto_op.get("args", {}).get("Process Group Name", None) def __repr__(self) -> str: """ @@ -153,3 +155,14 @@ def is_gpu_op(self) -> bool: """ gpu_categories = {"kernel", "gpu_memcpy"} return self.category in gpu_categories + + def is_inter_gpu_comms_op(self) -> bool: + """ + Check if the operator is a inter-GPU communication operator based on its name. + + Both point-to-point send/receive primitives and collective communication primitives are considered. + + Returns + bool: True if it's a inter-GPU communication, otherwise False. + """ + return "ncclDevKernel" in self.name diff --git a/src/trace_link/trace_linker.py b/src/trace_link/trace_linker.py index 1eda866d..aeeb0358 100644 --- a/src/trace_link/trace_linker.py +++ b/src/trace_link/trace_linker.py @@ -755,8 +755,14 @@ def process_dependent_gpu_ops( "exclusive_dur": gpu_op.exclusive_dur, "ts": gpu_op.timestamp, "stream": gpu_op.stream, + **( + {"pg_name": gpu_op.pg_name} + if gpu_op.is_inter_gpu_comms_op() and gpu_op.pg_name is not None + else {} + ), } ) + updated_gpu_ops.append(new_gpu_op) return updated_gpu_ops diff --git a/tests/trace_link/test_trace_linker.py b/tests/trace_link/test_trace_linker.py index 77bcc732..4d320acd 100644 --- a/tests/trace_link/test_trace_linker.py +++ b/tests/trace_link/test_trace_linker.py @@ -469,6 +469,7 @@ def test_process_dependent_gpu_ops(trace_linker, orig_op_id, cpu_op, kineto_gpu_ gpu_op.inclusive_dur = gpu_op_data["inclusive_dur"] gpu_op.exclusive_dur = gpu_op_data["exclusive_dur"] gpu_op.stream = gpu_op_data["stream"] + gpu_op.pg_name = gpu_op_data.get("pg_name", None) kineto_gpu_op_objects.append(gpu_op) host_op_id_to_kineto_ops_map = {orig_op_id: kineto_gpu_op_objects} @@ -497,6 +498,8 @@ def test_process_dependent_gpu_ops(trace_linker, orig_op_id, cpu_op, kineto_gpu_ assert updated_gpu_op["exclusive_dur"] == kineto_gpu_op_objects[i].exclusive_dur assert updated_gpu_op["ts"] == kineto_gpu_op_objects[i].timestamp assert updated_gpu_op["stream"] == kineto_gpu_op_objects[i].stream + if kineto_gpu_op_objects[i].is_inter_gpu_comms_op() and kineto_gpu_op_objects[i].pg_name is not None: + assert updated_gpu_op["pg_name"] == kineto_gpu_op_objects[i].pg_name @patch("builtins.open", new_callable=MagicMock)