Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] CUDA Scan implementation #1250

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ CHECK_INCLUDE_FILE_CXX("linux/io_uring.h" STDEXEC_FOUND_IO_URING)
option (STDEXEC_ENABLE_IO_URING_TESTS "Enable io_uring tests" ${STDEXEC_FOUND_IO_URING})

option(STDEXEC_BUILD_DOCS "Build stdexec documentation" OFF)
option(STDEXEC_BUILD_EXAMPLES "Build stdexec examples" ON)
option(STDEXEC_BUILD_EXAMPLES "Build stdexec examples" OFF)
option(STDEXEC_BUILD_TESTS "Build stdexec tests" ON)
option(BUILD_TESTING "" ${STDEXEC_BUILD_TESTS})

Expand Down
172 changes: 172 additions & 0 deletions include/nvexec/stream/scan.cuh
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Copyright (c) 20224 NVIDIA Corporation
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include "../../stdexec/execution.hpp"
#include <type_traits>
#include <ranges>

#include <cuda/std/type_traits>

#include <cub/device/device_scan.cuh>

#include "algorithm_base.cuh"
#include "common.cuh"
#include "../detail/throw_on_cuda_error.cuh"

namespace nvexec {
namespace STDEXEC_STREAM_DETAIL_NS {
namespace scan_ {


template <class SenderId, class ReceiverId, class InitT, class Fun>
struct receiver_t
: public __algo_range_init_fun::receiver_t<
Copy link
Author

@ZelboK ZelboK Feb 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reuses the algorithm_base.cuh . The ExclusiveScan API I used was the one that allows you to specify an initial value, so I could easily reuse this base. Nearly all of scan.cuh is identical to reduce with the exception of the CUB api they call and the final return type.
The difference between the reduce is that it returns a single value where as a scan is to return an array of data so it is very similar.

SenderId,
ReceiverId,
InitT,
Fun,
receiver_t<SenderId, ReceiverId, InitT, Fun>> {
using base = __algo_range_init_fun::
receiver_t<SenderId, ReceiverId, InitT, Fun, receiver_t<SenderId, ReceiverId, InitT, Fun>>;

template <class Range>
using result_t = typename __algo_range_init_fun::binary_invoke_result_t<Range, InitT, Fun>;

template <class Range>
static void set_value_impl(base::__t&& self, Range&& range) noexcept {
cudaError_t status{cudaSuccess};
cudaStream_t stream = self.op_state_.get_stream();

// `range` is produced asynchronously, so we need to wait for it to be ready
if (status = STDEXEC_DBG_ERR(cudaStreamSynchronize(stream)); status != cudaSuccess) {
self.op_state_.propagate_completion_signal(stdexec::set_error, std::move(status));
return;
}

using value_t = result_t<Range>;
value_t* d_out = static_cast<value_t*>(self.op_state_.temp_storage_);

void* d_temp_storage{};
std::size_t temp_storage_size{};

auto first = begin(range);
auto last = end(range);

std::size_t num_items = std::distance(first, last);

if (status = STDEXEC_DBG_ERR(cub::DeviceScan::ExclusiveScan(
d_temp_storage,
temp_storage_size,
first,
d_out,
self.fun_,
self.init_,
num_items,
stream));
status != cudaSuccess) {
self.op_state_.propagate_completion_signal(stdexec::set_error, std::move(status));
return;
}

if (status = STDEXEC_DBG_ERR( //
cudaMallocAsync(&d_temp_storage, temp_storage_size, stream));
status != cudaSuccess) {
self.op_state_.propagate_completion_signal(stdexec::set_error, std::move(status));
return;
}

if (status = STDEXEC_DBG_ERR(cub::DeviceScan::ExclusiveScan(
d_temp_storage,
temp_storage_size,
first,
d_out,
self.fun_,
self.init_,
num_items,
stream));
status != cudaSuccess) {
self.op_state_.propagate_completion_signal(stdexec::set_error, std::move(status));
return;
}

status = STDEXEC_DBG_ERR(cudaFreeAsync(d_temp_storage, stream));
self.op_state_.defer_temp_storage_destruction(d_out);
if (status == cudaSuccess) {
int* host_out = new int[10];
cudaMemcpy(host_out, d_out, 10 * sizeof(int), cudaMemcpyDeviceToHost);
for (size_t i = 0; i < 10; ++i) {
std::cout << host_out[i] << std::endl;
}
self.op_state_.propagate_completion_signal(stdexec::set_value, d_out);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you change d_out to *d_out you can confirm that the data is scanning properly. But it won't compile I imagine because of the comcpletion signatures being wrong.

} else {
self.op_state_.propagate_completion_signal(stdexec::set_error, std::move(status));
}
}
};

template <class SenderId, class InitT, class Fun>
struct sender_t
: public __algo_range_init_fun::
sender_t<SenderId, InitT, Fun, sender_t<SenderId, InitT, Fun>> {
template <class Receiver>
using receiver_t =
stdexec::__t<scan_::receiver_t<SenderId, stdexec::__id<Receiver>, InitT, Fun>>;

template <class Range>
using _set_value_t = completion_signatures<set_value_t(
::std::add_lvalue_reference_t<
typename __algo_range_init_fun::binary_invoke_result_t<Range, InitT, Fun>>)>;

// template <class Range>
// using _set_value_t = completion_signatures<set_value_t(
// std::vector<typename __algo_range_init_fun::binary_invoke_result_t<Range, InitT, Fun>>)>;

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how to get the completion signatures right. Hoping to get some guidance

// // template <class Range, class InitT, class Fun>
// using _set_value_t_spanc =
// completion_signatures<set_value_t(binary_invoke_result_t<Range, InitT, Fun>*)>;
};
} // namespace scan_

struct scan_t {
template <class Sender, class InitT, class Fun>
using __sender = stdexec::__t<scan_::sender_t<stdexec::__id<__decay_t<Sender>>, InitT, Fun>>;

template <sender Sender, __movable_value InitT, __movable_value Fun = cub::Sum>
__sender<Sender, InitT, Fun> operator()(Sender&& sndr, InitT init, Fun fun) const {
return __sender<Sender, InitT, Fun>{{}, (Sender&&) sndr, (InitT&&) init, (Fun&&) fun};
}

template <class InitT, class Fun = cub::Sum>
__binder_back<scan_t, InitT, Fun> operator()(InitT init, Fun fun = {}) const {
return {
{},
{},
{(InitT&&) init, (Fun&&) fun}
};
}
};
} // namespace STDEXEC_STREAM_DETAIL_NS

inline constexpr STDEXEC_STREAM_DETAIL_NS::scan_t scan{};
} // namespace nvexec

namespace stdexec::__detail {
template <class SenderId, class Init, class Fun>
extern __mconst<
nvexec::STDEXEC_STREAM_DETAIL_NS::scan_::sender_t<__name_of<__t<SenderId>>, Init, Fun>>
__name_of_v<nvexec::STDEXEC_STREAM_DETAIL_NS::scan_::sender_t<SenderId, Init, Fun>>;
} // namespace stdexec::__detail
1 change: 1 addition & 0 deletions include/nvexec/stream_context.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "stream/upon_stopped.cuh"
#include "stream/when_all.cuh"
#include "stream/reduce.cuh"
#include "stream/scan.cuh"
#include "stream/ensure_started.cuh"

#include "stream/common.cuh"
Expand Down
33 changes: 33 additions & 0 deletions test/nvexec/reduce.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ namespace ex = stdexec;

namespace {

[[deprecated]]
void printer(auto elem) {}

TEST_CASE(
"nvexec reduce returns a sender with single input",
"[cuda][stream][adaptors][reduce]") {
Expand Down Expand Up @@ -44,6 +47,36 @@ namespace {
(void) snd;
}

TEST_CASE("hi im scan", "[cuda][stream][adaptors][reduce]") {
constexpr int N = 2048;
int input[N] = {};
std::fill_n(input, N, 1);

nvexec::stream_context stream{};
auto snd = ex::transfer_just(stream.get_scheduler(), std::span{input})
| nvexec::scan(0);

// STATIC_REQUIRE(ex::sender_of<decltype(snd), ex::set_value_t(int&)>);

(void) snd;
}
TEST_CASE("nvexec scan uses sum as default", "[cuda][stream][adaptors][reduce]") {
constexpr int N = 2048;
constexpr int init = 42;

thrust::device_vector<int> input(N, 1);
int* first = thrust::raw_pointer_cast(input.data());
int* last = thrust::raw_pointer_cast(input.data()) + input.size();

nvexec::stream_context stream{};
auto snd = ex::transfer_just(stream.get_scheduler(), std::span{first, last})
| nvexec::scan(init);

auto [result] = ex::sync_wait(std::move(snd)).value();
// REQUIRE(result == N + init);
}


TEST_CASE("nvexec reduce uses sum as default", "[cuda][stream][adaptors][reduce]") {
constexpr int N = 2048;
constexpr int init = 42;
Expand Down