Skip to content

Commit

Permalink
async: fall back to blocking (#272)
Browse files Browse the repository at this point in the history
`read_async()` and `write_async()` now works in compat mode by falling back to the blocking implementation

Authors:
  - Mads R. B. Kristensen (https://github.com/madsbk)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)

URL: #272
  • Loading branch information
madsbk authored Sep 3, 2023
1 parent 63d76a9 commit 26dff9a
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 88 deletions.
70 changes: 34 additions & 36 deletions cpp/examples/basic_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ int main()
cout << "Parallel POSIX read (" << kvikio::defaults::thread_pool_nthreads()
<< " threads): " << read << endl;
}

if (kvikio::is_batch_and_stream_available() && !kvikio::defaults::compat_mode()) {
// Here we use the batch API to read "/tmp/test-file" into `b_dev` by
// submitting 4 batch operations.
Expand Down Expand Up @@ -195,41 +194,40 @@ int main()
check(statuses.empty());
cout << "Batch canceling of all 4 operations" << endl;
}

{
cout << "Performing async I/O using file handle" << endl;
off_t f_off{0};
off_t d_off{0};
// Notice, we have to allocate the `bytes_done_p` argument on the heap and set it to 0.
ssize_t* bytes_done_p{};
check(cudaHostAlloc((void**)&bytes_done_p, SIZE, cudaHostAllocDefault) == cudaSuccess);
*bytes_done_p = 0;

// Let's create a new stream and submit a sync write
CUstream stream{};
check(cudaStreamCreate(&stream) == cudaSuccess);
kvikio::FileHandle f_handle("/data/test-file", "w+", kvikio::FileHandle::m644, false);
check(cudaMemcpyAsync(a_dev, a, SIZE, cudaMemcpyHostToDevice, stream) == cudaSuccess);
f_handle.write_async(a_dev, &io_size, &f_off, &d_off, bytes_done_p, stream);

// After synchronizing `stream`, we can read the number of bytes written
check(cudaStreamSynchronize(stream) == cudaSuccess);
// Note, `*bytes_done_p` might be negative, which indicate an IO error thus we
// use `CUFILE_CHECK_STREAM_IO` to check for errors.
CUFILE_CHECK_STREAM_IO(bytes_done_p);
check(*bytes_done_p == SIZE);
cout << "File async write : " << *bytes_done_p << endl;

/* Read */
*bytes_done_p = 0;
f_handle.read_async(c_dev, &io_size, &f_off, &d_off, bytes_done_p, stream);
check(cudaStreamSynchronize(stream) == cudaSuccess);
CUFILE_CHECK_STREAM_IO(bytes_done_p);
check(*bytes_done_p == SIZE);
cout << "File async read : " << *bytes_done_p << endl;
check(cudaFreeHost((void*)bytes_done_p) == cudaSuccess);
}
} else {
cout << "The batch and stream API isn't available, requires CUDA 12.2+" << endl;
cout << "The batch API isn't available, requires CUDA 12.2+" << endl;
}
{
cout << "Performing async I/O using by-reference arguments" << endl;
off_t f_off{0};
off_t d_off{0};
// Notice, we have to allocate the `bytes_done_p` argument on the heap and set it to 0.
ssize_t* bytes_done_p{};
check(cudaHostAlloc((void**)&bytes_done_p, SIZE, cudaHostAllocDefault) == cudaSuccess);
*bytes_done_p = 0;

// Let's create a new stream and submit an async write
CUstream stream{};
check(cudaStreamCreate(&stream) == cudaSuccess);
kvikio::FileHandle f_handle("/tmp/test-file", "w+");
check(cudaMemcpyAsync(a_dev, a, SIZE, cudaMemcpyHostToDevice, stream) == cudaSuccess);
f_handle.write_async(a_dev, &io_size, &f_off, &d_off, bytes_done_p, stream);

// After synchronizing `stream`, we can read the number of bytes written
check(cudaStreamSynchronize(stream) == cudaSuccess);
// Note, `*bytes_done_p` might be negative, which indicate an IO error thus we
// use `CUFILE_CHECK_STREAM_IO` to check for errors.
CUFILE_CHECK_STREAM_IO(bytes_done_p);
check(*bytes_done_p == SIZE);
cout << "File async write: " << *bytes_done_p << endl;

// Let's async read the data back into device memory
*bytes_done_p = 0;
f_handle.read_async(c_dev, &io_size, &f_off, &d_off, bytes_done_p, stream);
check(cudaStreamSynchronize(stream) == cudaSuccess);
CUFILE_CHECK_STREAM_IO(bytes_done_p);
check(*bytes_done_p == SIZE);
cout << "File async read: " << *bytes_done_p << endl;
check(cudaFreeHost((void*)bytes_done_p) == cudaSuccess);
}
}
2 changes: 1 addition & 1 deletion cpp/include/kvikio/error.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ struct CUfileException : public std::runtime_error {
#ifdef KVIKIO_CUFILE_FOUND
#define CUFILE_CHECK_STREAM_IO_2(_nbytes_done, _exception_type) \
do { \
int const _nbytes = (*_nbytes_done); \
auto const _nbytes = *(_nbytes_done); \
if (_nbytes < 0) { \
throw(_exception_type){std::string{"cuFile error at: "} + __FILE__ + ":" + \
KVIKIO_STRINGIFY(__LINE__) + ": " + std::to_string(_nbytes)}; \
Expand Down
119 changes: 68 additions & 51 deletions cpp/include/kvikio/file_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>

#include <cstddef>
Expand Down Expand Up @@ -344,7 +345,7 @@ class FileHandle {
* `devPtr_base` must remain set to the base address used in the `buffer_register` call.
* @param size Size in bytes to write.
* @param file_offset Offset in the file to write from.
* @param devPtr_offset Offset relative to the `devPtr_base` pointer to write into.
* @param devPtr_offset Offset relative to the `devPtr_base` pointer to write from.
* This parameter should be used only with registered buffers.
* @return Size of bytes that were successfully written.
*/
Expand Down Expand Up @@ -502,45 +503,52 @@ class FileHandle {
* This is an asynchronous version of `.read()`, which will be executed in sequence
* for the specified stream.
*
* The arguments have the same meaning as in `.read()` but some of them are deferred. That is,
* the values of `size`, `file_offset` and `devPtr_offset` will not be evaluated until execution
* time. Notice, this behavior can be changed using cuFile's cuFileStreamRegister API.
* When running CUDA v12.1 or older, this function falls back to use `.read()` after
* `stream` has been synchronized.
*
* The arguments have the same meaning as in `.read()` but some of them are deferred.
* That is, the values pointed to by `size_p`, `file_offset_p` and `devPtr_offset_p`
* will not be evaluated until execution time. Notice, this behavior can be changed
* using cuFile's cuFileStreamRegister API.
*
* @param devPtr_base Base address of buffer in device memory. For registered buffers,
* `devPtr_base` must remain set to the base address used in the `buffer_register` call.
* @param size Pointer to size in bytes to read. If the exact size is not known at the time of I/O
* submission, then you must set it to the maximum possible I/O size for that stream I/O. Later
* the actual size can be set prior to the stream I/O execution.
* @param file_offset Pointer to offset in the file from which to read. Unless otherwise set using
* cuFileStreamRegister API, this value will not be evaluated until execution time.
* @param devPtr_offset Pointer to the offset relative to the bufPtr_base pointer from which to
* write. Unless otherwise set using cuFileStreamRegister API, this value will not be evaluated
* until execution time.
* @param bytes_read Pointer to the bytes read from file. This pointer should be a non-NULL value
* and *bytes_read set to 0. The bytes_read memory should be allocated with cuMemHostAlloc/malloc/
* mmap or registered with cuMemHostRegister.
* After successful execution of the operation in the stream, the value *bytes_read will contain
* either:
* @param size_p Pointer to size in bytes to read. If the exact size is not known at the time of
* I/O submission, then you must set it to the maximum possible I/O size for that stream I/O.
* Later the actual size can be set prior to the stream I/O execution.
* @param file_offset_p Pointer to offset in the file from which to read. Unless otherwise set
* using cuFileStreamRegister API, this value will not be evaluated until execution time.
* @param devPtr_offset_p Pointer to the offset relative to the bufPtr_base from which to write.
* Unless otherwise set using cuFileStreamRegister API, this value will not be evaluated until
* execution time.
* @param bytes_read_p Pointer to the bytes read from file. This pointer should be a non-NULL
* value and *bytes_read_p set to 0. The bytes_read_p memory should be allocated with
* cuMemHostAlloc/malloc/mmap or registered with cuMemHostRegister. After successful execution of
* the operation in the stream, the value *bytes_read_p will contain either:
* - The number of bytes successfully read.
* - -1 on IO errors.
* - All other errors return a negative integer value of the CUfileOpError enum value.
* @param stream CUDA stream in which to enqueue the operation. If NULL, make this operation
* synchronous.
*/
inline void read_async(void* devPtr_base,
std::size_t* size,
off_t* file_offset,
off_t* devPtr_offset,
ssize_t* bytes_read,
CUstream stream)
void read_async(void* devPtr_base,
std::size_t* size_p,
off_t* file_offset_p,
off_t* devPtr_offset_p,
ssize_t* bytes_read_p,
CUstream stream)
{
#ifdef KVIKIO_CUFILE_STREAM_API_FOUND
CUFILE_TRY(cuFileAPI::instance().ReadAsync(
_handle, devPtr_base, size, file_offset, devPtr_offset, bytes_read, stream));
return;
#else
throw CUfileException("cuFile's stream API isn't available, please build with CUDA v12.2+.");
if (kvikio::is_batch_and_stream_available() && !_compat_mode) {
CUFILE_TRY(cuFileAPI::instance().ReadAsync(
_handle, devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream));
return;
}
#endif

CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
*bytes_read_p =
static_cast<ssize_t>(read(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
}

/**
Expand All @@ -549,44 +557,53 @@ class FileHandle {
* This is an asynchronous version of `.write()`, which will be executed in sequence
* for the specified stream.
*
* The arguments have the same meaning as in `.write()` but some of them are deferred. That is,
* the values of `size`, `file_offset` and `devPtr_offset` will not be evaluated until execution
* time. Notice, this behavior can be changed using cuFile's cuFileStreamRegister API.
* When running CUDA v12.1 or older, this function falls back to use `.read()` after
* `stream` has been synchronized.
*
* The arguments have the same meaning as in `.write()` but some of them are deferred.
* That is, the values pointed to by `size_p`, `file_offset_p` and `devPtr_offset_p`
* will not be evaluated until execution time. Notice, this behavior can be changed
* using cuFile's cuFileStreamRegister API.
*
* @param devPtr_base Base address of buffer in device memory. For registered buffers,
* `devPtr_base` must remain set to the base address used in the `buffer_register` call.
* @param size Pointer to size in bytes to read. If the exact size is not known at the time of I/O
* submission, then you must set it to the maximum possible I/O size for that stream I/O. Later
* the actual size can be set prior to the stream I/O execution.
* @param file_offset Pointer to offset in the file from which to read. Unless otherwise set
* @param size_p Pointer to size in bytes to read. If the exact size is not known at the time of
* I/O submission, then you must set it to the maximum possible I/O size for that stream I/O.
* Later the actual size can be set prior to the stream I/O execution.
* @param file_offset_p Pointer to offset in the file from which to read. Unless otherwise set
* using cuFileStreamRegister API, this value will not be evaluated until execution time.
* @param devPtr_offset Pointer to the offset relative to the bufPtr_base pointer from which to
* write.
* @param bytes_written Pointer to the bytes read from file. This pointer should be a non-NULL
* value and *bytes_written set to 0. The bytes_written memory should be allocated with
* @param devPtr_offset_p Pointer to the offset relative to the bufPtr_base from which to read.
* Unless otherwise set using cuFileStreamRegister API, this value will not be evaluated until
* execution time.
* @param bytes_written_p Pointer to the bytes read from file. This pointer should be a non-NULL
* value and *bytes_written_p set to 0. The bytes_written_p memory should be allocated with
* cuMemHostAlloc/malloc/mmap or registered with cuMemHostRegister.
* After successful execution of the operation in the stream, the value *bytes_written will
* After successful execution of the operation in the stream, the value *bytes_written_p will
* contain either:
* - The number of bytes successfully read.
* - -1 on IO errors.
* - All other errors return a negative integer value of the CUfileOpError enum value.
* @param stream CUDA stream in which to enqueue the operation. If NULL, make this operation
* synchronous.
*/
inline void write_async(void* devPtr_base,
std::size_t* size,
off_t* file_offset,
off_t* devPtr_offset,
ssize_t* bytes_written,
CUstream stream)
void write_async(void* devPtr_base,
std::size_t* size_p,
off_t* file_offset_p,
off_t* devPtr_offset_p,
ssize_t* bytes_written_p,
CUstream stream)
{
#ifdef KVIKIO_CUFILE_STREAM_API_FOUND
CUFILE_TRY(cuFileAPI::instance().WriteAsync(
_handle, devPtr_base, size, file_offset, devPtr_offset, bytes_written, stream));
return;
#else
throw CUfileException("cuFile's stream API isn't available, please build with CUDA v12.2+.");
if (kvikio::is_batch_and_stream_available() && !_compat_mode) {
CUFILE_TRY(cuFileAPI::instance().WriteAsync(
_handle, devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream));
return;
}
#endif

CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
*bytes_written_p =
static_cast<ssize_t>(write(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
}

/**
Expand Down
2 changes: 2 additions & 0 deletions cpp/include/kvikio/shim/cuda.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class cudaAPI {
decltype(cuDeviceGet)* DeviceGet{nullptr};
decltype(cuDevicePrimaryCtxRetain)* DevicePrimaryCtxRetain{nullptr};
decltype(cuDevicePrimaryCtxRelease)* DevicePrimaryCtxRelease{nullptr};
decltype(cuStreamSynchronize)* StreamSynchronize{nullptr};

private:
cudaAPI()
Expand All @@ -70,6 +71,7 @@ class cudaAPI {
get_symbol(DeviceGet, lib, KVIKIO_STRINGIFY(cuDeviceGet));
get_symbol(DevicePrimaryCtxRetain, lib, KVIKIO_STRINGIFY(cuDevicePrimaryCtxRetain));
get_symbol(DevicePrimaryCtxRelease, lib, KVIKIO_STRINGIFY(cuDevicePrimaryCtxRelease));
get_symbol(StreamSynchronize, lib, KVIKIO_STRINGIFY(cuStreamSynchronize));
}

public:
Expand Down

0 comments on commit 26dff9a

Please sign in to comment.