diff --git a/example/main.cpp b/example/main.cpp index 9b7c5e48..0468b051 100644 --- a/example/main.cpp +++ b/example/main.cpp @@ -68,7 +68,7 @@ async_simple::coro::Lazy chunked_upload1(coro_http_client &client) { create_file(filename, 1010); coro_io::coro_file file{}; - co_await file.async_open(filename, coro_io::flags::read_only); + file.open(filename, std::ios::in); std::string buf; cinatra::detail::resize(buf, 100); diff --git a/include/cinatra/coro_http_client.hpp b/include/cinatra/coro_http_client.hpp index fbc09d1a..336ef0cc 100644 --- a/include/cinatra/coro_http_client.hpp +++ b/include/cinatra/coro_http_client.hpp @@ -778,7 +778,7 @@ class coro_http_client : public std::enable_shared_from_this { std::string range = "") { resp_data data{}; coro_io::coro_file file; - co_await file.async_open(filename, coro_io::flags::create_write); + file.open(filename, std::ios::trunc | std::ios::out); if (!file.is_open()) { data.net_err = std::make_error_code(std::errc::no_such_file_or_directory); data.status = 404; @@ -849,12 +849,12 @@ class coro_http_client : public std::enable_shared_from_this { private: async_simple::coro::Lazy send_file_chunked_with_copy( - std::string source, std::error_code &ec) { + std::string_view source, std::error_code &ec) { std::string file_data; detail::resize(file_data, max_single_part_size_); coro_io::coro_file file{}; - bool ok = co_await file.async_open(source, coro_io::flags::read_only); - if (!ok) { + file.open(source, std::ios::in); + if (!file.is_open()) { ec = std::make_error_code(std::errc::bad_file_descriptor); co_return; } @@ -873,15 +873,15 @@ class coro_http_client : public std::enable_shared_from_this { } async_simple::coro::Lazy send_file_no_chunked_with_copy( - std::string source, std::error_code &ec, std::size_t length) { + std::string_view source, std::error_code &ec, std::size_t length) { if (length <= 0) { co_return; } std::string file_data; detail::resize(file_data, std::min(max_single_part_size_, length)); coro_io::coro_file file{}; - bool ok = co_await file.async_open(source, coro_io::flags::read_only); - if (!ok) { + file.open(source, std::ios::in); + if (!file.is_open()) { ec = std::make_error_code(std::errc::bad_file_descriptor); co_return; } @@ -1868,8 +1868,8 @@ class coro_http_client : public std::enable_shared_from_this { if (is_ranges) { if (ctx.resp_body_stream) { - auto ec = - co_await ctx.resp_body_stream->async_write(data_ptr, content_len); + auto [ec, size] = co_await ctx.resp_body_stream->async_write( + {data_ptr, content_len}); if (ec) { data.net_err = ec; co_return; @@ -1955,8 +1955,9 @@ class coro_http_client : public std::enable_shared_from_this { auto part_body = co_await multipart.read_part_body(boundary); if (ctx.resp_body_stream) { - ec = co_await ctx.resp_body_stream->async_write(part_body.data.data(), - part_body.data.size()); + size_t size; + std::tie(ec, size) = + co_await ctx.resp_body_stream->async_write(part_body.data); } else { resp_chunk_str_.append(part_body.data.data(), part_body.data.size()); @@ -2035,7 +2036,8 @@ class coro_http_client : public std::enable_shared_from_this { data_ptr = asio::buffer_cast(chunked_buf_.data()); if (ctx.resp_body_stream) { - ec = co_await ctx.resp_body_stream->async_write(data_ptr, chunk_size); + std::tie(ec, size) = co_await ctx.resp_body_stream->async_write( + {data_ptr, (size_t)chunk_size}); } else { resp_chunk_str_.append(data_ptr, chunk_size); @@ -2141,7 +2143,7 @@ class coro_http_client : public std::enable_shared_from_this { if (is_file) { coro_io::coro_file file{}; - co_await file.async_open(part.filename, coro_io::flags::read_only); + file.open(part.filename, std::ios::in); assert(file.is_open()); std::string file_data; detail::resize(file_data, max_single_part_size_); diff --git a/include/cinatra/coro_http_server.hpp b/include/cinatra/coro_http_server.hpp index d34050e1..bbdfbead 100644 --- a/include/cinatra/coro_http_server.hpp +++ b/include/cinatra/coro_http_server.hpp @@ -413,7 +413,7 @@ class coro_http_server { detail::resize(content, chunked_size_); coro_io::coro_file in_file{}; - co_await in_file.async_open(file_name, coro_io::flags::read_only); + in_file.open(file_name, std::ios::in); if (!in_file.is_open()) { resp.set_status_and_content(status_type::not_found, file_name + "not found"); @@ -468,7 +468,13 @@ class coro_http_server { if (ranges.size() == 1) { // single part auto [start, end] = ranges[0]; - in_file.seek(start, SEEK_SET); + bool ok = in_file.seek(start, std::ios::beg); + if (!ok) { + resp.set_status_and_content(status_type::bad_request, + "invalid range"); + co_await resp.get_conn()->reply(); + co_return; + } size_t part_size = end + 1 - start; int status = (part_size == file_size) ? 200 : 206; std::string content_range = "Content-Range: bytes "; @@ -511,7 +517,13 @@ class coro_http_server { } auto [start, end] = ranges[i]; - in_file.seek(start, SEEK_SET); + bool ok = in_file.seek(start, std::ios::beg); + if (!ok) { + resp.set_status_and_content(status_type::bad_request, + "invalid range"); + co_await resp.get_conn()->reply(); + co_return; + } size_t part_size = end + 1 - start; std::string_view more = CRCF; diff --git a/include/cinatra/ylt/coro_io/coro_file.hpp b/include/cinatra/ylt/coro_io/coro_file.hpp index be35cb71..aaaf98ae 100644 --- a/include/cinatra/ylt/coro_io/coro_file.hpp +++ b/include/cinatra/ylt/coro_io/coro_file.hpp @@ -22,13 +22,16 @@ #include #include #include +#include +#include "async_simple/coro/SyncAwait.h" #include "io_context_pool.hpp" -#if defined(ENABLE_FILE_IO_URING) +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) #include #include #endif #include +#include #include #include @@ -42,9 +45,7 @@ #include #include "coro_io.hpp" - #if defined(ASIO_WINDOWS) -#include #include #endif @@ -97,70 +98,247 @@ enum flags { #endif // defined(ASIO_WINDOWS) }; -enum class read_type { - init, -#if defined(ENABLE_FILE_IO_URING) - uring, - uring_random, -#else - fread, +constexpr inline flags to_flags(std::ios::ios_base::openmode mode) { + flags access = flags::read_write; + + if (mode == std::ios::in) + access = flags::read_only; + else if (mode == std::ios::out) + access = flags::write_only; + else if (mode == std::ios::app) + access = flags::append; + else if (mode == std::ios::trunc) + access = flags::truncate; + else if (mode == (std::ios::in | std::ios::out)) + access = flags::read_write; + else if (mode == (std::ios::trunc | std::ios::out)) + access = flags::create_write_trunc; + if (mode == (std::ios::in | std::ios::out | std::ios::trunc)) + access = create_read_write_trunc; + else if (mode == (std::ios::in | std::ios::out | std::ios::app)) + access = create_read_write_append; + + return access; +} + +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) +template +inline bool open_native_async_file(File &file, Executor &executor, + std::string_view filepath, + flags open_flags) { + if (file && file->is_open()) { + return true; + } + + try { + if constexpr (seq) { + file = std::make_shared( + executor.get_asio_executor(), std::string(filepath), + static_cast(open_flags)); + } + else { + file = std::make_shared( + executor.get_asio_executor(), std::string(filepath), + static_cast(open_flags)); + } + } catch (std::exception &ex) { + std::cout << "line " << __LINE__ << " coro_file open failed" << ex.what() + << "\n"; + return false; + } + + return true; +} #endif - pread, -}; -class coro_file { +enum class execution_type { none, native_async, thread_pool }; + +template +class basic_seq_coro_file { public: -#if defined(ENABLE_FILE_IO_URING) - coro_file( - coro_io::ExecutorWrapper<> *executor = coro_io::get_global_executor()) - : coro_file(executor->get_asio_executor()) {} + basic_seq_coro_file(coro_io::ExecutorWrapper<> *executor = + coro_io::get_global_block_executor()) + : basic_seq_coro_file(executor->get_asio_executor()) {} - coro_file(asio::io_context::executor_type executor) + basic_seq_coro_file(asio::io_context::executor_type executor) : executor_wrapper_(executor) {} -#else - coro_file(coro_io::ExecutorWrapper<> *executor = - coro_io::get_global_block_executor()) - : coro_file(executor->get_asio_executor()) {} + basic_seq_coro_file(std::string_view filepath, + std::ios::ios_base::openmode open_flags, + coro_io::ExecutorWrapper<> *executor = + coro_io::get_global_block_executor()) + : basic_seq_coro_file(filepath, open_flags, + executor->get_asio_executor()) {} + + basic_seq_coro_file(std::string_view filepath, + std::ios::ios_base::openmode open_flags, + asio::io_context::executor_type executor) + : executor_wrapper_(executor) { + open(filepath, open_flags); + } - coro_file(asio::io_context::executor_type executor) - : executor_wrapper_(executor) {} + bool open(std::string_view filepath, + std::ios::ios_base::openmode open_flags) { + file_path_ = std::string{filepath}; + if constexpr (execute_type == execution_type::thread_pool) { + return open_stream_file_in_pool(filepath, open_flags); + } + else { +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + return open_native_async_file(async_seq_file_, executor_wrapper_, + filepath, to_flags(open_flags)); +#else + return open_stream_file_in_pool(filepath, open_flags); #endif + } + } - bool is_open() const { - if (type_ == read_type::pread) { - return fd_file_ != nullptr; + async_simple::coro::Lazy> async_read( + char *buf, size_t size) { + if constexpr (execute_type == execution_type::thread_pool) { + co_return co_await async_read_write({buf, size}); } + else { +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + if (async_seq_file_ == nullptr) { + co_return std::make_pair( + std::make_error_code(std::errc::invalid_argument), 0); + } + auto [ec, read_size] = co_await coro_io::async_read( + *async_seq_file_, asio::buffer(buf, size)); + if (ec == asio::error::eof) { + eof_ = true; + co_return std::make_pair(std::error_code{}, read_size); + } - return stream_file_ != nullptr; + co_return std::make_pair(ec, read_size); +#else + co_return co_await async_read_write({buf, size}); +#endif + } } - void flush() { -#if defined(ENABLE_FILE_IO_URING) + template + async_simple::coro::Lazy> async_read_write( + std::span buf) { + auto result = co_await coro_io::post( + [this, buf]() -> std::pair { + if constexpr (is_read) { + if (frw_seq_file_.read(buf.data(), buf.size())) { + return std::make_pair(std::error_code{}, frw_seq_file_.gcount()); + } + } + else { + if (frw_seq_file_.write(buf.data(), buf.size())) { + return std::make_pair(std::error_code{}, buf.size()); + } + } + + if (frw_seq_file_.eof()) { + eof_ = true; + return std::make_pair(std::error_code{}, frw_seq_file_.gcount()); + } + + return std::make_pair(std::make_error_code(std::errc::io_error), 0); + }, + &executor_wrapper_); + co_return result.value(); + } + + async_simple::coro::Lazy> async_write( + std::string_view buf) { + if constexpr (execute_type == execution_type::thread_pool) { + co_return co_await async_read_write( + std::span(const_cast(buf.data()), buf.size())); + } + else { +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + if (async_seq_file_ == nullptr) { + co_return std::make_pair( + std::make_error_code(std::errc::invalid_argument), 0); + } + auto [ec, size] = + co_await coro_io::async_write(*async_seq_file_, asio::buffer(buf)); + co_return std::make_pair(ec, size); #else - if (fd_file_) { -#if defined(__GNUC__) - fsync(*fd_file_); + co_return co_await async_read_write( + std::span(const_cast(buf.data()), buf.size())); #endif } - else if (stream_file_) { - fflush(stream_file_.get()); + } + +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + std::shared_ptr get_async_stream_file() { + return async_seq_file_; + } +#endif + + std::fstream &get_stream_file() { return frw_seq_file_; } + + bool is_open() { +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + if (async_seq_file_ && async_seq_file_->is_open()) { + return true; } #endif + return frw_seq_file_.is_open(); } - bool eof() const { return eof_; } + bool eof() { return eof_; } void close() { - if (stream_file_) { - stream_file_.reset(); +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + if (async_seq_file_ && async_seq_file_->is_open()) { + std::error_code ec; + async_seq_file_->close(ec); } - else if (fd_file_) { - fd_file_.reset(); +#endif + if (frw_seq_file_.is_open()) { + frw_seq_file_.close(); } } + bool seek(size_t offset, std::ios_base::seekdir dir) { +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + if (async_seq_file_ && async_seq_file_->is_open()) { + int whence = SEEK_SET; + if (dir == std::ios_base::cur) + whence = SEEK_CUR; + else if (dir == std::ios_base::end) + whence = SEEK_END; + + std::error_code seek_ec; + async_seq_file_->seek( + offset, static_cast(whence), seek_ec); + if (seek_ec) { + return false; + } + return true; + } +#endif + if (frw_seq_file_.is_open()) { + if (frw_seq_file_.seekg(offset, dir)) { + return true; + } + } + + return false; + } + + execution_type get_execution_type() { +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + if (async_seq_file_ && async_seq_file_->is_open()) { + return execution_type::native_async; + } +#endif + if (frw_seq_file_.is_open()) { + return execution_type::thread_pool; + } + + return execution_type::none; + } + size_t file_size(std::error_code ec) const noexcept { return std::filesystem::file_size(file_path_, ec); } @@ -169,287 +347,253 @@ class coro_file { std::string_view file_path() const { return file_path_; } - async_simple::coro::Lazy> async_pread( - size_t offset, char *data, size_t size) { - if (type_ != read_type::pread) { - co_return std::make_pair( - std::make_error_code(std::errc::bad_file_descriptor), 0); + private: + bool open_stream_file_in_pool(std::string_view filepath, + std::ios::ios_base::openmode flags) { + if (frw_seq_file_.is_open()) { + return true; } -#if defined(ASIO_WINDOWS) - auto pread = [](int fd, void *buf, uint64_t count, - uint64_t offset) -> int64_t { - DWORD bytes_read = 0; - OVERLAPPED overlapped; - memset(&overlapped, 0, sizeof(OVERLAPPED)); - overlapped.Offset = offset & 0xFFFFFFFF; - overlapped.OffsetHigh = (offset >> 32) & 0xFFFFFFFF; - - BOOL ok = ReadFile(reinterpret_cast(_get_osfhandle(fd)), buf, - count, &bytes_read, &overlapped); - if (!ok && (errno = GetLastError()) != ERROR_HANDLE_EOF) { - return -1; - } + auto coro_func = coro_io::post( + [this, flags, filepath]() mutable { + frw_seq_file_.open(filepath.data(), flags); + if (!frw_seq_file_.is_open()) { + std::cout << "line " << __LINE__ << " coro_file open failed " + << filepath << "\n"; + std::cerr << "Error: " << strerror(errno); + return false; + } + return true; + }, + &executor_wrapper_); + auto result = async_simple::coro::syncAwait(coro_func); + return result.value(); + } - return bytes_read; - }; + coro_io::ExecutorWrapper<> executor_wrapper_; +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + std::shared_ptr async_seq_file_; // seq #endif - co_return co_await async_prw(pread, true, offset, data, size); - } + std::fstream frw_seq_file_; // fread/fwrite seq file + std::string file_path_; + bool eof_ = false; +}; - async_simple::coro::Lazy async_pwrite(size_t offset, - const char *data, - size_t size) { - if (type_ != read_type::pread) { - co_return std::make_error_code(std::errc::bad_file_descriptor); - } -#if defined(ASIO_WINDOWS) - auto pwrite = [](int fd, const void *buf, uint64_t count, - uint64_t offset) -> int64_t { - DWORD bytes_write = 0; - OVERLAPPED overlapped; - memset(&overlapped, 0, sizeof(OVERLAPPED)); - overlapped.Offset = offset & 0xFFFFFFFF; - overlapped.OffsetHigh = (offset >> 32) & 0xFFFFFFFF; +using coro_file = basic_seq_coro_file<>; - BOOL ok = WriteFile(reinterpret_cast(_get_osfhandle(fd)), buf, - count, &bytes_write, &overlapped); - if (!ok) { - return -1; - } +template +class basic_random_coro_file { + public: + basic_random_coro_file(coro_io::ExecutorWrapper<> *executor = + coro_io::get_global_block_executor()) + : basic_random_coro_file(executor->get_asio_executor()) {} - return bytes_write; - }; -#endif - auto result = co_await async_prw(pwrite, false, offset, (char *)data, size); - co_return result.first; + basic_random_coro_file(asio::io_context::executor_type executor) + : executor_wrapper_(executor) {} + + basic_random_coro_file(std::string_view filepath, + std::ios::ios_base::openmode open_flags, + coro_io::ExecutorWrapper<> *executor = + coro_io::get_global_block_executor()) + : basic_random_coro_file(filepath, open_flags, + executor->get_asio_executor()) {} + + basic_random_coro_file(std::string_view filepath, + std::ios::ios_base::openmode open_flags, + asio::io_context::executor_type executor) + : executor_wrapper_(executor) { + open(filepath, open_flags); } -#if defined(ENABLE_FILE_IO_URING) - async_simple::coro::Lazy async_open(std::string_view filepath, - int open_mode = flags::read_write, - read_type type = read_type::uring) { - type_ = type; - if (type_ == read_type::pread) { - co_return open_fd(filepath, open_mode); + bool open(std::string_view filepath, + std::ios::ios_base::openmode open_flags) { + file_path_ = std::string{filepath}; + if constexpr (execute_type == execution_type::thread_pool) { + return open_fd(filepath, to_flags(open_flags)); + } + else { +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + return open_native_async_file(async_random_file_, + executor_wrapper_, filepath, + to_flags(open_flags)); +#else + return open_fd(filepath, to_flags(open_flags)); +#endif } + } - try { - if (type_ == read_type::uring) { - stream_file_ = std::make_shared( - executor_wrapper_.get_asio_executor()); - } - else { - stream_file_ = std::make_shared( - executor_wrapper_.get_asio_executor()); - } - } catch (std::exception &ex) { - stream_file_ = nullptr; - std::cout << "line " << __LINE__ << " coro_file create failed" - << ex.what() << "\n"; - co_return false; + async_simple::coro::Lazy> async_read_at( + uint64_t offset, char *buf, size_t size) { + if constexpr (execute_type == execution_type::thread_pool) { + co_return co_await async_pread(offset, buf, size); } + else { +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + if (async_random_file_ == nullptr) { + co_return std::make_pair( + std::make_error_code(std::errc::invalid_argument), 0); + } + auto [ec, read_size] = co_await coro_io::async_read_at( + offset, *async_random_file_, asio::buffer(buf, size)); - std::error_code ec; - stream_file_->open(filepath.data(), - static_cast(open_mode), ec); + if (ec == asio::error::eof) { + eof_ = true; + co_return std::make_pair(std::error_code{}, read_size); + } - if (ec) { - stream_file_ = nullptr; - std::cout << "line " << __LINE__ << " coro_file open failed" - << ec.message() << "\n"; - co_return false; + co_return std::make_pair(ec, read_size); +#else + co_return co_await async_pread(offset, buf, size); +#endif } - - co_return true; } - bool seek(long offset, int whence) { - if (type_ != read_type::uring) { - return false; + async_simple::coro::Lazy> async_write_at( + uint64_t offset, std::string_view buf) { + if constexpr (execute_type == execution_type::thread_pool) { + co_return co_await async_pwrite(offset, buf.data(), buf.size()); } + else { +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + if (async_random_file_ == nullptr) { + co_return std::make_pair( + std::make_error_code(std::errc::invalid_argument), 0); + } + auto [ec, write_size] = co_await coro_io::async_write_at( + offset, *async_random_file_, asio::buffer(buf)); - std::error_code seek_ec; - reinterpret_cast(stream_file_.get()) - ->seek(offset, static_cast(whence), - seek_ec); - if (seek_ec) { - return false; + co_return std::make_pair(ec, write_size); +#else + co_return co_await async_pwrite(offset, buf.data(), buf.size()); +#endif } - return true; } - async_simple::coro::Lazy> async_read_at( - uint64_t offset, char *data, size_t size) { - if (type_ != read_type::uring_random) { - co_return std::make_pair( - std::make_error_code(std::errc::bad_file_descriptor), 0); - } +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + std::shared_ptr get_async_stream_file() { + return async_random_file_; + } +#endif - auto [ec, read_size] = co_await coro_io::async_read_at( - offset, - *reinterpret_cast(stream_file_.get()), - asio::buffer(data, size)); + std::shared_ptr get_pread_file() { return prw_random_file_; } - if (ec == asio::error::eof) { - eof_ = true; - co_return std::make_pair(std::error_code{}, read_size); + bool is_open() { +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + if (async_random_file_ && async_random_file_->is_open()) { + return true; } - - co_return std::make_pair(std::error_code{}, read_size); +#endif + return prw_random_file_ != nullptr; } - async_simple::coro::Lazy async_write_at(uint64_t offset, - const char *data, - size_t size) { - if (type_ != read_type::uring_random) { - co_return std::make_error_code(std::errc::bad_file_descriptor); + bool eof() { return eof_; } + + execution_type get_execution_type() { +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + if (async_random_file_ && async_random_file_->is_open()) { + return execution_type::native_async; + } +#endif + if (prw_random_file_ != nullptr) { + return execution_type::thread_pool; } - auto [ec, write_size] = co_await coro_io::async_write_at( - offset, - *reinterpret_cast(stream_file_.get()), - asio::buffer(data, size)); - co_return ec; + return execution_type::none; } - async_simple::coro::Lazy> async_read( - char *data, size_t size) { - if (type_ != read_type::uring) { - co_return std::make_pair( - std::make_error_code(std::errc::bad_file_descriptor), 0); - } - - auto [ec, read_size] = co_await coro_io::async_read( - *reinterpret_cast(stream_file_.get()), - asio::buffer(data, size)); - if (ec == asio::error::eof) { - eof_ = true; - co_return std::make_pair(std::error_code{}, read_size); + void close() { +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + std::error_code ec; + if (async_random_file_) { + async_random_file_->close(ec); } +#endif + prw_random_file_ = nullptr; + } - co_return std::make_pair(std::error_code{}, read_size); + size_t file_size(std::error_code ec) const noexcept { + return std::filesystem::file_size(file_path_, ec); } - async_simple::coro::Lazy async_write(const char *data, - size_t size) { - if (type_ != read_type::uring) { - co_return std::make_error_code(std::errc::bad_file_descriptor); - } + size_t file_size() const { return std::filesystem::file_size(file_path_); } - auto [ec, write_size] = co_await coro_io::async_write( - *reinterpret_cast(stream_file_.get()), - asio::buffer(data, size)); + std::string_view file_path() const { return file_path_; } - co_return ec; - } -#else - std::string str_mode(int open_mode) { - switch (open_mode) { - case flags::read_only: - return "rb"; - case flags::create_write: - case flags::write_only: - return "wb+"; - case flags::read_write: - return "rb+"; - case flags::append: - return "ab+"; - case flags::create_read_write_append: - return "ab+"; - case flags::truncate: - return "w+"; - default: - return "rb+"; + private: + bool open_fd(std::string_view filepath, int open_flags) { + if (prw_random_file_) { + return true; } - } - bool seek(long offset, int whence) { - if (stream_file_ == nullptr) { +#if defined(ASIO_WINDOWS) + int fd = _open(filepath.data(), adjust_flags(open_flags)); +#else + int fd = ::open(filepath.data(), open_flags); +#endif + if (fd < 0) { return false; } - return fseek(stream_file_.get(), offset, whence) == 0; + prw_random_file_ = std::shared_ptr(new int(fd), [](int *ptr) { +#if defined(ASIO_WINDOWS) + _close(*ptr); +#else + ::close(*ptr); +#endif + delete ptr; + }); + return true; } - async_simple::coro::Lazy async_open(std::string filepath, - int open_mode = flags::read_write, - read_type type = read_type::fread) { - file_path_ = std::move(filepath); - type_ = type; - if (type_ == read_type::pread) { - co_return open_fd(file_path_, open_mode); - } + async_simple::coro::Lazy> async_pread( + size_t offset, char *data, size_t size) { +#if defined(ASIO_WINDOWS) + auto pread = [](int fd, void *buf, uint64_t count, + uint64_t offset) -> int64_t { + DWORD bytes_read = 0; + OVERLAPPED overlapped; + memset(&overlapped, 0, sizeof(OVERLAPPED)); + overlapped.Offset = offset & 0xFFFFFFFF; + overlapped.OffsetHigh = (offset >> 32) & 0xFFFFFFFF; - if (stream_file_ != nullptr) { - co_return true; - } + BOOL ok = ReadFile(reinterpret_cast(_get_osfhandle(fd)), buf, + count, &bytes_read, &overlapped); + if (!ok && (errno = GetLastError()) != ERROR_HANDLE_EOF) { + return -1; + } - auto result = co_await coro_io::post( - [this, open_mode] { - auto fptr = - fopen(this->file_path_.data(), str_mode(open_mode).data()); - if (fptr == nullptr) { - std::cout << "line " << __LINE__ << " coro_file open failed " - << this->file_path_ << "\n"; - return false; - } - stream_file_ = std::shared_ptr(fptr, [](FILE *ptr) { - fclose(ptr); - }); - return true; - }, - &executor_wrapper_); - co_return result.value(); + return bytes_read; + }; +#endif + co_return co_await async_prw(pread, true, offset, data, size); } - async_simple::coro::Lazy> async_read( - char *data, size_t size) { - if (type_ != read_type::fread) { - co_return std::make_pair( - std::make_error_code(std::errc::bad_file_descriptor), 0); - } - auto result = co_await coro_io::post( - [this, data, size] { - auto fptr = stream_file_.get(); - size_t read_size = fread(data, sizeof(char), size, fptr); - if (ferror(fptr)) { - return std::pair( - std::make_error_code(std::errc::io_error), 0); - } - eof_ = feof(fptr); - return std::pair(std::error_code{}, - read_size); - }, - &executor_wrapper_); - - co_return result.value(); - } + async_simple::coro::Lazy> async_pwrite( + size_t offset, const char *data, size_t size) { +#if defined(ASIO_WINDOWS) + auto pwrite = [](int fd, const void *buf, uint64_t count, + uint64_t offset) -> int64_t { + DWORD bytes_write = 0; + OVERLAPPED overlapped; + memset(&overlapped, 0, sizeof(OVERLAPPED)); + overlapped.Offset = offset & 0xFFFFFFFF; + overlapped.OffsetHigh = (offset >> 32) & 0xFFFFFFFF; - async_simple::coro::Lazy async_write(const char *data, - size_t size) { - if (type_ != read_type::fread) { - co_return std::make_error_code(std::errc::bad_file_descriptor); - } - auto result = co_await coro_io::post( - [this, data, size] { - auto fptr = stream_file_.get(); - fwrite(data, sizeof(char), size, fptr); - if (ferror(fptr)) { - return std::make_error_code(std::errc::io_error); - } - return std::error_code{}; - }, - &executor_wrapper_); + BOOL ok = WriteFile(reinterpret_cast(_get_osfhandle(fd)), buf, + count, &bytes_write, &overlapped); + if (!ok) { + return -1; + } - co_return result.value(); - } + return bytes_write; + }; #endif + co_return co_await async_prw(pwrite, false, offset, (char *)data, size); + } - private: async_simple::coro::Lazy> async_prw( auto io_func, bool is_read, size_t offset, char *buf, size_t size) { std::function func = [=, this] { - int fd = *fd_file_; + int fd = *prw_random_file_; return io_func(fd, buf, size, offset); }; @@ -468,38 +612,14 @@ class coro_file { } else { ec = std::make_error_code(std::errc::io_error); + op_size = len; } co_return std::make_pair(ec, op_size); } - bool open_fd(std::string_view filepath, int open_mode = flags::read_write) { - if (fd_file_) { - return true; - } - -#if defined(ASIO_WINDOWS) - int fd = _open(filepath.data(), adjust_open_mode(open_mode)); -#else - int fd = open(filepath.data(), open_mode); -#endif - if (fd < 0) { - return false; - } - - fd_file_ = std::shared_ptr(new int(fd), [](int *ptr) { -#if defined(ASIO_WINDOWS) - _close(*ptr); -#else - ::close(*ptr); -#endif - delete ptr; - }); - return true; - } - #if defined(ASIO_WINDOWS) - static int adjust_open_mode(int open_mode) { + static int adjust_flags(int open_mode) { switch (open_mode) { case flags::read_only: return _O_RDONLY; @@ -531,16 +651,15 @@ class coro_file { return open_mode; } #endif - private: - read_type type_ = read_type::init; -#if defined(ENABLE_FILE_IO_URING) - std::shared_ptr> stream_file_; -#else - std::shared_ptr stream_file_; -#endif + coro_io::ExecutorWrapper<> executor_wrapper_; - std::shared_ptr fd_file_; +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + std::shared_ptr async_random_file_; // random file +#endif + std::shared_ptr prw_random_file_ = nullptr; // pread/pwrite random file std::string file_path_; - std::atomic eof_ = false; + bool eof_ = false; }; + +using random_coro_file = basic_random_coro_file<>; } // namespace coro_io diff --git a/lang/coroutine_based_http_lib.md b/lang/coroutine_based_http_lib.md index 2de668a8..12d328f3 100644 --- a/lang/coroutine_based_http_lib.md +++ b/lang/coroutine_based_http_lib.md @@ -444,7 +444,7 @@ coro_http_client client{}; create_file(filename, 1010); coro_io::coro_file file{}; - co_await file.async_open(filename, coro_io::flags::read_only); + file.open(filename, std::ios::in); std::string buf; detail::resize(buf, 100); @@ -550,7 +550,7 @@ async_simple::coro::Lazy byte_ranges_download() { } std::cout << filename << "\n"; - co_await file->async_open(filename, coro_io::flags::create_write); + file->open(filename, std::ios::trunc|std::ios::out); if (!file->is_open()) { resp.set_status_and_content(status_type::internal_server_error, "file open failed"); @@ -564,8 +564,7 @@ async_simple::coro::Lazy byte_ranges_download() { } if (!filename.empty()) { - auto ec = co_await file->async_write(part_body.data.data(), - part_body.data.size()); + auto ec = co_await file->async_write(part_body.data); if (ec) { co_return; } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index d9962e4d..cce3397c 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -44,6 +44,12 @@ if(ENABLE_FILE_IO_URING) add_definitions(-DASIO_HAS_IO_URING) endif() +if(WIN32) + message(STATUS "windows has file") + add_definitions(-DASIO_HAS_FILE) + add_definitions(-DASIO_HAS_IOCP) +endif() + option(USE_PREAD_WRITE "enable pread and pwrite" OFF) if(USE_PREAD_WRITE) message(STATUS "use pread and pwrite") diff --git a/tests/test_cinatra.cpp b/tests/test_cinatra.cpp index d169c89c..2abf545a 100644 --- a/tests/test_cinatra.cpp +++ b/tests/test_cinatra.cpp @@ -876,7 +876,7 @@ TEST_CASE("test upload file") { } std::cout << filename << "\n"; - co_await file->async_open(filename, coro_io::flags::create_write); + file->open(filename, std::ios::trunc | std::ios::out); if (!file->is_open()) { resp.set_status_and_content(status_type::internal_server_error, "file open failed"); @@ -890,8 +890,7 @@ TEST_CASE("test upload file") { } if (!filename.empty()) { - auto ec = co_await file->async_write(part_body.data.data(), - part_body.data.size()); + auto [ec, sz] = co_await file->async_write(part_body.data); if (ec) { co_return; } @@ -1086,7 +1085,7 @@ TEST_CASE("test coro_http_client multipart upload") { } std::cout << filename << "\n"; - co_await file->async_open(filename, coro_io::flags::create_write); + file->open(filename, std::ios::trunc | std::ios::out); if (!file->is_open()) { resp.set_status_and_content(status_type::internal_server_error, "file open failed"); @@ -1100,8 +1099,7 @@ TEST_CASE("test coro_http_client multipart upload") { } if (!filename.empty()) { - auto ec = co_await file->async_write(part_body.data.data(), - part_body.data.size()); + auto [ec, sz] = co_await file->async_write(part_body.data); if (ec) { co_return; } @@ -1185,9 +1183,8 @@ TEST_CASE("test coro_http_client upload") { client.add_header("filesize", std::to_string(r_size)); std::string uri = "http://127.0.0.1:8090/upload"; coro_io::coro_file file; - auto res = async_simple::coro::syncAwait( - file.async_open(filename, coro_io::flags::read_only)); - CHECK(res); + file.open(filename, std::ios::in); + CHECK(file.is_open()); std::string buf; buf.resize(1'000'000); auto async_read = diff --git a/tests/test_coro_http_server.cpp b/tests/test_coro_http_server.cpp index 62d4dd10..6c49fc78 100644 --- a/tests/test_coro_http_server.cpp +++ b/tests/test_coro_http_server.cpp @@ -696,7 +696,7 @@ async_simple::coro::Lazy chunked_upload1(coro_http_client &client) { create_file(filename, 1010); coro_io::coro_file file{}; - co_await file.async_open(filename, coro_io::flags::read_only); + file.open(filename, std::ios::in); std::string buf; detail::resize(buf, 100); diff --git a/tests/test_corofile.cpp b/tests/test_corofile.cpp index 02fda594..e2548a21 100644 --- a/tests/test_corofile.cpp +++ b/tests/test_corofile.cpp @@ -15,6 +15,7 @@ #include "doctest/doctest.h" namespace fs = std::filesystem; +using namespace coro_io; constexpr uint64_t KB = 1024; constexpr uint64_t MB = 1024 * KB; @@ -24,7 +25,8 @@ std::vector create_filled_vec(std::string fill_with, size_t size = block_size) { if (fill_with.empty() || size == 0) return std::vector{}; - std::vector ret(size); + std::vector ret; + ret.resize(size); size_t fill_with_size = fill_with.size(); int cnt = size / fill_with_size; int remain = size % fill_with_size; @@ -69,119 +71,171 @@ void create_files(const std::vector &files, size_t file_size) { } } -TEST_CASE("validate corofile") { - std::string filename = "validate.tmp"; - create_files({filename}, 190); - { - coro_io::coro_file file{}; - async_simple::coro::syncAwait(file.async_open( - filename.data(), coro_io::flags::read_only, coro_io::read_type::pread)); - CHECK(file.is_open()); +template +void test_random_read_write(std::string_view filename) { + create_files({std::string(filename)}, 190); + coro_io::basic_random_coro_file file(filename, std::ios::in); + CHECK(file.is_open()); +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + if (execute_type == coro_io::execution_type::native_async) { + CHECK(file.get_execution_type() == coro_io::execution_type::native_async); + } +#else + CHECK(file.get_execution_type() == coro_io::execution_type::thread_pool); +#endif - char buf[100]; - std::error_code ec; - size_t size; - std::tie(ec, size) = - async_simple::coro::syncAwait(file.async_read(buf, 10)); - CHECK(ec == std::make_error_code(std::errc::bad_file_descriptor)); - CHECK(size == 0); + char buf[100]; + auto pair = async_simple::coro::syncAwait(file.async_read_at(0, buf, 10)); + CHECK(std::string_view(buf, pair.second) == "AAAAAAAAAA"); + CHECK(!file.eof()); - auto write_ec = async_simple::coro::syncAwait(file.async_write(buf, 10)); - CHECK(write_ec == std::make_error_code(std::errc::bad_file_descriptor)); - } -#if defined(ENABLE_FILE_IO_URING) - { - coro_io::coro_file file{}; - async_simple::coro::syncAwait( - file.async_open(filename.data(), coro_io::flags::read_only, - coro_io::read_type::uring_random)); - CHECK(file.is_open()); + pair = async_simple::coro::syncAwait(file.async_read_at(10, buf, 100)); + CHECK(!file.eof()); + CHECK(pair.second == 100); - char buf[100]; - std::error_code ec; - size_t size; - std::tie(ec, size) = - async_simple::coro::syncAwait(file.async_read(buf, 10)); - CHECK(ec == std::make_error_code(std::errc::bad_file_descriptor)); - CHECK(size == 0); + pair = async_simple::coro::syncAwait(file.async_read_at(110, buf, 100)); + CHECK(pair.second == 80); + + // only read size equal 0 is eof. + pair = async_simple::coro::syncAwait(file.async_read_at(200, buf, 100)); + CHECK(file.eof()); + CHECK(pair.second == 0); - ec = async_simple::coro::syncAwait(file.async_write(buf, 10)); - CHECK(ec == std::make_error_code(std::errc::bad_file_descriptor)); + coro_io::basic_random_coro_file file1; + file1.open(filename, std::ios::out); + CHECK(file1.is_open()); + std::string buf1 = "cccccccccc"; + async_simple::coro::syncAwait(file1.async_write_at(0, buf1)); + + std::string buf2 = "dddddddddd"; + async_simple::coro::syncAwait(file1.async_write_at(10, buf2)); +} + +template +void test_seq_read_write(std::string_view filename) { + create_files({std::string(filename)}, 190); + coro_io::basic_seq_coro_file file(filename, + std::ios::in | std::ios::out); + CHECK(file.is_open()); +#if defined(ENABLE_FILE_IO_URING) || defined(ASIO_WINDOWS) + if (execute_type == coro_io::execution_type::native_async) { + CHECK(file.get_execution_type() == coro_io::execution_type::native_async); } +#else + CHECK(file.get_execution_type() == coro_io::execution_type::thread_pool); +#endif + char buf[100]; + std::error_code ec; + size_t size; + std::tie(ec, size) = async_simple::coro::syncAwait(file.async_read(buf, 10)); + CHECK(size == 10); + + std::string str = "test"; + std::tie(ec, size) = async_simple::coro::syncAwait(file.async_write(str)); + CHECK(size == 4); +} +TEST_CASE("test seq and random") { + std::string filename = "validate.tmp"; { - coro_io::coro_file file{}; - async_simple::coro::syncAwait(file.async_open( - filename.data(), coro_io::flags::read_only, coro_io::read_type::uring)); - CHECK(file.is_open()); + test_random_read_write(filename); + test_random_read_write(filename); + } + { + test_seq_read_write(filename); + test_seq_read_write(filename); + } +} - char buf[100]; - std::error_code ec; - size_t size; - std::tie(ec, size) = - async_simple::coro::syncAwait(file.async_read_at(0, buf, 10)); - CHECK(ec == std::make_error_code(std::errc::bad_file_descriptor)); - CHECK(size == 0); +async_simple::coro::Lazy read_seek(std::string filename) { + coro_io::coro_file file{}; + file.open(filename, std::ios::in); + CHECK(file.is_open()); + std::string str; + str.resize(200); - ec = async_simple::coro::syncAwait(file.async_write_at(0, buf, 10)); - CHECK(ec == std::make_error_code(std::errc::bad_file_descriptor)); + { + auto pair = co_await file.async_read(str.data(), 10); + CHECK(pair.second == 10); + CHECK(!file.eof()); } -#else { - coro_io::coro_file file{}; - async_simple::coro::syncAwait(file.async_open( - filename.data(), coro_io::flags::read_only, coro_io::read_type::fread)); - CHECK(file.is_open()); + bool ok = file.seek(10, std::ios::beg); + CHECK(ok); + } + { + auto pair = co_await file.async_read(str.data(), str.size()); + CHECK(pair.second == 5); + CHECK(file.eof()); + } - char buf[100]; - std::error_code ec; - size_t size; - std::tie(ec, size) = - async_simple::coro::syncAwait(file.async_pread(0, buf, 10)); - CHECK(ec == std::make_error_code(std::errc::bad_file_descriptor)); - CHECK(size == 0); + // bool ok = file.seek(100, std::ios::beg); + // CHECK(!ok); +} - auto write_ec = - async_simple::coro::syncAwait(file.async_pwrite(0, buf, 10)); - CHECK(write_ec == std::make_error_code(std::errc::bad_file_descriptor)); +async_simple::coro::Lazy write_seek(std::string filename) { + coro_io::coro_file file{}; + file.open(filename, std::ios::in | std::ios::out | std::ios::trunc); + CHECK(file.is_open()); + std::string str = "hello"; + + { + co_await file.async_write(str); + std::string result; + result.resize(10); + CHECK(file.seek(0, std::ios::beg)); + auto [rd_ec, size] = co_await file.async_read(result.data(), 5); + std::string_view s(result.data(), size); + CHECK(s == "hello"); } -#endif + { + bool ok = file.seek(10, std::ios::beg); + CHECK(ok); + co_await file.async_write(str); + CHECK(file.seek(10, std::ios::beg)); + std::string result; + result.resize(10); + auto [rd_ec, size] = co_await file.async_read(result.data(), 5); + std::string_view s(result.data(), size); + CHECK(s == "hello"); + } +} + +TEST_CASE("coro_file seek read and write") { + async_simple::coro::syncAwait(write_seek("seek_file.txt")); + async_simple::coro::syncAwait(read_seek("seek_file.txt")); } TEST_CASE("coro_file pread and pwrite basic test") { std::string filename = "test.tmp"; create_files({filename}, 190); { - coro_io::coro_file file{}; - async_simple::coro::syncAwait(file.async_open( - filename.data(), coro_io::flags::read_only, coro_io::read_type::pread)); + basic_random_coro_file file(filename, + std::ios::in); CHECK(file.is_open()); char buf[100]; - auto pair = async_simple::coro::syncAwait(file.async_pread(0, buf, 10)); + auto pair = async_simple::coro::syncAwait(file.async_read_at(0, buf, 10)); CHECK(std::string_view(buf, pair.second) == "AAAAAAAAAA"); CHECK(!file.eof()); - pair = async_simple::coro::syncAwait(file.async_pread(10, buf, 100)); + pair = async_simple::coro::syncAwait(file.async_read_at(10, buf, 100)); CHECK(!file.eof()); CHECK(pair.second == 100); - pair = async_simple::coro::syncAwait(file.async_pread(110, buf, 100)); + pair = async_simple::coro::syncAwait(file.async_read_at(110, buf, 100)); CHECK(!file.eof()); CHECK(pair.second == 80); // only read size equal 0 is eof. - pair = async_simple::coro::syncAwait(file.async_pread(200, buf, 100)); + pair = async_simple::coro::syncAwait(file.async_read_at(200, buf, 100)); CHECK(file.eof()); CHECK(pair.second == 0); } #if defined(ENABLE_FILE_IO_URING) { - coro_io::coro_file file{}; - async_simple::coro::syncAwait( - file.async_open(filename.data(), coro_io::flags::read_only, - coro_io::read_type::uring_random)); + random_coro_file file(filename, std::ios::in); CHECK(file.is_open()); char buf[100]; @@ -203,21 +257,14 @@ TEST_CASE("coro_file pread and pwrite basic test") { } { - coro_io::coro_file file{}; - async_simple::coro::syncAwait( - file.async_open(filename.data(), coro_io::flags::read_write, - coro_io::read_type::uring_random)); + random_coro_file file(filename, std::ios::in | std::ios::out); CHECK(file.is_open()); std::string buf = "cccccccccc"; - auto ec = async_simple::coro::syncAwait( - file.async_write_at(0, buf.data(), buf.size())); - CHECK(!ec); + async_simple::coro::syncAwait(file.async_write_at(0, buf)); std::string buf1 = "dddddddddd"; - ec = async_simple::coro::syncAwait( - file.async_write_at(10, buf1.data(), buf1.size())); - CHECK(!ec); + async_simple::coro::syncAwait(file.async_write_at(10, buf1)); char buf2[100]; auto pair = async_simple::coro::syncAwait(file.async_read_at(0, buf2, 10)); @@ -231,81 +278,29 @@ TEST_CASE("coro_file pread and pwrite basic test") { #endif { - coro_io::coro_file file{}; - async_simple::coro::syncAwait(file.async_open(filename.data(), - coro_io::flags::read_write, - coro_io::read_type::pread)); + basic_random_coro_file file( + filename, std::ios::in | std::ios::out); CHECK(file.is_open()); std::string buf = "cccccccccc"; - auto ec = async_simple::coro::syncAwait( - file.async_pwrite(0, buf.data(), buf.size())); - CHECK(!ec); + auto pair = async_simple::coro::syncAwait(file.async_write_at(0, buf)); + CHECK(!pair.first); std::string buf1 = "dddddddddd"; - ec = async_simple::coro::syncAwait( - file.async_pwrite(10, buf1.data(), buf1.size())); - CHECK(!ec); + pair = async_simple::coro::syncAwait(file.async_write_at(10, buf1)); + CHECK(!pair.first); char buf2[100]; - auto pair = async_simple::coro::syncAwait(file.async_pread(0, buf2, 10)); + pair = async_simple::coro::syncAwait(file.async_read_at(0, buf2, 10)); CHECK(!file.eof()); CHECK(std::string_view(buf2, pair.second) == "cccccccccc"); - pair = async_simple::coro::syncAwait(file.async_pread(10, buf2, 10)); + pair = async_simple::coro::syncAwait(file.async_read_at(10, buf2, 10)); CHECK(!file.eof()); CHECK(std::string_view(buf2, pair.second) == "dddddddddd"); } } -async_simple::coro::Lazy test_basic_read(std::string filename) { - coro_io::coro_file file{}; - co_await file.async_open(filename.data(), coro_io::flags::read_only); - std::string str; - str.resize(200); - - { - auto [ec, size] = co_await file.async_read(str.data(), 10); - std::cout << size << ", " << file.eof() << "\n"; - } - { - bool ok = file.seek(10, SEEK_CUR); - std::cout << ok << "\n"; - } - { - auto [ec, size] = co_await file.async_read(str.data(), str.size()); - std::cout << size << ", " << file.eof() << "\n"; - } -} - -async_simple::coro::Lazy test_basic_write(std::string filename) { - coro_io::coro_file file{}; - co_await file.async_open(filename.data(), coro_io::flags::read_write); - std::string str = "hello"; - - { - auto ec = co_await file.async_write(str.data(), str.size()); - std::string result; - result.resize(10); - file.seek(0, SEEK_SET); - auto [rd_ec, size] = co_await file.async_read(result.data(), 5); - std::string_view s(result.data(), size); - CHECK(s == "hello"); - } - { - bool ok = file.seek(10, SEEK_SET); - auto ec = co_await file.async_write(str.data(), str.size()); - file.seek(10, SEEK_SET); - std::string result; - result.resize(10); - auto [rd_ec, size] = co_await file.async_read(result.data(), 5); - std::string_view s(result.data(), size); - CHECK(s == "hello"); - - std::cout << ec << "\n"; - } -} - TEST_CASE("multithread for balance") { size_t total = 100; std::vector filenames; @@ -326,15 +321,12 @@ TEST_CASE("multithread for balance") { int index) mutable -> async_simple::coro::Lazy { coro_io::coro_file file(coro_io::get_global_block_executor< coro_io::multithread_context_pool>()); - async_simple::coro::syncAwait( - file.async_open(filename, coro_io::flags::create_write)); + file.open(filename, std::ios::out | std::ios::trunc); CHECK(file.is_open()); size_t id = index % write_str_vec.size(); auto &str = write_str_vec[id]; - auto ec = co_await file.async_write(str.data(), str.size()); - CHECK(!ec); - co_return; + co_await file.async_write(str); }; for (size_t i = 0; i < total; ++i) { @@ -357,8 +349,7 @@ TEST_CASE("multithread for balance") { int index) mutable -> async_simple::coro::Lazy { coro_io::coro_file file(coro_io::get_global_block_executor< coro_io::multithread_context_pool>()); - async_simple::coro::syncAwait( - file.async_open(filename, coro_io::flags::read_only)); + file.open(filename, std::ios::in); CHECK(file.is_open()); size_t id = index % write_str_vec.size(); @@ -417,15 +408,12 @@ TEST_CASE("read write 100 small files") { std::string filename, int index) mutable -> async_simple::coro::Lazy { coro_io::coro_file file(pool.get_executor()); - async_simple::coro::syncAwait( - file.async_open(filename, coro_io::flags::create_write)); + file.open(filename, std::ios::trunc | std::ios::out); CHECK(file.is_open()); size_t id = index % write_str_vec.size(); auto &str = write_str_vec[id]; - auto ec = co_await file.async_write(str.data(), str.size()); - CHECK(!ec); - co_return; + co_await file.async_write(str); }; for (size_t i = 0; i < total; ++i) { @@ -448,8 +436,7 @@ TEST_CASE("read write 100 small files") { std::string filename, int index) mutable -> async_simple::coro::Lazy { coro_io::coro_file file(pool.get_executor()); - async_simple::coro::syncAwait( - file.async_open(filename, coro_io::flags::read_only)); + file.open(filename, std::ios::in); CHECK(file.is_open()); size_t id = index % write_str_vec.size(); @@ -497,8 +484,7 @@ TEST_CASE("small_file_read_test") { }); coro_io::coro_file file(ioc.get_executor()); - async_simple::coro::syncAwait( - file.async_open(filename, coro_io::flags::read_only)); + file.open(filename, std::ios::binary | std::ios::in); CHECK(file.is_open()); char buf[block_size]{}; @@ -534,24 +520,28 @@ TEST_CASE("large_file_read_test") { }); coro_io::coro_file file(ioc.get_executor()); - async_simple::coro::syncAwait( - file.async_open(filename, coro_io::flags::read_only)); + file.open(filename, std::ios::in); CHECK(file.is_open()); - char buf[block_size]{}; size_t total_size = 0; std::error_code ec; size_t read_size; + while (!file.eof()) { + char buf[block_size]{}; std::tie(ec, read_size) = async_simple::coro::syncAwait(file.async_read(buf, block_size)); if (ec) { std::cout << ec.message() << "\n"; break; } + total_size += read_size; - CHECK(std::string_view(block_vec.data(), read_size) == - std::string_view(buf, read_size)); + CHECK(read_size <= block_size); + auto s1 = std::string_view(block_vec.data(), read_size); + auto s2 = std::string_view(buf, read_size); + + CHECK(s1 == s2); } CHECK(total_size == file_size); work.reset(); @@ -571,8 +561,7 @@ TEST_CASE("empty_file_read_test") { }); coro_io::coro_file file(ioc.get_executor()); - async_simple::coro::syncAwait( - file.async_open(filename, coro_io::flags::read_only)); + file.open(filename, std::ios::in); CHECK(file.is_open()); char buf[block_size]{}; @@ -604,8 +593,7 @@ TEST_CASE("small_file_read_with_pool_test") { }); coro_io::coro_file file(pool.get_executor()); - async_simple::coro::syncAwait( - file.async_open(filename, coro_io::flags::read_only)); + file.open(filename, std::ios::in); CHECK(file.is_open()); char buf[block_size]{}; @@ -640,8 +628,7 @@ TEST_CASE("large_file_read_with_pool_test") { }); coro_io::coro_file file(pool.get_executor()); - async_simple::coro::syncAwait( - file.async_open(filename, coro_io::flags::read_only)); + file.open(filename, std::ios::in); CHECK(file.is_open()); char buf[block_size]{}; @@ -673,23 +660,19 @@ TEST_CASE("small_file_write_test") { ioc.run(); }); + std::string file_content_0 = "small_file_write_test_0"; + coro_io::coro_file file(ioc.get_executor()); - async_simple::coro::syncAwait( - file.async_open(filename, coro_io::flags::create_write)); + file.open(filename, std::ios::trunc | std::ios::out); CHECK(file.is_open()); + async_simple::coro::syncAwait(file.async_write(file_content_0)); - char buf[512]{}; - - std::string file_content_0 = "small_file_write_test_0"; - - auto ec = async_simple::coro::syncAwait( - file.async_write(file_content_0.data(), file_content_0.size())); - if (ec) { - std::cout << ec.message() << "\n"; + auto &stream = file.get_stream_file(); + if (stream) { + stream.flush(); } - file.flush(); - + char buf[512]{}; std::ifstream is(filename, std::ios::binary); if (!is.is_open()) { std::cout << "Failed to open file: " << filename << "\n"; @@ -707,12 +690,13 @@ TEST_CASE("small_file_write_test") { std::string file_content_1 = "small_file_write_test_1"; - ec = async_simple::coro::syncAwait( - file.async_write(file_content_1.data(), file_content_1.size())); - if (ec) { - std::cout << ec.message() << "\n"; + async_simple::coro::syncAwait(file.async_write(file_content_1)); + + auto &stream1 = file.get_stream_file(); + if (stream1) { + stream1.flush(); } - file.flush(); + is.open(filename, std::ios::binary); if (!is.is_open()) { std::cout << "Failed to open file: " << filename << "\n"; @@ -743,29 +727,24 @@ TEST_CASE("large_file_write_test") { }); coro_io::coro_file file(ioc.get_executor()); - async_simple::coro::syncAwait( - file.async_open(filename, coro_io::flags::create_write)); + file.open(filename, std::ios::trunc | std::ios::out); CHECK(file.is_open()); auto block_vec = create_filled_vec("large_file_write_test"); int cnt = file_size / block_size; int remain = file_size % block_size; while (cnt--) { - auto ec = async_simple::coro::syncAwait( - file.async_write(block_vec.data(), block_size)); - if (ec) { - std::cout << ec.message() << "\n"; - break; - } + async_simple::coro::syncAwait( + file.async_write({block_vec.data(), block_size})); } if (remain > 0) { - auto ec = async_simple::coro::syncAwait( - file.async_write(block_vec.data(), remain)); - if (ec) { - std::cout << ec.message() << "\n"; - } + async_simple::coro::syncAwait( + file.async_write({block_vec.data(), (size_t)remain})); + } + auto &stream = file.get_stream_file(); + if (stream) { + stream.flush(); } - file.flush(); CHECK(fs::file_size(filename) == file_size); std::ifstream is(filename, std::ios::binary); if (!is.is_open()) { @@ -798,20 +777,15 @@ TEST_CASE("empty_file_write_test") { }); coro_io::coro_file file(ioc.get_executor()); - async_simple::coro::syncAwait( - file.async_open(filename, coro_io::flags::create_write)); + file.open(filename, std::ios::trunc | std::ios::out); CHECK(file.is_open()); char buf[512]{}; std::string file_content_0 = "small_file_write_test_0"; - auto ec = - async_simple::coro::syncAwait(file.async_write(file_content_0.data(), 0)); - if (ec) { - std::cout << ec.message() << "\n"; - } - file.flush(); + async_simple::coro::syncAwait(file.async_write({file_content_0.data(), 0})); + std::ifstream is(filename, std::ios::binary); if (!is.is_open()) { std::cout << "Failed to open file: " << filename << "\n"; @@ -834,20 +808,21 @@ TEST_CASE("small_file_write_with_pool_test") { }); coro_io::coro_file file(pool.get_executor()); - async_simple::coro::syncAwait( - file.async_open(filename, coro_io::flags::create_write)); + file.open(filename, std::ios::trunc | std::ios::out); CHECK(file.is_open()); char buf[512]{}; std::string file_content_0 = "small_file_write_with_pool_test_0"; - auto ec = async_simple::coro::syncAwait( - file.async_write(file_content_0.data(), file_content_0.size())); - if (ec) { - std::cout << ec.message() << "\n"; + async_simple::coro::syncAwait(file.async_write(file_content_0)); + + { + auto &stream1 = file.get_stream_file(); + if (stream1) { + stream1.flush(); + } } - file.flush(); std::ifstream is(filename, std::ios::binary); if (!is.is_open()) { @@ -866,12 +841,15 @@ TEST_CASE("small_file_write_with_pool_test") { std::string file_content_1 = "small_file_write_with_pool_test_1"; - ec = async_simple::coro::syncAwait( - file.async_write(file_content_1.data(), file_content_1.size())); - if (ec) { - std::cout << ec.message() << "\n"; + async_simple::coro::syncAwait(file.async_write(file_content_1)); + + { + auto &stream1 = file.get_stream_file(); + if (stream1) { + stream1.flush(); + } } - file.flush(); + is.open(filename, std::ios::binary); if (!is.is_open()) { std::cout << "Failed to open file: " << filename << "\n"; @@ -901,29 +879,26 @@ TEST_CASE("large_file_write_with_pool_test") { }); coro_io::coro_file file(pool.get_executor()); - async_simple::coro::syncAwait( - file.async_open(filename, coro_io::flags::create_write)); + file.open(filename, std::ios::trunc | std::ios::out); CHECK(file.is_open()); auto block_vec = create_filled_vec("large_file_write_with_pool_test"); int cnt = file_size / block_size; int remain = file_size % block_size; while (cnt--) { - auto ec = async_simple::coro::syncAwait( - file.async_write(block_vec.data(), block_size)); - if (ec) { - std::cout << ec.message() << "\n"; - break; - } + async_simple::coro::syncAwait( + file.async_write({block_vec.data(), block_size})); } if (remain > 0) { - auto ec = async_simple::coro::syncAwait( - file.async_write(block_vec.data(), remain)); - if (ec) { - std::cout << ec.message() << "\n"; - } + async_simple::coro::syncAwait( + file.async_write({block_vec.data(), (size_t)remain})); } - file.flush(); + + auto &stream = file.get_stream_file(); + if (stream) { + stream.flush(); + } + size_t sz = fs::file_size(filename); CHECK(sz == file_size); std::ifstream is(filename, std::ios::binary);