Skip to content

Commit

Permalink
Fix summary (#625)
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos authored Aug 21, 2024
1 parent 6323967 commit f36ebc5
Showing 1 changed file with 36 additions and 34 deletions.
70 changes: 36 additions & 34 deletions include/cinatra/ylt/metric/summary.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ REFLECTION(json_summary_t, name, help, type, metrics);
#endif

struct block_t {
std::atomic<bool> is_coro_started_ = false;
std::atomic<bool> stop_ = false;
ylt::detail::moodycamel::ConcurrentQueue<double> sample_queue_;
std::shared_ptr<TimeWindowQuantiles> quantile_values_;
Expand Down Expand Up @@ -75,7 +76,7 @@ class summary_t : public static_metric {
block_->sample_queue_.enqueue(value);

bool expected = false;
if (is_coro_started_.compare_exchange_strong(expected, true)) {
if (block_->is_coro_started_.compare_exchange_strong(expected, true)) {
start(block_).via(excutor_->get_executor()).start([](auto &&) {
});
}
Expand Down Expand Up @@ -220,13 +221,13 @@ class summary_t : public static_metric {
}

if (block->sample_queue_.size_approx() == 0) {
is_coro_started_ = false;
block_->is_coro_started_ = false;
if (block->sample_queue_.size_approx() == 0) {
break;
}

bool expected = false;
if (!is_coro_started_.compare_exchange_strong(expected, true)) {
if (!block_->is_coro_started_.compare_exchange_strong(expected, true)) {
break;
}

Expand All @@ -243,7 +244,6 @@ class summary_t : public static_metric {
std::shared_ptr<block_t> block_;
static inline std::shared_ptr<coro_io::io_context_pool> excutor_ =
coro_io::create_io_context_pool(1);
std::atomic<bool> is_coro_started_ = false;
bool has_observe_ = false;
};

Expand All @@ -260,6 +260,10 @@ struct sum_and_count_t {

template <uint8_t N>
struct labels_block_t {
summary_t::Quantiles quantiles_; // readonly
std::chrono::milliseconds max_age_;
uint16_t age_buckets_;
std::atomic<bool> is_coro_started_ = false;
std::atomic<bool> stop_ = false;
ylt::detail::moodycamel::ConcurrentQueue<summary_label_sample<N>>
sample_queue_;
Expand All @@ -280,12 +284,16 @@ class basic_dynamic_summary : public dynamic_metric {
std::array<std::string, N> labels_name,
std::chrono::milliseconds max_age = std::chrono::seconds{60},
uint16_t age_buckets = 5)
: quantiles_{std::move(quantiles)},
dynamic_metric(MetricType::Summary, std::move(name), std::move(help),
std::move(labels_name)),
max_age_(max_age),
age_buckets_(age_buckets) {
init_block(labels_block_);
: dynamic_metric(MetricType::Summary, std::move(name), std::move(help),
std::move(labels_name)) {
labels_block_ = std::make_shared<labels_block_t<N>>();
labels_block_->quantiles_ = std::move(quantiles);
labels_block_->max_age_ = max_age;
labels_block_->age_buckets_ = age_buckets;

start(labels_block_).via(excutor_->get_executor()).start([](auto &&) {
});

g_user_metric_count++;
}

Expand All @@ -307,7 +315,8 @@ class basic_dynamic_summary : public dynamic_metric {
labels_block_->sample_queue_.enqueue({std::move(labels_value), value});

bool expected = false;
if (is_coro_started_.compare_exchange_strong(expected, true)) {
if (labels_block_->is_coro_started_.compare_exchange_strong(expected,
true)) {
start(labels_block_).via(excutor_->get_executor()).start([](auto &&) {
});
}
Expand All @@ -327,7 +336,7 @@ class basic_dynamic_summary : public dynamic_metric {
const std::array<std::string, N> &labels_value, double &sum,
uint64_t &count) {
std::vector<double> vec;
if (quantiles_.empty()) {
if (labels_block_->quantiles_.empty()) {
co_return std::vector<double>{};
}

Expand All @@ -339,7 +348,7 @@ class basic_dynamic_summary : public dynamic_metric {
}
sum = labels_block_->sum_and_count_[labels_value].sum;
count = labels_block_->sum_and_count_[labels_value].count;
for (const auto &quantile : quantiles_) {
for (const auto &quantile : labels_block_->quantiles_) {
vec.push_back(it->second->get(quantile.quantile));
}
},
Expand All @@ -359,13 +368,6 @@ class basic_dynamic_summary : public dynamic_metric {
}
#endif
private:
template <typename T>
void init_block(std::shared_ptr<T> &block) {
block = std::make_shared<T>();
start(block).via(excutor_->get_executor()).start([](auto &&) {
});
}

async_simple::coro::Lazy<void> start(
std::shared_ptr<labels_block_t<N>> label_block) {
summary_label_sample<N> sample;
Expand All @@ -376,8 +378,9 @@ class basic_dynamic_summary : public dynamic_metric {
auto &ptr = label_block->label_quantile_values_[sample.labels_value];

if (ptr == nullptr) {
ptr = std::make_shared<TimeWindowQuantiles>(quantiles_, max_age_,
age_buckets_);
ptr = std::make_shared<TimeWindowQuantiles>(
label_block->quantiles_, label_block->max_age_,
label_block->age_buckets_);
}

ptr->insert(sample.value);
Expand All @@ -393,13 +396,14 @@ class basic_dynamic_summary : public dynamic_metric {
co_await async_simple::coro::Yield{};

if (label_block->sample_queue_.size_approx() == 0) {
is_coro_started_ = false;
label_block->is_coro_started_ = false;
if (label_block->sample_queue_.size_approx() == 0) {
break;
}

bool expected = false;
if (!is_coro_started_.compare_exchange_strong(expected, true)) {
if (!label_block->is_coro_started_.compare_exchange_strong(expected,
true)) {
break;
}

Expand All @@ -412,7 +416,7 @@ class basic_dynamic_summary : public dynamic_metric {
}

async_simple::coro::Lazy<void> serialize_async_with_label(std::string &str) {
if (quantiles_.empty()) {
if (labels_block_->quantiles_.empty()) {
co_return;
}

Expand All @@ -432,13 +436,14 @@ class basic_dynamic_summary : public dynamic_metric {
double sum = 0;
uint64_t count = 0;
auto rates = co_await get_rates(labels_value, sum, count);
for (size_t i = 0; i < quantiles_.size(); i++) {
for (size_t i = 0; i < labels_block_->quantiles_.size(); i++) {
str.append(name_);
str.append("{");
build_label_string(str, labels_name_, labels_value);
str.append(",");
str.append("quantile=\"");
str.append(std::to_string(quantiles_[i].quantile)).append("\"} ");
str.append(std::to_string(labels_block_->quantiles_[i].quantile))
.append("\"} ");
str.append(std::to_string(rates[i])).append("\n");
}

Expand All @@ -459,7 +464,7 @@ class basic_dynamic_summary : public dynamic_metric {
#ifdef CINATRA_ENABLE_METRIC_JSON
async_simple::coro::Lazy<void> serialize_to_json_with_label_async(
std::string &str) {
if (quantiles_.empty()) {
if (labels_block_->quantiles_.empty()) {
co_return;
}

Expand All @@ -482,11 +487,12 @@ class basic_dynamic_summary : public dynamic_metric {
auto rates = co_await get_rates(labels_value, sum, count);
metric.count = count;
metric.sum = sum;
for (size_t i = 0; i < quantiles_.size(); i++) {
for (size_t i = 0; i < labels_block_->quantiles_.size(); i++) {
for (size_t i = 0; i < labels_value.size(); i++) {
metric.labels[labels_name_[i]] = labels_value[i];
}
metric.quantiles.emplace(quantiles_[i].quantile, rates[i]);
metric.quantiles.emplace(labels_block_->quantiles_[i].quantile,
rates[i]);
}

summary.metrics.push_back(std::move(metric));
Expand All @@ -495,13 +501,9 @@ class basic_dynamic_summary : public dynamic_metric {
}
#endif

Quantiles quantiles_; // readonly
std::shared_ptr<labels_block_t<N>> labels_block_;
static inline std::shared_ptr<coro_io::io_context_pool> excutor_ =
coro_io::create_io_context_pool(1);
std::chrono::milliseconds max_age_;
uint16_t age_buckets_;
std::atomic<bool> is_coro_started_ = false;
bool has_observe_ = false;
};

Expand Down

0 comments on commit f36ebc5

Please sign in to comment.