diff --git a/tests/trace_link/test_trace_linker.py b/tests/trace_link/test_trace_linker.py index 7482b657..d361d729 100644 --- a/tests/trace_link/test_trace_linker.py +++ b/tests/trace_link/test_trace_linker.py @@ -20,6 +20,44 @@ def test_initialization(trace_linker): assert isinstance(trace_linker.id_assigner, UniqueIdAssigner) +@patch("chakra.src.trace_link.trace_linker.TraceLinker.construct_et_plus_data") +@patch("chakra.src.trace_link.trace_linker.TraceLinker.add_thread_and_process_annotations") +@patch("chakra.src.trace_link.trace_linker.TraceLinker.map_pytorch_to_kineto_ops") +def test_link_traces(mock_map_ops, mock_add_annotations, mock_construct_et_plus, trace_linker): + mock_add_annotations.return_value = ([], [], []) + mock_map_ops.return_value = ({}, {}, {}, {}, {}) + mock_construct_et_plus.return_value = {} + + pytorch_ops = [MagicMock(spec=PyTorchOperator)] + kineto_cpu_ops = [MagicMock(spec=KinetoOperator)] + sorted_kineto_cpu_ops = [MagicMock(spec=KinetoOperator)] + sorted_kineto_cpu_op_ts = [100] + kineto_correlation_cuda_runtime_map = {1: MagicMock(spec=KinetoOperator)} + kineto_rf_id_to_kineto_op_map = {1: MagicMock(spec=KinetoOperator)} + kineto_gpu_ops = [MagicMock(spec=KinetoOperator)] + kineto_thread_info = {1: (100, 200)} + kineto_process_start_time = 50 + kineto_process_end_time = 300 + + trace_linker.link_traces( + "pytorch_et_file", + pytorch_ops, + kineto_cpu_ops, + sorted_kineto_cpu_ops, + sorted_kineto_cpu_op_ts, + kineto_correlation_cuda_runtime_map, + kineto_rf_id_to_kineto_op_map, + kineto_gpu_ops, + kineto_thread_info, + kineto_process_start_time, + kineto_process_end_time, + ) + + mock_add_annotations.assert_called_once() + mock_map_ops.assert_called_once() + mock_construct_et_plus.assert_called_once() + + @patch("chakra.src.trace_link.trace_linker.TraceLinker.load_pytorch_et") @patch("chakra.src.trace_link.trace_linker.TraceLinker.load_kineto_trace") def test_load_traces(mock_load_kineto_trace, mock_load_pytorch_et, trace_linker): @@ -29,44 +67,50 @@ def test_load_traces(mock_load_kineto_trace, mock_load_pytorch_et, trace_linker) mock_load_kineto_trace.assert_called_once() -@patch("chakra.src.trace_link.trace_linker.TraceLinker.load_traces") -@patch("chakra.src.trace_link.trace_linker.TraceLinker.update_kineto_data") -@patch("chakra.src.trace_link.trace_linker.TraceLinker.enforce_inter_thread_order") -@patch("chakra.src.trace_link.trace_linker.TraceLinker.link_traces") -@patch("chakra.src.trace_link.trace_linker.TraceLinker.dump_pytorch_execution_trace_plus") -def test_link(mock_dump, mock_link_traces, mock_enforce_order, mock_update_data, mock_load_traces, trace_linker): - mock_load_traces.return_value = ([], {}) - mock_update_data.return_value = ([], {}, {}, [], {}, {}, 0, 0, {}, {}, [], []) - mock_enforce_order.return_value = {} +def test_extract_pytorch_ops(trace_linker): + mock_root_node = MagicMock(spec=PyTorchOperator) + mock_child_node = MagicMock(spec=PyTorchOperator) - trace_linker.link("pytorch_et_file", "kineto_file", "output_file") + mock_root_node.children = [mock_child_node] + mock_child_node.children = [] - mock_load_traces.assert_called_once() - mock_update_data.assert_called_once() - mock_enforce_order.assert_called_once() - mock_link_traces.assert_called_once() - mock_dump.assert_called_once() + mock_root_node.id = 1 + mock_child_node.id = 2 + + result = trace_linker.extract_pytorch_ops(mock_root_node) + assert len(result) == 2 + assert result[0].id == 1 + assert result[1].id == 2 -def test_construct_kineto_data_structures(trace_linker): - mock_kineto_op1 = MagicMock(spec=KinetoOperator) - mock_kineto_op1.is_cpu_op.return_value = True - mock_kineto_op1.timestamp = 100 - mock_kineto_op1.inclusive_dur = 50 - mock_kineto_op1.tid = 1 - mock_kineto_op1.name = "op1" - mock_kineto_op1.rf_id = 1 +@pytest.mark.parametrize( + "kineto_ops, expected_exclusive_durs", + [ + ( + [ + {"ts": 100, "dur": 10, "inclusive_dur": 10}, + {"ts": 105, "dur": 3, "inclusive_dur": 3}, + {"ts": 108, "dur": 1, "inclusive_dur": 1}, + ], + [6, 3, 1], # Expected exclusive durations + ), + ( + [ + {"ts": 100, "dur": 20, "inclusive_dur": 20}, + {"ts": 105, "dur": 5, "inclusive_dur": 5}, + {"ts": 110, "dur": 5, "inclusive_dur": 5}, + ], + [10, 5, 5], # Expected exclusive durations + ), + ], +) +def test_calculate_exclusive_dur(trace_linker, kineto_ops, expected_exclusive_durs): + kineto_tid_cpu_ops_map = {1: [KinetoOperator(op) for op in kineto_ops]} - mock_kineto_op2 = MagicMock(spec=KinetoOperator) - mock_kineto_op2.is_cpu_op.return_value = True - mock_kineto_op2.timestamp = 200 - mock_kineto_op2.inclusive_dur = 50 - mock_kineto_op2.tid = 1 - mock_kineto_op2.name = "op2" - mock_kineto_op2.rf_id = 2 + trace_linker.calculate_exclusive_dur(kineto_tid_cpu_ops_map) - kineto_data = trace_linker.construct_kineto_data_structures([mock_kineto_op1, mock_kineto_op2]) - assert kineto_data["kineto_tid_cpu_ops_map"][1] == [mock_kineto_op1, mock_kineto_op2] + for i, op in enumerate(kineto_tid_cpu_ops_map[1]): + assert op.exclusive_dur == expected_exclusive_durs[i] @pytest.mark.parametrize( @@ -84,6 +128,38 @@ def test_merge_overlapping_intervals(intervals, expected_result): assert result == expected_result +@patch("chakra.src.trace_link.trace_linker.TraceLinker.find_last_cpu_node_before_timestamp") +def test_process_thread_inter_thread_order(mock_find_last, trace_linker): + mock_op1 = MagicMock(spec=KinetoOperator) + mock_op1.timestamp = 100 + mock_op1.inclusive_dur = 50 + mock_op1.tid = 1 + mock_op1.name = "op1" + mock_op1.rf_id = 1 + + mock_op2 = MagicMock(spec=KinetoOperator) + mock_op2.timestamp = 200 + mock_op2.inclusive_dur = 50 + mock_op2.tid = 1 + mock_op2.name = "op2" + mock_op2.rf_id = 2 + + ops_by_tid = {1: [mock_op1, mock_op2]} + trace_linker.process_thread_inter_thread_order(1, [mock_op1, mock_op2], ops_by_tid, 1000) + + mock_find_last.assert_called() + + +@patch("concurrent.futures.Future.result") +@patch("chakra.src.trace_link.trace_linker.TraceLinker.process_thread_inter_thread_order") +def test_enforce_inter_thread_order_exception(mock_process_thread, mock_future_result, trace_linker): + mock_future_result.side_effect = Exception("Test Exception") + kineto_tid_cpu_ops_map = {1: [MagicMock(spec=KinetoOperator)]} + + with pytest.raises(Exception, match="Test Exception"): + trace_linker.enforce_inter_thread_order(kineto_tid_cpu_ops_map) + + @pytest.mark.parametrize( "ops_by_tid, exclude_tid, timestamp, expected_result", [ @@ -142,6 +218,198 @@ def test_find_last_cpu_node_before_timestamp(ops_by_tid, exclude_tid, timestamp, assert result == expected_result +def test_add_thread_and_process_annotations(trace_linker): + kineto_cpu_ops = [] + sorted_kineto_cpu_ops = [] + sorted_kineto_cpu_op_ts = [] + kineto_thread_info = {1: (100, 200), 2: (150, 250)} + kineto_process_start_time = 50 + kineto_process_end_time = 300 + + kineto_cpu_ops, sorted_kineto_cpu_ops, sorted_kineto_cpu_op_ts = trace_linker.add_thread_and_process_annotations( + kineto_cpu_ops, + sorted_kineto_cpu_ops, + sorted_kineto_cpu_op_ts, + kineto_thread_info, + kineto_process_start_time, + kineto_process_end_time, + ) + + assert len(kineto_cpu_ops) == 3 + assert kineto_cpu_ops[0].name == EXECUTION_TRACE_PROCESS_ANNOTATION + assert kineto_cpu_ops[1].name == EXECUTION_TRACE_THREAD_ANNOTATION + assert kineto_cpu_ops[2].name == EXECUTION_TRACE_THREAD_ANNOTATION + + assert len(sorted_kineto_cpu_ops) == 3 + assert sorted_kineto_cpu_ops[0].timestamp == kineto_cpu_ops[0].timestamp + assert sorted_kineto_cpu_ops[1].timestamp == kineto_cpu_ops[1].timestamp + assert sorted_kineto_cpu_ops[2].timestamp == kineto_cpu_ops[2].timestamp + + assert len(sorted_kineto_cpu_op_ts) == 3 + assert sorted_kineto_cpu_op_ts[0] == kineto_cpu_ops[0].timestamp + assert sorted_kineto_cpu_op_ts[1] == kineto_cpu_ops[1].timestamp + assert sorted_kineto_cpu_op_ts[2] == kineto_cpu_ops[2].timestamp + + +def test_group_gpu_ops_by_cpu_launchers(trace_linker): + kineto_gpu_op1 = MagicMock(spec=KinetoOperator) + kineto_gpu_op1.correlation = 123 + kineto_gpu_op1.name = "gpu_op1" + kineto_gpu_op1.timestamp = 150 + kineto_gpu_op1.tid = 1 + + kineto_gpu_op2 = MagicMock(spec=KinetoOperator) + kineto_gpu_op2.correlation = 456 + kineto_gpu_op2.name = "gpu_op2" + kineto_gpu_op2.timestamp = 250 + kineto_gpu_op2.tid = 2 + + kineto_runtime_op1 = MagicMock(spec=KinetoOperator) + kineto_runtime_op1.ev_idx = "cpu_op1" + kineto_runtime_op1.timestamp = 100 + kineto_runtime_op1.tid = 1 + kineto_runtime_op1.name = "runtime_op1" + kineto_runtime_op1.correlation = 123 + + kineto_runtime_op2 = MagicMock(spec=KinetoOperator) + kineto_runtime_op2.ev_idx = "cpu_op2" + kineto_runtime_op2.timestamp = 200 + kineto_runtime_op2.tid = 2 + kineto_runtime_op2.name = "runtime_op2" + kineto_runtime_op2.correlation = 456 + + kineto_correlation_cuda_runtime_map = {123: kineto_runtime_op1, 456: kineto_runtime_op2} + sorted_kineto_cpu_ops = [kineto_runtime_op1, kineto_runtime_op2] + sorted_kineto_cpu_op_ts = [kineto_runtime_op1.timestamp, kineto_runtime_op2.timestamp] + + with patch.object(trace_linker, "find_parent_cpu_op", side_effect=[kineto_runtime_op1, kineto_runtime_op2]): + result = trace_linker.group_gpu_ops_by_cpu_launchers( + [kineto_gpu_op1, kineto_gpu_op2], + kineto_correlation_cuda_runtime_map, + sorted_kineto_cpu_ops, + sorted_kineto_cpu_op_ts, + ) + + assert result == {"cpu_op1": [kineto_gpu_op1], "cpu_op2": [kineto_gpu_op2]} + + +@patch("chakra.src.trace_link.trace_linker.TraceLinker.find_closest_op") +def test_find_parent_cpu_op(mock_find_closest_op, trace_linker): + kineto_gpu_op = MagicMock(spec=KinetoOperator) + kineto_gpu_op.correlation = 123 + kineto_gpu_op.name = "gpu_op" + + kineto_runtime_op = MagicMock(spec=KinetoOperator) + kineto_runtime_op.timestamp = 100 + kineto_runtime_op.tid = 1 + kineto_runtime_op.name = "runtime_op" + + kineto_correlation_cuda_runtime_map = {123: kineto_runtime_op} + + mock_find_closest_op.return_value = kineto_runtime_op + + sorted_kineto_cpu_ops = [MagicMock(spec=KinetoOperator)] + sorted_kineto_cpu_op_ts = [100] + + result = trace_linker.find_parent_cpu_op( + kineto_gpu_op, kineto_correlation_cuda_runtime_map, sorted_kineto_cpu_ops, sorted_kineto_cpu_op_ts + ) + + assert result == kineto_runtime_op + mock_find_closest_op.assert_called_once_with( + kineto_gpu_op, sorted_kineto_cpu_ops, sorted_kineto_cpu_op_ts, kineto_runtime_op.timestamp + ) + + +@pytest.mark.parametrize( + "pytorch_op, kineto_op, expected_linked_gpu_ops, expected_inclusive_dur, expected_exclusive_dur, " + "expected_timestamp, expected_inter_thread_dep", + [ + ( + MagicMock(spec=PyTorchOperator, id=1), + MagicMock( + spec=KinetoOperator, ev_idx="1", inclusive_dur=100, exclusive_dur=50, timestamp=123456, pytorch_op=None + ), + [MagicMock(spec=KinetoOperator)], + 100, + 50, + 123456, + None, + ), + ( + MagicMock(spec=PyTorchOperator, id=2), + MagicMock( + spec=KinetoOperator, ev_idx="2", inclusive_dur=200, exclusive_dur=150, timestamp=223456, pytorch_op=None + ), + [MagicMock(spec=KinetoOperator), MagicMock(spec=KinetoOperator)], + 200, + 150, + 223456, + 42, + ), + ], +) +@patch.object(TraceLinker, "get_inter_thread_dep") +@patch.object(TraceLinker, "link_gpu_ops") +def test_link_ops( + mock_link_gpu_ops, + mock_get_inter_thread_dep, + trace_linker, + pytorch_op, + kineto_op, + expected_linked_gpu_ops, + expected_inclusive_dur, + expected_exclusive_dur, + expected_timestamp, + expected_inter_thread_dep, +): + mock_get_inter_thread_dep.return_value = expected_inter_thread_dep + + cpu_ev_idx_to_gpu_ops_map = {kineto_op.ev_idx: expected_linked_gpu_ops} + kineto_rf_id_to_kineto_op_map = {1: MagicMock(spec=KinetoOperator, pytorch_op=MagicMock(id=42))} + + result = trace_linker.link_ops( + pytorch_op, + kineto_op, + cpu_ev_idx_to_gpu_ops_map, + kineto_rf_id_to_kineto_op_map, + ) + + assert result == ( + expected_linked_gpu_ops, + expected_inclusive_dur, + expected_exclusive_dur, + expected_timestamp, + expected_inter_thread_dep, + ) + mock_link_gpu_ops.assert_called_once_with(pytorch_op, expected_linked_gpu_ops) + + +def test_link_ops_with_no_gpu_ops(trace_linker): + pytorch_op = MagicMock(spec=PyTorchOperator, id=1) + kineto_op = MagicMock( + spec=KinetoOperator, + ev_idx="1", + inclusive_dur=100, + exclusive_dur=50, + timestamp=123456, + pytorch_op=None, + inter_thread_dep=None, + ) + + cpu_ev_idx_to_gpu_ops_map = {} + kineto_rf_id_to_kineto_op_map = {} + + result = trace_linker.link_ops( + pytorch_op, + kineto_op, + cpu_ev_idx_to_gpu_ops_map, + kineto_rf_id_to_kineto_op_map, + ) + + assert result == ([], 100, 50, 123456, None) + + def test_get_inter_thread_dep(trace_linker): kineto_op = MagicMock(spec=KinetoOperator) kineto_op.inter_thread_dep = 1 @@ -177,9 +445,30 @@ def test_link_gpu_ops(trace_linker): # Call the method trace_linker.link_gpu_ops(pytorch_op, kineto_gpu_ops) - # Assert that the parent_pytorch_op_id is set correctly - for gpu_op in kineto_gpu_ops: - assert gpu_op.parent_pytorch_op_id == pytorch_op.id + +@patch("chakra.src.trace_link.trace_linker.TraceLinker.process_op_and_dependents") +@patch("builtins.open", new_callable=MagicMock) +@patch("json.load") +def test_construct_et_plus_data(mock_json_load, mock_open, mock_process_op_and_dependents, trace_linker): + mock_json_load.return_value = {"nodes": [{"id": 1}, {"id": 2}]} + mock_process_op_and_dependents.side_effect = lambda x, *args: [{"id": x["id"] + 2}] + + pytorch_op_id_to_kineto_ops_map = {1: [], 2: []} + pytorch_op_id_to_inclusive_dur_map = {1: 100, 2: 200} + pytorch_op_id_to_exclusive_dur_map = {1: 50, 2: 150} + pytorch_op_id_to_timestamp_map = {1: 1000, 2: 2000} + pytorch_op_id_to_inter_thread_dep_map = {1: None, 2: None} + + pytorch_et_plus_data = trace_linker.construct_et_plus_data( + "path/to/pytorch_et_file", + pytorch_op_id_to_kineto_ops_map, + pytorch_op_id_to_inclusive_dur_map, + pytorch_op_id_to_exclusive_dur_map, + pytorch_op_id_to_timestamp_map, + pytorch_op_id_to_inter_thread_dep_map, + ) + + assert pytorch_et_plus_data["nodes"] == [{"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}] @pytest.mark.parametrize( @@ -270,31 +559,6 @@ def test_process_dependent_gpu_ops(trace_linker, orig_op_id, cpu_op, kineto_gpu_ assert updated_gpu_op["stream"] == kineto_gpu_op_objects[i].stream -@patch("chakra.src.trace_link.trace_linker.TraceLinker.process_op_and_dependents") -@patch("builtins.open", new_callable=MagicMock) -@patch("json.load") -def test_construct_et_plus_data(mock_json_load, mock_open, mock_process_op_and_dependents, trace_linker): - mock_json_load.return_value = {"nodes": [{"id": 1}, {"id": 2}]} - mock_process_op_and_dependents.side_effect = lambda x, *args: [{"id": x["id"] + 2}] - - pytorch_op_id_to_kineto_ops_map = {1: [], 2: []} - pytorch_op_id_to_inclusive_dur_map = {1: 100, 2: 200} - pytorch_op_id_to_exclusive_dur_map = {1: 50, 2: 150} - pytorch_op_id_to_timestamp_map = {1: 1000, 2: 2000} - pytorch_op_id_to_inter_thread_dep_map = {1: None, 2: None} - - pytorch_et_plus_data = trace_linker.construct_et_plus_data( - "path/to/pytorch_et_file", - pytorch_op_id_to_kineto_ops_map, - pytorch_op_id_to_inclusive_dur_map, - pytorch_op_id_to_exclusive_dur_map, - pytorch_op_id_to_timestamp_map, - pytorch_op_id_to_inter_thread_dep_map, - ) - - assert pytorch_et_plus_data["nodes"] == [{"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}] - - @patch("builtins.open", new_callable=MagicMock) @patch("json.dump") def test_dump_pytorch_execution_trace_plus(mock_json_dump, mock_open, trace_linker): @@ -307,205 +571,3 @@ def test_dump_pytorch_execution_trace_plus(mock_json_dump, mock_open, trace_link mock_json_dump.assert_called_once_with( {"nodes": [{"id": 1}, {"id": 2}]}, mock_open.return_value.__enter__(), indent=4 ) - - -def test_add_thread_and_process_annotations(trace_linker): - kineto_cpu_ops = [] - sorted_kineto_cpu_ops = [] - sorted_kineto_cpu_op_ts = [] - kineto_thread_info = {1: (100, 200), 2: (150, 250)} - kineto_process_start_time = 50 - kineto_process_end_time = 300 - - kineto_cpu_ops, sorted_kineto_cpu_ops, sorted_kineto_cpu_op_ts = trace_linker.add_thread_and_process_annotations( - kineto_cpu_ops, - sorted_kineto_cpu_ops, - sorted_kineto_cpu_op_ts, - kineto_thread_info, - kineto_process_start_time, - kineto_process_end_time, - ) - - assert len(kineto_cpu_ops) == 3 - assert kineto_cpu_ops[0].name == EXECUTION_TRACE_PROCESS_ANNOTATION - assert kineto_cpu_ops[1].name == EXECUTION_TRACE_THREAD_ANNOTATION - assert kineto_cpu_ops[2].name == EXECUTION_TRACE_THREAD_ANNOTATION - - assert len(sorted_kineto_cpu_ops) == 3 - assert sorted_kineto_cpu_ops[0].timestamp == kineto_cpu_ops[0].timestamp - assert sorted_kineto_cpu_ops[1].timestamp == kineto_cpu_ops[1].timestamp - assert sorted_kineto_cpu_ops[2].timestamp == kineto_cpu_ops[2].timestamp - - assert len(sorted_kineto_cpu_op_ts) == 3 - assert sorted_kineto_cpu_op_ts[0] == kineto_cpu_ops[0].timestamp - assert sorted_kineto_cpu_op_ts[1] == kineto_cpu_ops[1].timestamp - assert sorted_kineto_cpu_op_ts[2] == kineto_cpu_ops[2].timestamp - - -@patch("chakra.src.trace_link.trace_linker.TraceLinker.find_closest_op") -def test_find_parent_cpu_op(mock_find_closest_op, trace_linker): - kineto_gpu_op = MagicMock(spec=KinetoOperator) - kineto_gpu_op.correlation = 123 - kineto_gpu_op.name = "gpu_op" - - kineto_runtime_op = MagicMock(spec=KinetoOperator) - kineto_runtime_op.timestamp = 100 - kineto_runtime_op.tid = 1 - kineto_runtime_op.name = "runtime_op" - - kineto_correlation_cuda_runtime_map = {123: kineto_runtime_op} - - mock_find_closest_op.return_value = kineto_runtime_op - - sorted_kineto_cpu_ops = [MagicMock(spec=KinetoOperator)] - sorted_kineto_cpu_op_ts = [100] - - result = trace_linker.find_parent_cpu_op( - kineto_gpu_op, kineto_correlation_cuda_runtime_map, sorted_kineto_cpu_ops, sorted_kineto_cpu_op_ts - ) - - assert result == kineto_runtime_op - mock_find_closest_op.assert_called_once_with( - kineto_gpu_op, sorted_kineto_cpu_ops, sorted_kineto_cpu_op_ts, kineto_runtime_op.timestamp - ) - - -def test_group_gpu_ops_by_cpu_launchers(trace_linker): - kineto_gpu_op1 = MagicMock(spec=KinetoOperator) - kineto_gpu_op1.correlation = 123 - kineto_gpu_op1.name = "gpu_op1" - kineto_gpu_op1.timestamp = 150 - kineto_gpu_op1.tid = 1 - - kineto_gpu_op2 = MagicMock(spec=KinetoOperator) - kineto_gpu_op2.correlation = 456 - kineto_gpu_op2.name = "gpu_op2" - kineto_gpu_op2.timestamp = 250 - kineto_gpu_op2.tid = 2 - - kineto_runtime_op1 = MagicMock(spec=KinetoOperator) - kineto_runtime_op1.ev_idx = "cpu_op1" - kineto_runtime_op1.timestamp = 100 - kineto_runtime_op1.tid = 1 - kineto_runtime_op1.name = "runtime_op1" - kineto_runtime_op1.correlation = 123 - - kineto_runtime_op2 = MagicMock(spec=KinetoOperator) - kineto_runtime_op2.ev_idx = "cpu_op2" - kineto_runtime_op2.timestamp = 200 - kineto_runtime_op2.tid = 2 - kineto_runtime_op2.name = "runtime_op2" - kineto_runtime_op2.correlation = 456 - - kineto_correlation_cuda_runtime_map = {123: kineto_runtime_op1, 456: kineto_runtime_op2} - sorted_kineto_cpu_ops = [kineto_runtime_op1, kineto_runtime_op2] - sorted_kineto_cpu_op_ts = [kineto_runtime_op1.timestamp, kineto_runtime_op2.timestamp] - - with patch.object(trace_linker, "find_parent_cpu_op", side_effect=[kineto_runtime_op1, kineto_runtime_op2]): - result = trace_linker.group_gpu_ops_by_cpu_launchers( - [kineto_gpu_op1, kineto_gpu_op2], - kineto_correlation_cuda_runtime_map, - sorted_kineto_cpu_ops, - sorted_kineto_cpu_op_ts, - ) - - assert result == {"cpu_op1": [kineto_gpu_op1], "cpu_op2": [kineto_gpu_op2]} - - -def test_calculate_exclusive_dur(trace_linker): - kineto_op1 = KinetoOperator({"ts": 100, "dur": 10, "inclusive_dur": 10}) - kineto_op2 = KinetoOperator({"ts": 105, "dur": 3, "inclusive_dur": 3}) - kineto_op3 = KinetoOperator({"ts": 108, "dur": 1, "inclusive_dur": 1}) - kineto_tid_cpu_ops_map = {1: [kineto_op1, kineto_op2, kineto_op3]} - - trace_linker.calculate_exclusive_dur(kineto_tid_cpu_ops_map) - - assert kineto_op1.exclusive_dur == 6 # 10 - (3 + 1) - assert kineto_op2.exclusive_dur == 3 - assert kineto_op3.exclusive_dur == 1 - - -@patch("chakra.src.trace_link.trace_linker.TraceLinker.find_last_cpu_node_before_timestamp") -def test_process_thread_inter_thread_order(mock_find_last, trace_linker): - mock_op1 = MagicMock(spec=KinetoOperator) - mock_op1.timestamp = 100 - mock_op1.inclusive_dur = 50 - mock_op1.tid = 1 - mock_op1.name = "op1" - mock_op1.rf_id = 1 - - mock_op2 = MagicMock(spec=KinetoOperator) - mock_op2.timestamp = 200 - mock_op2.inclusive_dur = 50 - mock_op2.tid = 1 - mock_op2.name = "op2" - mock_op2.rf_id = 2 - - ops_by_tid = {1: [mock_op1, mock_op2]} - trace_linker.process_thread_inter_thread_order(1, [mock_op1, mock_op2], ops_by_tid, 1000) - - mock_find_last.assert_called() - - -@patch("concurrent.futures.Future.result") -@patch("chakra.src.trace_link.trace_linker.TraceLinker.process_thread_inter_thread_order") -def test_enforce_inter_thread_order_exception(mock_process_thread, mock_future_result, trace_linker): - mock_future_result.side_effect = Exception("Test Exception") - kineto_tid_cpu_ops_map = {1: [MagicMock(spec=KinetoOperator)]} - - with pytest.raises(Exception, match="Test Exception"): - trace_linker.enforce_inter_thread_order(kineto_tid_cpu_ops_map) - - -def test_extract_pytorch_ops(trace_linker): - mock_root_node = MagicMock(spec=PyTorchOperator) - mock_child_node = MagicMock(spec=PyTorchOperator) - - mock_root_node.children = [mock_child_node] - mock_child_node.children = [] - - mock_root_node.id = 1 - mock_child_node.id = 2 - - result = trace_linker.extract_pytorch_ops(mock_root_node) - assert len(result) == 2 - assert result[0].id == 1 - assert result[1].id == 2 - - -@patch("chakra.src.trace_link.trace_linker.TraceLinker.construct_et_plus_data") -@patch("chakra.src.trace_link.trace_linker.TraceLinker.add_thread_and_process_annotations") -@patch("chakra.src.trace_link.trace_linker.TraceLinker.map_pytorch_to_kineto_ops") -def test_link_traces(mock_map_ops, mock_add_annotations, mock_construct_et_plus, trace_linker): - mock_add_annotations.return_value = ([], [], []) - mock_map_ops.return_value = ({}, {}, {}, {}, {}) - mock_construct_et_plus.return_value = {} - - pytorch_ops = [MagicMock(spec=PyTorchOperator)] - kineto_cpu_ops = [MagicMock(spec=KinetoOperator)] - sorted_kineto_cpu_ops = [MagicMock(spec=KinetoOperator)] - sorted_kineto_cpu_op_ts = [100] - kineto_correlation_cuda_runtime_map = {1: MagicMock(spec=KinetoOperator)} - kineto_rf_id_to_kineto_op_map = {1: MagicMock(spec=KinetoOperator)} - kineto_gpu_ops = [MagicMock(spec=KinetoOperator)] - kineto_thread_info = {1: (100, 200)} - kineto_process_start_time = 50 - kineto_process_end_time = 300 - - trace_linker.link_traces( - "pytorch_et_file", - pytorch_ops, - kineto_cpu_ops, - sorted_kineto_cpu_ops, - sorted_kineto_cpu_op_ts, - kineto_correlation_cuda_runtime_map, - kineto_rf_id_to_kineto_op_map, - kineto_gpu_ops, - kineto_thread_info, - kineto_process_start_time, - kineto_process_end_time, - ) - - mock_add_annotations.assert_called_once() - mock_map_ops.assert_called_once() - mock_construct_et_plus.assert_called_once()