Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests for TraceLinker methods to improve coverage #86

Merged
merged 1 commit into from
Jun 11, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 106 additions & 9 deletions tests/trace_link/test_trace_linker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
from chakra.src.trace_link.kineto_operator import KinetoOperator
from chakra.src.trace_link.trace_linker import TraceLinker
from chakra.src.trace_link.unique_id_assigner import UniqueIdAssigner
from param_bench.train.compute.python.tools.execution_trace import (
EXECUTION_TRACE_PROCESS_ANNOTATION,
EXECUTION_TRACE_THREAD_ANNOTATION,
)
from param_bench.train.compute.python.tools.execution_trace import (
Node as PyTorchOperator,
)
Expand Down Expand Up @@ -239,24 +243,24 @@ def test_process_dependent_gpu_ops(trace_linker, orig_op_id, cpu_op, kineto_gpu_
@patch("json.load")
def test_construct_et_plus_data(mock_json_load, mock_open, mock_process_op_and_dependents, trace_linker):
mock_json_load.return_value = {"nodes": [{"id": 1}, {"id": 2}]}
mock_process_op_and_dependents.side_effect = lambda op, *args: [{"id": op["id"] + 2}]
mock_process_op_and_dependents.side_effect = lambda x, *args: [{"id": x["id"] + 2}]

pytorch_op_id_to_kineto_ops_map = {}
pytorch_op_id_to_inclusive_dur_map = {}
pytorch_op_id_to_exclusive_dur_map = {}
pytorch_op_id_to_timestamp_map = {}
pytorch_op_id_to_inter_thread_dep_map = {}
pytorch_op_id_to_kineto_ops_map = {1: [], 2: []}
pytorch_op_id_to_inclusive_dur_map = {1: 100, 2: 200}
pytorch_op_id_to_exclusive_dur_map = {1: 50, 2: 150}
pytorch_op_id_to_timestamp_map = {1: 1000, 2: 2000}
pytorch_op_id_to_inter_thread_dep_map = {1: None, 2: None}

result = trace_linker.construct_et_plus_data(
"path/to/pytorch_et.json",
trace_linker.pytorch_et_plus_data = trace_linker.construct_et_plus_data(
trace_linker.pytorch_et_file,
pytorch_op_id_to_kineto_ops_map,
pytorch_op_id_to_inclusive_dur_map,
pytorch_op_id_to_exclusive_dur_map,
pytorch_op_id_to_timestamp_map,
pytorch_op_id_to_inter_thread_dep_map,
)

assert result["nodes"] == [{"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}]
assert trace_linker.pytorch_et_plus_data["nodes"] == [{"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}]


@patch("builtins.open", new_callable=MagicMock)
Expand All @@ -271,3 +275,96 @@ def test_dump_pytorch_execution_trace_plus(mock_json_dump, mock_open, trace_link
mock_json_dump.assert_called_once_with(
{"nodes": [{"id": 1}, {"id": 2}]}, mock_open.return_value.__enter__(), indent=4
)


def test_add_thread_and_process_annotations(trace_linker):
kineto_cpu_ops = []
sorted_kineto_cpu_ops = []
sorted_kineto_cpu_op_ts = []
kineto_thread_info = {1: (100, 200), 2: (150, 250)}
kineto_process_start_time = 50
kineto_process_end_time = 300

kineto_cpu_ops, sorted_kineto_cpu_ops, sorted_kineto_cpu_op_ts = trace_linker.add_thread_and_process_annotations(
kineto_cpu_ops,
sorted_kineto_cpu_ops,
sorted_kineto_cpu_op_ts,
kineto_thread_info,
kineto_process_start_time,
kineto_process_end_time,
)

assert len(kineto_cpu_ops) == 3
assert kineto_cpu_ops[0].name == EXECUTION_TRACE_PROCESS_ANNOTATION
assert kineto_cpu_ops[1].name == EXECUTION_TRACE_THREAD_ANNOTATION
assert kineto_cpu_ops[2].name == EXECUTION_TRACE_THREAD_ANNOTATION

assert len(sorted_kineto_cpu_ops) == 3
assert sorted_kineto_cpu_ops[0].timestamp == kineto_cpu_ops[0].timestamp
assert sorted_kineto_cpu_ops[1].timestamp == kineto_cpu_ops[1].timestamp
assert sorted_kineto_cpu_ops[2].timestamp == kineto_cpu_ops[2].timestamp

assert len(sorted_kineto_cpu_op_ts) == 3
assert sorted_kineto_cpu_op_ts[0] == kineto_cpu_ops[0].timestamp
assert sorted_kineto_cpu_op_ts[1] == kineto_cpu_ops[1].timestamp
assert sorted_kineto_cpu_op_ts[2] == kineto_cpu_ops[2].timestamp


@patch("chakra.src.trace_link.trace_linker.TraceLinker.find_closest_op")
def test_find_parent_cpu_op(mock_find_closest_op, trace_linker):
kineto_gpu_op = MagicMock(spec=KinetoOperator)
kineto_gpu_op.correlation = 123
kineto_gpu_op.name = "gpu_op"

kineto_runtime_op = MagicMock(spec=KinetoOperator)
kineto_runtime_op.timestamp = 100
kineto_runtime_op.tid = 1
kineto_runtime_op.name = "runtime_op"

trace_linker.kineto_correlation_cuda_runtime_map = {123: kineto_runtime_op}

mock_find_closest_op.return_value = kineto_runtime_op

result = trace_linker.find_parent_cpu_op(kineto_gpu_op, trace_linker.kineto_correlation_cuda_runtime_map)

assert result == kineto_runtime_op
mock_find_closest_op.assert_called_once_with(
kineto_gpu_op, trace_linker.sorted_kineto_cpu_ops, kineto_runtime_op.timestamp
)


def test_group_gpu_ops_by_cpu_launchers(trace_linker):
kineto_gpu_op1 = MagicMock(spec=KinetoOperator)
kineto_gpu_op1.correlation = 123
kineto_gpu_op1.name = "gpu_op1"
kineto_gpu_op1.timestamp = 150
kineto_gpu_op1.tid = 1

kineto_gpu_op2 = MagicMock(spec=KinetoOperator)
kineto_gpu_op2.correlation = 456
kineto_gpu_op2.name = "gpu_op2"
kineto_gpu_op2.timestamp = 250
kineto_gpu_op2.tid = 2

kineto_runtime_op1 = MagicMock(spec=KinetoOperator)
kineto_runtime_op1.ev_idx = "cpu_op1"
kineto_runtime_op1.timestamp = 100
kineto_runtime_op1.tid = 1
kineto_runtime_op1.name = "runtime_op1"
kineto_runtime_op1.correlation = 123

kineto_runtime_op2 = MagicMock(spec=KinetoOperator)
kineto_runtime_op2.ev_idx = "cpu_op2"
kineto_runtime_op2.timestamp = 200
kineto_runtime_op2.tid = 2
kineto_runtime_op2.name = "runtime_op2"
kineto_runtime_op2.correlation = 456

trace_linker.kineto_correlation_cuda_runtime_map = {123: kineto_runtime_op1, 456: kineto_runtime_op2}

with patch.object(trace_linker, "find_parent_cpu_op", side_effect=[kineto_runtime_op1, kineto_runtime_op2]):
result = trace_linker.group_gpu_ops_by_cpu_launchers(
[kineto_gpu_op1, kineto_gpu_op2], trace_linker.kineto_correlation_cuda_runtime_map
)

assert result == {"cpu_op1": [kineto_gpu_op1], "cpu_op2": [kineto_gpu_op2]}
Loading