From f6884f9080e0d17dfcc66303ec618cf798903ac1 Mon Sep 17 00:00:00 2001 From: Christopher Beard Date: Fri, 20 Sep 2024 17:37:38 +0100 Subject: [PATCH] [WIP] Feat: Add queue history size metric This adds a new queue metric that counts the number of GUIDs in that queue's history. This is useful for identifying excessive memory utilization from history and potential history garbage collection issues (where history is filled up faster than it's cleaned up). Signed-off-by: Christopher Beard --- .../mqb/mqbs/mqbs_filebackedstorage.cpp | 33 +++++++++++++++++-- src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp | 30 +++++++++++++++-- src/groups/mqb/mqbstat/mqbstat_queuestats.cpp | 16 ++++++++- src/groups/mqb/mqbstat/mqbstat_queuestats.h | 6 ++-- .../mqb/mqbstat/mqbstat_queuestats.t.cpp | 9 +++++ .../mwc/mwcc/mwcc_orderedhashmapwithhistory.h | 10 ++++++ 6 files changed, 97 insertions(+), 7 deletions(-) diff --git a/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp b/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp index 31a48a145..3550e3e5e 100644 --- a/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp +++ b/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp @@ -94,6 +94,9 @@ void FileBackedStorage::purgeCommon(const mqbu::StorageKey& appKey) queue()->stats()->onEvent( mqbstat::QueueStatsDomain::EventType::e_PURGE, 0); + queue()->stats()->onEvent( + mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, + d_handles.historySize()); } } } @@ -524,6 +527,9 @@ FileBackedStorage::remove(const bmqt::MessageGUID& msgGUID, int* msgSize) queue()->stats()->onEvent( mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE, msgLen); + queue()->stats()->onEvent( + mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, + d_handles.historySize()); if (msgSize) { *msgSize = msgLen; @@ -578,6 +584,12 @@ FileBackedStorage::removeAll(const mqbu::StorageKey& appKey) d_isEmpty.storeRelaxed(1); } + if (queue()) { + queue()->stats()->onEvent( + mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, + d_handles.historySize()); + } + return mqbi::StorageResult::e_SUCCESS; } @@ -694,6 +706,9 @@ int FileBackedStorage::gcExpiredMessages( mqbstat::QueueStatsDomain::EventType::e_NO_SC_MESSAGE, numMsgsUnreceipted); } + queue()->stats()->onEvent( + mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, + d_handles.historySize()); } if (d_handles.empty()) { @@ -705,8 +720,16 @@ int FileBackedStorage::gcExpiredMessages( bool FileBackedStorage::gcHistory() { - return d_handles.gc(mwcsys::Time::highResolutionTimer(), - k_GC_MESSAGES_BATCH_SIZE); + bool hasMoreToGc = d_handles.gc(mwcsys::Time::highResolutionTimer(), + k_GC_MESSAGES_BATCH_SIZE); + + if (queue()) { + queue()->stats()->onEvent( + mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, + d_handles.historySize()); + } + + return hasMoreToGc; } void FileBackedStorage::processMessageRecord( @@ -904,6 +927,12 @@ void FileBackedStorage::processDeletionRecord(const bmqt::MessageGUID& guid) if (d_handles.empty()) { d_isEmpty.storeRelaxed(1); } + + if (queue()) { + queue()->stats()->onEvent( + mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, + d_handles.historySize()); + } } void FileBackedStorage::addQueueOpRecordHandle( diff --git a/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp b/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp index 75b3f6a57..44f042f92 100644 --- a/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp +++ b/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp @@ -310,6 +310,12 @@ InMemoryStorage::releaseRef(const bmqt::MessageGUID& guid) d_items.erase(it); + if (queue()) { + queue()->stats()->onEvent( + mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, + d_items.historySize()); + } + return mqbi::StorageResult::e_ZERO_REFERENCES; // RETURN } @@ -337,6 +343,9 @@ InMemoryStorage::remove(const bmqt::MessageGUID& msgGUID, int* msgSize) queue()->stats()->onEvent( mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE, msgLen); + queue()->stats()->onEvent( + mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, + d_items.historySize()); } if (msgSize) { @@ -390,6 +399,12 @@ InMemoryStorage::removeAll(const mqbu::StorageKey& appKey) d_isEmpty.storeRelaxed(1); } + if (queue()) { + queue()->stats()->onEvent( + mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, + d_items.historySize()); + } + return mqbi::StorageResult::e_SUCCESS; } @@ -442,6 +457,9 @@ int InMemoryStorage::gcExpiredMessages( queue()->stats()->onEvent( mqbstat::QueueStatsDomain::EventType::e_GC_MESSAGE, numMsgsDeleted); + queue()->stats()->onEvent( + mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, + d_items.historySize()); } if (d_items.empty()) { @@ -453,8 +471,16 @@ int InMemoryStorage::gcExpiredMessages( bool InMemoryStorage::gcHistory() { - return d_items.gc(mwcsys::Time::highResolutionTimer(), - k_GC_MESSAGES_BATCH_SIZE); + bool hasMoreToGc = d_items.gc(mwcsys::Time::highResolutionTimer(), + k_GC_MESSAGES_BATCH_SIZE); + + if (queue()) { + queue()->stats()->onEvent( + mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, + d_items.historySize()); + } + + return hasMoreToGc; } void InMemoryStorage::selectForAutoConfirming(const bmqt::MessageGUID& msgGUID) diff --git a/src/groups/mqb/mqbstat/mqbstat_queuestats.cpp b/src/groups/mqb/mqbstat/mqbstat_queuestats.cpp index 9c5445d42..83283700f 100644 --- a/src/groups/mqb/mqbstat/mqbstat_queuestats.cpp +++ b/src/groups/mqb/mqbstat/mqbstat_queuestats.cpp @@ -148,11 +148,17 @@ struct DomainQueueStats { , e_CFG_BYTES // Value: The configured queue bytes capacity + , e_STAT_NO_SC_MSGS // Value: Accumulated number of messages in the strong // consistency queue expired before receiving quorum // Receipts + + , + e_STAT_HISTORY + // Value: Current number of GUIDs stored in queue's history + // (does not include messages in the queue) }; }; @@ -242,6 +248,7 @@ const char* QueueStatsDomain::Stat::toString(Stat::Enum value) MQBSTAT_CASE(e_CFG_BYTES, "queue_cfg_bytes") MQBSTAT_CASE(e_NO_SC_MSGS_DELTA, "queue_nack_noquorum_msgs") MQBSTAT_CASE(e_NO_SC_MSGS_ABS, "queue_nack_noquorum_msgs_abs") + MQBSTAT_CASE(e_HISTORY_ABS, "queue_history_abs") } BSLS_ASSERT(!"invalid enumerator"); @@ -405,6 +412,9 @@ QueueStatsDomain::getValue(const mwcst::StatContext& context, return STAT_RANGE(valueDifference, DomainQueueStats::e_STAT_NO_SC_MSGS); } + case QueueStatsDomain::Stat::e_HISTORY_ABS: { + return STAT_SINGLE(value, DomainQueueStats::e_STAT_HISTORY); + } default: { BSLS_ASSERT_SAFE(false && "Attempting to access an unknown stat"); } @@ -571,6 +581,9 @@ void QueueStatsDomain::onEvent(EventType::Enum type, bsls::Types::Int64 value) d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_NO_SC_MSGS, value); } break; + case EventType::e_UPDATE_HISTORY: { + d_statContext_mp->setValue(DomainQueueStats::e_STAT_HISTORY, value); + } break; default: { BSLS_ASSERT_SAFE(false && "Unknown event type"); } break; @@ -634,7 +647,8 @@ void QueueStatsDomain::onEvent(EventType::Enum type, case EventType::e_CHANGE_ROLE: BSLS_ANNOTATION_FALLTHROUGH; case EventType::e_CFG_MSGS: BSLS_ANNOTATION_FALLTHROUGH; case EventType::e_CFG_BYTES: BSLS_ANNOTATION_FALLTHROUGH; - case EventType::e_NO_SC_MESSAGE: { + case EventType::e_NO_SC_MESSAGE: BSLS_ANNOTATION_FALLTHROUGH; + case EventType::e_UPDATE_HISTORY: { BSLS_ASSERT_SAFE(false && "Unexpected event type for appId metric"); } break; diff --git a/src/groups/mqb/mqbstat/mqbstat_queuestats.h b/src/groups/mqb/mqbstat/mqbstat_queuestats.h index 75a4c6ab6..4e8eed66b 100644 --- a/src/groups/mqb/mqbstat/mqbstat_queuestats.h +++ b/src/groups/mqb/mqbstat/mqbstat_queuestats.h @@ -99,7 +99,8 @@ class QueueStatsDomain { e_CHANGE_ROLE, e_CFG_MSGS, e_CFG_BYTES, - e_NO_SC_MESSAGE + e_NO_SC_MESSAGE, + e_UPDATE_HISTORY }; }; @@ -142,7 +143,8 @@ class QueueStatsDomain { e_CFG_MSGS, e_CFG_BYTES, e_NO_SC_MSGS_DELTA, - e_NO_SC_MSGS_ABS + e_NO_SC_MSGS_ABS, + e_HISTORY_ABS }; /// Return the non-modifiable string description corresponding to diff --git a/src/groups/mqb/mqbstat/mqbstat_queuestats.t.cpp b/src/groups/mqb/mqbstat/mqbstat_queuestats.t.cpp index bc7974e2c..4c8e716a4 100644 --- a/src/groups/mqb/mqbstat/mqbstat_queuestats.t.cpp +++ b/src/groups/mqb/mqbstat/mqbstat_queuestats.t.cpp @@ -311,6 +311,9 @@ static void test3_queueStatsDomain() // 1 add message : 15 bytes queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_ADD_MESSAGE, 15); + + // 1 GUID in history + queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_UPDATE_HISTORY, 1); domain->snapshot(); // The following stats are not range based, and therefore always return the @@ -325,6 +328,7 @@ static void test3_queueStatsDomain() ASSERT_EQ_DOMAINSTAT(e_PUSH_BYTES_ABS, 0, 9); ASSERT_EQ_DOMAINSTAT(e_PUT_MESSAGES_ABS, 0, 3); ASSERT_EQ_DOMAINSTAT(e_PUT_BYTES_ABS, 0, 33); + ASSERT_EQ_DOMAINSTAT(e_HISTORY_ABS, 0, 1); ASSERT_EQ_DOMAINSTAT(e_ACK_DELTA, 1, 2); ASSERT_EQ_DOMAINSTAT(e_CONFIRM_DELTA, 1, 1); @@ -359,6 +363,10 @@ static void test3_queueStatsDomain() // del 1 message queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_DEL_MESSAGE, 15); + + // 3 GUIDs in history (first 5, then gc results in 3) + queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_UPDATE_HISTORY, 5); + queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_UPDATE_HISTORY, 3); domain->snapshot(); // The following stats are not range based, and therefore always return the @@ -373,6 +381,7 @@ static void test3_queueStatsDomain() ASSERT_EQ_DOMAINSTAT(e_PUSH_BYTES_ABS, 0, 20); ASSERT_EQ_DOMAINSTAT(e_PUT_MESSAGES_ABS, 0, 5); ASSERT_EQ_DOMAINSTAT(e_PUT_BYTES_ABS, 0, 55); + ASSERT_EQ_DOMAINSTAT(e_HISTORY_ABS, 0, 3); // Compare now and previous snapshot ASSERT_EQ_DOMAINSTAT(e_ACK_DELTA, 1, 4); diff --git a/src/groups/mwc/mwcc/mwcc_orderedhashmapwithhistory.h b/src/groups/mwc/mwcc/mwcc_orderedhashmapwithhistory.h index bf3087260..bf26c3128 100644 --- a/src/groups/mwc/mwcc/mwcc_orderedhashmapwithhistory.h +++ b/src/groups/mwc/mwcc/mwcc_orderedhashmapwithhistory.h @@ -356,6 +356,9 @@ class OrderedHashMapWithHistory { /// Return the number of elements in this container. size_t size() const; + + /// Return the number of elements in this container's history. + size_t historySize() const; }; // ============================================================================ @@ -763,6 +766,13 @@ OrderedHashMapWithHistory::size() const return d_impl.size() - d_historySize; } +template +inline size_t +OrderedHashMapWithHistory::historySize() const +{ + return d_historySize; +} + } // close package namespace } // close enterprise namespace