From 35d004ec308e0bbdeca24c2626735725c7a62dc6 Mon Sep 17 00:00:00 2001 From: Taekyung Heo <7621438+TaekyungHeo@users.noreply.github.com> Date: Tue, 23 Jul 2024 11:44:14 -0400 Subject: [PATCH 1/5] Identify process group init nodes as METADATA nodes --- src/converter/pytorch_converter.py | 6 ++++-- src/converter/pytorch_node.py | 6 +++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/converter/pytorch_converter.py b/src/converter/pytorch_converter.py index a1ab7bd2..1ae2a29c 100644 --- a/src/converter/pytorch_converter.py +++ b/src/converter/pytorch_converter.py @@ -225,8 +225,10 @@ def convert_json_to_protobuf_nodes( protobuf_node_map (Dict[int, ChakraNode]): Dictionary where the converted Protobuf nodes will be stored. """ for _, json_node in json_node_map.items(): - if (json_node.get_op_type() == PyTorchNodeType.CPU_OP) or ( - json_node.get_op_type() == PyTorchNodeType.LABEL + if ( + (json_node.get_op_type() == PyTorchNodeType.CPU_OP) + or (json_node.get_op_type() == PyTorchNodeType.LABEL) + or (json_node.get_op_type() == PyTorchNodeType.METADATA) ): chakra_node = self.convert_json_to_protobuf_node(json_node_map, protobuf_node_map, json_node) protobuf_node_map[chakra_node.id] = chakra_node diff --git a/src/converter/pytorch_node.py b/src/converter/pytorch_node.py index 89b29dcb..b906265d 100644 --- a/src/converter/pytorch_node.py +++ b/src/converter/pytorch_node.py @@ -13,11 +13,13 @@ class PyTorchNodeType(Enum): CPU_OP (int): Represents a CPU operation. GPU_OP (int): Represents a GPU operation. LABEL (int): Represents a non-operator node (e.g., labels). + METADATA (int): Represents a metadata node (e.g., process group initialization). """ CPU_OP = 1 GPU_OP = 2 LABEL = 3 # Non-operator nodes + METADATA = 4 # Metadata nodes class PyTorchNode: @@ -120,7 +122,9 @@ def get_op_type(self) -> PyTorchNodeType: Returns PyTorchNodeType: The type of the PyTorch operation. """ - if self.is_gpu_op(): + if "process_group:init" in self.name: + return PyTorchNodeType.METADATA + elif self.is_gpu_op(): return PyTorchNodeType.GPU_OP elif hasattr(self, "op_schema") or hasattr(self, "outputs"): return PyTorchNodeType.CPU_OP From 5107e96c0b1d566e9cca50a41943ead7e30a064f Mon Sep 17 00:00:00 2001 From: JoongunPark <8554137+JoongunPark@users.noreply.github.com> Date: Wed, 24 Jul 2024 13:52:23 -0400 Subject: [PATCH 2/5] Update trace_linker to encode pg_name for collectives --- src/trace_link/kineto_operator.py | 13 +++++++++++++ src/trace_link/trace_linker.py | 6 ++++++ 2 files changed, 19 insertions(+) diff --git a/src/trace_link/kineto_operator.py b/src/trace_link/kineto_operator.py index 5078aeba..95bbcede 100644 --- a/src/trace_link/kineto_operator.py +++ b/src/trace_link/kineto_operator.py @@ -25,6 +25,7 @@ class KinetoOperator: stream (Optional[int]): CUDA stream identifier associated with the operator. rf_id (Optional[int]): Record function identifier. correlation (int): Identifier used to correlate CUDA runtime and GPU operations. + pg_name (Optional[str]): Process Group name for the collective communication. """ def __init__(self, kineto_op: Dict[str, Any]) -> None: @@ -51,6 +52,7 @@ def __init__(self, kineto_op: Dict[str, Any]) -> None: self.stream: Optional[int] = kineto_op.get("args", {}).get("stream", None) self.rf_id: Optional[int] = kineto_op.get("args", {}).get("Record function id", None) self.correlation: int = kineto_op.get("args", {}).get("correlation", -1) + self.pg_name: Optional[str] = kineto_op.get("args", {}).get("Process Group Name", None) def __repr__(self) -> str: """ @@ -153,3 +155,14 @@ def is_gpu_op(self) -> bool: """ gpu_categories = {"kernel", "gpu_memcpy"} return self.category in gpu_categories + + def is_inter_gpu_comms_op(self) -> bool: + """ + Check if the operator is a inter-GPU communication operator based on its name. + + Both point-to-point send/receive primitives and collective communication primitives are considered. + + Returns + bool: True if it's a inter-GPU communication, otherwise False. + """ + return "ncclDevKernel" in self.name diff --git a/src/trace_link/trace_linker.py b/src/trace_link/trace_linker.py index 1eda866d..aeeb0358 100644 --- a/src/trace_link/trace_linker.py +++ b/src/trace_link/trace_linker.py @@ -755,8 +755,14 @@ def process_dependent_gpu_ops( "exclusive_dur": gpu_op.exclusive_dur, "ts": gpu_op.timestamp, "stream": gpu_op.stream, + **( + {"pg_name": gpu_op.pg_name} + if gpu_op.is_inter_gpu_comms_op() and gpu_op.pg_name is not None + else {} + ), } ) + updated_gpu_ops.append(new_gpu_op) return updated_gpu_ops From dd62ef3e95230945484ee86f4d96af69b891704e Mon Sep 17 00:00:00 2001 From: JoongunPark <8554137+JoongunPark@users.noreply.github.com> Date: Wed, 24 Jul 2024 13:53:58 -0400 Subject: [PATCH 3/5] Update converter to encode pg_name in attr field --- src/converter/pytorch_converter.py | 2 ++ src/converter/pytorch_node.py | 5 +++++ 2 files changed, 7 insertions(+) diff --git a/src/converter/pytorch_converter.py b/src/converter/pytorch_converter.py index 1ae2a29c..2371dccb 100644 --- a/src/converter/pytorch_converter.py +++ b/src/converter/pytorch_converter.py @@ -244,6 +244,7 @@ def convert_json_to_protobuf_nodes( [ ChakraAttr(name="comm_type", int64_val=collective_comm_type), ChakraAttr(name="comm_size", int64_val=pytorch_gpu_node.comm_size), + *( [ChakraAttr(name="pg_name", string_val=pytorch_gpu_node.pg_name)] if pytorch_gpu_node.pg_name != "" else [] ), ] ) @@ -251,6 +252,7 @@ def convert_json_to_protobuf_nodes( chakra_gpu_node.attr.extend( [ ChakraAttr(name="comm_size", int64_val=pytorch_gpu_node.comm_size), + *( [ChakraAttr(name="pg_name", string_val=pytorch_gpu_node.pg_name)] if pytorch_gpu_node.pg_name != "" else [] ), ] ) diff --git a/src/converter/pytorch_node.py b/src/converter/pytorch_node.py index b906265d..b569bd51 100644 --- a/src/converter/pytorch_node.py +++ b/src/converter/pytorch_node.py @@ -44,6 +44,7 @@ class PyTorchNode: inter_thread_dep (Any): Inter-thread dependency of the node. cat (Any): Category of the node. stream (int): Stream associated with the node. + pg_name (str): Process Group name for the inter-GPU communication. """ SUPPORTED_VERSIONS = ["1.0.2-chakra.0.0.4", "1.0.3-chakra.0.0.4", "1.1.0-chakra.0.0.4"] @@ -111,6 +112,10 @@ def _parse_data_1_0_3_chakra_0_0_4(self, node_data: Dict[str, Any]) -> None: self.inter_thread_dep = node_data.get("inter_thread_dep") self.cat = node_data.get("cat") self.stream = node_data.get("stream", 0) + # In Colletive comms nodes, pg_name is in node_data if exists. + # In SendRecv nodes, pg_name is in the attrs if exists. + # Otherwise, pg_name is not present. + self.pg_name = node_data.get("pg_name", "") for attr in node_data.get("attrs", []): setattr(self, attr["name"], attr["value"]) From b5dcd00c03b2f4f3bd8ebdade60f06e525696fa8 Mon Sep 17 00:00:00 2001 From: JoongunPark <8554137+JoongunPark@users.noreply.github.com> Date: Wed, 24 Jul 2024 13:55:15 -0400 Subject: [PATCH 4/5] Update ETFeeder to read pg_name and provider getter --- src/feeder/et_feeder_node.cpp | 6 ++++++ src/feeder/et_feeder_node.h | 2 ++ 2 files changed, 8 insertions(+) diff --git a/src/feeder/et_feeder_node.cpp b/src/feeder/et_feeder_node.cpp index 3cb2ced0..2d89b93c 100644 --- a/src/feeder/et_feeder_node.cpp +++ b/src/feeder/et_feeder_node.cpp @@ -32,6 +32,8 @@ ETFeederNode::ETFeederNode(std::shared_ptr node) { this->comm_dst_ = static_cast(attr.int32_val()); } else if (attr_name == "comm_tag") { this->comm_tag_ = static_cast(attr.int32_val()); + } else if (attr_name == "pg_name") { + this->pg_name_ = static_cast(attr.string_val()); } else { this->other_attrs_.emplace(attr_name, attr); } @@ -138,3 +140,7 @@ uint32_t ETFeederNode::comm_dst() { uint32_t ETFeederNode::comm_tag() { return comm_tag_; } + +string ETFeederNode::pg_name() { + return pg_name_; +} diff --git a/src/feeder/et_feeder_node.h b/src/feeder/et_feeder_node.h index 5d4689cd..9d2008a1 100644 --- a/src/feeder/et_feeder_node.h +++ b/src/feeder/et_feeder_node.h @@ -37,6 +37,7 @@ class ETFeederNode { uint32_t comm_src(); uint32_t comm_dst(); uint32_t comm_tag(); + std::string pg_name(); private: void assign_attr_val( @@ -64,6 +65,7 @@ class ETFeederNode { uint32_t comm_src_; uint32_t comm_dst_; uint32_t comm_tag_; + std::string pg_name_; }; } // namespace Chakra From 52de625b8a0b04c7a142f5fff3162f01d3ac071b Mon Sep 17 00:00:00 2001 From: JoongunPark <8554137+JoongunPark@users.noreply.github.com> Date: Wed, 24 Jul 2024 14:31:48 -0400 Subject: [PATCH 5/5] Add 'pg_name' attribute to Mock objects in test --- tests/trace_link/test_trace_linker.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/trace_link/test_trace_linker.py b/tests/trace_link/test_trace_linker.py index 77bcc732..4d320acd 100644 --- a/tests/trace_link/test_trace_linker.py +++ b/tests/trace_link/test_trace_linker.py @@ -469,6 +469,7 @@ def test_process_dependent_gpu_ops(trace_linker, orig_op_id, cpu_op, kineto_gpu_ gpu_op.inclusive_dur = gpu_op_data["inclusive_dur"] gpu_op.exclusive_dur = gpu_op_data["exclusive_dur"] gpu_op.stream = gpu_op_data["stream"] + gpu_op.pg_name = gpu_op_data.get("pg_name", None) kineto_gpu_op_objects.append(gpu_op) host_op_id_to_kineto_ops_map = {orig_op_id: kineto_gpu_op_objects} @@ -497,6 +498,8 @@ def test_process_dependent_gpu_ops(trace_linker, orig_op_id, cpu_op, kineto_gpu_ assert updated_gpu_op["exclusive_dur"] == kineto_gpu_op_objects[i].exclusive_dur assert updated_gpu_op["ts"] == kineto_gpu_op_objects[i].timestamp assert updated_gpu_op["stream"] == kineto_gpu_op_objects[i].stream + if kineto_gpu_op_objects[i].is_inter_gpu_comms_op() and kineto_gpu_op_objects[i].pg_name is not None: + assert updated_gpu_op["pg_name"] == kineto_gpu_op_objects[i].pg_name @patch("builtins.open", new_callable=MagicMock)