Skip to content

Commit

Permalink
Feat: track queue depth per appId (#320)
Browse files Browse the repository at this point in the history
Signed-off-by: Evgeny Malygin <[email protected]>
  • Loading branch information
678098 authored Oct 7, 2024
1 parent 6190b97 commit 698d7be
Show file tree
Hide file tree
Showing 9 changed files with 508 additions and 82 deletions.
11 changes: 5 additions & 6 deletions src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ void FileBackedStorage::setQueue(mqbi::Queue* queue)
d_virtualStorageCatalog.setQueue(queue);

// Update queue stats if a queue has been associated with the storage.

if (queue) {
const bsls::Types::Int64 numMessage = numMessages(
mqbu::StorageKey::k_NULL_KEY);
Expand Down Expand Up @@ -381,8 +380,8 @@ FileBackedStorage::confirm(const bmqt::MessageGUID& msgGUID,
return mqbi::StorageResult::e_GUID_NOT_FOUND; // RETURN
}

mqbi::StorageResult::Enum rc = d_virtualStorageCatalog.confirm(msgGUID,
appKey);
const mqbi::StorageResult::Enum rc =
d_virtualStorageCatalog.confirm(msgGUID, appKey);
if (mqbi::StorageResult::e_SUCCESS != rc) {
return rc; // RETURN
}
Expand All @@ -391,7 +390,7 @@ FileBackedStorage::confirm(const bmqt::MessageGUID& msgGUID,
BSLS_ASSERT_SAFE(!handles.empty());

DataStoreRecordHandle handle;
int writeResult = d_store_p->writeConfirmRecord(
const int writeResult = d_store_p->writeConfirmRecord(
&handle,
msgGUID,
d_queueKey,
Expand Down Expand Up @@ -830,8 +829,8 @@ void FileBackedStorage::processConfirmRecord(
--it->second.d_refCount; // Update outstanding refCount

if (!appKey.isNull()) {
mqbi::StorageResult::Enum rc = d_virtualStorageCatalog.confirm(guid,
appKey);
const mqbi::StorageResult::Enum rc =
d_virtualStorageCatalog.confirm(guid, appKey);
if (mqbi::StorageResult::e_SUCCESS != rc) {
BALL_LOG_ERROR << "#STORAGE_INVALID_CONFIRM " << "Partition ["
<< partitionId() << "]"
Expand Down
4 changes: 2 additions & 2 deletions src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@ InMemoryStorage::confirm(const bmqt::MessageGUID& msgGUID,
}

if (!appKey.isNull()) {
mqbi::StorageResult::Enum rc = d_virtualStorageCatalog.confirm(msgGUID,
appKey);
const mqbi::StorageResult::Enum rc =
d_virtualStorageCatalog.confirm(msgGUID, appKey);
if (mqbi::StorageResult::e_SUCCESS != rc) {
return rc; // RETURN
}
Expand Down
19 changes: 17 additions & 2 deletions src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
#include <mqbs_virtualstoragecatalog.h>

#include <mqbscm_version.h>

// MQB
#include <mqbi_queueengine.h>
#include <mqbstat_queuestats.h>

#include <mwctsk_alarmlog.h>

Expand Down Expand Up @@ -115,7 +117,6 @@ VirtualStorageCatalog::~VirtualStorageCatalog()
}

// MANIPULATORS

VirtualStorageCatalog::DataStreamIterator
VirtualStorageCatalog::begin(const bmqt::MessageGUID& where)
{
Expand Down Expand Up @@ -270,8 +271,15 @@ VirtualStorageCatalog::confirm(const bmqt::MessageGUID& msgGUID,
BSLS_ASSERT_SAFE(it != d_virtualStorages.end());

setup(&data->second);
const mqbi::StorageResult::Enum rc = it->value()->confirm(&data->second);
if (queue() && mqbi::StorageResult::Enum::e_SUCCESS == rc) {
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE,
data->second.d_size,
it->key1());
}

return it->value()->confirm(&data->second);
return rc;
}

mqbi::StorageResult::Enum
Expand Down Expand Up @@ -365,6 +373,13 @@ VirtualStorageCatalog::removeAll(const mqbu::StorageKey& appKey)
++itData;
}
}

if (queue()) {
queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_PURGE,
0,
itVs->key1());
}
}
else {
for (VirtualStoragesIter it = d_virtualStorages.begin();
Expand Down
29 changes: 13 additions & 16 deletions src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
// storages associated with a queue.

// MQB

#include <mqbi_storage.h>
#include <mqbs_virtualstorage.h>
#include <mqbu_storagekey.h>
Expand Down Expand Up @@ -107,37 +106,36 @@ class VirtualStorageCatalog {

private:
// DATA
mqbi::Storage* d_storage_p; // Physical storage underlying all
// virtual storages known to this
// object
/// Physical storage underlying all virtual storages known to this object
mqbi::Storage* d_storage_p;

/// Map of appKey to corresponding virtual storage
VirtualStorages d_virtualStorages;
// Map of appKey to corresponding
// virtual storage

/// Available ordinal values for virtual storages.
AvailableOrdinals d_availableOrdinals;
// available ordinal values for Virtual Storages.

/// Monotonically increasing value to generate new ordinal.
Ordinal d_nextOrdinal;
// Monotonically increasing value to generate new ordinal.

/// The DataStream tracking all Apps states.
VirtualStorage::DataStream d_dataStream;
// The DataStream tracking all Apps states.

/// Cumulative count of all bytes.
bsls::Types::Int64 d_totalBytes;
// Cumulative count of all bytes.

/// Cumulative count of all messages.
bsls::Types::Int64 d_numMessages;
// Cumulative count of all messages.

/// The default App state
mqbi::AppMessage d_defaultAppMessage;
// The default App state

/// This could be null if a local or remote
/// queue instance has not been created.
mqbi::Queue* d_queue_p;
// This could be null if a local or remote
// queue instance has not been created.

bslma::Allocator* d_allocator_p; // Allocator to use
/// Allocator to use
bslma::Allocator* d_allocator_p;

private:
// NOT IMPLEMENTED
Expand All @@ -161,7 +159,6 @@ class VirtualStorageCatalog {
~VirtualStorageCatalog();

// MANIPULATORS

/// If the specified 'where' is unset, return reference to the beginning of
/// the DataStream. Otherwise, return reference to the corresponding item
/// in the DataStream.
Expand Down
46 changes: 42 additions & 4 deletions src/groups/mqb/mqbstat/mqbstat_jsonprinter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,51 @@ struct ConversionUtils {

populateMetric(&values, ctx, Stat::e_NB_PRODUCER);
populateMetric(&values, ctx, Stat::e_NB_CONSUMER);

populateMetric(&values, ctx, Stat::e_MESSAGES_CURRENT);
populateMetric(&values, ctx, Stat::e_MESSAGES_MAX);
populateMetric(&values, ctx, Stat::e_BYTES_CURRENT);
populateMetric(&values, ctx, Stat::e_BYTES_MAX);

populateMetric(&values, ctx, Stat::e_PUT_MESSAGES_DELTA);
populateMetric(&values, ctx, Stat::e_PUT_BYTES_DELTA);
populateMetric(&values, ctx, Stat::e_PUT_MESSAGES_ABS);
populateMetric(&values, ctx, Stat::e_PUT_BYTES_ABS);

populateMetric(&values, ctx, Stat::e_PUSH_MESSAGES_DELTA);
populateMetric(&values, ctx, Stat::e_PUSH_BYTES_DELTA);
populateMetric(&values, ctx, Stat::e_PUSH_MESSAGES_ABS);
populateMetric(&values, ctx, Stat::e_PUSH_BYTES_ABS);

populateMetric(&values, ctx, Stat::e_ACK_DELTA);
populateMetric(&values, ctx, Stat::e_ACK_ABS);
populateMetric(&values, ctx, Stat::e_ACK_TIME_AVG);
populateMetric(&values, ctx, Stat::e_ACK_TIME_MAX);

populateMetric(&values, ctx, Stat::e_NACK_DELTA);
populateMetric(&values, ctx, Stat::e_NACK_ABS);

populateMetric(&values, ctx, Stat::e_CONFIRM_DELTA);
populateMetric(&values, ctx, Stat::e_CONFIRM_ABS);
populateMetric(&values, ctx, Stat::e_CONFIRM_TIME_AVG);
populateMetric(&values, ctx, Stat::e_CONFIRM_TIME_MAX);

populateMetric(&values, ctx, Stat::e_REJECT_ABS);
populateMetric(&values, ctx, Stat::e_REJECT_DELTA);

populateMetric(&values, ctx, Stat::e_QUEUE_TIME_AVG);
populateMetric(&values, ctx, Stat::e_QUEUE_TIME_MAX);

populateMetric(&values, ctx, Stat::e_GC_MSGS_DELTA);
populateMetric(&values, ctx, Stat::e_GC_MSGS_ABS);

populateMetric(&values, ctx, Stat::e_ROLE);

populateMetric(&values, ctx, Stat::e_CFG_MSGS);
populateMetric(&values, ctx, Stat::e_CFG_BYTES);

populateMetric(&values, ctx, Stat::e_NO_SC_MSGS_DELTA);
populateMetric(&values, ctx, Stat::e_NO_SC_MSGS_ABS);
}

inline static void populateOneDomainStats(bdljsn::JsonObject* domainObject,
Expand Down Expand Up @@ -196,10 +230,14 @@ class JsonPrinter::JsonPrinterImpl {
inline JsonPrinter::JsonPrinterImpl::JsonPrinterImpl(
const StatContextsMap& statContextsMap,
bslma::Allocator* allocator)
: d_opsCompact(bdljsn::WriteOptions().setSpacesPerLevel(0).setStyle(
bdljsn::WriteStyle::e_COMPACT))
, d_opsPretty(bdljsn::WriteOptions().setSpacesPerLevel(4).setStyle(
bdljsn::WriteStyle::e_PRETTY))
: d_opsCompact(bdljsn::WriteOptions()
.setSpacesPerLevel(0)
.setStyle(bdljsn::WriteStyle::e_COMPACT)
.setSortMembers(true))
, d_opsPretty(bdljsn::WriteOptions()
.setSpacesPerLevel(4)
.setStyle(bdljsn::WriteStyle::e_PRETTY)
.setSortMembers(true))
, d_contexts(statContextsMap, allocator)
{
// NOTHING
Expand Down
36 changes: 32 additions & 4 deletions src/groups/mqb/mqbstat/mqbstat_queuestats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,15 @@ void QueueStatsDomain::onEvent(EventType::Enum type, bsls::Types::Int64 value)
case EventType::e_ADD_MESSAGE: {
d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_BYTES, value);
d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_MESSAGES, 1);
if (!d_subContextsHolder.empty()) {
bsl::list<StatSubContextMp>::iterator it =
d_subContextsHolder.begin();
while (it != d_subContextsHolder.end()) {
it->get()->adjustValue(DomainQueueStats::e_STAT_BYTES, value);
it->get()->adjustValue(DomainQueueStats::e_STAT_MESSAGES, 1);
++it;
}
}
} break;
case EventType::e_DEL_MESSAGE: {
d_statContext_mp->adjustValue(DomainQueueStats::e_STAT_BYTES, -value);
Expand All @@ -558,6 +567,15 @@ void QueueStatsDomain::onEvent(EventType::Enum type, bsls::Types::Int64 value)
// the stat to get rates
d_statContext_mp->setValue(DomainQueueStats::e_STAT_BYTES, 0);
d_statContext_mp->setValue(DomainQueueStats::e_STAT_MESSAGES, 0);
if (!d_subContextsHolder.empty()) {
bsl::list<StatSubContextMp>::iterator it =
d_subContextsHolder.begin();
while (it != d_subContextsHolder.end()) {
it->get()->setValue(DomainQueueStats::e_STAT_BYTES, 0);
it->get()->setValue(DomainQueueStats::e_STAT_MESSAGES, 0);
++it;
}
}
} break;
case EventType::e_CHANGE_ROLE: {
d_statContext_mp->setValue(DomainQueueStats::e_STAT_ROLE, value);
Expand Down Expand Up @@ -614,10 +632,23 @@ void QueueStatsDomain::onEvent(EventType::Enum type,
appIdContext->reportValue(DomainQueueStats::e_STAT_CONFIRM_TIME,
value);
} break;

case EventType::e_QUEUE_TIME: {
appIdContext->reportValue(DomainQueueStats::e_STAT_QUEUE_TIME, value);
} break;
case EventType::e_ADD_MESSAGE: {
appIdContext->adjustValue(DomainQueueStats::e_STAT_BYTES, value);
appIdContext->adjustValue(DomainQueueStats::e_STAT_MESSAGES, 1);
} break;
case EventType::e_DEL_MESSAGE: {
appIdContext->adjustValue(DomainQueueStats::e_STAT_BYTES, -value);
appIdContext->adjustValue(DomainQueueStats::e_STAT_MESSAGES, -1);
} break;
case EventType::e_PURGE: {
// NOTE: Setting the value like that will cause weird results if using
// the stat to get rates
appIdContext->setValue(DomainQueueStats::e_STAT_BYTES, 0);
appIdContext->setValue(DomainQueueStats::e_STAT_MESSAGES, 0);
} break;

// Some of these event types make no sense per appId and should be reported
// per entire queue instead
Expand All @@ -628,10 +659,7 @@ void QueueStatsDomain::onEvent(EventType::Enum type,
case EventType::e_REJECT: BSLS_ANNOTATION_FALLTHROUGH;
case EventType::e_PUSH: BSLS_ANNOTATION_FALLTHROUGH;
case EventType::e_PUT: BSLS_ANNOTATION_FALLTHROUGH;
case EventType::e_ADD_MESSAGE: BSLS_ANNOTATION_FALLTHROUGH;
case EventType::e_DEL_MESSAGE: BSLS_ANNOTATION_FALLTHROUGH;
case EventType::e_GC_MESSAGE: BSLS_ANNOTATION_FALLTHROUGH;
case EventType::e_PURGE: BSLS_ANNOTATION_FALLTHROUGH;
case EventType::e_CHANGE_ROLE: BSLS_ANNOTATION_FALLTHROUGH;
case EventType::e_CFG_MSGS: BSLS_ANNOTATION_FALLTHROUGH;
case EventType::e_CFG_BYTES: BSLS_ANNOTATION_FALLTHROUGH;
Expand Down
2 changes: 0 additions & 2 deletions src/groups/mqb/mqbstat/mqbstat_queuestats.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
// 'mqbstat::QueueStatsUtil' is a utility namespace exposing methods to
// initialize the stat contexts and associated objects.

// MQB

// BMQ
#include <bmqt_uri.h>

Expand Down
20 changes: 18 additions & 2 deletions src/integration-tests/test_admin_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,12 @@ def test_queue_stats(single_node: Cluster) -> None:
- Confirm a portion of messages for each consumer
- Verify stats acquired via admin command with the expected stats
Stage 3: check too-often stats safeguard
Stage 3: check stats after purging an appId
- Purge one appId
- Check that message/byte stats for this appId set to 0, and
the queue stats in general correctly changed
Stage 4: check too-often stats safeguard
- Send several 'stat show' requests
- Verify that the admin session complains about too often stat request
Expand Down Expand Up @@ -225,11 +230,22 @@ def test_queue_stats(single_node: Cluster) -> None:

expect_same_structure(queue_stats, dt.TEST_QUEUE_STATS_AFTER_CONFIRM)

# Stage 3: check stats after purging an appId
res = admin.send_admin(
f"DOMAINS DOMAIN {task.domain} QUEUE {task.queue_name} PURGE baz"
)
assert f"Purged 21 message(s)" in res

stats = extract_stats(admin.send_admin("encoding json_pretty stat show"))
queue_stats = stats["domainQueues"]["domains"][tc.DOMAIN_FANOUT][task.uri]

expect_same_structure(queue_stats, dt.TEST_QUEUE_STATS_AFTER_PURGE)

consumer_foo.close(f"{task.uri}?id=foo")
consumer_bar.close(f"{task.uri}?id=bar")
consumer_baz.close(f"{task.uri}?id=baz")

# Stage 3: check too-often stats safeguard
# Stage 4: check too-often stats safeguard
for i in range(5):
admin.send_admin("encoding json_pretty stat show")
res = admin.send_admin("encoding json_pretty stat show")
Expand Down
Loading

0 comments on commit 698d7be

Please sign in to comment.