diff --git a/Data/Data_VS90.vcproj b/Data/Data_VS90.vcproj
index 233d67d0a8..53e09ff50b 100644
--- a/Data/Data_VS90.vcproj
+++ b/Data/Data_VS90.vcproj
@@ -651,10 +651,6 @@
RelativePath=".\include\Poco\Data\SimpleRowFormatter.h"
>
-
-
@@ -791,10 +787,6 @@
RelativePath=".\src\SimpleRowFormatter.cpp"
>
-
-
diff --git a/Data/Data_vs170.vcxproj.filters b/Data/Data_vs170.vcxproj.filters
index b49c64254d..1885396c65 100644
--- a/Data/Data_vs170.vcxproj.filters
+++ b/Data/Data_vs170.vcxproj.filters
@@ -180,9 +180,6 @@
DataCore\Header Files
-
- DataCore\Header Files
-
DataCore\Header Files
@@ -297,6 +294,9 @@
Logging\Header Files
+
+ Logging\Header Files
+
@@ -374,9 +374,6 @@
DataCore\Source Files
-
- DataCore\Source Files
-
DataCore\Source Files
@@ -440,6 +437,9 @@
Logging\Source Files
+
+ Logging\Source Files
+
diff --git a/Data/SQLite/testsuite/TestSuite_vs170.vcxproj b/Data/SQLite/testsuite/TestSuite_vs170.vcxproj
index 401b89f379..877fe23b13 100644
--- a/Data/SQLite/testsuite/TestSuite_vs170.vcxproj
+++ b/Data/SQLite/testsuite/TestSuite_vs170.vcxproj
@@ -1,4 +1,4 @@
-
+
@@ -81,7 +81,7 @@
TestSuite
Win32Proj
-
+
Application
MultiByte
@@ -172,63 +172,63 @@
MultiByte
v143
-
-
+
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
<_ProjectFileVersion>17.0.34322.80
TestSuited
@@ -352,7 +352,7 @@
true
true
true
-
+
Level3
ProgramDatabase
Default
@@ -387,9 +387,9 @@
true
true
true
-
+
Level3
-
+
Default
$(OutDir)$(TargetName).pdb
true
@@ -419,7 +419,7 @@
true
true
true
-
+
Level3
ProgramDatabase
Default
@@ -454,9 +454,9 @@
true
true
true
-
+
Level3
-
+
Default
$(OutDir)$(TargetName).pdb
true
@@ -486,7 +486,7 @@
true
true
true
-
+
Level3
ProgramDatabase
Default
@@ -521,9 +521,9 @@
true
true
true
-
+
Level3
-
+
Default
$(OutDir)$(TargetName).pdb
true
@@ -553,7 +553,7 @@
true
true
true
-
+
Level3
ProgramDatabase
Default
@@ -588,9 +588,9 @@
true
true
true
-
+
Level3
-
+
Default
$(OutDir)$(TargetName).pdb
true
@@ -620,7 +620,7 @@
true
true
true
-
+
Level3
ProgramDatabase
Default
@@ -655,9 +655,9 @@
true
true
true
-
+
Level3
-
+
Default
$(OutDir)$(TargetName).pdb
true
@@ -687,7 +687,7 @@
true
true
true
-
+
Level3
ProgramDatabase
Default
@@ -722,9 +722,9 @@
true
true
true
-
+
Level3
-
+
Default
$(OutDir)$(TargetName).pdb
true
@@ -746,7 +746,7 @@
Disabled
..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;..\..\..\Data\SQLParser;..\..\..\Data\SQLParser\src;%(AdditionalIncludeDirectories)
- WIN32;_DEBUG;_WINDOWS;WINVER=0x0600;%(PreprocessorDefinitions)
+ WIN32;_CRT_SECURE_NO_WARNINGS;_DEBUG;_WINDOWS;WINVER=0x0600;%(PreprocessorDefinitions)
true
EnableFastChecks
MultiThreadedDebugDLL
@@ -754,7 +754,7 @@
true
true
true
-
+
Level3
ProgramDatabase
Default
@@ -782,16 +782,16 @@
Speed
true
..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;..\..\..\Data\SQLParser;..\..\..\Data\SQLParser\src;%(AdditionalIncludeDirectories)
- WIN32;NDEBUG;_WINDOWS;WINVER=0x0600;%(PreprocessorDefinitions)
+ WIN32;_CRT_SECURE_NO_WARNINGS;NDEBUG;_WINDOWS;WINVER=0x0600;%(PreprocessorDefinitions)
true
MultiThreadedDLL
false
true
true
true
-
+
Level3
-
+
Default
$(OutDir)$(TargetName).pdb
true
@@ -813,7 +813,7 @@
Disabled
..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;..\..\..\Data\SQLParser;..\..\..\Data\SQLParser\src;%(AdditionalIncludeDirectories)
- WIN32;_DEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions)
+ WIN32;_CRT_SECURE_NO_WARNINGS;_DEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions)
true
EnableFastChecks
MultiThreadedDebug
@@ -821,7 +821,7 @@
true
true
true
-
+
Level3
ProgramDatabase
Default
@@ -849,16 +849,16 @@
Speed
true
..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;..\..\..\Data\SQLParser;..\..\..\Data\SQLParser\src;%(AdditionalIncludeDirectories)
- WIN32;NDEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions)
+ WIN32;_CRT_SECURE_NO_WARNINGS;NDEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions)
true
MultiThreaded
false
true
true
true
-
+
Level3
-
+
Default
$(OutDir)$(TargetName).pdb
true
@@ -880,7 +880,7 @@
Disabled
..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;..\..\..\Data\SQLParser;..\..\..\Data\SQLParser\src;%(AdditionalIncludeDirectories)
- WIN32;_DEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions)
+ WIN32;_CRT_SECURE_NO_WARNINGS;_DEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions)
true
EnableFastChecks
MultiThreadedDebugDLL
@@ -888,7 +888,7 @@
true
true
true
-
+
Level3
ProgramDatabase
Default
@@ -916,16 +916,16 @@
Speed
true
..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;..\..\..\Data\SQLParser;..\..\..\Data\SQLParser\src;%(AdditionalIncludeDirectories)
- WIN32;NDEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions)
+ WIN32;_CRT_SECURE_NO_WARNINGS;NDEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions)
true
MultiThreadedDLL
false
true
true
true
-
+
Level3
-
+
Default
$(OutDir)$(TargetName).pdb
true
@@ -944,8 +944,8 @@
-
-
+
+
@@ -964,6 +964,6 @@
stdc11
-
-
-
+
+
+
\ No newline at end of file
diff --git a/Data/SQLite/testsuite/src/SQLiteTest.cpp b/Data/SQLite/testsuite/src/SQLiteTest.cpp
index 6d49533aa7..871ac3c045 100755
--- a/Data/SQLite/testsuite/src/SQLiteTest.cpp
+++ b/Data/SQLite/testsuite/src/SQLiteTest.cpp
@@ -2480,7 +2480,7 @@ void SQLiteTest::testSQLChannel()
{
Thread::sleep(10);
if (sw.elapsedSeconds() > 3)
- fail ("SQLExecutor::sqlLogger(): SQLChannel timed out");
+ fail ("SQLChannel timed out");
}
// bulk binding mode is not suported by SQLite, but SQLChannel should handle it internally
pChannel->setProperty("bulk", "true");
@@ -2539,17 +2539,15 @@ void SQLiteTest::testSQLChannel()
assertTrue("f Warning sync message" == rs2["Text"]);
pChannel->setProperty("minBatch", "1024");
- constexpr int mcount { 20000 };
+ constexpr int mcount { 2000 };
for (int i = 0; i < mcount; i++)
{
- Message msgInfA("InformationSource", "e Informational sync message", Message::PRIO_INFORMATION);
+ Message msgInfA("InformationSource", "g Informational sync message", Message::PRIO_INFORMATION);
pChannel->log(msgInfA);
}
- //pChannel->stop();
pChannel.reset();
RecordSet rsl(tmp, "SELECT * FROM T_POCO_LOG");
assertEquals(2+mcount, rsl.rowCount());
-
}
diff --git a/Data/include/Poco/Data/SQLChannel.h b/Data/include/Poco/Data/SQLChannel.h
index b916fdc636..7f9ac98af6 100644
--- a/Data/include/Poco/Data/SQLChannel.h
+++ b/Data/include/Poco/Data/SQLChannel.h
@@ -33,6 +33,7 @@
#include "Poco/Thread.h"
#include "Poco/Mutex.h"
#include
+#include
namespace Poco {
@@ -56,15 +57,21 @@ class Data_API SQLChannel: public Poco::Channel, Poco::Runnable
///
/// The table name is configurable through "table" property.
/// Other than DateTime filed name used for optional time-based archiving purposes, currently the
- /// field names are not mandated. However, it is recomended to use names as specified above.
+ /// field names are not mandated. However, it is recommended to use names as specified above.
///
- /// To provide as non-intrusive operation as possbile, the log entries are cached and
+ /// To provide as non-intrusive operation as possible, the log entries are cached and
/// inserted into the target database asynchronously by default. The blocking, however, will occur
/// before the next entry insertion with default timeout of 1 second. The default settings can be
/// overridden (see async, timeout and throw properties for details).
/// If throw property is false, insertion timeouts are ignored, otherwise a TimeoutException is thrown.
/// To force insertion of every entry, set timeout to 0. This setting, however, introduces
/// a risk of long blocking periods in case of remote server communication delays.
+ ///
+ /// A default-constructed SQLChannel operates without an active DB connection, in a store-and-forward
+ /// mode. For this mode of operation, a separate service is required to consume and execute the SQL
+ /// statements from the stored files and insert the log entries into the database. Since this setup
+ /// stores SQL inserts in the OS file system, it is strongly recommended to take the necessary
+ /// precautions to limit and secure the access to those files.
{
public:
class LogNotification : public Poco::Notification
@@ -88,9 +95,14 @@ class Data_API SQLChannel: public Poco::Channel, Poco::Runnable
static const int DEFAULT_MIN_BATCH_SIZE = 1;
static const int DEFAULT_MAX_BATCH_SIZE = 1000;
+ static const int DEFAULT_MAX_SQL_SIZE = 65536;
SQLChannel();
/// Creates SQLChannel.
+ /// An SQLChannel without DB connector and local file name
+ /// only logs SQL inserts into local files. A separate file
+ /// is created for each insert batch; files are named
+ /// ..log.sql.
SQLChannel(const std::string& connector,
const std::string& connect,
@@ -98,7 +110,8 @@ class Data_API SQLChannel: public Poco::Channel, Poco::Runnable
const std::string& table = "T_POCO_LOG",
int timeout = 1000,
int minBatch = DEFAULT_MIN_BATCH_SIZE,
- int maxBatch = DEFAULT_MAX_BATCH_SIZE);
+ int maxBatch = DEFAULT_MAX_BATCH_SIZE,
+ int maxSQL = DEFAULT_MAX_SQL_SIZE);
/// Creates an SQLChannel with the given connector, connect string, timeout, table and name.
/// The connector must be already registered.
@@ -166,6 +179,8 @@ class Data_API SQLChannel: public Poco::Channel, Poco::Runnable
/// reaches this size, log entries are silently discarded.
/// Defaults to 100, can't be zero or larger than 1000.
///
+ /// * maxSQL: Maximum total length of the SQL statement. Defaults to 65536.
+ ///
/// * bulk: Do bulk execute (on most DBMS systems, this can speed up things
/// drastically).
///
@@ -198,8 +213,10 @@ class Data_API SQLChannel: public Poco::Channel, Poco::Runnable
static const std::string PROP_TIMEOUT;
static const std::string PROP_MIN_BATCH;
static const std::string PROP_MAX_BATCH;
+ static const std::string PROP_MAX_SQL;
static const std::string PROP_BULK;
static const std::string PROP_THROW;
+ static const std::string PROP_DIRECTORY;
static const std::string PROP_FILE;
protected:
@@ -222,17 +239,20 @@ class Data_API SQLChannel: public Poco::Channel, Poco::Runnable
/// than minBatch, sends logs to the destination.
/// Returns true if log entry was processed.
- size_t execSQL();
+ size_t execSQL(bool flush = false);
/// Executes the log statement.
- size_t logSync();
+ size_t logSync(bool flush = false);
/// Inserts entries into the target database.
bool isTrue(const std::string& value) const;
/// Returns true is value is "true", "t", "yes" or "y".
/// Case insensitive.
- size_t logToFile(bool clear = false);
+ size_t logToDB(bool flush = false);
+ /// Logs cached entries to the DB.
+
+ size_t logToFile(bool flush = false);
/// Logs cached entries to a file. Called in case DB insertions fail.
size_t logLocal(const std::string&, Message::Priority prio = Message::PRIO_ERROR);
@@ -257,24 +277,26 @@ class Data_API SQLChannel: public Poco::Channel, Poco::Runnable
int _timeout;
std::atomic _minBatch;
int _maxBatch;
+ int _maxSQL;
bool _bulk;
std::atomic _throw;
// members for log entry cache
- std::vector _source;
- std::vector _pid;
- std::vector _thread;
- std::vector _tid;
- std::vector _priority;
- std::vector _text;
- std::vector _dateTime;
+ std::deque _source;
+ std::deque _pid;
+ std::deque _thread;
+ std::deque _tid;
+ std::deque _priority;
+ std::deque _text;
+ std::deque _dateTime;
Poco::NotificationQueue _logQueue;
- std::unique_ptr _pDBThread;
+ std::unique_ptr _pLogThread;
std::atomic _reconnect;
std::atomic _stop;
std::atomic _logged;
StrategyPtr _pArchiveStrategy;
std::string _file;
+ std::string _directory;
AutoPtr _pFileChannel;
Poco::Logger& _logger = Poco::Logger::get("SQLChannel");
};
@@ -296,7 +318,7 @@ inline bool SQLChannel::isTrue(const std::string& value) const
inline bool SQLChannel::isRunning() const
{
- return _pDBThread && _pDBThread->isRunning();
+ return _pLogThread && _pLogThread->isRunning();
}
diff --git a/Data/src/SQLChannel.cpp b/Data/src/SQLChannel.cpp
index 18e30e159a..68195cfa54 100644
--- a/Data/src/SQLChannel.cpp
+++ b/Data/src/SQLChannel.cpp
@@ -24,7 +24,10 @@
#include "Poco/NumberFormatter.h"
#include "Poco/Stopwatch.h"
#include "Poco/Format.h"
+#include "Poco/Path.h"
#include "Poco/File.h"
+#include "Poco/UUID.h"
+#include "Poco/UUIDGenerator.h"
#include
#include
@@ -46,8 +49,10 @@ const std::string SQLChannel::PROP_ASYNC("async");
const std::string SQLChannel::PROP_TIMEOUT("timeout");
const std::string SQLChannel::PROP_MIN_BATCH("minBatch");
const std::string SQLChannel::PROP_MAX_BATCH("maxBatch");
+const std::string SQLChannel::PROP_MAX_SQL("maxSQL");
const std::string SQLChannel::PROP_BULK("bulk");
const std::string SQLChannel::PROP_THROW("throw");
+const std::string SQLChannel::PROP_DIRECTORY("directory");
const std::string SQLChannel::PROP_FILE("file");
@@ -57,21 +62,23 @@ const std::string SQLChannel::SQL_INSERT_STMT = "INSERT INTO %s " \
SQLChannel::SQLChannel():
- _name("-"),
- _table("T_POCO_LOG"),
+ _name("SQLChannel"),
_tableChanged(true),
_timeout(1000),
_minBatch(DEFAULT_MIN_BATCH_SIZE),
_maxBatch(DEFAULT_MAX_BATCH_SIZE),
+ _maxSQL(DEFAULT_MAX_SQL_SIZE),
_bulk(true),
_throw(false),
_pid(),
_tid(),
_priority(),
+ _pLogThread(new Thread),
_reconnect(false),
_stop(false),
_logged(0)
{
+ _pLogThread->start(*this);
}
@@ -81,7 +88,8 @@ SQLChannel::SQLChannel(const std::string& connector,
const std::string& table,
int timeout,
int minBatch,
- int maxBatch) :
+ int maxBatch,
+ int maxSQL) :
_connector(connector),
_connect(connect),
_name(name),
@@ -90,17 +98,18 @@ SQLChannel::SQLChannel(const std::string& connector,
_timeout(timeout),
_minBatch(minBatch),
_maxBatch(maxBatch),
+ _maxSQL(maxSQL),
_bulk(false),
_throw(false),
_pid(),
_tid(),
_priority(),
- _pDBThread(new Thread),
+ _pLogThread(new Thread),
_reconnect(true),
_stop(false),
_logged(0)
{
- _pDBThread->start(*this);
+ _pLogThread->start(*this);
}
@@ -185,7 +194,7 @@ size_t SQLChannel::logLocal(const std::string& message, Message::Priority prio)
{
Message msg("SQLChannel"s, message, prio);
log(msg);
- return logToFile(_pFileChannel);
+ return logToFile();
}
@@ -195,11 +204,11 @@ void SQLChannel::log(const Message& msg)
}
-size_t SQLChannel::logSync()
+size_t SQLChannel::logSync(bool flush)
{
try
{
- return execSQL();
+ return execSQL(flush);
}
catch (...)
{
@@ -212,8 +221,8 @@ size_t SQLChannel::logSync()
bool SQLChannel::processOne(int minBatch)
{
- bool ret = false;
- if (_logQueue.size())
+ bool ret = false, flush = (minBatch == 0);
+ while (_logQueue.size())
{
Notification::Ptr pN = _logQueue.dequeueNotification();
LogNotification::Ptr pLN = pN.cast();
@@ -231,10 +240,12 @@ bool SQLChannel::processOne(int minBatch)
_text.push_back(msg.getText());
Poco::replaceInPlace(_text.back(), "'", "''");
_dateTime.emplace_back(msg.getTime());
+ if ((_source.size() >= minBatch) || flush)
+ logSync(flush);
+ ret = true;
}
- ret = true;
}
- if (_source.size() >= minBatch) logSync();
+ if (flush) logSync(flush);
return ret;
}
@@ -251,16 +262,14 @@ void SQLChannel::run()
{
close();
open();
- _reconnect = _pSession.isNull();
+ _reconnect = !_connector.empty() && _pSession.isNull();
if (_reconnect && sleepTime < 12800)
sleepTime *= 2;
}
- if (_stop && _reconnect)
- std::cout << "Request to stop and reconnect!" << std::endl;
if (!_reconnect)
{
- processOne(_minBatch);
+ if (_logQueue.size()) processOne(_minBatch);
sleepTime = 100;
}
}
@@ -291,20 +300,21 @@ void SQLChannel::run()
}
Thread::sleep(sleepTime);
}
- while (_logQueue.size())
+ while (_logQueue.size() || _source.size())
processOne();
}
void SQLChannel::stop()
{
- if (_pDBThread)
+ if (_pLogThread)
{
_reconnect = false;
_stop = true;
- while (_pDBThread->isRunning())
+ while (_pLogThread->isRunning())
Thread::sleep(10);
- _pDBThread->join();
+ _pLogThread->join();
+ _pLogThread.reset();
}
}
@@ -312,11 +322,229 @@ void SQLChannel::stop()
void SQLChannel::reconnect()
{
_reconnect = true;
- if (!_pDBThread)
+ if (!_pLogThread)
+ {
+ _pLogThread = std::make_unique();
+ _pLogThread->start(*this);
+ }
+}
+
+
+size_t SQLChannel::logToFile(bool flush)
+{
+ if (_source.empty()) return 0u;
+
+ static std::vector names;
+ if (names.size() != _source.size())
+ names.resize(_source.size(), Poco::replace(_name, "'", "''"));
+
+ std::size_t n = 0;
+
+ AutoPtr pFileChannel;
+ std::string file = _file;
+ if (!_pFileChannel && !_file.empty())
+ {
+ if (!Path(File(file).path()).isAbsolute())
+ file = _directory + file;
+
+ _pFileChannel = new FileChannel(file);
+ pFileChannel = _pFileChannel;
+ }
+ else
+ {
+ UUID uuid = UUIDGenerator::defaultGenerator().createRandom();
+ std::string filename(_directory);
+ filename.append(DateTimeFormatter::format(LocalDateTime(), "%Y%m%d%H%M%S%i.").append(uuid.toString()).append(".log.sql"s));
+ pFileChannel = new FileChannel(filename);
+ }
+
+ if (pFileChannel)
+ {
+ std::string sql;
+ Poco::format(sql, SQL_INSERT_STMT, _table, std::string());
+ std::stringstream os, tmp;
+ os << sql << '\n';
+ auto it = _source.begin();
+ auto end = _source.end();
+ int idx = 0, batch = 0;
+ for (; it != end; ++idx)
+ {
+ std::string dt = DateTimeFormatter::format(_dateTime[idx], "%Y-%m-%d %H:%M:%S.%i");
+ tmp.str("");
+ tmp << "('" << *it << "','" <<
+ names[idx] << "'," <<
+ _pid[idx] << ",'" <<
+ _thread[idx] << "'," <<
+ _tid[idx] << ',' <<
+ _priority[idx] << ",'" <<
+ _text[idx] << "','" <<
+ dt << "')";
+
+ if (++batch == _maxBatch || (os.str().length() + tmp.str().length()) >= _maxSQL)
+ {
+ os << ";\n";
+ Message msg(_source[0], os.str(), Message::PRIO_FATAL);
+ pFileChannel->log(msg);
+ n += batch;
+ _logged += batch;
+ _source.erase(_source.begin(), _source.begin() + batch);
+ _pid.erase(_pid.begin(), _pid.begin() + batch);
+ _thread.erase(_thread.begin(), _thread.begin() + batch);
+ _tid.erase(_tid.begin(), _tid.begin() + batch);
+ _priority.erase(_priority.begin(), _priority.begin() + batch);
+ _text.erase(_text.begin(), _text.begin() + batch);
+ _dateTime.erase(_dateTime.begin(), _dateTime.begin() + batch);
+ os.str(""); sql.clear();
+ Poco::format(sql, SQL_INSERT_STMT, _table, std::string());
+ os << sql << '\n' << tmp.str();
+ batch = 0;
+ }
+
+ os << tmp.str();
+
+ if (++it == end)
+ {
+ os << ";\n";
+ break;
+ }
+ os << ",\n";
+ }
+
+ if ((batch >= _minBatch) || flush)
+ {
+ Message msg(_source[0], os.str(), Message::PRIO_FATAL);
+ pFileChannel->log(msg);
+ n += batch;
+ _logged += batch;
+ _source.erase(_source.begin(), _source.begin()+batch);
+ _pid.erase(_pid.begin(), _pid.begin() + batch);
+ _thread.erase(_thread.begin(), _thread.begin() + batch);
+ _tid.erase(_tid.begin(), _tid.begin() + batch);
+ _priority.erase(_priority.begin(), _priority.begin() + batch);
+ _text.erase(_text.begin(), _text.begin() + batch);
+ _dateTime.erase(_dateTime.begin(), _dateTime.begin() + batch);
+ }
+ }
+
+ return n;
+}
+
+
+size_t SQLChannel::logToDB(bool flush)
+{
+ if (_source.empty()) return 0u;
+
+ static std::vector names;
+ if (names.size() != _source.size())
+ names.resize(_source.size(), Poco::replace(_name, "'", "''"));
+ static std::string placeholders = "(?,?,?,?,?,?,?,?)";
+
+ Poco::FastMutex::ScopedLock l(_mutex);
+
+ if (_tableChanged)
{
- _pDBThread = std::make_unique();
- _pDBThread->start(*this);
+ Poco::format(_sql, SQL_INSERT_STMT, _table, placeholders);
+ _tableChanged = false;
}
+
+ std::size_t n = 0;
+
+ try
+ {
+ if (_bulk)
+ {
+ try
+ {
+ (*_pSession) << _sql,
+ use(_source, bulk),
+ use(names, bulk),
+ use(_pid, bulk),
+ use(_thread, bulk),
+ use(_tid, bulk),
+ use(_priority, bulk),
+ use(_text, bulk),
+ use(_dateTime, bulk), now;
+ }
+ // most likely bulk mode not supported, try again
+ catch (const Poco::InvalidAccessException&)
+ {
+ (*_pSession) << _sql,
+ use(_source),
+ use(names),
+ use(_pid),
+ use(_thread),
+ use(_tid),
+ use(_priority),
+ use(_text),
+ use(_dateTime), now;
+ _bulk = false;
+ }
+ }
+ else
+ {
+ (*_pSession) << _sql,
+ use(_source),
+ use(names),
+ use(_pid),
+ use(_thread),
+ use(_tid),
+ use(_priority),
+ use(_text),
+ use(_dateTime), now;
+ }
+ n = _source.size();
+ }
+ catch (const Poco::Exception& ex)
+ {
+ logLocal(ex.displayText());
+ close();
+ _reconnect = true;
+ }
+ catch (const std::exception& ex)
+ {
+ logLocal(ex.what());
+ close();
+ _reconnect = true;
+ }
+
+ if (n)
+ {
+ _logged += n;
+ _source.erase(_source.begin(), _source.begin() + n);
+ _pid.erase(_pid.begin(), _pid.begin() + n);
+ _thread.erase(_thread.begin(), _thread.begin() + n);
+ _tid.erase(_tid.begin(), _tid.begin() + n);
+ _priority.erase(_priority.begin(), _priority.begin() + n);
+ _text.erase(_text.begin(), _text.begin() + n);
+ _dateTime.erase(_dateTime.begin(), _dateTime.begin() + n);
+ }
+
+ return n;
+}
+
+
+size_t SQLChannel::execSQL(bool flush)
+{
+ if (!_connector.empty() && (!_pSession || !_pSession->isConnected())) open();
+ if (_pArchiveStrategy) _pArchiveStrategy->archive();
+
+ size_t n = _pSession ? logToDB(flush) : logToFile(flush);
+
+ return n;
+}
+
+
+std::size_t SQLChannel::wait(int ms)
+{
+ Stopwatch sw;
+ sw.start();
+ while (_logQueue.size())
+ {
+ Thread::sleep(10);
+ if (ms && sw.elapsed() * 1000 > ms)
+ break;
+ }
+ return _logQueue.size();
}
@@ -405,6 +633,11 @@ void SQLChannel::setProperty(const std::string& name, const std::string& value)
throw Poco::InvalidArgumentException(Poco::format("SQLChannel::setProperty(%s,%s)", name, value));
_maxBatch = maxBatch;
}
+ else if (name == PROP_MAX_SQL)
+ {
+ int maxSQL = NumberParser::parse(value);
+ _maxSQL = maxSQL;
+ }
else if (name == PROP_BULK)
{
_bulk = isTrue(value);
@@ -413,6 +646,16 @@ void SQLChannel::setProperty(const std::string& name, const std::string& value)
{
_throw = isTrue(value);
}
+ else if (name == PROP_DIRECTORY)
+ {
+ std::string dir = value;
+ if (!Path(File(dir).path()).isAbsolute())
+ {
+ Path d(dir);
+ dir = d.makeDirectory().makeAbsolute().toString();
+ }
+ _directory = dir;
+ }
else if (name == PROP_FILE)
{
_file = value;
@@ -465,6 +708,10 @@ std::string SQLChannel::getProperty(const std::string& name) const
{
return std::to_string(_maxBatch);
}
+ else if (name == PROP_MAX_SQL)
+ {
+ return std::to_string(_maxSQL);
+ }
else if (name == PROP_BULK)
{
if (_bulk) return "true";
@@ -475,6 +722,10 @@ std::string SQLChannel::getProperty(const std::string& name) const
if (_throw) return "true";
else return "false";
}
+ else if (name == PROP_DIRECTORY)
+ {
+ return _directory;
+ }
else if (name == PROP_FILE)
{
return _file;
@@ -486,182 +737,6 @@ std::string SQLChannel::getProperty(const std::string& name) const
}
-size_t SQLChannel::logToFile(bool clear)
-{
- static std::vector names;
- if (names.size() != _source.size())
- names.resize(_source.size(), Poco::replace(_name, "'", "''"));
-
- std::size_t n = 0;
-
- if (!_pFileChannel && !_file.empty()) _pFileChannel = new FileChannel(_file);
- if (_pFileChannel)
- {
- std::string sql;
- Poco::format(sql, SQL_INSERT_STMT, _table, std::string());
- std::stringstream os;
- os << sql << '\n';
- auto it = _source.begin();
- auto end = _source.end();
- int idx = 0, batch = 0;
- for (; it != end; ++idx)
- {
- std::string dt = Poco::DateTimeFormatter::format(_dateTime[idx], "%Y-%m-%d %H:%M:%S.%i");
- os << "('" << *it << "','" <<
- names[idx] << "'," <<
- _pid[idx] << ",'" <<
- _thread[idx] << "'," <<
- _tid[idx] << ',' <<
- _priority[idx] << ",'" <<
- _text[idx] << "','" <<
- dt << "')";
- if (++batch == _maxBatch)
- {
- os << ";\n";
- Message msg(_source[0], os.str(), Message::PRIO_ERROR);
- _pFileChannel->log(msg);
- os.str(""); sql.clear();
- Poco::format(sql, SQL_INSERT_STMT, _table, std::string());
- batch = 0;
- }
- if (++it == end)
- {
- os << ";\n";
- break;
- }
- os << ",\n";
- }
- Message msg(_source[0], os.str(), Message::PRIO_ERROR);
- _pFileChannel->log(msg);
- n = _source.size();
- if (clear && n)
- {
- _source.clear();
- _pid.clear();
- _thread.clear();
- _tid.clear();
- _priority.clear();
- _text.clear();
- _dateTime.clear();
- }
- }
- return n;
-}
-
-
-size_t SQLChannel::execSQL()
-{
- static std::vector names;
- if (names.size() != _source.size())
- names.resize(_source.size(), Poco::replace(_name, "'", "''"));
- static std::string placeholders = "(?,?,?,?,?,?,?,?)";
-
- Poco::FastMutex::ScopedLock l(_mutex);
-
- if (_tableChanged)
- {
- Poco::format(_sql, SQL_INSERT_STMT, _table, placeholders);
- _tableChanged = false;
- }
-
- if (!_pSession || !_pSession->isConnected()) open();
- if (_pArchiveStrategy) _pArchiveStrategy->archive();
-
- size_t n = 0;
- if (_pSession)
- {
- try
- {
- if (_bulk)
- {
- try
- {
- (*_pSession) << _sql,
- use(_source, bulk),
- use(names, bulk),
- use(_pid, bulk),
- use(_thread, bulk),
- use(_tid, bulk),
- use(_priority, bulk),
- use(_text, bulk),
- use(_dateTime, bulk), now;
- }
- // most likely bulk mode not supported, try again
- catch (const Poco::InvalidAccessException&)
- {
- (*_pSession) << _sql,
- use(_source),
- use(names),
- use(_pid),
- use(_thread),
- use(_tid),
- use(_priority),
- use(_text),
- use(_dateTime), now;
- _bulk = false;
- }
- }
- else
- {
- (*_pSession) << _sql,
- use(_source),
- use(names),
- use(_pid),
- use(_thread),
- use(_tid),
- use(_priority),
- use(_text),
- use(_dateTime), now;
- }
- n = _source.size();
- }
- catch (const Poco::Exception& ex)
- {
- logLocal(ex.displayText());
- close();
- _reconnect = true;
- }
- catch (const std::exception& ex)
- {
- logLocal(ex.what());
- close();
- _reconnect = true;
- }
- }
- else
- {
- n = logToFile(_pFileChannel);
- }
-
- if (n)
- {
- _logged += n;
- _source.clear();
- _pid.clear();
- _thread.clear();
- _tid.clear();
- _priority.clear();
- _text.clear();
- _dateTime.clear();
- }
- return n;
-}
-
-
-std::size_t SQLChannel::wait(int ms)
-{
- Stopwatch sw;
- sw.start();
- while (_logQueue.size())
- {
- Thread::sleep(10);
- if (ms && sw.elapsed() * 1000 > ms)
- break;
- }
- return _logQueue.size();
-}
-
-
void SQLChannel::registerChannel()
{
Poco::LoggingFactory::defaultFactory().registerChannelClass("SQLChannel",
diff --git a/Data/testsuite/src/DataTest.cpp b/Data/testsuite/src/DataTest.cpp
index fef722b547..4750fa53bf 100644
--- a/Data/testsuite/src/DataTest.cpp
+++ b/Data/testsuite/src/DataTest.cpp
@@ -20,6 +20,7 @@
#include "Poco/Data/Column.h"
#include "Poco/Data/Date.h"
#include "Poco/Data/Time.h"
+#include "Poco/Data/SQLChannel.h"
#include "Poco/Data/SimpleRowFormatter.h"
#include "Poco/Data/JSONRowFormatter.h"
#include "Poco/Data/DataException.h"
@@ -27,12 +28,17 @@
#include "Poco/BinaryReader.h"
#include "Poco/BinaryWriter.h"
#include "Poco/DateTime.h"
+#include "Poco/Stopwatch.h"
#include "Poco/Types.h"
#include "Poco/Dynamic/Var.h"
#include "Poco/Data/DynamicLOB.h"
#include "Poco/Data/DynamicDateTime.h"
#include "Poco/Latin1Encoding.h"
#include "Poco/Exception.h"
+#include "Poco/DirectoryIterator.h"
+#include "Poco/Glob.h"
+#include "Poco/File.h"
+#include
#include
#include
#include
@@ -40,46 +46,13 @@
using namespace Poco;
+using namespace Poco::Dynamic;
using namespace Poco::Data;
using namespace Poco::Data::Keywords;
-
-
-using Poco::BinaryReader;
-using Poco::BinaryWriter;
-using Poco::UInt32;
-using Poco::Int64;
-using Poco::UInt64;
-using Poco::DateTime;
-using Poco::Latin1Encoding;
-using Poco::Dynamic::Var;
-using Poco::InvalidAccessException;
-using Poco::IllegalStateException;
-using Poco::RangeException;
-using Poco::NotFoundException;
-using Poco::InvalidArgumentException;
-using Poco::NotImplementedException;
-using Poco::Data::Session;
-using Poco::Data::SessionFactory;
-using Poco::Data::Statement;
-using Poco::Data::NotSupportedException;
-using Poco::Data::CLOB;
-using Poco::Data::CLOBInputStream;
-using Poco::Data::CLOBOutputStream;
-using Poco::Data::MetaColumn;
-using Poco::Data::Column;
-using Poco::Data::Row;
-using Poco::Data::RowFormatter;
-using Poco::Data::SimpleRowFormatter;
-using Poco::Data::JSONRowFormatter;
-using Poco::Data::Date;
-using Poco::Data::Time;
-using Poco::Data::AbstractExtractor;
-using Poco::Data::AbstractExtraction;
-using Poco::Data::AbstractExtractionVec;
-using Poco::Data::AbstractExtractionVecVec;
-using Poco::Data::AbstractBinding;
-using Poco::Data::AbstractBindingVec;
-using Poco::Data::NotConnectedException;
+using Poco::DirectoryIterator;
+using Poco::Glob;
+using Poco::File;
+using namespace std::string_literals;
DataTest::DataTest(const std::string& name): CppUnit::TestCase(name)
@@ -1570,6 +1543,61 @@ void DataTest::testSQLParse()
}
+void DataTest::testSQLChannel()
+{
+ AutoPtr pChannel = new SQLChannel();
+ Stopwatch sw; sw.start();
+ while (!pChannel->isRunning())
+ {
+ Thread::sleep(10);
+ if (sw.elapsedSeconds() > 3)
+ fail("SQLChannel timed out");
+ }
+
+ Glob g("*.log.sql");
+ {
+ DirectoryIterator it(""s);
+ DirectoryIterator end;
+ while (it != end)
+ {
+ if (g.match(it->path()))
+ {
+ File(it->path()).remove();
+ }
+ ++it;
+ }
+ }
+
+ constexpr int mcount{10};
+ constexpr int batch{3};
+ pChannel->setProperty("minBatch", std::to_string(batch));
+ for (int i = 0; i < mcount; i++)
+ {
+ Message msgInfA("InformationSource", Poco::format("%d Informational sync message", i), Message::PRIO_INFORMATION);
+ pChannel->log(msgInfA);
+ }
+ Thread::sleep(1000); // give it time to log
+ pChannel->stop();
+ auto logged = pChannel->logged();
+ assertEqual(mcount, logged);
+ pChannel.reset();
+
+ int count = 0;
+ DirectoryIterator it(""s);
+ DirectoryIterator end;
+ while (it != end)
+ {
+ if (g.match(it->path()))
+ {
+ ++count;
+ File(it->path()).remove();
+ }
+ ++it;
+ }
+ assertEqual(count, (mcount / batch) + (mcount % batch));
+}
+
+
void DataTest::setUp()
{
}
@@ -1603,6 +1631,7 @@ CppUnit::Test* DataTest::suite()
CppUnit_addTest(pSuite, DataTest, testExternalBindingAndExtraction);
CppUnit_addTest(pSuite, DataTest, testTranscode);
CppUnit_addTest(pSuite, DataTest, testSQLParse);
+ CppUnit_addTest(pSuite, DataTest, testSQLChannel);
return pSuite;
}
diff --git a/Data/testsuite/src/DataTest.h b/Data/testsuite/src/DataTest.h
index 8d9825030c..5e34dbdc40 100644
--- a/Data/testsuite/src/DataTest.h
+++ b/Data/testsuite/src/DataTest.h
@@ -46,6 +46,7 @@ class DataTest: public CppUnit::TestCase
void testExternalBindingAndExtraction();
void testTranscode();
void testSQLParse();
+ void testSQLChannel();
void setUp();
void tearDown();