Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add queue history size metric #436

Merged
merged 2 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ void FileBackedStorage::purgeCommon(const mqbu::StorageKey& appKey)
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_PURGE,
0);
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY,
d_handles.historySize());
}
}
}
Expand Down Expand Up @@ -476,6 +479,12 @@ FileBackedStorage::releaseRef(const bmqt::MessageGUID& guid)
d_capacityMeter.remove(1, msgLen);
d_handles.erase(it);

if (queue()) {
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY,
d_handles.historySize());
}

return mqbi::StorageResult::e_ZERO_REFERENCES;
}
else {
Expand Down Expand Up @@ -523,6 +532,9 @@ FileBackedStorage::remove(const bmqt::MessageGUID& msgGUID, int* msgSize)
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE,
msgLen);
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY,
d_handles.historySize());

if (msgSize) {
*msgSize = msgLen;
Expand Down Expand Up @@ -577,6 +589,12 @@ FileBackedStorage::removeAll(const mqbu::StorageKey& appKey)
d_isEmpty.storeRelaxed(1);
}

if (queue()) {
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY,
d_handles.historySize());
}

return mqbi::StorageResult::e_SUCCESS;
}

Expand Down Expand Up @@ -693,6 +711,9 @@ int FileBackedStorage::gcExpiredMessages(
mqbstat::QueueStatsDomain::EventType::e_NO_SC_MESSAGE,
numMsgsUnreceipted);
}
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY,
d_handles.historySize());
}

if (d_handles.empty()) {
Expand All @@ -704,8 +725,16 @@ int FileBackedStorage::gcExpiredMessages(

bool FileBackedStorage::gcHistory()
{
return d_handles.gc(mwcsys::Time::highResolutionTimer(),
k_GC_MESSAGES_BATCH_SIZE);
bool hasMoreToGc = d_handles.gc(mwcsys::Time::highResolutionTimer(),
k_GC_MESSAGES_BATCH_SIZE);

if (queue()) {
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY,
d_handles.historySize());
}

return hasMoreToGc;
}

void FileBackedStorage::processMessageRecord(
Expand Down Expand Up @@ -903,6 +932,12 @@ void FileBackedStorage::processDeletionRecord(const bmqt::MessageGUID& guid)
if (d_handles.empty()) {
d_isEmpty.storeRelaxed(1);
}

if (queue()) {
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY,
d_handles.historySize());
}
}

void FileBackedStorage::addQueueOpRecordHandle(
Expand Down
30 changes: 28 additions & 2 deletions src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,12 @@ InMemoryStorage::releaseRef(const bmqt::MessageGUID& guid)

d_items.erase(it);

if (queue()) {
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY,
d_items.historySize());
}

return mqbi::StorageResult::e_ZERO_REFERENCES; // RETURN
}

Expand Down Expand Up @@ -337,6 +343,9 @@ InMemoryStorage::remove(const bmqt::MessageGUID& msgGUID, int* msgSize)
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE,
msgLen);
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY,
d_items.historySize());
}

if (msgSize) {
Expand Down Expand Up @@ -390,6 +399,12 @@ InMemoryStorage::removeAll(const mqbu::StorageKey& appKey)
d_isEmpty.storeRelaxed(1);
}

if (queue()) {
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY,
d_items.historySize());
}

return mqbi::StorageResult::e_SUCCESS;
}

Expand Down Expand Up @@ -442,6 +457,9 @@ int InMemoryStorage::gcExpiredMessages(
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_GC_MESSAGE,
numMsgsDeleted);
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY,
d_items.historySize());
}

if (d_items.empty()) {
Expand All @@ -453,8 +471,16 @@ int InMemoryStorage::gcExpiredMessages(

bool InMemoryStorage::gcHistory()
{
return d_items.gc(mwcsys::Time::highResolutionTimer(),
k_GC_MESSAGES_BATCH_SIZE);
bool hasMoreToGc = d_items.gc(mwcsys::Time::highResolutionTimer(),
k_GC_MESSAGES_BATCH_SIZE);

if (queue()) {
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY,
d_items.historySize());
}

return hasMoreToGc;
}

void InMemoryStorage::selectForAutoConfirming(const bmqt::MessageGUID& msgGUID)
Expand Down
2 changes: 2 additions & 0 deletions src/groups/mqb/mqbstat/mqbstat_jsonprinter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ struct ConversionUtils {

populateMetric(&values, ctx, Stat::e_NO_SC_MSGS_DELTA);
populateMetric(&values, ctx, Stat::e_NO_SC_MSGS_ABS);

populateMetric(&values, ctx, Stat::e_HISTORY_ABS);
}

inline static void populateOneDomainStats(bdljsn::JsonObject* domainObject,
Expand Down
25 changes: 23 additions & 2 deletions src/groups/mqb/mqbstat/mqbstat_queuestats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ struct DomainQueueStats {
/// consistency queue expired before receiving quorum
/// Receipts
e_STAT_NO_SC_MSGS

,
// Value: Current number of GUIDs stored in queue's history
// (does not include messages in the queue)
e_STAT_HISTORY
};
};

Expand Down Expand Up @@ -243,6 +248,7 @@ const char* QueueStatsDomain::Stat::toString(Stat::Enum value)
MQBSTAT_CASE(e_CFG_BYTES, "queue_cfg_bytes")
MQBSTAT_CASE(e_NO_SC_MSGS_DELTA, "queue_nack_noquorum_msgs")
MQBSTAT_CASE(e_NO_SC_MSGS_ABS, "queue_nack_noquorum_msgs_abs")
MQBSTAT_CASE(e_HISTORY_ABS, "queue_history_abs")
}

BSLS_ASSERT(!"invalid enumerator");
Expand Down Expand Up @@ -406,6 +412,9 @@ QueueStatsDomain::getValue(const mwcst::StatContext& context,
return STAT_RANGE(valueDifference,
DomainQueueStats::e_STAT_NO_SC_MSGS);
}
case QueueStatsDomain::Stat::e_HISTORY_ABS: {
return STAT_SINGLE(value, DomainQueueStats::e_STAT_HISTORY);
}
default: {
BSLS_ASSERT_SAFE(false && "Attempting to access an unknown stat");
}
Expand Down Expand Up @@ -590,6 +599,9 @@ void QueueStatsDomain::onEvent(EventType::Enum type, bsls::Types::Int64 value)
d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_NO_SC_MSGS,
value);
} break;
case EventType::e_UPDATE_HISTORY: {
d_statContext_mp->setValue(DomainQueueStats::e_STAT_HISTORY, value);
} break;
default: {
BSLS_ASSERT_SAFE(false && "Unknown event type");
} break;
Expand Down Expand Up @@ -663,7 +675,8 @@ void QueueStatsDomain::onEvent(EventType::Enum type,
case EventType::e_CHANGE_ROLE: BSLS_ANNOTATION_FALLTHROUGH;
case EventType::e_CFG_MSGS: BSLS_ANNOTATION_FALLTHROUGH;
case EventType::e_CFG_BYTES: BSLS_ANNOTATION_FALLTHROUGH;
case EventType::e_NO_SC_MESSAGE: {
case EventType::e_NO_SC_MESSAGE: BSLS_ANNOTATION_FALLTHROUGH;
case EventType::e_UPDATE_HISTORY: {
BSLS_ASSERT_SAFE(false && "Unexpected event type for appId metric");
} break;

Expand Down Expand Up @@ -925,7 +938,8 @@ QueueStatsUtil::initializeStatContextDomains(int historySize,
.value("cfg_msgs")
.value("cfg_bytes")
.value("content_msgs")
.value("content_bytes");
.value("content_bytes")
.value("history_size");
// NOTE: If the stats are using too much memory, we could reconsider
// nb_producer, nb_consumer, messages and bytes to be using atomic
// int and not stat value.
Expand Down Expand Up @@ -990,6 +1004,10 @@ void QueueStatsUtil::initializeTableAndTipDomains(
DomainQueueStats::e_STAT_BYTES,
mwcst::StatUtil::value,
start);
schema.addColumn("history_size",
DomainQueueStats::e_STAT_HISTORY,
mwcst::StatUtil::value,
start);

schema.addColumn("put_msgs_delta",
DomainQueueStats::e_STAT_PUT,
Expand Down Expand Up @@ -1190,6 +1208,9 @@ void QueueStatsUtil::initializeTableAndTipDomains(
tip->setColumnGroup("GC");
tip->addColumn("gc_msgs_delta", "delta").zeroString("");
tip->addColumn("gc_msgs_abs", "abs").zeroString("");

tip->setColumnGroup("History");
tip->addColumn("history_size", "# GUIDs").zeroString("");
}

void QueueStatsUtil::initializeTableAndTipClients(
Expand Down
6 changes: 4 additions & 2 deletions src/groups/mqb/mqbstat/mqbstat_queuestats.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ class QueueStatsDomain {
e_CHANGE_ROLE,
e_CFG_MSGS,
e_CFG_BYTES,
e_NO_SC_MESSAGE
e_NO_SC_MESSAGE,
e_UPDATE_HISTORY
};
};

Expand Down Expand Up @@ -140,7 +141,8 @@ class QueueStatsDomain {
e_CFG_MSGS,
e_CFG_BYTES,
e_NO_SC_MSGS_DELTA,
e_NO_SC_MSGS_ABS
e_NO_SC_MSGS_ABS,
e_HISTORY_ABS
};

/// Return the non-modifiable string description corresponding to
Expand Down
9 changes: 9 additions & 0 deletions src/groups/mqb/mqbstat/mqbstat_queuestats.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,9 @@ static void test3_queueStatsDomain()

// 1 add message : 15 bytes
queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_ADD_MESSAGE, 15);

// 1 GUID in history
queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_UPDATE_HISTORY, 1);
domain->snapshot();

// The following stats are not range based, and therefore always return the
Expand All @@ -325,6 +328,7 @@ static void test3_queueStatsDomain()
ASSERT_EQ_DOMAINSTAT(e_PUSH_BYTES_ABS, 0, 9);
ASSERT_EQ_DOMAINSTAT(e_PUT_MESSAGES_ABS, 0, 3);
ASSERT_EQ_DOMAINSTAT(e_PUT_BYTES_ABS, 0, 33);
ASSERT_EQ_DOMAINSTAT(e_HISTORY_ABS, 0, 1);

ASSERT_EQ_DOMAINSTAT(e_ACK_DELTA, 1, 2);
ASSERT_EQ_DOMAINSTAT(e_CONFIRM_DELTA, 1, 1);
Expand Down Expand Up @@ -359,6 +363,10 @@ static void test3_queueStatsDomain()

// del 1 message
queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_DEL_MESSAGE, 15);

// 3 GUIDs in history (first 5, then gc results in 3)
queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_UPDATE_HISTORY, 5);
queueStatsDomain.onEvent(QueueStatsDomain::EventType::e_UPDATE_HISTORY, 3);
domain->snapshot();

// The following stats are not range based, and therefore always return the
Expand All @@ -373,6 +381,7 @@ static void test3_queueStatsDomain()
ASSERT_EQ_DOMAINSTAT(e_PUSH_BYTES_ABS, 0, 20);
ASSERT_EQ_DOMAINSTAT(e_PUT_MESSAGES_ABS, 0, 5);
ASSERT_EQ_DOMAINSTAT(e_PUT_BYTES_ABS, 0, 55);
ASSERT_EQ_DOMAINSTAT(e_HISTORY_ABS, 0, 3);

// Compare now and previous snapshot
ASSERT_EQ_DOMAINSTAT(e_ACK_DELTA, 1, 4);
Expand Down
10 changes: 10 additions & 0 deletions src/groups/mwc/mwcc/mwcc_orderedhashmapwithhistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,9 @@ class OrderedHashMapWithHistory {

/// Return the number of elements in this container.
size_t size() const;

/// Return the number of elements in this container's history.
size_t historySize() const;
};

// ============================================================================
Expand Down Expand Up @@ -763,6 +766,13 @@ OrderedHashMapWithHistory<KEY, VALUE, HASH, VALUE_TYPE>::size() const
return d_impl.size() - d_historySize;
}

template <class KEY, class VALUE, class HASH, class VALUE_TYPE>
inline size_t
OrderedHashMapWithHistory<KEY, VALUE, HASH, VALUE_TYPE>::historySize() const
{
return d_historySize;
}

} // close package namespace
} // close enterprise namespace

Expand Down
Loading
Loading