Skip to content

Commit

Permalink
feat(SQLChannel): add store-and-forward mode
Browse files Browse the repository at this point in the history
  • Loading branch information
aleks-f committed Jul 15, 2024
1 parent 6ad9655 commit c3cbcaa
Show file tree
Hide file tree
Showing 8 changed files with 449 additions and 332 deletions.
8 changes: 0 additions & 8 deletions Data/Data_VS90.vcproj
Original file line number Diff line number Diff line change
Expand Up @@ -651,10 +651,6 @@
RelativePath=".\include\Poco\Data\SimpleRowFormatter.h"
>
</File>
<File
RelativePath=".\include\Poco\Data\SQLChannel.h"
>
</File>
<File
RelativePath=".\include\Poco\Data\SQLParser.h"
>
Expand Down Expand Up @@ -791,10 +787,6 @@
RelativePath=".\src\SimpleRowFormatter.cpp"
>
</File>
<File
RelativePath=".\src\SQLChannel.cpp"
>
</File>
<File
RelativePath=".\src\Statement.cpp"
>
Expand Down
12 changes: 6 additions & 6 deletions Data/Data_vs170.vcxproj.filters
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,6 @@
<ClInclude Include="include\Poco\Data\SimpleRowFormatter.h">
<Filter>DataCore\Header Files</Filter>
</ClInclude>
<ClInclude Include="include\Poco\Data\SQLChannel.h">
<Filter>DataCore\Header Files</Filter>
</ClInclude>
<ClInclude Include="include\Poco\Data\SQLParser.h">
<Filter>DataCore\Header Files</Filter>
</ClInclude>
Expand Down Expand Up @@ -297,6 +294,9 @@
<ClInclude Include="include\Poco\Data\ArchiveStrategy.h">
<Filter>Logging\Header Files</Filter>
</ClInclude>
<ClInclude Include="include\Poco\Data\SQLChannel.h">
<Filter>Logging\Header Files</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<ClCompile Include="src\AbstractBinder.cpp">
Expand Down Expand Up @@ -374,9 +374,6 @@
<ClCompile Include="src\SimpleRowFormatter.cpp">
<Filter>DataCore\Source Files</Filter>
</ClCompile>
<ClCompile Include="src\SQLChannel.cpp">
<Filter>DataCore\Source Files</Filter>
</ClCompile>
<ClCompile Include="src\Statement.cpp">
<Filter>DataCore\Source Files</Filter>
</ClCompile>
Expand Down Expand Up @@ -440,6 +437,9 @@
<ClCompile Include="src\ArchiveStrategy.cpp">
<Filter>Logging\Source Files</Filter>
</ClCompile>
<ClCompile Include="src\SQLChannel.cpp">
<Filter>Logging\Source Files</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ResourceCompile Include="..\DLLVersion.rc" />
Expand Down
122 changes: 61 additions & 61 deletions Data/SQLite/testsuite/TestSuite_vs170.vcxproj

Large diffs are not rendered by default.

8 changes: 3 additions & 5 deletions Data/SQLite/testsuite/src/SQLiteTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2480,7 +2480,7 @@ void SQLiteTest::testSQLChannel()
{
Thread::sleep(10);
if (sw.elapsedSeconds() > 3)
fail ("SQLExecutor::sqlLogger(): SQLChannel timed out");
fail ("SQLChannel timed out");
}
// bulk binding mode is not suported by SQLite, but SQLChannel should handle it internally
pChannel->setProperty("bulk", "true");
Expand Down Expand Up @@ -2539,17 +2539,15 @@ void SQLiteTest::testSQLChannel()
assertTrue("f Warning sync message" == rs2["Text"]);

pChannel->setProperty("minBatch", "1024");
constexpr int mcount { 20000 };
constexpr int mcount { 2000 };
for (int i = 0; i < mcount; i++)
{
Message msgInfA("InformationSource", "e Informational sync message", Message::PRIO_INFORMATION);
Message msgInfA("InformationSource", "g Informational sync message", Message::PRIO_INFORMATION);
pChannel->log(msgInfA);
}
//pChannel->stop();
pChannel.reset();
RecordSet rsl(tmp, "SELECT * FROM T_POCO_LOG");
assertEquals(2+mcount, rsl.rowCount());

}


Expand Down
52 changes: 37 additions & 15 deletions Data/include/Poco/Data/SQLChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "Poco/Thread.h"
#include "Poco/Mutex.h"
#include <atomic>
#include <atomic>


namespace Poco {
Expand All @@ -56,15 +57,21 @@ class Data_API SQLChannel: public Poco::Channel, Poco::Runnable
///
/// The table name is configurable through "table" property.
/// Other than DateTime filed name used for optional time-based archiving purposes, currently the
/// field names are not mandated. However, it is recomended to use names as specified above.
/// field names are not mandated. However, it is recommended to use names as specified above.
///
/// To provide as non-intrusive operation as possbile, the log entries are cached and
/// To provide as non-intrusive operation as possible, the log entries are cached and
/// inserted into the target database asynchronously by default. The blocking, however, will occur
/// before the next entry insertion with default timeout of 1 second. The default settings can be
/// overridden (see async, timeout and throw properties for details).
/// If throw property is false, insertion timeouts are ignored, otherwise a TimeoutException is thrown.
/// To force insertion of every entry, set timeout to 0. This setting, however, introduces
/// a risk of long blocking periods in case of remote server communication delays.
///
/// A default-constructed SQLChannel operates without an active DB connection, in a store-and-forward
/// mode. For this mode of operation, a separate service is required to consume and execute the SQL
/// statements from the stored files and insert the log entries into the database. Since this setup
/// stores SQL inserts in the OS file system, it is strongly recommended to take the necessary
/// precautions to limit and secure the access to those files.
{
public:
class LogNotification : public Poco::Notification
Expand All @@ -88,17 +95,23 @@ class Data_API SQLChannel: public Poco::Channel, Poco::Runnable

static const int DEFAULT_MIN_BATCH_SIZE = 1;
static const int DEFAULT_MAX_BATCH_SIZE = 1000;
static const int DEFAULT_MAX_SQL_SIZE = 65536;

SQLChannel();
/// Creates SQLChannel.
/// An SQLChannel without DB connector and local file name
/// only logs SQL inserts into local files. A separate file
/// is created for each insert batch; files are named
/// <YYYYMMDDHH24MISSmmm>.<UUID>.log.sql.

SQLChannel(const std::string& connector,
const std::string& connect,
const std::string& name = "-",
const std::string& table = "T_POCO_LOG",
int timeout = 1000,
int minBatch = DEFAULT_MIN_BATCH_SIZE,
int maxBatch = DEFAULT_MAX_BATCH_SIZE);
int maxBatch = DEFAULT_MAX_BATCH_SIZE,
int maxSQL = DEFAULT_MAX_SQL_SIZE);
/// Creates an SQLChannel with the given connector, connect string, timeout, table and name.
/// The connector must be already registered.

Expand Down Expand Up @@ -166,6 +179,8 @@ class Data_API SQLChannel: public Poco::Channel, Poco::Runnable
/// reaches this size, log entries are silently discarded.
/// Defaults to 100, can't be zero or larger than 1000.
///
/// * maxSQL: Maximum total length of the SQL statement. Defaults to 65536.
///
/// * bulk: Do bulk execute (on most DBMS systems, this can speed up things
/// drastically).
///
Expand Down Expand Up @@ -198,8 +213,10 @@ class Data_API SQLChannel: public Poco::Channel, Poco::Runnable
static const std::string PROP_TIMEOUT;
static const std::string PROP_MIN_BATCH;
static const std::string PROP_MAX_BATCH;
static const std::string PROP_MAX_SQL;
static const std::string PROP_BULK;
static const std::string PROP_THROW;
static const std::string PROP_DIRECTORY;
static const std::string PROP_FILE;

protected:
Expand All @@ -222,17 +239,20 @@ class Data_API SQLChannel: public Poco::Channel, Poco::Runnable
/// than minBatch, sends logs to the destination.
/// Returns true if log entry was processed.

size_t execSQL();
size_t execSQL(bool flush = false);
/// Executes the log statement.

size_t logSync();
size_t logSync(bool flush = false);
/// Inserts entries into the target database.

bool isTrue(const std::string& value) const;
/// Returns true is value is "true", "t", "yes" or "y".
/// Case insensitive.

size_t logToFile(bool clear = false);
size_t logToDB(bool flush = false);
/// Logs cached entries to the DB.

size_t logToFile(bool flush = false);
/// Logs cached entries to a file. Called in case DB insertions fail.

size_t logLocal(const std::string&, Message::Priority prio = Message::PRIO_ERROR);
Expand All @@ -257,24 +277,26 @@ class Data_API SQLChannel: public Poco::Channel, Poco::Runnable
int _timeout;
std::atomic<int> _minBatch;
int _maxBatch;
int _maxSQL;
bool _bulk;
std::atomic<bool> _throw;

// members for log entry cache
std::vector<std::string> _source;
std::vector<long> _pid;
std::vector<std::string> _thread;
std::vector<long> _tid;
std::vector<int> _priority;
std::vector<std::string> _text;
std::vector<DateTime> _dateTime;
std::deque<std::string> _source;
std::deque<long> _pid;
std::deque<std::string> _thread;
std::deque<long> _tid;
std::deque<int> _priority;
std::deque<std::string> _text;
std::deque<DateTime> _dateTime;
Poco::NotificationQueue _logQueue;
std::unique_ptr<Poco::Thread> _pDBThread;
std::unique_ptr<Poco::Thread> _pLogThread;
std::atomic<bool> _reconnect;
std::atomic<bool> _stop;
std::atomic<size_t> _logged;
StrategyPtr _pArchiveStrategy;
std::string _file;
std::string _directory;
AutoPtr<FileChannel> _pFileChannel;
Poco::Logger& _logger = Poco::Logger::get("SQLChannel");
};
Expand All @@ -296,7 +318,7 @@ inline bool SQLChannel::isTrue(const std::string& value) const

inline bool SQLChannel::isRunning() const
{
return _pDBThread && _pDBThread->isRunning();
return _pLogThread && _pLogThread->isRunning();
}


Expand Down
Loading

0 comments on commit c3cbcaa

Please sign in to comment.