diff --git a/cpp/oneapi/dal/algo/covariance/backend/gpu/compute_kernel_dense_impl_dpc.cpp b/cpp/oneapi/dal/algo/covariance/backend/gpu/compute_kernel_dense_impl_dpc.cpp index 7fdea5cc019..1f5a92e3934 100644 --- a/cpp/oneapi/dal/algo/covariance/backend/gpu/compute_kernel_dense_impl_dpc.cpp +++ b/cpp/oneapi/dal/algo/covariance/backend/gpu/compute_kernel_dense_impl_dpc.cpp @@ -27,6 +27,13 @@ #include "oneapi/dal/backend/primitives/reduction.hpp" #include "oneapi/dal/backend/primitives/stat.hpp" #include "oneapi/dal/backend/primitives/blas.hpp" +#include +#include "oneapi/dal/backend/common.hpp" +#include "oneapi/dal/detail/cpu_info_impl.hpp" +#include "oneapi/dal/detail/error_messages.hpp" +#include "oneapi/dal/detail/parameters/system_parameters_impl.hpp" +#include +#include #ifdef ONEDAL_DATA_PARALLEL @@ -48,6 +55,10 @@ template result_t compute_kernel_dense_impl::operator()(const descriptor_t& desc, const parameters_t& params, const input_t& input) { + using daal::services::Environment; + Environment* env = Environment::getInstance(); + std::cerr << "number of threads = " << static_cast(env->getNumberOfThreads()) << std::endl; + ONEDAL_PROFILER_TASK(compute_covariance_kernel_dense); ONEDAL_ASSERT(input.get_data().has_data()); const auto data = input.get_data(); @@ -57,14 +68,21 @@ result_t compute_kernel_dense_impl::operator()(const descriptor_t& desc, auto rows_count_global = row_count; const std::int64_t column_count = data.get_column_count(); ONEDAL_ASSERT(column_count > 0); + // int flag = 100; + // comm_.is_init(&flag); + // std::cout << "MPI_initialized = " << flag << std::endl; auto bias = desc.get_bias(); auto assume_centered = desc.get_assume_centered(); auto result = compute_result{}.set_result_options(desc.get_result_options()); - const auto data_nd = pr::table2ndarray(q_, data, alloc::device); + { + ONEDAL_PROFILER_TASK(allreduce_rows_count_global); + comm_.allreduce(rows_count_global, spmd::reduce_op::sum).wait(); + } + auto [sums, sums_event] = compute_sums(q_, data_nd, assume_centered, {}); { @@ -85,10 +103,6 @@ result_t compute_kernel_dense_impl::operator()(const descriptor_t& desc, comm_.allreduce(xtx.flatten(q_, { gemm_event }), spmd::reduce_op::sum).wait(); } - { - ONEDAL_PROFILER_TASK(allreduce_rows_count_global); - comm_.allreduce(rows_count_global, spmd::reduce_op::sum).wait(); - } if (desc.get_result_options().test(result_options::cov_matrix)) { auto [cov, cov_event] = compute_covariance(q_, @@ -98,28 +112,43 @@ result_t compute_kernel_dense_impl::operator()(const descriptor_t& desc, bias, assume_centered, { gemm_event }); - result.set_cov_matrix( - (homogen_table::wrap(cov.flatten(q_, { cov_event }), column_count, column_count))); + { + ONEDAL_PROFILER_TASK(cov_flatten, q_); + result.set_cov_matrix( + (homogen_table::wrap(cov.flatten(q_, { cov_event }), column_count, column_count))); + } } if (desc.get_result_options().test(result_options::cor_matrix)) { auto [corr, corr_event] = compute_correlation(q_, rows_count_global, xtx, sums, { gemm_event }); - result.set_cor_matrix( - (homogen_table::wrap(corr.flatten(q_, { corr_event }), column_count, column_count))); + { + ONEDAL_PROFILER_TASK(corr_flatten, q_); + result.set_cor_matrix( + (homogen_table::wrap(corr.flatten(q_, { corr_event }), column_count, column_count))); + } } if (desc.get_result_options().test(result_options::means)) { if (!assume_centered) { auto [means, means_event] = compute_means(q_, sums, rows_count_global, { gemm_event }); - result.set_means( - homogen_table::wrap(means.flatten(q_, { means_event }), 1, column_count)); + { + ONEDAL_PROFILER_TASK(means_flatten, q_); + result.set_means( + homogen_table::wrap(means.flatten(q_, { means_event }), 1, column_count)); + } } else { auto [zero_means, zeros_event] = pr::ndarray::zeros(q_, { column_count }, sycl::usm::alloc::device); - result.set_means( - homogen_table::wrap(zero_means.flatten(q_, { zeros_event }), 1, column_count)); + { + ONEDAL_PROFILER_TASK(zero_means_flatten, q_); + result.set_means( + homogen_table::wrap(zero_means.flatten(q_, { zeros_event }), 1, column_count)); + } } } + + // comm_.is_init(&flag); + // std::cout << "MPI_initialized = " << flag << std::endl; return result; } diff --git a/cpp/oneapi/dal/backend/communicator.cpp b/cpp/oneapi/dal/backend/communicator.cpp index d0f7924864f..b959e99b026 100644 --- a/cpp/oneapi/dal/backend/communicator.cpp +++ b/cpp/oneapi/dal/backend/communicator.cpp @@ -96,6 +96,8 @@ class fake_spmd_communicator_host_impl : public spmd::communicator_iface_base { void barrier() override {} + // void is_init(int * flag) override {} + request_t* bcast(byte_t* send_buf, std::int64_t count, const data_type& dtype, @@ -167,6 +169,8 @@ class fake_spmd_communicator_device_impl : public spmd::communicator_iface { void barrier() override {} + // void is_init(int * flag) override {} + request_t* bcast(byte_t* send_buf, std::int64_t count, const data_type& dtype, diff --git a/cpp/oneapi/dal/backend/communicator.hpp b/cpp/oneapi/dal/backend/communicator.hpp index b4fe473efe0..f182190362c 100644 --- a/cpp/oneapi/dal/backend/communicator.hpp +++ b/cpp/oneapi/dal/backend/communicator.hpp @@ -92,6 +92,10 @@ class communicator { public_comm_.barrier(); } + // void is_init(int * flag) const { + // public_comm_.is_init(flag); + // } + template communicator_event bcast(Args&&... args) const { return public_comm_.bcast(std::forward(args)...); diff --git a/cpp/oneapi/dal/backend/primitives/utils.hpp b/cpp/oneapi/dal/backend/primitives/utils.hpp index 54e7c18d951..299ad7f7700 100644 --- a/cpp/oneapi/dal/backend/primitives/utils.hpp +++ b/cpp/oneapi/dal/backend/primitives/utils.hpp @@ -22,6 +22,7 @@ #include "oneapi/dal/table/common.hpp" #include "oneapi/dal/table/row_accessor.hpp" +#include "oneapi/dal/detail/profiler.hpp" namespace oneapi::dal::backend::primitives { @@ -119,6 +120,7 @@ template inline ndarray table2ndarray(sycl::queue& q, const table& table, sycl::usm::alloc alloc = sycl::usm::alloc::shared) { + ONEDAL_PROFILER_TASK(table2ndarray, q); [[maybe_unused]] const auto layout = table.get_data_layout(); if constexpr (order == ndorder::c) { ONEDAL_ASSERT(layout == decltype(layout)::row_major); @@ -150,6 +152,7 @@ template inline ndarray table2ndarray_1d(sycl::queue& q, const table& table, sycl::usm::alloc alloc = sycl::usm::alloc::shared) { + ONEDAL_PROFILER_TASK(table2ndarray_1d, q); row_accessor accessor{ table }; const auto data = accessor.pull(q, { 0, -1 }, alloc); return ndarray::wrap(data, { data.get_count() }); diff --git a/cpp/oneapi/dal/backend/transfer_dpc.cpp b/cpp/oneapi/dal/backend/transfer_dpc.cpp index 6f772e96c56..2a751be64ab 100644 --- a/cpp/oneapi/dal/backend/transfer_dpc.cpp +++ b/cpp/oneapi/dal/backend/transfer_dpc.cpp @@ -95,41 +95,42 @@ sycl::event scatter_host2device(sycl::queue& q, ONEDAL_ASSERT(is_known_usm(q, dst_device)); ONEDAL_ASSERT_MUL_OVERFLOW(std::int64_t, block_count, block_size_in_bytes); - const auto gathered_device_unique = - make_unique_usm_device(q, block_count * block_size_in_bytes); + // const auto gathered_device_unique = + // make_unique_usm_device(q, block_count * block_size_in_bytes); auto copy_event = memcpy_host2usm(q, - gathered_device_unique.get(), + dst_device, src_host, block_count * block_size_in_bytes, deps); - - auto scatter_event = q.submit([&](sycl::handler& cgh) { - cgh.depends_on(copy_event); - - const byte_t* const gathered_byte = - reinterpret_cast(gathered_device_unique.get()); - byte_t* const dst_byte = reinterpret_cast(dst_device); - - const std::int64_t required_local_size = bk::device_max_wg_size(q); - const std::int64_t local_size = std::min(down_pow2(block_count), required_local_size); - const auto range = make_multiple_nd_range_1d(block_count, local_size); - - cgh.parallel_for(range, [=](sycl::nd_item<1> id) { - const auto i = id.get_global_id(); - if (i < block_count) { - // TODO: Unroll for optimization - for (std::int64_t j = 0; j < block_size_in_bytes; ++j) { - dst_byte[i * dst_stride_in_bytes + j] = - gathered_byte[i * block_size_in_bytes + j]; - } - } - }); - }); - - // We need to wait until scatter kernel is completed to deallocate - // `gathered_device_unique` - scatter_event.wait_and_throw(); + copy_event.wait_and_throw(); + // auto scatter_event = q.submit([&](sycl::handler& cgh) { + // cgh.depends_on(copy_event); + + // const byte_t* const gathered_byte = + // reinterpret_cast(gathered_device_unique.get()); + // byte_t* const dst_byte = reinterpret_cast(dst_device); + + // const std::int64_t required_local_size = bk::device_max_wg_size(q); + // const std::int64_t local_size = std::min(down_pow2(block_count), required_local_size); + // const auto range = make_multiple_nd_range_1d(block_count, local_size); + + // cgh.parallel_for(range, [=](sycl::nd_item<1> id) { + // const auto i = id.get_global_id(); + // if (i < block_count) { + // // TODO: Unroll for optimization + // //#pragma unroll + // for (std::int64_t j = 0; j < block_size_in_bytes; ++j) { + // dst_byte[i * dst_stride_in_bytes + j] = + // gathered_byte[i * block_size_in_bytes + j]; + // } + // } + // }); + // }); + + // // We need to wait until scatter kernel is completed to deallocate + // // `gathered_device_unique` + // scatter_event.wait_and_throw(); return sycl::event{}; } diff --git a/cpp/oneapi/dal/detail/ccl/communicator.hpp b/cpp/oneapi/dal/detail/ccl/communicator.hpp index d78d7fc8ddf..27e74dc1b0b 100644 --- a/cpp/oneapi/dal/detail/ccl/communicator.hpp +++ b/cpp/oneapi/dal/detail/ccl/communicator.hpp @@ -341,6 +341,11 @@ class ccl_communicator_impl : public ccl_interface_selector::t ccl::barrier(host_comm_->get_ref()).wait(); } + // void is_init(int * flag) override { + // flag = 0; + // ccl::barrier(host_comm_->get_ref()).wait(); + // } + spmd::request_iface* bcast(byte_t* send_buf, std::int64_t count, const data_type& dtype, diff --git a/cpp/oneapi/dal/detail/mpi/communicator.hpp b/cpp/oneapi/dal/detail/mpi/communicator.hpp index 28f6f7589ba..23274e3f0f7 100644 --- a/cpp/oneapi/dal/detail/mpi/communicator.hpp +++ b/cpp/oneapi/dal/detail/mpi/communicator.hpp @@ -213,6 +213,10 @@ class mpi_communicator_impl : public via_host_interface_selector namespace oneapi::dal::detail { + +profiler::profiler() { + start_time = get_time(); +} + +profiler::~profiler() { + auto end_time = get_time(); + auto total_time = end_time - start_time; + std::cerr << "KERNEL_PROFILER: total time " << total_time / 1e6 << std::endl; +} + +std::uint64_t profiler::get_time() { + struct timespec t; + clock_gettime(CLOCK_MONOTONIC, &t); + return t.tv_sec * 1000000000 + t.tv_nsec; +} + +profiler* profiler::get_instance() { + static profiler instance; + return &instance; +} + +task& profiler::get_task() { + return task_; +} + +#ifdef ONEDAL_DATA_PARALLEL +sycl::queue& profiler::get_queue() { + return queue_; +} + +void profiler::set_queue(const sycl::queue& q) { + queue_ = q; +} +#endif + profiler_task profiler::start_task(const char* task_name) { + auto ns_start = get_time(); + auto& tasks_info = get_instance()->get_task(); + tasks_info.time_kernels[tasks_info.current_kernel] = ns_start; + tasks_info.current_kernel++; return profiler_task(task_name); } -void profiler::end_task(const char* task_name) {} +void profiler::end_task(const char* task_name) { + const std::uint64_t ns_end = get_time(); + auto& tasks_info = get_instance()->get_task(); +#ifdef ONEDAL_DATA_PARALLEL + auto& queue = get_instance()->get_queue(); + queue.wait_and_throw(); +#endif + tasks_info.current_kernel--; + const std::uint64_t times = ns_end - tasks_info.time_kernels[tasks_info.current_kernel]; -profiler_task::profiler_task(const char* task_name) : task_name_(task_name) {} + auto it = tasks_info.kernels.find(task_name); + if (it == tasks_info.kernels.end()) { + tasks_info.kernels.insert({ task_name, times }); + } + else { + it->second += times; + } + std::cerr << "KERNEL_PROFILER: " << std::string(task_name) << " " << times / 1e6 << std::endl; +} #ifdef ONEDAL_DATA_PARALLEL -profiler_task profiler::start_task(const char* task_name, const sycl::queue& task_queue) { +profiler_task profiler::start_task(const char* task_name, sycl::queue& task_queue) { + task_queue.wait_and_throw(); + get_instance()->set_queue(task_queue); + auto ns_start = get_time(); + auto& tasks_info = get_instance()->get_task(); + tasks_info.time_kernels[tasks_info.current_kernel] = ns_start; + tasks_info.current_kernel++; return profiler_task(task_name, task_queue); } + + profiler_task::profiler_task(const char* task_name, const sycl::queue& task_queue) : task_name_(task_name), - task_queue_(task_queue) {} + task_queue_(task_queue), + has_queue_(true) {} + #endif +profiler_task::profiler_task(const char* task_name) + : task_name_(task_name) {} + profiler_task::~profiler_task() { + #ifdef ONEDAL_DATA_PARALLEL + if (has_queue_) + task_queue_.wait_and_throw(); + #endif // ONEDAL_DATA_PARALLEL profiler::end_task(task_name_); } diff --git a/cpp/oneapi/dal/detail/profiler.hpp b/cpp/oneapi/dal/detail/profiler.hpp index cfda588d547..3eacba0ee63 100644 --- a/cpp/oneapi/dal/detail/profiler.hpp +++ b/cpp/oneapi/dal/detail/profiler.hpp @@ -19,6 +19,14 @@ #ifdef ONEDAL_DATA_PARALLEL #include #endif + + +#include +#include +#include +#include +#include +#include #define ONEDAL_PROFILER_CONCAT2(x, y) x##y #define ONEDAL_PROFILER_CONCAT(x, y) ONEDAL_PROFILER_CONCAT2(x, y) @@ -39,6 +47,16 @@ namespace oneapi::dal::detail { + + +struct task { + static const std::uint64_t MAX_KERNELS = 256; + std::map kernels; + std::uint64_t current_kernel = 0; + std::uint64_t time_kernels[MAX_KERNELS]; + void clear(); +}; + class profiler_task { public: profiler_task(const char* task_name); @@ -51,16 +69,34 @@ class profiler_task { const char* task_name_; #ifdef ONEDAL_DATA_PARALLEL sycl::queue task_queue_; + bool has_queue_; #endif }; class profiler { public: + profiler(); + ~profiler(); static profiler_task start_task(const char* task_name); + static std::uint64_t get_time(); + static profiler* get_instance(); + task& get_task(); + #ifdef ONEDAL_DATA_PARALLEL - static profiler_task start_task(const char* task_name, const sycl::queue& task_queue); + sycl::queue& get_queue(); + void set_queue(const sycl::queue& q); + + + static profiler_task start_task(const char* task_name, sycl::queue& task_queue); #endif static void end_task(const char* task_name); + +private: + std::uint64_t start_time; + task task_; +#ifdef ONEDAL_DATA_PARALLEL + sycl::queue queue_; +#endif }; } // namespace oneapi::dal::detail diff --git a/cpp/oneapi/dal/spmd/communicator.hpp b/cpp/oneapi/dal/spmd/communicator.hpp index b5eebf27ef7..29fc99509a2 100644 --- a/cpp/oneapi/dal/spmd/communicator.hpp +++ b/cpp/oneapi/dal/spmd/communicator.hpp @@ -71,6 +71,8 @@ class communicator_iface_base { virtual void barrier() = 0; + // virtual void is_init(int * flag) = 0; + virtual request_iface* bcast(byte_t* send_buf, std::int64_t count, const data_type& dtype, @@ -107,6 +109,7 @@ class communicator_iface : public communicator_iface_base { using base_t::get_rank_count; using base_t::get_default_root_rank; using base_t::barrier; + // using base_t::is_init; using base_t::bcast; using base_t::allgatherv; using base_t::allreduce; @@ -190,6 +193,12 @@ class communicator : public base { impl_->barrier(); } + + // void is_init(int * flag) const { + // wait_for_exception_handling(); + // impl_->is_init(flag); + // } + /// Broadcasts a message from the `root` rank to all other ranks /// /// @param buf The buffer which content is broadcasted diff --git a/cpp/oneapi/dal/table/backend/convert.cpp b/cpp/oneapi/dal/table/backend/convert.cpp index 1830d00c432..b17fcca3281 100644 --- a/cpp/oneapi/dal/table/backend/convert.cpp +++ b/cpp/oneapi/dal/table/backend/convert.cpp @@ -20,7 +20,8 @@ #include "oneapi/dal/backend/dispatcher.hpp" #include "oneapi/dal/backend/transfer.hpp" #include "oneapi/dal/backend/interop/data_conversion.hpp" - +#include +#include "oneapi/dal/detail/profiler.hpp" namespace oneapi::dal::backend { static void convert_vector(const void* src, @@ -30,6 +31,7 @@ static void convert_vector(const void* src, std::int64_t src_stride, std::int64_t dst_stride, std::int64_t element_count) { + std::cout<<"here1"<(src_stride); const int dst_stride_int = dal::detail::integral_cast(dst_stride); const int element_count_int = dal::detail::integral_cast(element_count); @@ -193,6 +199,7 @@ sycl::event convert_vector_device2device(sycl::queue& q, std::int64_t dst_stride, std::int64_t element_count, const event_vector& deps) { + std::cout<<"here6"< 0); @@ -265,6 +273,7 @@ sycl::event convert_vector_host2device(sycl::queue& q, std::int64_t dst_stride, std::int64_t element_count, const std::vector& deps) { + std::cout<<"here8"< 0); @@ -294,6 +303,7 @@ sycl::event convert_vector_host2device(sycl::queue& q, const std::int64_t max_loop_range = std::numeric_limits::max(); sycl::event scatter_event; if (element_count > max_loop_range) { + std::cout<<"here elem max"< Device convert_vector_device2device(q, src, @@ -350,6 +364,7 @@ void convert_vector(const detail::data_parallel_policy& policy, .wait_and_throw(); } else if (src_device_friendly) { + std::cout<<"here22"< Host convert_vector_device2host(q, src, @@ -362,6 +377,7 @@ void convert_vector(const detail::data_parallel_policy& policy, .wait_and_throw(); } else if (dst_device_friendly) { + std::cout<<"here23"< Device convert_vector_host2device(q, src, @@ -374,6 +390,7 @@ void convert_vector(const detail::data_parallel_policy& policy, .wait_and_throw(); } else { + std::cout<<"here24"< Host convert_vector(detail::default_host_policy{}, src, @@ -398,6 +415,7 @@ sycl::event convert_matrix_host2device(sycl::queue& q, const std::int64_t dst_col_stride, const std::int64_t dst_row_count, const std::int64_t dst_col_count) { + std::cout<<"here12"< 0); @@ -439,6 +457,7 @@ void convert_matrix(const detail::data_parallel_policy& policy, const std::int64_t dst_col_stride, const std::int64_t dst_row_count, const std::int64_t dst_col_count) { + std::cout<<"here13"< namespace oneapi::dal::backend { struct block_info { @@ -95,6 +96,7 @@ static void pull_row_major_impl(const Policy& policy, array& block_data, alloc_kind requested_alloc_kind, bool preserve_mutability) { + ONEDAL_PROFILER_TASK(pull_row_major_impl); constexpr std::int64_t block_dtype_size = sizeof(BlockData); const auto origin_dtype_size = origin_info.get_data_type_size(); const auto block_dtype = detail::make_data_type(); @@ -117,6 +119,7 @@ static void pull_row_major_impl(const Policy& policy, const bool block_has_mutable_data = block_data.has_mutable_data(); if (contiguous_block_requested && same_data_type && nocopy_alloc_kind) { + std::cout<<"if &&& line 122"< 1) { + std::cout<<"if line 139"<& block_data, alloc_kind requested_alloc_kind, bool preserve_mutability) { + ONEDAL_PROFILER_TASK(pull_column_major_impl); constexpr std::int64_t block_dtype_size = sizeof(BlockData); const auto origin_dtype_size = origin_info.get_data_type_size(); const auto block_dtype = detail::make_data_type(); @@ -186,6 +193,7 @@ static void pull_column_major_impl(const Policy& policy, const bool block_has_mutable_data = block_data.has_mutable_data(); if (!block_has_enough_space || !block_has_mutable_data || !nocopy_alloc_kind) { + std::cout<<"if &&& line 196"< +#include "oneapi/dal/detail/profiler.hpp" namespace oneapi::dal::backend { @@ -106,6 +108,7 @@ class homogen_table_impl : public detail::homogen_table_template& block, const range& rows, sycl::usm::alloc alloc) const { + ONEDAL_PROFILER_TASK(pull_rows_template, policy.get_queue()); homogen_pull_rows(policy, get_info(), data_, block, rows, alloc_kind_from_sycl(alloc)); } #endif diff --git a/cpp/oneapi/dal/table/row_accessor.hpp b/cpp/oneapi/dal/table/row_accessor.hpp index e7d89f685d9..3270eaabfd4 100644 --- a/cpp/oneapi/dal/table/row_accessor.hpp +++ b/cpp/oneapi/dal/table/row_accessor.hpp @@ -18,6 +18,7 @@ #include "oneapi/dal/table/detail/table_utils.hpp" #include "oneapi/dal/table/detail/table_builder.hpp" +#include "oneapi/dal/detail/profiler.hpp" namespace oneapi::dal { namespace v1 { @@ -88,6 +89,7 @@ class row_accessor { dal::array pull(sycl::queue& queue, const range& row_range = { 0, -1 }, const sycl::usm::alloc& alloc = sycl::usm::alloc::shared) const { + ONEDAL_PROFILER_TASK(pull_1, queue); dal::array block; pull(queue, block, row_range, alloc); return block; @@ -136,6 +138,7 @@ class row_accessor { dal::array& block, const range& row_range = { 0, -1 }, const sycl::usm::alloc& alloc = sycl::usm::alloc::shared) const { + ONEDAL_PROFILER_TASK(pull_4args, queue); pull_iface_->pull_rows(detail::data_parallel_policy{ queue }, block, row_range, alloc); return get_block_data(block); } diff --git a/examples/oneapi/dpc/source/covariance/cor_dense_batch.cpp b/examples/oneapi/dpc/source/covariance/cor_dense_batch.cpp index 1f8eaf81cca..ef2f5fef6a9 100644 --- a/examples/oneapi/dpc/source/covariance/cor_dense_batch.cpp +++ b/examples/oneapi/dpc/source/covariance/cor_dense_batch.cpp @@ -30,7 +30,7 @@ namespace dal = oneapi::dal; void run(sycl::queue &q) { const auto input_file_name = get_data_path("covcormoments_dense.csv"); - const auto input = dal::read(q, dal::csv::data_source{ input_file_name }); + const auto input = dal::read(dal::csv::data_source{ input_file_name }); const auto cov_desc = dal::covariance::descriptor{}.set_result_options( dal::covariance::result_options::cor_matrix | dal::covariance::result_options::means);