From f53fa7b15962c6c9af78bbaabfb99ba69a000276 Mon Sep 17 00:00:00 2001 From: Danial Javady Date: Sun, 18 Feb 2024 00:32:48 +0000 Subject: [PATCH] draft initial --- CMakeLists.txt | 2 +- include/nvexec/stream/scan.cuh | 172 ++++++++++++++++++++++++++++++ include/nvexec/stream_context.cuh | 1 + test/nvexec/reduce.cpp | 33 ++++++ 4 files changed, 207 insertions(+), 1 deletion(-) create mode 100644 include/nvexec/stream/scan.cuh diff --git a/CMakeLists.txt b/CMakeLists.txt index 0ec0ecec6..6ea8bc17a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -323,7 +323,7 @@ CHECK_INCLUDE_FILE_CXX("linux/io_uring.h" STDEXEC_FOUND_IO_URING) option (STDEXEC_ENABLE_IO_URING_TESTS "Enable io_uring tests" ${STDEXEC_FOUND_IO_URING}) option(STDEXEC_BUILD_DOCS "Build stdexec documentation" OFF) -option(STDEXEC_BUILD_EXAMPLES "Build stdexec examples" ON) +option(STDEXEC_BUILD_EXAMPLES "Build stdexec examples" OFF) option(STDEXEC_BUILD_TESTS "Build stdexec tests" ON) option(BUILD_TESTING "" ${STDEXEC_BUILD_TESTS}) diff --git a/include/nvexec/stream/scan.cuh b/include/nvexec/stream/scan.cuh new file mode 100644 index 000000000..107f3c62d --- /dev/null +++ b/include/nvexec/stream/scan.cuh @@ -0,0 +1,172 @@ +/* + * Copyright (c) 20224 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "../../stdexec/execution.hpp" +#include +#include + +#include + +#include + +#include "algorithm_base.cuh" +#include "common.cuh" +#include "../detail/throw_on_cuda_error.cuh" + +namespace nvexec { + namespace STDEXEC_STREAM_DETAIL_NS { + namespace scan_ { + + + template + struct receiver_t + : public __algo_range_init_fun::receiver_t< + SenderId, + ReceiverId, + InitT, + Fun, + receiver_t> { + using base = __algo_range_init_fun:: + receiver_t>; + + template + using result_t = typename __algo_range_init_fun::binary_invoke_result_t; + + template + static void set_value_impl(base::__t&& self, Range&& range) noexcept { + cudaError_t status{cudaSuccess}; + cudaStream_t stream = self.op_state_.get_stream(); + + // `range` is produced asynchronously, so we need to wait for it to be ready + if (status = STDEXEC_DBG_ERR(cudaStreamSynchronize(stream)); status != cudaSuccess) { + self.op_state_.propagate_completion_signal(stdexec::set_error, std::move(status)); + return; + } + + using value_t = result_t; + value_t* d_out = static_cast(self.op_state_.temp_storage_); + + void* d_temp_storage{}; + std::size_t temp_storage_size{}; + + auto first = begin(range); + auto last = end(range); + + std::size_t num_items = std::distance(first, last); + + if (status = STDEXEC_DBG_ERR(cub::DeviceScan::ExclusiveScan( + d_temp_storage, + temp_storage_size, + first, + d_out, + self.fun_, + self.init_, + num_items, + stream)); + status != cudaSuccess) { + self.op_state_.propagate_completion_signal(stdexec::set_error, std::move(status)); + return; + } + + if (status = STDEXEC_DBG_ERR( // + cudaMallocAsync(&d_temp_storage, temp_storage_size, stream)); + status != cudaSuccess) { + self.op_state_.propagate_completion_signal(stdexec::set_error, std::move(status)); + return; + } + + if (status = STDEXEC_DBG_ERR(cub::DeviceScan::ExclusiveScan( + d_temp_storage, + temp_storage_size, + first, + d_out, + self.fun_, + self.init_, + num_items, + stream)); + status != cudaSuccess) { + self.op_state_.propagate_completion_signal(stdexec::set_error, std::move(status)); + return; + } + + status = STDEXEC_DBG_ERR(cudaFreeAsync(d_temp_storage, stream)); + self.op_state_.defer_temp_storage_destruction(d_out); + if (status == cudaSuccess) { + int* host_out = new int[10]; + cudaMemcpy(host_out, d_out, 10 * sizeof(int), cudaMemcpyDeviceToHost); + for (size_t i = 0; i < 10; ++i) { + std::cout << host_out[i] << std::endl; + } + self.op_state_.propagate_completion_signal(stdexec::set_value, d_out); + } else { + self.op_state_.propagate_completion_signal(stdexec::set_error, std::move(status)); + } + } + }; + + template + struct sender_t + : public __algo_range_init_fun:: + sender_t> { + template + using receiver_t = + stdexec::__t, InitT, Fun>>; + + template + using _set_value_t = completion_signatures>)>; + + // template + // using _set_value_t = completion_signatures>)>; + + // // template + // using _set_value_t_spanc = + // completion_signatures*)>; + }; + } // namespace scan_ + + struct scan_t { + template + using __sender = stdexec::__t>, InitT, Fun>>; + + template + __sender operator()(Sender&& sndr, InitT init, Fun fun) const { + return __sender{{}, (Sender&&) sndr, (InitT&&) init, (Fun&&) fun}; + } + + template + __binder_back operator()(InitT init, Fun fun = {}) const { + return { + {}, + {}, + {(InitT&&) init, (Fun&&) fun} + }; + } + }; + } // namespace STDEXEC_STREAM_DETAIL_NS + + inline constexpr STDEXEC_STREAM_DETAIL_NS::scan_t scan{}; +} // namespace nvexec + +namespace stdexec::__detail { + template + extern __mconst< + nvexec::STDEXEC_STREAM_DETAIL_NS::scan_::sender_t<__name_of<__t>, Init, Fun>> + __name_of_v>; +} // namespace stdexec::__detail diff --git a/include/nvexec/stream_context.cuh b/include/nvexec/stream_context.cuh index 7f4ff4510..bd6b0ffe3 100644 --- a/include/nvexec/stream_context.cuh +++ b/include/nvexec/stream_context.cuh @@ -35,6 +35,7 @@ #include "stream/upon_stopped.cuh" #include "stream/when_all.cuh" #include "stream/reduce.cuh" +#include "stream/scan.cuh" #include "stream/ensure_started.cuh" #include "stream/common.cuh" diff --git a/test/nvexec/reduce.cpp b/test/nvexec/reduce.cpp index 81de4747f..7aaa46a5a 100644 --- a/test/nvexec/reduce.cpp +++ b/test/nvexec/reduce.cpp @@ -15,6 +15,9 @@ namespace ex = stdexec; namespace { + [[deprecated]] + void printer(auto elem) {} + TEST_CASE( "nvexec reduce returns a sender with single input", "[cuda][stream][adaptors][reduce]") { @@ -44,6 +47,36 @@ namespace { (void) snd; } + TEST_CASE("hi im scan", "[cuda][stream][adaptors][reduce]") { + constexpr int N = 2048; + int input[N] = {}; + std::fill_n(input, N, 1); + + nvexec::stream_context stream{}; + auto snd = ex::transfer_just(stream.get_scheduler(), std::span{input}) + | nvexec::scan(0); + + // STATIC_REQUIRE(ex::sender_of); + + (void) snd; + } + TEST_CASE("nvexec scan uses sum as default", "[cuda][stream][adaptors][reduce]") { + constexpr int N = 2048; + constexpr int init = 42; + + thrust::device_vector input(N, 1); + int* first = thrust::raw_pointer_cast(input.data()); + int* last = thrust::raw_pointer_cast(input.data()) + input.size(); + + nvexec::stream_context stream{}; + auto snd = ex::transfer_just(stream.get_scheduler(), std::span{first, last}) + | nvexec::scan(init); + + auto [result] = ex::sync_wait(std::move(snd)).value(); + // REQUIRE(result == N + init); + } + + TEST_CASE("nvexec reduce uses sum as default", "[cuda][stream][adaptors][reduce]") { constexpr int N = 2048; constexpr int init = 42;