Skip to content

Commit

Permalink
[WIP] Feat: Add queue history size metric
Browse files Browse the repository at this point in the history
This adds a new queue metric that counts the number of GUIDs in that
queue's history. This is useful for identifying excessive memory
utilization from history and potential history garbage collection issues
(where history is filled up faster than it's cleaned up).

Signed-off-by: Christopher Beard <[email protected]>
  • Loading branch information
chrisbeard committed Oct 3, 2024
1 parent 9b9785e commit f6884f9
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 7 deletions.
33 changes: 31 additions & 2 deletions src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ void FileBackedStorage::purgeCommon(const mqbu::StorageKey& appKey)
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_PURGE,
0);
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY,
d_handles.historySize());
}
}
}
Expand Down Expand Up @@ -524,6 +527,9 @@ FileBackedStorage::remove(const bmqt::MessageGUID& msgGUID, int* msgSize)
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE,
msgLen);
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY,
d_handles.historySize());

if (msgSize) {
*msgSize = msgLen;
Expand Down Expand Up @@ -578,6 +584,12 @@ FileBackedStorage::removeAll(const mqbu::StorageKey& appKey)
d_isEmpty.storeRelaxed(1);
}

if (queue()) {
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY,
d_handles.historySize());
}

return mqbi::StorageResult::e_SUCCESS;
}

Expand Down Expand Up @@ -694,6 +706,9 @@ int FileBackedStorage::gcExpiredMessages(
mqbstat::QueueStatsDomain::EventType::e_NO_SC_MESSAGE,
numMsgsUnreceipted);
}
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY,
d_handles.historySize());
}

if (d_handles.empty()) {
Expand All @@ -705,8 +720,16 @@ int FileBackedStorage::gcExpiredMessages(

bool FileBackedStorage::gcHistory()
{
return d_handles.gc(mwcsys::Time::highResolutionTimer(),
k_GC_MESSAGES_BATCH_SIZE);
bool hasMoreToGc = d_handles.gc(mwcsys::Time::highResolutionTimer(),
k_GC_MESSAGES_BATCH_SIZE);

if (queue()) {
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY,
d_handles.historySize());
}

return hasMoreToGc;
}

void FileBackedStorage::processMessageRecord(
Expand Down Expand Up @@ -904,6 +927,12 @@ void FileBackedStorage::processDeletionRecord(const bmqt::MessageGUID& guid)
if (d_handles.empty()) {
d_isEmpty.storeRelaxed(1);
}

if (queue()) {
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY,
d_handles.historySize());
}
}

void FileBackedStorage::addQueueOpRecordHandle(
Expand Down
30 changes: 28 additions & 2 deletions src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,12 @@ InMemoryStorage::releaseRef(const bmqt::MessageGUID& guid)

d_items.erase(it);

if (queue()) {
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY,
d_items.historySize());
}

return mqbi::StorageResult::e_ZERO_REFERENCES; // RETURN
}

Expand Down Expand Up @@ -337,6 +343,9 @@ InMemoryStorage::remove(const bmqt::MessageGUID& msgGUID, int* msgSize)
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE,
msgLen);
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY,
d_items.historySize());
}

if (msgSize) {
Expand Down Expand Up @@ -390,6 +399,12 @@ InMemoryStorage::removeAll(const mqbu::StorageKey& appKey)
d_isEmpty.storeRelaxed(1);
}

if (queue()) {
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY,
d_items.historySize());
}

return mqbi::StorageResult::e_SUCCESS;
}

Expand Down Expand Up @@ -442,6 +457,9 @@ int InMemoryStorage::gcExpiredMessages(
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_GC_MESSAGE,
numMsgsDeleted);
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY,
d_items.historySize());
}

if (d_items.empty()) {
Expand All @@ -453,8 +471,16 @@ int InMemoryStorage::gcExpiredMessages(

bool InMemoryStorage::gcHistory()
{
return d_items.gc(mwcsys::Time::highResolutionTimer(),
k_GC_MESSAGES_BATCH_SIZE);
bool hasMoreToGc = d_items.gc(mwcsys::Time::highResolutionTimer(),
k_GC_MESSAGES_BATCH_SIZE);

if (queue()) {
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY,
d_items.historySize());
}

return hasMoreToGc;
}

void InMemoryStorage::selectForAutoConfirming(const bmqt::MessageGUID& msgGUID)
Expand Down
16 changes: 15 additions & 1 deletion src/groups/mqb/mqbstat/mqbstat_queuestats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,17 @@ struct DomainQueueStats {
,
e_CFG_BYTES
// Value: The configured queue bytes capacity

,
e_STAT_NO_SC_MSGS
// Value: Accumulated number of messages in the strong
// consistency queue expired before receiving quorum
// Receipts

,
e_STAT_HISTORY
// Value: Current number of GUIDs stored in queue's history
// (does not include messages in the queue)
};
};

Expand Down Expand Up @@ -242,6 +248,7 @@ const char* QueueStatsDomain::Stat::toString(Stat::Enum value)
MQBSTAT_CASE(e_CFG_BYTES, "queue_cfg_bytes")
MQBSTAT_CASE(e_NO_SC_MSGS_DELTA, "queue_nack_noquorum_msgs")
MQBSTAT_CASE(e_NO_SC_MSGS_ABS, "queue_nack_noquorum_msgs_abs")
MQBSTAT_CASE(e_HISTORY_ABS, "queue_history_abs")
}

BSLS_ASSERT(!"invalid enumerator");
Expand Down Expand Up @@ -405,6 +412,9 @@ QueueStatsDomain::getValue(const mwcst::StatContext& context,
return STAT_RANGE(valueDifference,
DomainQueueStats::e_STAT_NO_SC_MSGS);
}
case QueueStatsDomain::Stat::e_HISTORY_ABS: {
return STAT_SINGLE(value, DomainQueueStats::e_STAT_HISTORY);
}
default: {
BSLS_ASSERT_SAFE(false && "Attempting to access an unknown stat");
}
Expand Down Expand Up @@ -571,6 +581,9 @@ void QueueStatsDomain::onEvent(EventType::Enum type, bsls::Types::Int64 value)
d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_NO_SC_MSGS,
value);
} break;
case EventType::e_UPDATE_HISTORY: {
d_statContext_mp->setValue(DomainQueueStats::e_STAT_HISTORY, value);
} break;
default: {
BSLS_ASSERT_SAFE(false && "Unknown event type");
} break;
Expand Down Expand Up @@ -634,7 +647,8 @@ void QueueStatsDomain::onEvent(EventType::Enum type,
case EventType::e_CHANGE_ROLE: BSLS_ANNOTATION_FALLTHROUGH;
case EventType::e_CFG_MSGS: BSLS_ANNOTATION_FALLTHROUGH;
case EventType::e_CFG_BYTES: BSLS_ANNOTATION_FALLTHROUGH;
case EventType::e_NO_SC_MESSAGE: {
case EventType::e_NO_SC_MESSAGE: BSLS_ANNOTATION_FALLTHROUGH;
case EventType::e_UPDATE_HISTORY: {
BSLS_ASSERT_SAFE(false && "Unexpected event type for appId metric");
} break;

Expand Down
6 changes: 4 additions & 2 deletions src/groups/mqb/mqbstat/mqbstat_queuestats.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ class QueueStatsDomain {
e_CHANGE_ROLE,
e_CFG_MSGS,
e_CFG_BYTES,
e_NO_SC_MESSAGE
e_NO_SC_MESSAGE,
e_UPDATE_HISTORY
};
};

Expand Down Expand Up @@ -142,7 +143,8 @@ class QueueStatsDomain {
e_CFG_MSGS,
e_CFG_BYTES,
e_NO_SC_MSGS_DELTA,
e_NO_SC_MSGS_ABS
e_NO_SC_MSGS_ABS,
e_HISTORY_ABS
};

/// Return the non-modifiable string description corresponding to
Expand Down
9 changes: 9 additions & 0 deletions src/groups/mqb/mqbstat/mqbstat_queuestats.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,9 @@ static void test3_queueStatsDomain()

// 1 add message : 15 bytes
queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_ADD_MESSAGE, 15);

// 1 GUID in history
queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_UPDATE_HISTORY, 1);
domain->snapshot();

// The following stats are not range based, and therefore always return the
Expand All @@ -325,6 +328,7 @@ static void test3_queueStatsDomain()
ASSERT_EQ_DOMAINSTAT(e_PUSH_BYTES_ABS, 0, 9);
ASSERT_EQ_DOMAINSTAT(e_PUT_MESSAGES_ABS, 0, 3);
ASSERT_EQ_DOMAINSTAT(e_PUT_BYTES_ABS, 0, 33);
ASSERT_EQ_DOMAINSTAT(e_HISTORY_ABS, 0, 1);

ASSERT_EQ_DOMAINSTAT(e_ACK_DELTA, 1, 2);
ASSERT_EQ_DOMAINSTAT(e_CONFIRM_DELTA, 1, 1);
Expand Down Expand Up @@ -359,6 +363,10 @@ static void test3_queueStatsDomain()

// del 1 message
queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_DEL_MESSAGE, 15);

// 3 GUIDs in history (first 5, then gc results in 3)
queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_UPDATE_HISTORY, 5);
queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_UPDATE_HISTORY, 3);
domain->snapshot();

// The following stats are not range based, and therefore always return the
Expand All @@ -373,6 +381,7 @@ static void test3_queueStatsDomain()
ASSERT_EQ_DOMAINSTAT(e_PUSH_BYTES_ABS, 0, 20);
ASSERT_EQ_DOMAINSTAT(e_PUT_MESSAGES_ABS, 0, 5);
ASSERT_EQ_DOMAINSTAT(e_PUT_BYTES_ABS, 0, 55);
ASSERT_EQ_DOMAINSTAT(e_HISTORY_ABS, 0, 3);

// Compare now and previous snapshot
ASSERT_EQ_DOMAINSTAT(e_ACK_DELTA, 1, 4);
Expand Down
10 changes: 10 additions & 0 deletions src/groups/mwc/mwcc/mwcc_orderedhashmapwithhistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,9 @@ class OrderedHashMapWithHistory {

/// Return the number of elements in this container.
size_t size() const;

/// Return the number of elements in this container's history.
size_t historySize() const;
};

// ============================================================================
Expand Down Expand Up @@ -763,6 +766,13 @@ OrderedHashMapWithHistory<KEY, VALUE, HASH, VALUE_TYPE>::size() const
return d_impl.size() - d_historySize;
}

template <class KEY, class VALUE, class HASH, class VALUE_TYPE>
inline size_t
OrderedHashMapWithHistory<KEY, VALUE, HASH, VALUE_TYPE>::historySize() const
{
return d_historySize;
}

} // close package namespace
} // close enterprise namespace

Expand Down

0 comments on commit f6884f9

Please sign in to comment.