diff --git a/Data/samples/DBLogger/CMakeLists.txt b/Data/samples/DBLogger/CMakeLists.txt index 6c6f6fe7af..aaae453d27 100644 --- a/Data/samples/DBLogger/CMakeLists.txt +++ b/Data/samples/DBLogger/CMakeLists.txt @@ -1,2 +1,12 @@ -add_executable(DBLogger src/DBLogger.cpp) + +# Sources +file(GLOB SRCS_G "src/*.cpp") +POCO_SOURCES_AUTO(DBLOGGER_SRCS ${SRCS_G}) + +# Headers +file(GLOB_RECURSE HDRS_G "src/*.h") +POCO_HEADERS_AUTO(DBLOGGER_SRCS ${HDRS_G}) + +add_executable(DBLogger ${DBLOGGER_SRCS}) + target_link_libraries(DBLogger PUBLIC Poco::Util Poco::DataSQLite) diff --git a/Data/samples/DBLogger/Makefile b/Data/samples/DBLogger/Makefile index 9ca550da9d..385d386797 100644 --- a/Data/samples/DBLogger/Makefile +++ b/Data/samples/DBLogger/Makefile @@ -12,7 +12,7 @@ ifndef POCO_DATA_NO_SQL_PARSER target_includes = $(POCO_BASE)/Data/src endif -target = DBLogger +target = DBLogger SQLLogInserter target_version = 1 target_libs = PocoDataSQLite PocoData PocoFoundation PocoUtil diff --git a/Data/samples/DBLogger/src/DBLogger.cpp b/Data/samples/DBLogger/src/DBLogger.cpp index b5aee7393e..56303b94d9 100644 --- a/Data/samples/DBLogger/src/DBLogger.cpp +++ b/Data/samples/DBLogger/src/DBLogger.cpp @@ -18,13 +18,7 @@ #include "Poco/Util/HelpFormatter.h" #include "Poco/Util/ServerApplication.h" -#include -#include -#include -#include -#include -#include -#include +#include "SQLLogInserter.h" using namespace Poco::Data::Keywords; using Poco::Util::ServerApplication; @@ -64,34 +58,28 @@ class DBLogger: public ServerApplication Poco::Data::SQLite::Connector::registerConnector(); - logger().information("Database connector: %s, cs: %s", _connector, _connectionString); - logger().information("Directory: %s", _directory); - logger().information("Number of workers: %z", _numWorkers); + // TODO: Only delete and create table when creating demo messages? + logger().information("Database connector: %s, cs: %s", + _inserter.connector(), _inserter.connectionString()); + logger().information("Directory: %s", _inserter.directory()); + logger().information("Number of workers: %z", _inserter.numWorkers()); - _dataSession = std::make_shared(_connector, _connectionString); + Poco::Data::Session session (_inserter.connector(), _inserter.connectionString()); - (*_dataSession) << ("DROP TABLE IF EXISTS "s + _tableName), now; + session << ("DROP TABLE IF EXISTS "s + _tableName), now; const auto create { "CREATE TABLE "s + _tableName + " (Source VARCHAR, Name VARCHAR, ProcessId INTEGER, Thread VARCHAR,"s + " ThreadId INTEGER, Priority INTEGER, Text VARCHAR, DateTime DATE)"s }; - (*_dataSession) << create, now; + session << create, now; + _active = true; _startTime.update(); - _workSet.reserve(MAX_WORKSET_SIZE * 2); - - // TODO: Create worker threads - for (std::size_t i = 0; i < _numWorkers; i++) - { - _workers.emplace_back(&DBLogger::processFiles, this); - } - - // Thread to scan the directory - _dirScanThread = std::move(std::thread(&DBLogger::runDirectoryScan, this)); + _inserter.start(); logger().information("Started directory scanning."); @@ -99,7 +87,7 @@ class DBLogger: public ServerApplication { // SQL channel to generate SQL files _sqlChannel = new Poco::Data::SQLChannel(); - _sqlChannel->setProperty(Poco::Data::SQLChannel::PROP_DIRECTORY, _directory); + _sqlChannel->setProperty(Poco::Data::SQLChannel::PROP_DIRECTORY, _inserter.directory()); _sqlChannel->setProperty(Poco::Data::SQLChannel::PROP_TABLE, _tableName); _sqlSourceThread = std::move(std::thread(&DBLogger::createMessages, this)); @@ -117,25 +105,11 @@ class DBLogger: public ServerApplication { _sqlSourceThread.join(); } - _dirScanThread.join(); - - while (!_workSet.empty()) - { - logger().information("Waiting for workers to stop. Work %z", _workSet.size()); - Poco::Thread::sleep(200); - } - - // stop all worker threads - for (auto& w: _workers) - { - if (w.joinable()) { - w.join(); - } - } + _inserter.stop(); logger().information( "Created %z messages, processed %z messages in %Ld ms.", - _created, _processed, (_startTime.elapsed() / 1000) + _created, _inserter.totalProcessed(), (_startTime.elapsed() / 1000) ); Application::uninitialize(); @@ -214,27 +188,30 @@ class DBLogger: public ServerApplication { if (value.empty()) throw Poco::Util::IncompatibleOptionsException("Expected directory name"); - _directory = value; + + _inserter.setDirectory(value); } void handleDbConnector(const std::string& name, const std::string& value) { if (value.empty()) throw Poco::Util::IncompatibleOptionsException("Expected database connector name"); - _connector = value; + + _inserter.setConnector(value); } void handleDbConnectionString(const std::string& name, const std::string& value) { if (value.empty()) throw Poco::Util::IncompatibleOptionsException("Expected database connection string"); - _connectionString = value; + + _inserter.setConnectionString(value); } void handleNumberOfWorkers(const std::string& name, const std::string& value) { const auto num = Poco::NumberParser::parseUnsigned(value); - this->_numWorkers = std::min(1U, num); + _inserter.setNumWorkers( std::min(1U, num) ); } void displayHelp() @@ -245,149 +222,6 @@ class DBLogger: public ServerApplication helpFormatter.format(std::cout); } - std::size_t insertEntries(std::vector& entries) - { - std::unique_lock l(_workMutex); - - while (_workSet.size() > MAX_WORKSET_SIZE && _active) - { - // Prevent creating too large work set - _underflowCondition.wait_for(l, std::chrono::milliseconds(200)); - } - - const auto wss = _workSet.size(); - // Do not re-insert entries that are being processed. - entries.erase( - std::remove_if( - entries.begin(), - entries.end(), - [this](const std::string& e) { - return this->_processingSet.find(e) != this->_processingSet.end(); - } - ), - entries.end() - ); - - logger().information("Enqueued new entries: %z", entries.size()); - _workSet.insert(entries.begin(), entries.end()); - _workCondition.notify_all(); - return _workSet.size() - wss; - } - - std::string popEntry() - { - std::unique_lock l(_workMutex); - while (_workSet.empty() && _active) - { - _workCondition.wait_for(l, std::chrono::milliseconds(200)); - } - if (_workSet.empty()) - { - // Exited loop because of !_active - return {}; - } - auto entry = (*_workSet.begin()); - _processingSet.insert(entry); - _workSet.erase(_workSet.begin()); - if (_workSet.size() < MAX_WORKSET_SIZE) - { - _underflowCondition.notify_all(); - } - return entry; - } - - void removeEntry(std::string entry) - { - std::unique_lock l(_workMutex); - auto i = _processingSet.find(entry); - if (i != _processingSet.end()) - { - _processingSet.erase(i); - } - } - - void processFile(std::string& entry) - { - if (entry.empty()) - { - return; - } - if (!std::filesystem::exists(entry)) - { - // Directory iterator can still pick up files that were already processed - removeEntry(entry); - return; - } - bool success {false}; - try - { - std::ifstream is(entry); - std::stringstream buffer; - buffer << is.rdbuf(); - const auto& sql { buffer.str() }; - - if (!sql.empty()) - { - auto& s = (*_dataSession); - - s << sql, now; - - ++_processed; - success = true; - } - } - catch (const Poco::Exception& e) - { - logger().warning("Failed to insert to database %s: %s", entry, e.displayText()); - } - if (success) - { - std::filesystem::remove(entry); - } - else - { - std::filesystem::path newPath {entry}; - newPath.replace_extension("err"s); - std::filesystem::rename(entry, newPath); - } - removeEntry(entry); - } - - std::size_t scanDirectory() - { - std::vector newEntries; - newEntries.reserve(1000); - std::filesystem::directory_iterator diriter(_directory, std::filesystem::directory_options::skip_permission_denied); - for (auto& entry: diriter) - { - if (!_active) - { - return 0; - } - if (_dataSession == nullptr || !_dataSession->isGood()) - { - // Do not process files if database session is not healthy. - // Files will be processed later. - return 0; - } - - if (!std::filesystem::exists(entry)) - { - continue; - } - if (!entry.is_regular_file()) - { - continue; - } - if (entry.path().extension() != ".sql"s) - { - continue; - } - newEntries.push_back(entry.path()); - } - return insertEntries(newEntries); - } - void createMessages() { int i {0}; @@ -404,34 +238,11 @@ class DBLogger: public ServerApplication } } - void runDirectoryScan() - { - while (_active) - { - const auto scheduled = scanDirectory(); - if (scheduled == 0) - { - // No new files to be scheduled. Wait a bit. - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - } - } - } - - void processFiles() - { - while (!_workSet.empty() || _active) - { - auto entry = popEntry(); - processFile(entry); - } - } - int main(const std::vector& args) override { if (!_helpRequested) { - logger().information("Press any key to stop scanning."); - std::cin.get(); + waitForTerminationRequest(); _active = false; } @@ -439,37 +250,18 @@ class DBLogger: public ServerApplication } private: - static constexpr std::size_t MAX_WORKSET_SIZE {1000}; bool _helpRequested {false}; bool _demoMessagesRequested {false}; - std::string _directory; - std::string _connector; - std::string _connectionString; - std::size_t _numWorkers {2}; - - const std::string _tableName{"T_POCO_LOG"}; + SQLLogInserter _inserter; + Poco::Timestamp _startTime; bool _active {false}; std::size_t _created{0}; - std::size_t _processed{0}; - - Poco::Timestamp _startTime; - std::thread _sqlSourceThread; - std::thread _dirScanThread; - - WorkSet _workSet; - WorkSet _processingSet; - std::mutex _workMutex; - std::condition_variable _workCondition; - std::condition_variable _underflowCondition; - - std::vector _workers; - + const std::string _tableName{"T_POCO_LOG"}; Poco::AutoPtr _sqlChannel; - std::shared_ptr _dataSession; }; diff --git a/Data/samples/DBLogger/src/SQLLogInserter.cpp b/Data/samples/DBLogger/src/SQLLogInserter.cpp new file mode 100644 index 0000000000..26c2b4227b --- /dev/null +++ b/Data/samples/DBLogger/src/SQLLogInserter.cpp @@ -0,0 +1,246 @@ +// +// SQLLogInserter.cpp +// +// This sample demonstrates the Data library recordset row formatting +// and streaming capabilities. +// +// Copyright (c) 2024, Applied Informatics Software Engineering GmbH. +// and Contributors. +// +// SPDX-License-Identifier: BSL-1.0 + +#include "SQLLogInserter.h" + +#include "Poco/Util/Application.h" + +#include +#include +#include + +using namespace Poco::Data::Keywords; + +using namespace std::string_literals; + +void SQLLogInserter::start() +{ + _dataSession = std::make_shared(_connector, _connectionString); + + _active = true; + + _workSet.reserve(MAX_WORKSET_SIZE * 2); + + for (std::size_t i = 0; i < _numWorkers; i++) + { + _workers.emplace_back(&SQLLogInserter::runProcessingEntries, this); + } + + // Thread to scan the directory + _dirScanThread = std::move(std::thread(&SQLLogInserter::runDirectoryScan, this)); +} + +void SQLLogInserter::stop() +{ + _active = false; + _workCondition.notify_all(); + _underflowCondition.notify_all(); + + { + std::unique_lock l(_workMutex); + while (!_workSet.empty() || !_processingSet.empty()) + { + _completedCondition.wait_for(l, std::chrono::milliseconds(5000)); + } + } + + if (_dirScanThread.joinable()) + { + _dirScanThread.join(); + } + + // stop all worker threads + for (auto& w: _workers) + { + if (w.joinable()) { + w.join(); + } + } +} + + +std::size_t SQLLogInserter::insertEntries(std::vector& entries) +{ + std::unique_lock l(_workMutex); + + while (_workSet.size() > MAX_WORKSET_SIZE && _active) + { + // Prevent creating too large work set + _underflowCondition.wait_for(l, std::chrono::milliseconds(200)); + } + if (!_active) + { + return 0; + } + + const auto wss = _workSet.size(); + // Do not re-insert entries that are being processed. + entries.erase( + std::remove_if( + entries.begin(), + entries.end(), + [this](const std::string& e) { + return this->_processingSet.find(e) != this->_processingSet.end(); + } + ), + entries.end() + ); + + _workSet.insert(entries.begin(), entries.end()); + _workCondition.notify_all(); + return _workSet.size() - wss; +} + + +std::string SQLLogInserter::popEntry() +{ + std::unique_lock l(_workMutex); + while (_workSet.empty() && _active) + { + _workCondition.wait_for(l, std::chrono::milliseconds(200)); + } + if (_workSet.empty()) + { + // Exited loop because of !_active + return {}; + } + auto entry = (*_workSet.begin()); + _processingSet.insert(entry); + _workSet.erase(_workSet.begin()); + if (_workSet.size() < MAX_WORKSET_SIZE) + { + _underflowCondition.notify_all(); + } + return entry; +} + + +void SQLLogInserter::removeEntry(std::string entry) +{ + std::unique_lock l(_workMutex); + auto i = _processingSet.find(entry); + if (i != _processingSet.end()) + { + _processingSet.erase(i); + } +} + + +void SQLLogInserter::processFile(std::string& entry) +{ + if (entry.empty()) + { + return; + } + if (!std::filesystem::exists(entry)) + { + // Directory iterator can pick up files that were already processed + // Ignore such entries. + removeEntry(entry); + return; + } + bool success {false}; + try + { + std::ifstream is(entry); + std::stringstream buffer; + buffer << is.rdbuf(); + const auto& sql { buffer.str() }; + + if (!sql.empty()) + { + auto& s = (*_dataSession); + + s << sql, now; + + ++_processed; + success = true; + } + } + catch (const Poco::Exception& e) + { + // Failed inserting into the database + } + if (success) + { + std::filesystem::remove(entry); + } + else + { + std::filesystem::path newPath {entry}; + newPath.replace_extension("err"s); + std::filesystem::rename(entry, newPath); + } + removeEntry(entry); +} + + +std::size_t SQLLogInserter::scanDirectory() +{ + std::vector newEntries; + newEntries.reserve(1000); + std::filesystem::directory_iterator diriter(_directory, std::filesystem::directory_options::skip_permission_denied); + for (auto& entry: diriter) + { + if (!_active) + { + return 0; + } + if (_dataSession == nullptr || !_dataSession->isGood()) + { + // Do not process files if database session is not healthy. + // Files will be processed later. + return 0; + } + + if (!std::filesystem::exists(entry)) + { + continue; + } + if (!entry.is_regular_file()) + { + continue; + } + if (entry.path().extension() != ".sql"s) + { + continue; + } + newEntries.push_back(entry.path()); + } + return insertEntries(newEntries); +} + + +void SQLLogInserter::runDirectoryScan() +{ + while (_active) + { + const auto scheduled = scanDirectory(); + if (scheduled == 0) + { + // No new files to be scheduled. Wait a bit. + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + } +} + + +void SQLLogInserter::runProcessingEntries() +{ + while (!_workSet.empty() || _active) + { + auto entry = popEntry(); + processFile(entry); + } + _completedCondition.notify_all(); +} + + diff --git a/Data/samples/DBLogger/src/SQLLogInserter.h b/Data/samples/DBLogger/src/SQLLogInserter.h new file mode 100644 index 0000000000..116712967d --- /dev/null +++ b/Data/samples/DBLogger/src/SQLLogInserter.h @@ -0,0 +1,138 @@ +// +// SQLLogInserter.cpp +// +// This sample demonstrates the Data library recordset row formatting +// and streaming capabilities. +// +// Copyright (c) 2024, Applied Informatics Software Engineering GmbH. +// and Contributors. +// +// SPDX-License-Identifier: BSL-1.0 + +#ifndef SQLLogInserter_INCLUDED +#define SQLLogInserter_INCLUDED + +#include "Poco/Data/SQLChannel.h" + +#include +#include +#include +#include + +using namespace Poco::Data::Keywords; +using namespace std::string_literals; + +class SQLLogInserter +{ +public: + + using WorkSet = std::unordered_set; + + SQLLogInserter() + { + } + + ~SQLLogInserter() + { + } + + void start(); + void stop(); + + void setDirectory(const std::string& directory); + std::string directory() const; + + void setConnector(const std::string& connector); + std::string connector() const; + + void setConnectionString(const std::string& cs); + std::string connectionString() const; + + void setNumWorkers(std::size_t number); + std::size_t numWorkers() const; + + std::size_t totalProcessed() const; + +private: + + std::size_t insertEntries(std::vector& entries); + std::string popEntry(); + void removeEntry(std::string entry); + void processFile(std::string& entry); + std::size_t scanDirectory(); + + void runDirectoryScan(); + void runProcessingEntries(); + + static constexpr std::size_t MAX_WORKSET_SIZE {1000}; + + std::string _directory; + std::string _connector; + std::string _connectionString; + std::size_t _numWorkers {2}; + + bool _active {false}; + + std::size_t _processed{0}; + + std::thread _dirScanThread; + + WorkSet _workSet; + WorkSet _processingSet; + std::mutex _workMutex; + std::condition_variable _workCondition; + std::condition_variable _underflowCondition; + std::condition_variable _completedCondition; + + std::vector _workers; + + Poco::AutoPtr _sqlChannel; + std::shared_ptr _dataSession; +}; + +inline void SQLLogInserter::setDirectory(const std::string& directory) +{ + _directory = directory; +} + +inline std::string SQLLogInserter::directory() const +{ + return _directory; +} + +inline void SQLLogInserter::setConnector(const std::string& connector) +{ + _connector = connector; +} + +inline std::string SQLLogInserter::connector() const +{ + return _connector; +} + +inline void SQLLogInserter::setConnectionString(const std::string& cs) +{ + _connectionString = cs; +} + +inline std::string SQLLogInserter::connectionString() const +{ + return _connectionString; +} + +inline void SQLLogInserter::setNumWorkers(std::size_t number) +{ + _numWorkers = number; +} + +inline std::size_t SQLLogInserter::numWorkers() const +{ + return _numWorkers; +} + +inline std::size_t SQLLogInserter::totalProcessed() const +{ + return _processed; +} + +#endif // SQLLogInserter_INCLUDED diff --git a/Util/src/ServerApplication.cpp b/Util/src/ServerApplication.cpp index 1a20db58b1..b9560d2190 100644 --- a/Util/src/ServerApplication.cpp +++ b/Util/src/ServerApplication.cpp @@ -523,7 +523,7 @@ void ServerApplication::waitForTerminationRequest() } sigaddset(&sset, SIGQUIT); sigaddset(&sset, SIGTERM); - sigprocmask(SIG_BLOCK, &sset, NULL); + sigprocmask(SIG_BLOCK, &sset, nullptr); int sig; sigwait(&sset, &sig); terminateCallback();