Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev/asolovev add comm profiling #2957

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@
#include "oneapi/dal/backend/primitives/reduction.hpp"
#include "oneapi/dal/backend/primitives/stat.hpp"
#include "oneapi/dal/backend/primitives/blas.hpp"
#include <iostream>
#include "oneapi/dal/backend/common.hpp"
#include "oneapi/dal/detail/cpu_info_impl.hpp"
#include "oneapi/dal/detail/error_messages.hpp"
#include "oneapi/dal/detail/parameters/system_parameters_impl.hpp"
#include <daal/src/services/service_defines.h>
#include <daal/include/services/internal/daal_kernel_defines.h>

#ifdef ONEDAL_DATA_PARALLEL

Expand All @@ -48,6 +55,10 @@ template <typename Float>
result_t compute_kernel_dense_impl<Float>::operator()(const descriptor_t& desc,
const parameters_t& params,
const input_t& input) {
using daal::services::Environment;
Environment* env = Environment::getInstance();
std::cerr << "number of threads = " << static_cast<std::uint32_t>(env->getNumberOfThreads()) << std::endl;
ONEDAL_PROFILER_TASK(compute_covariance_kernel_dense);
ONEDAL_ASSERT(input.get_data().has_data());

const auto data = input.get_data();
Expand All @@ -57,14 +68,21 @@ result_t compute_kernel_dense_impl<Float>::operator()(const descriptor_t& desc,
auto rows_count_global = row_count;
const std::int64_t column_count = data.get_column_count();
ONEDAL_ASSERT(column_count > 0);
// int flag = 100;
// comm_.is_init(&flag);
// std::cout << "MPI_initialized = " << flag << std::endl;

auto bias = desc.get_bias();
auto assume_centered = desc.get_assume_centered();

auto result = compute_result<task_t>{}.set_result_options(desc.get_result_options());

const auto data_nd = pr::table2ndarray<Float>(q_, data, alloc::device);

{
ONEDAL_PROFILER_TASK(allreduce_rows_count_global);
comm_.allreduce(rows_count_global, spmd::reduce_op::sum).wait();
}

auto [sums, sums_event] = compute_sums(q_, data_nd, assume_centered, {});

{
Expand All @@ -85,10 +103,6 @@ result_t compute_kernel_dense_impl<Float>::operator()(const descriptor_t& desc,
comm_.allreduce(xtx.flatten(q_, { gemm_event }), spmd::reduce_op::sum).wait();
}

{
ONEDAL_PROFILER_TASK(allreduce_rows_count_global);
comm_.allreduce(rows_count_global, spmd::reduce_op::sum).wait();
}

if (desc.get_result_options().test(result_options::cov_matrix)) {
auto [cov, cov_event] = compute_covariance(q_,
Expand All @@ -98,28 +112,43 @@ result_t compute_kernel_dense_impl<Float>::operator()(const descriptor_t& desc,
bias,
assume_centered,
{ gemm_event });
result.set_cov_matrix(
(homogen_table::wrap(cov.flatten(q_, { cov_event }), column_count, column_count)));
{
ONEDAL_PROFILER_TASK(cov_flatten, q_);
result.set_cov_matrix(
(homogen_table::wrap(cov.flatten(q_, { cov_event }), column_count, column_count)));
}
}
if (desc.get_result_options().test(result_options::cor_matrix)) {
auto [corr, corr_event] =
compute_correlation(q_, rows_count_global, xtx, sums, { gemm_event });
result.set_cor_matrix(
(homogen_table::wrap(corr.flatten(q_, { corr_event }), column_count, column_count)));
{
ONEDAL_PROFILER_TASK(corr_flatten, q_);
result.set_cor_matrix(
(homogen_table::wrap(corr.flatten(q_, { corr_event }), column_count, column_count)));
}
}
if (desc.get_result_options().test(result_options::means)) {
if (!assume_centered) {
auto [means, means_event] = compute_means(q_, sums, rows_count_global, { gemm_event });
result.set_means(
homogen_table::wrap(means.flatten(q_, { means_event }), 1, column_count));
{
ONEDAL_PROFILER_TASK(means_flatten, q_);
result.set_means(
homogen_table::wrap(means.flatten(q_, { means_event }), 1, column_count));
}
}
else {
auto [zero_means, zeros_event] =
pr::ndarray<Float, 1>::zeros(q_, { column_count }, sycl::usm::alloc::device);
result.set_means(
homogen_table::wrap(zero_means.flatten(q_, { zeros_event }), 1, column_count));
{
ONEDAL_PROFILER_TASK(zero_means_flatten, q_);
result.set_means(
homogen_table::wrap(zero_means.flatten(q_, { zeros_event }), 1, column_count));
}
}
}

// comm_.is_init(&flag);
// std::cout << "MPI_initialized = " << flag << std::endl;
return result;
}

Expand Down
4 changes: 4 additions & 0 deletions cpp/oneapi/dal/backend/communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ class fake_spmd_communicator_host_impl : public spmd::communicator_iface_base {

void barrier() override {}

// void is_init(int * flag) override {}

request_t* bcast(byte_t* send_buf,
std::int64_t count,
const data_type& dtype,
Expand Down Expand Up @@ -167,6 +169,8 @@ class fake_spmd_communicator_device_impl : public spmd::communicator_iface {

void barrier() override {}

// void is_init(int * flag) override {}

request_t* bcast(byte_t* send_buf,
std::int64_t count,
const data_type& dtype,
Expand Down
4 changes: 4 additions & 0 deletions cpp/oneapi/dal/backend/communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ class communicator {
public_comm_.barrier();
}

// void is_init(int * flag) const {
// public_comm_.is_init(flag);
// }

template <typename... Args>
communicator_event bcast(Args&&... args) const {
return public_comm_.bcast(std::forward<Args>(args)...);
Expand Down
3 changes: 3 additions & 0 deletions cpp/oneapi/dal/backend/primitives/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "oneapi/dal/table/common.hpp"
#include "oneapi/dal/table/row_accessor.hpp"
#include "oneapi/dal/detail/profiler.hpp"

namespace oneapi::dal::backend::primitives {

Expand Down Expand Up @@ -119,6 +120,7 @@ template <typename Type, ndorder order = ndorder::c>
inline ndarray<Type, 2, order> table2ndarray(sycl::queue& q,
const table& table,
sycl::usm::alloc alloc = sycl::usm::alloc::shared) {
ONEDAL_PROFILER_TASK(table2ndarray, q);
[[maybe_unused]] const auto layout = table.get_data_layout();
if constexpr (order == ndorder::c) {
ONEDAL_ASSERT(layout == decltype(layout)::row_major);
Expand Down Expand Up @@ -150,6 +152,7 @@ template <typename Type>
inline ndarray<Type, 1> table2ndarray_1d(sycl::queue& q,
const table& table,
sycl::usm::alloc alloc = sycl::usm::alloc::shared) {
ONEDAL_PROFILER_TASK(table2ndarray_1d, q);
row_accessor<const Type> accessor{ table };
const auto data = accessor.pull(q, { 0, -1 }, alloc);
return ndarray<Type, 1>::wrap(data, { data.get_count() });
Expand Down
61 changes: 31 additions & 30 deletions cpp/oneapi/dal/backend/transfer_dpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,41 +95,42 @@ sycl::event scatter_host2device(sycl::queue& q,
ONEDAL_ASSERT(is_known_usm(q, dst_device));
ONEDAL_ASSERT_MUL_OVERFLOW(std::int64_t, block_count, block_size_in_bytes);

const auto gathered_device_unique =
make_unique_usm_device(q, block_count * block_size_in_bytes);
// const auto gathered_device_unique =
// make_unique_usm_device(q, block_count * block_size_in_bytes);

auto copy_event = memcpy_host2usm(q,
gathered_device_unique.get(),
dst_device,
src_host,
block_count * block_size_in_bytes,
deps);

auto scatter_event = q.submit([&](sycl::handler& cgh) {
cgh.depends_on(copy_event);

const byte_t* const gathered_byte =
reinterpret_cast<const byte_t*>(gathered_device_unique.get());
byte_t* const dst_byte = reinterpret_cast<byte_t*>(dst_device);

const std::int64_t required_local_size = bk::device_max_wg_size(q);
const std::int64_t local_size = std::min(down_pow2(block_count), required_local_size);
const auto range = make_multiple_nd_range_1d(block_count, local_size);

cgh.parallel_for(range, [=](sycl::nd_item<1> id) {
const auto i = id.get_global_id();
if (i < block_count) {
// TODO: Unroll for optimization
for (std::int64_t j = 0; j < block_size_in_bytes; ++j) {
dst_byte[i * dst_stride_in_bytes + j] =
gathered_byte[i * block_size_in_bytes + j];
}
}
});
});

// We need to wait until scatter kernel is completed to deallocate
// `gathered_device_unique`
scatter_event.wait_and_throw();
copy_event.wait_and_throw();
// auto scatter_event = q.submit([&](sycl::handler& cgh) {
// cgh.depends_on(copy_event);

// const byte_t* const gathered_byte =
// reinterpret_cast<const byte_t*>(gathered_device_unique.get());
// byte_t* const dst_byte = reinterpret_cast<byte_t*>(dst_device);

// const std::int64_t required_local_size = bk::device_max_wg_size(q);
// const std::int64_t local_size = std::min(down_pow2(block_count), required_local_size);
// const auto range = make_multiple_nd_range_1d(block_count, local_size);

// cgh.parallel_for(range, [=](sycl::nd_item<1> id) {
// const auto i = id.get_global_id();
// if (i < block_count) {
// // TODO: Unroll for optimization
// //#pragma unroll
// for (std::int64_t j = 0; j < block_size_in_bytes; ++j) {
// dst_byte[i * dst_stride_in_bytes + j] =
// gathered_byte[i * block_size_in_bytes + j];
// }
// }
// });
// });

// // We need to wait until scatter kernel is completed to deallocate
// // `gathered_device_unique`
// scatter_event.wait_and_throw();

return sycl::event{};
}
Expand Down
5 changes: 5 additions & 0 deletions cpp/oneapi/dal/detail/ccl/communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,11 @@ class ccl_communicator_impl : public ccl_interface_selector<MemoryAccessKind>::t
ccl::barrier(host_comm_->get_ref()).wait();
}

// void is_init(int * flag) override {
// flag = 0;
// ccl::barrier(host_comm_->get_ref()).wait();
// }

spmd::request_iface* bcast(byte_t* send_buf,
std::int64_t count,
const data_type& dtype,
Expand Down
4 changes: 4 additions & 0 deletions cpp/oneapi/dal/detail/mpi/communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ class mpi_communicator_impl : public via_host_interface_selector<MemoryAccessKin
mpi_call(MPI_Barrier(mpi_comm_));
}

// void is_init(int * flag) override {
// mpi_call(MPI_Initialized(flag));
// }

spmd::request_iface* bcast(byte_t* send_buf,
std::int64_t count,
const data_type& dtype,
Expand Down
82 changes: 78 additions & 4 deletions cpp/oneapi/dal/detail/profiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,101 @@
*******************************************************************************/

#include "oneapi/dal/detail/profiler.hpp"
#include <iostream>

namespace oneapi::dal::detail {

profiler::profiler() {
start_time = get_time();
}

profiler::~profiler() {
auto end_time = get_time();
auto total_time = end_time - start_time;
std::cerr << "KERNEL_PROFILER: total time " << total_time / 1e6 << std::endl;
}

std::uint64_t profiler::get_time() {
struct timespec t;
clock_gettime(CLOCK_MONOTONIC, &t);
return t.tv_sec * 1000000000 + t.tv_nsec;
}

profiler* profiler::get_instance() {
static profiler instance;
return &instance;
}

task& profiler::get_task() {
return task_;
}

#ifdef ONEDAL_DATA_PARALLEL
sycl::queue& profiler::get_queue() {
return queue_;
}

void profiler::set_queue(const sycl::queue& q) {
queue_ = q;
}
#endif

profiler_task profiler::start_task(const char* task_name) {
auto ns_start = get_time();
auto& tasks_info = get_instance()->get_task();
tasks_info.time_kernels[tasks_info.current_kernel] = ns_start;
tasks_info.current_kernel++;
return profiler_task(task_name);
}

void profiler::end_task(const char* task_name) {}
void profiler::end_task(const char* task_name) {
const std::uint64_t ns_end = get_time();
auto& tasks_info = get_instance()->get_task();
#ifdef ONEDAL_DATA_PARALLEL
auto& queue = get_instance()->get_queue();
queue.wait_and_throw();
#endif
tasks_info.current_kernel--;
const std::uint64_t times = ns_end - tasks_info.time_kernels[tasks_info.current_kernel];

profiler_task::profiler_task(const char* task_name) : task_name_(task_name) {}
auto it = tasks_info.kernels.find(task_name);
if (it == tasks_info.kernels.end()) {
tasks_info.kernels.insert({ task_name, times });
}
else {
it->second += times;
}
std::cerr << "KERNEL_PROFILER: " << std::string(task_name) << " " << times / 1e6 << std::endl;
}

#ifdef ONEDAL_DATA_PARALLEL
profiler_task profiler::start_task(const char* task_name, const sycl::queue& task_queue) {
profiler_task profiler::start_task(const char* task_name, sycl::queue& task_queue) {
task_queue.wait_and_throw();
get_instance()->set_queue(task_queue);
auto ns_start = get_time();
auto& tasks_info = get_instance()->get_task();
tasks_info.time_kernels[tasks_info.current_kernel] = ns_start;
tasks_info.current_kernel++;
return profiler_task(task_name, task_queue);
}



profiler_task::profiler_task(const char* task_name, const sycl::queue& task_queue)
: task_name_(task_name),
task_queue_(task_queue) {}
task_queue_(task_queue),
has_queue_(true) {}

#endif

profiler_task::profiler_task(const char* task_name)
: task_name_(task_name) {}

profiler_task::~profiler_task() {
#ifdef ONEDAL_DATA_PARALLEL
if (has_queue_)
task_queue_.wait_and_throw();
#endif // ONEDAL_DATA_PARALLEL
profiler::end_task(task_name_);
}

Expand Down
Loading
Loading