From c3cbcaae45f12cdc75f2f16af4abac5cffa27fe7 Mon Sep 17 00:00:00 2001 From: Aleksandar Fabijanic Date: Mon, 15 Jul 2024 23:45:43 +0200 Subject: [PATCH] feat(SQLChannel): add store-and-forward mode --- Data/Data_VS90.vcproj | 8 - Data/Data_vs170.vcxproj.filters | 12 +- Data/SQLite/testsuite/TestSuite_vs170.vcxproj | 122 ++--- Data/SQLite/testsuite/src/SQLiteTest.cpp | 8 +- Data/include/Poco/Data/SQLChannel.h | 52 +- Data/src/SQLChannel.cpp | 473 ++++++++++-------- Data/testsuite/src/DataTest.cpp | 105 ++-- Data/testsuite/src/DataTest.h | 1 + 8 files changed, 449 insertions(+), 332 deletions(-) diff --git a/Data/Data_VS90.vcproj b/Data/Data_VS90.vcproj index 233d67d0a8..53e09ff50b 100644 --- a/Data/Data_VS90.vcproj +++ b/Data/Data_VS90.vcproj @@ -651,10 +651,6 @@ RelativePath=".\include\Poco\Data\SimpleRowFormatter.h" > - - @@ -791,10 +787,6 @@ RelativePath=".\src\SimpleRowFormatter.cpp" > - - diff --git a/Data/Data_vs170.vcxproj.filters b/Data/Data_vs170.vcxproj.filters index b49c64254d..1885396c65 100644 --- a/Data/Data_vs170.vcxproj.filters +++ b/Data/Data_vs170.vcxproj.filters @@ -180,9 +180,6 @@ DataCore\Header Files - - DataCore\Header Files - DataCore\Header Files @@ -297,6 +294,9 @@ Logging\Header Files + + Logging\Header Files + @@ -374,9 +374,6 @@ DataCore\Source Files - - DataCore\Source Files - DataCore\Source Files @@ -440,6 +437,9 @@ Logging\Source Files + + Logging\Source Files + diff --git a/Data/SQLite/testsuite/TestSuite_vs170.vcxproj b/Data/SQLite/testsuite/TestSuite_vs170.vcxproj index 401b89f379..877fe23b13 100644 --- a/Data/SQLite/testsuite/TestSuite_vs170.vcxproj +++ b/Data/SQLite/testsuite/TestSuite_vs170.vcxproj @@ -1,4 +1,4 @@ - + @@ -81,7 +81,7 @@ TestSuite Win32Proj - + Application MultiByte @@ -172,63 +172,63 @@ MultiByte v143 - - + + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + <_ProjectFileVersion>17.0.34322.80 TestSuited @@ -352,7 +352,7 @@ true true true - + Level3 ProgramDatabase Default @@ -387,9 +387,9 @@ true true true - + Level3 - + Default $(OutDir)$(TargetName).pdb true @@ -419,7 +419,7 @@ true true true - + Level3 ProgramDatabase Default @@ -454,9 +454,9 @@ true true true - + Level3 - + Default $(OutDir)$(TargetName).pdb true @@ -486,7 +486,7 @@ true true true - + Level3 ProgramDatabase Default @@ -521,9 +521,9 @@ true true true - + Level3 - + Default $(OutDir)$(TargetName).pdb true @@ -553,7 +553,7 @@ true true true - + Level3 ProgramDatabase Default @@ -588,9 +588,9 @@ true true true - + Level3 - + Default $(OutDir)$(TargetName).pdb true @@ -620,7 +620,7 @@ true true true - + Level3 ProgramDatabase Default @@ -655,9 +655,9 @@ true true true - + Level3 - + Default $(OutDir)$(TargetName).pdb true @@ -687,7 +687,7 @@ true true true - + Level3 ProgramDatabase Default @@ -722,9 +722,9 @@ true true true - + Level3 - + Default $(OutDir)$(TargetName).pdb true @@ -746,7 +746,7 @@ Disabled ..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;..\..\..\Data\SQLParser;..\..\..\Data\SQLParser\src;%(AdditionalIncludeDirectories) - WIN32;_DEBUG;_WINDOWS;WINVER=0x0600;%(PreprocessorDefinitions) + WIN32;_CRT_SECURE_NO_WARNINGS;_DEBUG;_WINDOWS;WINVER=0x0600;%(PreprocessorDefinitions) true EnableFastChecks MultiThreadedDebugDLL @@ -754,7 +754,7 @@ true true true - + Level3 ProgramDatabase Default @@ -782,16 +782,16 @@ Speed true ..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;..\..\..\Data\SQLParser;..\..\..\Data\SQLParser\src;%(AdditionalIncludeDirectories) - WIN32;NDEBUG;_WINDOWS;WINVER=0x0600;%(PreprocessorDefinitions) + WIN32;_CRT_SECURE_NO_WARNINGS;NDEBUG;_WINDOWS;WINVER=0x0600;%(PreprocessorDefinitions) true MultiThreadedDLL false true true true - + Level3 - + Default $(OutDir)$(TargetName).pdb true @@ -813,7 +813,7 @@ Disabled ..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;..\..\..\Data\SQLParser;..\..\..\Data\SQLParser\src;%(AdditionalIncludeDirectories) - WIN32;_DEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions) + WIN32;_CRT_SECURE_NO_WARNINGS;_DEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions) true EnableFastChecks MultiThreadedDebug @@ -821,7 +821,7 @@ true true true - + Level3 ProgramDatabase Default @@ -849,16 +849,16 @@ Speed true ..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;..\..\..\Data\SQLParser;..\..\..\Data\SQLParser\src;%(AdditionalIncludeDirectories) - WIN32;NDEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions) + WIN32;_CRT_SECURE_NO_WARNINGS;NDEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions) true MultiThreaded false true true true - + Level3 - + Default $(OutDir)$(TargetName).pdb true @@ -880,7 +880,7 @@ Disabled ..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;..\..\..\Data\SQLParser;..\..\..\Data\SQLParser\src;%(AdditionalIncludeDirectories) - WIN32;_DEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions) + WIN32;_CRT_SECURE_NO_WARNINGS;_DEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions) true EnableFastChecks MultiThreadedDebugDLL @@ -888,7 +888,7 @@ true true true - + Level3 ProgramDatabase Default @@ -916,16 +916,16 @@ Speed true ..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;..\..\..\Data\SQLParser;..\..\..\Data\SQLParser\src;%(AdditionalIncludeDirectories) - WIN32;NDEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions) + WIN32;_CRT_SECURE_NO_WARNINGS;NDEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions) true MultiThreadedDLL false true true true - + Level3 - + Default $(OutDir)$(TargetName).pdb true @@ -944,8 +944,8 @@ - - + + @@ -964,6 +964,6 @@ stdc11 - - - + + + \ No newline at end of file diff --git a/Data/SQLite/testsuite/src/SQLiteTest.cpp b/Data/SQLite/testsuite/src/SQLiteTest.cpp index 6d49533aa7..871ac3c045 100755 --- a/Data/SQLite/testsuite/src/SQLiteTest.cpp +++ b/Data/SQLite/testsuite/src/SQLiteTest.cpp @@ -2480,7 +2480,7 @@ void SQLiteTest::testSQLChannel() { Thread::sleep(10); if (sw.elapsedSeconds() > 3) - fail ("SQLExecutor::sqlLogger(): SQLChannel timed out"); + fail ("SQLChannel timed out"); } // bulk binding mode is not suported by SQLite, but SQLChannel should handle it internally pChannel->setProperty("bulk", "true"); @@ -2539,17 +2539,15 @@ void SQLiteTest::testSQLChannel() assertTrue("f Warning sync message" == rs2["Text"]); pChannel->setProperty("minBatch", "1024"); - constexpr int mcount { 20000 }; + constexpr int mcount { 2000 }; for (int i = 0; i < mcount; i++) { - Message msgInfA("InformationSource", "e Informational sync message", Message::PRIO_INFORMATION); + Message msgInfA("InformationSource", "g Informational sync message", Message::PRIO_INFORMATION); pChannel->log(msgInfA); } - //pChannel->stop(); pChannel.reset(); RecordSet rsl(tmp, "SELECT * FROM T_POCO_LOG"); assertEquals(2+mcount, rsl.rowCount()); - } diff --git a/Data/include/Poco/Data/SQLChannel.h b/Data/include/Poco/Data/SQLChannel.h index b916fdc636..7f9ac98af6 100644 --- a/Data/include/Poco/Data/SQLChannel.h +++ b/Data/include/Poco/Data/SQLChannel.h @@ -33,6 +33,7 @@ #include "Poco/Thread.h" #include "Poco/Mutex.h" #include +#include namespace Poco { @@ -56,15 +57,21 @@ class Data_API SQLChannel: public Poco::Channel, Poco::Runnable /// /// The table name is configurable through "table" property. /// Other than DateTime filed name used for optional time-based archiving purposes, currently the - /// field names are not mandated. However, it is recomended to use names as specified above. + /// field names are not mandated. However, it is recommended to use names as specified above. /// - /// To provide as non-intrusive operation as possbile, the log entries are cached and + /// To provide as non-intrusive operation as possible, the log entries are cached and /// inserted into the target database asynchronously by default. The blocking, however, will occur /// before the next entry insertion with default timeout of 1 second. The default settings can be /// overridden (see async, timeout and throw properties for details). /// If throw property is false, insertion timeouts are ignored, otherwise a TimeoutException is thrown. /// To force insertion of every entry, set timeout to 0. This setting, however, introduces /// a risk of long blocking periods in case of remote server communication delays. + /// + /// A default-constructed SQLChannel operates without an active DB connection, in a store-and-forward + /// mode. For this mode of operation, a separate service is required to consume and execute the SQL + /// statements from the stored files and insert the log entries into the database. Since this setup + /// stores SQL inserts in the OS file system, it is strongly recommended to take the necessary + /// precautions to limit and secure the access to those files. { public: class LogNotification : public Poco::Notification @@ -88,9 +95,14 @@ class Data_API SQLChannel: public Poco::Channel, Poco::Runnable static const int DEFAULT_MIN_BATCH_SIZE = 1; static const int DEFAULT_MAX_BATCH_SIZE = 1000; + static const int DEFAULT_MAX_SQL_SIZE = 65536; SQLChannel(); /// Creates SQLChannel. + /// An SQLChannel without DB connector and local file name + /// only logs SQL inserts into local files. A separate file + /// is created for each insert batch; files are named + /// ..log.sql. SQLChannel(const std::string& connector, const std::string& connect, @@ -98,7 +110,8 @@ class Data_API SQLChannel: public Poco::Channel, Poco::Runnable const std::string& table = "T_POCO_LOG", int timeout = 1000, int minBatch = DEFAULT_MIN_BATCH_SIZE, - int maxBatch = DEFAULT_MAX_BATCH_SIZE); + int maxBatch = DEFAULT_MAX_BATCH_SIZE, + int maxSQL = DEFAULT_MAX_SQL_SIZE); /// Creates an SQLChannel with the given connector, connect string, timeout, table and name. /// The connector must be already registered. @@ -166,6 +179,8 @@ class Data_API SQLChannel: public Poco::Channel, Poco::Runnable /// reaches this size, log entries are silently discarded. /// Defaults to 100, can't be zero or larger than 1000. /// + /// * maxSQL: Maximum total length of the SQL statement. Defaults to 65536. + /// /// * bulk: Do bulk execute (on most DBMS systems, this can speed up things /// drastically). /// @@ -198,8 +213,10 @@ class Data_API SQLChannel: public Poco::Channel, Poco::Runnable static const std::string PROP_TIMEOUT; static const std::string PROP_MIN_BATCH; static const std::string PROP_MAX_BATCH; + static const std::string PROP_MAX_SQL; static const std::string PROP_BULK; static const std::string PROP_THROW; + static const std::string PROP_DIRECTORY; static const std::string PROP_FILE; protected: @@ -222,17 +239,20 @@ class Data_API SQLChannel: public Poco::Channel, Poco::Runnable /// than minBatch, sends logs to the destination. /// Returns true if log entry was processed. - size_t execSQL(); + size_t execSQL(bool flush = false); /// Executes the log statement. - size_t logSync(); + size_t logSync(bool flush = false); /// Inserts entries into the target database. bool isTrue(const std::string& value) const; /// Returns true is value is "true", "t", "yes" or "y". /// Case insensitive. - size_t logToFile(bool clear = false); + size_t logToDB(bool flush = false); + /// Logs cached entries to the DB. + + size_t logToFile(bool flush = false); /// Logs cached entries to a file. Called in case DB insertions fail. size_t logLocal(const std::string&, Message::Priority prio = Message::PRIO_ERROR); @@ -257,24 +277,26 @@ class Data_API SQLChannel: public Poco::Channel, Poco::Runnable int _timeout; std::atomic _minBatch; int _maxBatch; + int _maxSQL; bool _bulk; std::atomic _throw; // members for log entry cache - std::vector _source; - std::vector _pid; - std::vector _thread; - std::vector _tid; - std::vector _priority; - std::vector _text; - std::vector _dateTime; + std::deque _source; + std::deque _pid; + std::deque _thread; + std::deque _tid; + std::deque _priority; + std::deque _text; + std::deque _dateTime; Poco::NotificationQueue _logQueue; - std::unique_ptr _pDBThread; + std::unique_ptr _pLogThread; std::atomic _reconnect; std::atomic _stop; std::atomic _logged; StrategyPtr _pArchiveStrategy; std::string _file; + std::string _directory; AutoPtr _pFileChannel; Poco::Logger& _logger = Poco::Logger::get("SQLChannel"); }; @@ -296,7 +318,7 @@ inline bool SQLChannel::isTrue(const std::string& value) const inline bool SQLChannel::isRunning() const { - return _pDBThread && _pDBThread->isRunning(); + return _pLogThread && _pLogThread->isRunning(); } diff --git a/Data/src/SQLChannel.cpp b/Data/src/SQLChannel.cpp index 18e30e159a..68195cfa54 100644 --- a/Data/src/SQLChannel.cpp +++ b/Data/src/SQLChannel.cpp @@ -24,7 +24,10 @@ #include "Poco/NumberFormatter.h" #include "Poco/Stopwatch.h" #include "Poco/Format.h" +#include "Poco/Path.h" #include "Poco/File.h" +#include "Poco/UUID.h" +#include "Poco/UUIDGenerator.h" #include #include @@ -46,8 +49,10 @@ const std::string SQLChannel::PROP_ASYNC("async"); const std::string SQLChannel::PROP_TIMEOUT("timeout"); const std::string SQLChannel::PROP_MIN_BATCH("minBatch"); const std::string SQLChannel::PROP_MAX_BATCH("maxBatch"); +const std::string SQLChannel::PROP_MAX_SQL("maxSQL"); const std::string SQLChannel::PROP_BULK("bulk"); const std::string SQLChannel::PROP_THROW("throw"); +const std::string SQLChannel::PROP_DIRECTORY("directory"); const std::string SQLChannel::PROP_FILE("file"); @@ -57,21 +62,23 @@ const std::string SQLChannel::SQL_INSERT_STMT = "INSERT INTO %s " \ SQLChannel::SQLChannel(): - _name("-"), - _table("T_POCO_LOG"), + _name("SQLChannel"), _tableChanged(true), _timeout(1000), _minBatch(DEFAULT_MIN_BATCH_SIZE), _maxBatch(DEFAULT_MAX_BATCH_SIZE), + _maxSQL(DEFAULT_MAX_SQL_SIZE), _bulk(true), _throw(false), _pid(), _tid(), _priority(), + _pLogThread(new Thread), _reconnect(false), _stop(false), _logged(0) { + _pLogThread->start(*this); } @@ -81,7 +88,8 @@ SQLChannel::SQLChannel(const std::string& connector, const std::string& table, int timeout, int minBatch, - int maxBatch) : + int maxBatch, + int maxSQL) : _connector(connector), _connect(connect), _name(name), @@ -90,17 +98,18 @@ SQLChannel::SQLChannel(const std::string& connector, _timeout(timeout), _minBatch(minBatch), _maxBatch(maxBatch), + _maxSQL(maxSQL), _bulk(false), _throw(false), _pid(), _tid(), _priority(), - _pDBThread(new Thread), + _pLogThread(new Thread), _reconnect(true), _stop(false), _logged(0) { - _pDBThread->start(*this); + _pLogThread->start(*this); } @@ -185,7 +194,7 @@ size_t SQLChannel::logLocal(const std::string& message, Message::Priority prio) { Message msg("SQLChannel"s, message, prio); log(msg); - return logToFile(_pFileChannel); + return logToFile(); } @@ -195,11 +204,11 @@ void SQLChannel::log(const Message& msg) } -size_t SQLChannel::logSync() +size_t SQLChannel::logSync(bool flush) { try { - return execSQL(); + return execSQL(flush); } catch (...) { @@ -212,8 +221,8 @@ size_t SQLChannel::logSync() bool SQLChannel::processOne(int minBatch) { - bool ret = false; - if (_logQueue.size()) + bool ret = false, flush = (minBatch == 0); + while (_logQueue.size()) { Notification::Ptr pN = _logQueue.dequeueNotification(); LogNotification::Ptr pLN = pN.cast(); @@ -231,10 +240,12 @@ bool SQLChannel::processOne(int minBatch) _text.push_back(msg.getText()); Poco::replaceInPlace(_text.back(), "'", "''"); _dateTime.emplace_back(msg.getTime()); + if ((_source.size() >= minBatch) || flush) + logSync(flush); + ret = true; } - ret = true; } - if (_source.size() >= minBatch) logSync(); + if (flush) logSync(flush); return ret; } @@ -251,16 +262,14 @@ void SQLChannel::run() { close(); open(); - _reconnect = _pSession.isNull(); + _reconnect = !_connector.empty() && _pSession.isNull(); if (_reconnect && sleepTime < 12800) sleepTime *= 2; } - if (_stop && _reconnect) - std::cout << "Request to stop and reconnect!" << std::endl; if (!_reconnect) { - processOne(_minBatch); + if (_logQueue.size()) processOne(_minBatch); sleepTime = 100; } } @@ -291,20 +300,21 @@ void SQLChannel::run() } Thread::sleep(sleepTime); } - while (_logQueue.size()) + while (_logQueue.size() || _source.size()) processOne(); } void SQLChannel::stop() { - if (_pDBThread) + if (_pLogThread) { _reconnect = false; _stop = true; - while (_pDBThread->isRunning()) + while (_pLogThread->isRunning()) Thread::sleep(10); - _pDBThread->join(); + _pLogThread->join(); + _pLogThread.reset(); } } @@ -312,11 +322,229 @@ void SQLChannel::stop() void SQLChannel::reconnect() { _reconnect = true; - if (!_pDBThread) + if (!_pLogThread) + { + _pLogThread = std::make_unique(); + _pLogThread->start(*this); + } +} + + +size_t SQLChannel::logToFile(bool flush) +{ + if (_source.empty()) return 0u; + + static std::vector names; + if (names.size() != _source.size()) + names.resize(_source.size(), Poco::replace(_name, "'", "''")); + + std::size_t n = 0; + + AutoPtr pFileChannel; + std::string file = _file; + if (!_pFileChannel && !_file.empty()) + { + if (!Path(File(file).path()).isAbsolute()) + file = _directory + file; + + _pFileChannel = new FileChannel(file); + pFileChannel = _pFileChannel; + } + else + { + UUID uuid = UUIDGenerator::defaultGenerator().createRandom(); + std::string filename(_directory); + filename.append(DateTimeFormatter::format(LocalDateTime(), "%Y%m%d%H%M%S%i.").append(uuid.toString()).append(".log.sql"s)); + pFileChannel = new FileChannel(filename); + } + + if (pFileChannel) + { + std::string sql; + Poco::format(sql, SQL_INSERT_STMT, _table, std::string()); + std::stringstream os, tmp; + os << sql << '\n'; + auto it = _source.begin(); + auto end = _source.end(); + int idx = 0, batch = 0; + for (; it != end; ++idx) + { + std::string dt = DateTimeFormatter::format(_dateTime[idx], "%Y-%m-%d %H:%M:%S.%i"); + tmp.str(""); + tmp << "('" << *it << "','" << + names[idx] << "'," << + _pid[idx] << ",'" << + _thread[idx] << "'," << + _tid[idx] << ',' << + _priority[idx] << ",'" << + _text[idx] << "','" << + dt << "')"; + + if (++batch == _maxBatch || (os.str().length() + tmp.str().length()) >= _maxSQL) + { + os << ";\n"; + Message msg(_source[0], os.str(), Message::PRIO_FATAL); + pFileChannel->log(msg); + n += batch; + _logged += batch; + _source.erase(_source.begin(), _source.begin() + batch); + _pid.erase(_pid.begin(), _pid.begin() + batch); + _thread.erase(_thread.begin(), _thread.begin() + batch); + _tid.erase(_tid.begin(), _tid.begin() + batch); + _priority.erase(_priority.begin(), _priority.begin() + batch); + _text.erase(_text.begin(), _text.begin() + batch); + _dateTime.erase(_dateTime.begin(), _dateTime.begin() + batch); + os.str(""); sql.clear(); + Poco::format(sql, SQL_INSERT_STMT, _table, std::string()); + os << sql << '\n' << tmp.str(); + batch = 0; + } + + os << tmp.str(); + + if (++it == end) + { + os << ";\n"; + break; + } + os << ",\n"; + } + + if ((batch >= _minBatch) || flush) + { + Message msg(_source[0], os.str(), Message::PRIO_FATAL); + pFileChannel->log(msg); + n += batch; + _logged += batch; + _source.erase(_source.begin(), _source.begin()+batch); + _pid.erase(_pid.begin(), _pid.begin() + batch); + _thread.erase(_thread.begin(), _thread.begin() + batch); + _tid.erase(_tid.begin(), _tid.begin() + batch); + _priority.erase(_priority.begin(), _priority.begin() + batch); + _text.erase(_text.begin(), _text.begin() + batch); + _dateTime.erase(_dateTime.begin(), _dateTime.begin() + batch); + } + } + + return n; +} + + +size_t SQLChannel::logToDB(bool flush) +{ + if (_source.empty()) return 0u; + + static std::vector names; + if (names.size() != _source.size()) + names.resize(_source.size(), Poco::replace(_name, "'", "''")); + static std::string placeholders = "(?,?,?,?,?,?,?,?)"; + + Poco::FastMutex::ScopedLock l(_mutex); + + if (_tableChanged) { - _pDBThread = std::make_unique(); - _pDBThread->start(*this); + Poco::format(_sql, SQL_INSERT_STMT, _table, placeholders); + _tableChanged = false; } + + std::size_t n = 0; + + try + { + if (_bulk) + { + try + { + (*_pSession) << _sql, + use(_source, bulk), + use(names, bulk), + use(_pid, bulk), + use(_thread, bulk), + use(_tid, bulk), + use(_priority, bulk), + use(_text, bulk), + use(_dateTime, bulk), now; + } + // most likely bulk mode not supported, try again + catch (const Poco::InvalidAccessException&) + { + (*_pSession) << _sql, + use(_source), + use(names), + use(_pid), + use(_thread), + use(_tid), + use(_priority), + use(_text), + use(_dateTime), now; + _bulk = false; + } + } + else + { + (*_pSession) << _sql, + use(_source), + use(names), + use(_pid), + use(_thread), + use(_tid), + use(_priority), + use(_text), + use(_dateTime), now; + } + n = _source.size(); + } + catch (const Poco::Exception& ex) + { + logLocal(ex.displayText()); + close(); + _reconnect = true; + } + catch (const std::exception& ex) + { + logLocal(ex.what()); + close(); + _reconnect = true; + } + + if (n) + { + _logged += n; + _source.erase(_source.begin(), _source.begin() + n); + _pid.erase(_pid.begin(), _pid.begin() + n); + _thread.erase(_thread.begin(), _thread.begin() + n); + _tid.erase(_tid.begin(), _tid.begin() + n); + _priority.erase(_priority.begin(), _priority.begin() + n); + _text.erase(_text.begin(), _text.begin() + n); + _dateTime.erase(_dateTime.begin(), _dateTime.begin() + n); + } + + return n; +} + + +size_t SQLChannel::execSQL(bool flush) +{ + if (!_connector.empty() && (!_pSession || !_pSession->isConnected())) open(); + if (_pArchiveStrategy) _pArchiveStrategy->archive(); + + size_t n = _pSession ? logToDB(flush) : logToFile(flush); + + return n; +} + + +std::size_t SQLChannel::wait(int ms) +{ + Stopwatch sw; + sw.start(); + while (_logQueue.size()) + { + Thread::sleep(10); + if (ms && sw.elapsed() * 1000 > ms) + break; + } + return _logQueue.size(); } @@ -405,6 +633,11 @@ void SQLChannel::setProperty(const std::string& name, const std::string& value) throw Poco::InvalidArgumentException(Poco::format("SQLChannel::setProperty(%s,%s)", name, value)); _maxBatch = maxBatch; } + else if (name == PROP_MAX_SQL) + { + int maxSQL = NumberParser::parse(value); + _maxSQL = maxSQL; + } else if (name == PROP_BULK) { _bulk = isTrue(value); @@ -413,6 +646,16 @@ void SQLChannel::setProperty(const std::string& name, const std::string& value) { _throw = isTrue(value); } + else if (name == PROP_DIRECTORY) + { + std::string dir = value; + if (!Path(File(dir).path()).isAbsolute()) + { + Path d(dir); + dir = d.makeDirectory().makeAbsolute().toString(); + } + _directory = dir; + } else if (name == PROP_FILE) { _file = value; @@ -465,6 +708,10 @@ std::string SQLChannel::getProperty(const std::string& name) const { return std::to_string(_maxBatch); } + else if (name == PROP_MAX_SQL) + { + return std::to_string(_maxSQL); + } else if (name == PROP_BULK) { if (_bulk) return "true"; @@ -475,6 +722,10 @@ std::string SQLChannel::getProperty(const std::string& name) const if (_throw) return "true"; else return "false"; } + else if (name == PROP_DIRECTORY) + { + return _directory; + } else if (name == PROP_FILE) { return _file; @@ -486,182 +737,6 @@ std::string SQLChannel::getProperty(const std::string& name) const } -size_t SQLChannel::logToFile(bool clear) -{ - static std::vector names; - if (names.size() != _source.size()) - names.resize(_source.size(), Poco::replace(_name, "'", "''")); - - std::size_t n = 0; - - if (!_pFileChannel && !_file.empty()) _pFileChannel = new FileChannel(_file); - if (_pFileChannel) - { - std::string sql; - Poco::format(sql, SQL_INSERT_STMT, _table, std::string()); - std::stringstream os; - os << sql << '\n'; - auto it = _source.begin(); - auto end = _source.end(); - int idx = 0, batch = 0; - for (; it != end; ++idx) - { - std::string dt = Poco::DateTimeFormatter::format(_dateTime[idx], "%Y-%m-%d %H:%M:%S.%i"); - os << "('" << *it << "','" << - names[idx] << "'," << - _pid[idx] << ",'" << - _thread[idx] << "'," << - _tid[idx] << ',' << - _priority[idx] << ",'" << - _text[idx] << "','" << - dt << "')"; - if (++batch == _maxBatch) - { - os << ";\n"; - Message msg(_source[0], os.str(), Message::PRIO_ERROR); - _pFileChannel->log(msg); - os.str(""); sql.clear(); - Poco::format(sql, SQL_INSERT_STMT, _table, std::string()); - batch = 0; - } - if (++it == end) - { - os << ";\n"; - break; - } - os << ",\n"; - } - Message msg(_source[0], os.str(), Message::PRIO_ERROR); - _pFileChannel->log(msg); - n = _source.size(); - if (clear && n) - { - _source.clear(); - _pid.clear(); - _thread.clear(); - _tid.clear(); - _priority.clear(); - _text.clear(); - _dateTime.clear(); - } - } - return n; -} - - -size_t SQLChannel::execSQL() -{ - static std::vector names; - if (names.size() != _source.size()) - names.resize(_source.size(), Poco::replace(_name, "'", "''")); - static std::string placeholders = "(?,?,?,?,?,?,?,?)"; - - Poco::FastMutex::ScopedLock l(_mutex); - - if (_tableChanged) - { - Poco::format(_sql, SQL_INSERT_STMT, _table, placeholders); - _tableChanged = false; - } - - if (!_pSession || !_pSession->isConnected()) open(); - if (_pArchiveStrategy) _pArchiveStrategy->archive(); - - size_t n = 0; - if (_pSession) - { - try - { - if (_bulk) - { - try - { - (*_pSession) << _sql, - use(_source, bulk), - use(names, bulk), - use(_pid, bulk), - use(_thread, bulk), - use(_tid, bulk), - use(_priority, bulk), - use(_text, bulk), - use(_dateTime, bulk), now; - } - // most likely bulk mode not supported, try again - catch (const Poco::InvalidAccessException&) - { - (*_pSession) << _sql, - use(_source), - use(names), - use(_pid), - use(_thread), - use(_tid), - use(_priority), - use(_text), - use(_dateTime), now; - _bulk = false; - } - } - else - { - (*_pSession) << _sql, - use(_source), - use(names), - use(_pid), - use(_thread), - use(_tid), - use(_priority), - use(_text), - use(_dateTime), now; - } - n = _source.size(); - } - catch (const Poco::Exception& ex) - { - logLocal(ex.displayText()); - close(); - _reconnect = true; - } - catch (const std::exception& ex) - { - logLocal(ex.what()); - close(); - _reconnect = true; - } - } - else - { - n = logToFile(_pFileChannel); - } - - if (n) - { - _logged += n; - _source.clear(); - _pid.clear(); - _thread.clear(); - _tid.clear(); - _priority.clear(); - _text.clear(); - _dateTime.clear(); - } - return n; -} - - -std::size_t SQLChannel::wait(int ms) -{ - Stopwatch sw; - sw.start(); - while (_logQueue.size()) - { - Thread::sleep(10); - if (ms && sw.elapsed() * 1000 > ms) - break; - } - return _logQueue.size(); -} - - void SQLChannel::registerChannel() { Poco::LoggingFactory::defaultFactory().registerChannelClass("SQLChannel", diff --git a/Data/testsuite/src/DataTest.cpp b/Data/testsuite/src/DataTest.cpp index fef722b547..4750fa53bf 100644 --- a/Data/testsuite/src/DataTest.cpp +++ b/Data/testsuite/src/DataTest.cpp @@ -20,6 +20,7 @@ #include "Poco/Data/Column.h" #include "Poco/Data/Date.h" #include "Poco/Data/Time.h" +#include "Poco/Data/SQLChannel.h" #include "Poco/Data/SimpleRowFormatter.h" #include "Poco/Data/JSONRowFormatter.h" #include "Poco/Data/DataException.h" @@ -27,12 +28,17 @@ #include "Poco/BinaryReader.h" #include "Poco/BinaryWriter.h" #include "Poco/DateTime.h" +#include "Poco/Stopwatch.h" #include "Poco/Types.h" #include "Poco/Dynamic/Var.h" #include "Poco/Data/DynamicLOB.h" #include "Poco/Data/DynamicDateTime.h" #include "Poco/Latin1Encoding.h" #include "Poco/Exception.h" +#include "Poco/DirectoryIterator.h" +#include "Poco/Glob.h" +#include "Poco/File.h" +#include #include #include #include @@ -40,46 +46,13 @@ using namespace Poco; +using namespace Poco::Dynamic; using namespace Poco::Data; using namespace Poco::Data::Keywords; - - -using Poco::BinaryReader; -using Poco::BinaryWriter; -using Poco::UInt32; -using Poco::Int64; -using Poco::UInt64; -using Poco::DateTime; -using Poco::Latin1Encoding; -using Poco::Dynamic::Var; -using Poco::InvalidAccessException; -using Poco::IllegalStateException; -using Poco::RangeException; -using Poco::NotFoundException; -using Poco::InvalidArgumentException; -using Poco::NotImplementedException; -using Poco::Data::Session; -using Poco::Data::SessionFactory; -using Poco::Data::Statement; -using Poco::Data::NotSupportedException; -using Poco::Data::CLOB; -using Poco::Data::CLOBInputStream; -using Poco::Data::CLOBOutputStream; -using Poco::Data::MetaColumn; -using Poco::Data::Column; -using Poco::Data::Row; -using Poco::Data::RowFormatter; -using Poco::Data::SimpleRowFormatter; -using Poco::Data::JSONRowFormatter; -using Poco::Data::Date; -using Poco::Data::Time; -using Poco::Data::AbstractExtractor; -using Poco::Data::AbstractExtraction; -using Poco::Data::AbstractExtractionVec; -using Poco::Data::AbstractExtractionVecVec; -using Poco::Data::AbstractBinding; -using Poco::Data::AbstractBindingVec; -using Poco::Data::NotConnectedException; +using Poco::DirectoryIterator; +using Poco::Glob; +using Poco::File; +using namespace std::string_literals; DataTest::DataTest(const std::string& name): CppUnit::TestCase(name) @@ -1570,6 +1543,61 @@ void DataTest::testSQLParse() } +void DataTest::testSQLChannel() +{ + AutoPtr pChannel = new SQLChannel(); + Stopwatch sw; sw.start(); + while (!pChannel->isRunning()) + { + Thread::sleep(10); + if (sw.elapsedSeconds() > 3) + fail("SQLChannel timed out"); + } + + Glob g("*.log.sql"); + { + DirectoryIterator it(""s); + DirectoryIterator end; + while (it != end) + { + if (g.match(it->path())) + { + File(it->path()).remove(); + } + ++it; + } + } + + constexpr int mcount{10}; + constexpr int batch{3}; + pChannel->setProperty("minBatch", std::to_string(batch)); + for (int i = 0; i < mcount; i++) + { + Message msgInfA("InformationSource", Poco::format("%d Informational sync message", i), Message::PRIO_INFORMATION); + pChannel->log(msgInfA); + } + Thread::sleep(1000); // give it time to log + pChannel->stop(); + auto logged = pChannel->logged(); + assertEqual(mcount, logged); + pChannel.reset(); + + int count = 0; + DirectoryIterator it(""s); + DirectoryIterator end; + while (it != end) + { + if (g.match(it->path())) + { + ++count; + File(it->path()).remove(); + } + ++it; + } + assertEqual(count, (mcount / batch) + (mcount % batch)); +} + + void DataTest::setUp() { } @@ -1603,6 +1631,7 @@ CppUnit::Test* DataTest::suite() CppUnit_addTest(pSuite, DataTest, testExternalBindingAndExtraction); CppUnit_addTest(pSuite, DataTest, testTranscode); CppUnit_addTest(pSuite, DataTest, testSQLParse); + CppUnit_addTest(pSuite, DataTest, testSQLChannel); return pSuite; } diff --git a/Data/testsuite/src/DataTest.h b/Data/testsuite/src/DataTest.h index 8d9825030c..5e34dbdc40 100644 --- a/Data/testsuite/src/DataTest.h +++ b/Data/testsuite/src/DataTest.h @@ -46,6 +46,7 @@ class DataTest: public CppUnit::TestCase void testExternalBindingAndExtraction(); void testTranscode(); void testSQLParse(); + void testSQLChannel(); void setUp(); void tearDown();