Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] allow client skips the non-continued deleted ranges stats #18263

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1262,7 +1262,8 @@ private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncR
protected CompletableFuture<? extends TopicStats> internalGetStatsAsync(boolean authoritative,
boolean getPreciseBacklog,
boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {
boolean getEarliestTimeInBacklog,
boolean getTotalNonContiguousDeletedMessagesRange) {
CompletableFuture<Void> future;

if (topicName.isGlobal()) {
Expand All @@ -1275,7 +1276,7 @@ protected CompletableFuture<? extends TopicStats> internalGetStatsAsync(boolean
.thenComposeAsync(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_STATS))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> topic.asyncGetStats(getPreciseBacklog, subscriptionBacklogSize,
getEarliestTimeInBacklog));
getEarliestTimeInBacklog, getTotalNonContiguousDeletedMessagesRange));
}

protected CompletableFuture<PersistentTopicInternalStats> internalGetInternalStatsAsync(boolean authoritative,
Expand Down Expand Up @@ -1411,7 +1412,8 @@ public void getInfoFailed(ManagedLedgerException exception, Object ctx) {

protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative, boolean perPartition,
boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {
boolean getEarliestTimeInBacklog,
boolean getTotalNonContiguousDeletedMessagesRange) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
Expand All @@ -1435,14 +1437,15 @@ protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean
.thenCompose(owned -> {
if (owned) {
return getTopicReferenceAsync(partition)
.thenApply(ref ->
ref.getStats(getPreciseBacklog, subscriptionBacklogSize,
getEarliestTimeInBacklog));
.thenApply(ref ->
ref.getStats(getPreciseBacklog, subscriptionBacklogSize,
leizhiyuan marked this conversation as resolved.
Show resolved Hide resolved
getEarliestTimeInBacklog,
getTotalNonContiguousDeletedMessagesRange));
} else {
try {
return pulsar().getAdminClient().topics().getStatsAsync(
partition.toString(), getPreciseBacklog, subscriptionBacklogSize,
getEarliestTimeInBacklog);
partition.toString(), getPreciseBacklog, subscriptionBacklogSize,
getEarliestTimeInBacklog, getTotalNonContiguousDeletedMessagesRange);
} catch (PulsarServerException e) {
return FutureUtil.failedFuture(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,9 +432,12 @@ public void getStats(
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog) {
@QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog,
@QueryParam("getTotalNonContiguousDeletedMessagesRange") @DefaultValue("true")
boolean getTotalNonContiguousDeletedMessagesRange) {
validateTopicName(property, cluster, namespace, encodedTopic);
internalGetStatsAsync(authoritative, getPreciseBacklog, false, false)
internalGetStatsAsync(authoritative, getPreciseBacklog, false,
false, getTotalNonContiguousDeletedMessagesRange)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
Expand Down Expand Up @@ -501,7 +504,8 @@ public void getPartitionedStats(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalGetPartitionedStats(asyncResponse, authoritative, perPartition, false, false, false);
internalGetPartitionedStats(asyncResponse, authoritative, perPartition, false,
false, false, true);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,10 @@ public void getPartitionedStats(
+ "not to use when there's heavy traffic.")
@QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize,
@ApiParam(value = "If return the earliest time in backlog")
@QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog) {
@QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog,
@ApiParam(value = "If return If return TotalNonContiguousDeletedMessagesRange")
@QueryParam("getTotalNonContiguousDeletedMessagesRange") @DefaultValue("true")
boolean getTotalNonContiguousDeletedMessagesRange) {
try {
validatePartitionedTopicName(tenant, namespace, encodedTopic);
if (topicName.isGlobal()) {
Expand All @@ -238,7 +241,8 @@ public void getPartitionedStats(
topicStatsFutureList
.add(pulsar().getAdminClient().topics().getStatsAsync(
(topicName.getPartition(i).toString()), getPreciseBacklog,
subscriptionBacklogSize, getEarliestTimeInBacklog));
subscriptionBacklogSize, getEarliestTimeInBacklog,
getTotalNonContiguousDeletedMessagesRange));
} catch (PulsarServerException e) {
asyncResponse.resume(new RestException(e));
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1212,9 +1212,13 @@ public void getStats(
+ "not to use when there's heavy traffic.")
@QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize,
@ApiParam(value = "If return time of the earliest message in backlog")
@QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog) {
@QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog,
@ApiParam(value = "If return If return TotalNonContiguousDeletedMessagesRange")
leizhiyuan marked this conversation as resolved.
Show resolved Hide resolved
@QueryParam("getTotalNonContiguousDeletedMessagesRange") @DefaultValue("true")
boolean getTotalNonContiguousDeletedMessagesRange) {
validateTopicName(tenant, namespace, encodedTopic);
internalGetStatsAsync(authoritative, getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog)
internalGetStatsAsync(authoritative, getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog,
getTotalNonContiguousDeletedMessagesRange)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
Expand Down Expand Up @@ -1314,11 +1318,14 @@ public void getPartitionedStats(
+ "not to use when there's heavy traffic.")
@QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize,
@ApiParam(value = "If return the earliest time in backlog")
@QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog) {
@QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog,
@ApiParam(value = "If return If return TotalNonContiguousDeletedMessagesRange")
@QueryParam("getTotalNonContiguousDeletedMessagesRange") @DefaultValue("true")
boolean getTotalNonContiguousDeletedMessagesRange) {
try {
validatePartitionedTopicName(tenant, namespace, encodedTopic);
internalGetPartitionedStats(asyncResponse, authoritative, perPartition, getPreciseBacklog,
subscriptionBacklogSize, getEarliestTimeInBacklog);
subscriptionBacklogSize, getEarliestTimeInBacklog, getTotalNonContiguousDeletedMessagesRange);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ protected static Summary.Child.Value getRgQuotaCalculationTime() {
protected void aggregateResourceGroupLocalUsages() {
final Summary.Timer aggrUsageTimer = rgUsageAggregationLatency.startTimer();
BrokerService bs = this.pulsar.getBrokerService();
Map<String, TopicStatsImpl> topicStatsMap = bs.getTopicStats();
Map<String, TopicStatsImpl> topicStatsMap = bs.getTopicStats(false);

for (Map.Entry<String, TopicStatsImpl> entry : topicStatsMap.entrySet()) {
final String topicName = entry.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ public Map<String, TopicStatsImpl> getTopicStats(NamespaceBundle bundle) {
Map<String, TopicStatsImpl> topicStatsMap = new HashMap<>();
topicMap.forEach((name, topic) -> {
topicStatsMap.put(name,
topic.getStats(false, false, false));
topic.getStats(false, false, false, true));
});
return topicStatsMap;
}
Expand Down Expand Up @@ -2342,10 +2342,11 @@ public String generateUniqueProducerName() {
return producerNameGenerator.getNextId();
}

public Map<String, TopicStatsImpl> getTopicStats() {
public Map<String, TopicStatsImpl> getTopicStats(boolean getTotalNonContiguousDeletedMessagesRange) {
HashMap<String, TopicStatsImpl> stats = new HashMap<>();

forEachTopic(topic -> stats.put(topic.getName(), topic.getStats(false, false, false)));
forEachTopic(topic -> stats.put(topic.getName(), topic.getStats(false,
false, false, getTotalNonContiguousDeletedMessagesRange)));

return stats;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,12 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats
ConcurrentOpenHashMap<String, ? extends Replicator> getShadowReplicators();

TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog);
boolean getEarliestTimeInBacklog, boolean getTotalNonContiguousDeletedMessagesRange);

CompletableFuture<? extends TopicStatsImpl> asyncGetStats(boolean getPreciseBacklog,
boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog);
boolean getEarliestTimeInBacklog,
boolean getTotalNonContiguousDeletedMessagesRange);

CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean includeLedgerMetadata);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -846,9 +846,11 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats

@Override
public NonPersistentTopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {
boolean getEarliestTimeInBacklog,
boolean getTotalNonContiguousDeletedMessagesRange) {
try {
return asyncGetStats(getPreciseBacklog, subscriptionBacklogSize, getPreciseBacklog).get();
return asyncGetStats(getPreciseBacklog, subscriptionBacklogSize, getPreciseBacklog,
getTotalNonContiguousDeletedMessagesRange).get();
} catch (InterruptedException | ExecutionException e) {
log.error("[{}] Fail to get stats", topic, e);
return null;
Expand All @@ -858,7 +860,8 @@ public NonPersistentTopicStatsImpl getStats(boolean getPreciseBacklog, boolean s
@Override
public CompletableFuture<NonPersistentTopicStatsImpl> asyncGetStats(boolean getPreciseBacklog,
boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {
boolean getEarliestTimeInBacklog,
boolean getTotalNonContiguousDeletedMessagesRange) {
CompletableFuture<NonPersistentTopicStatsImpl> future = new CompletableFuture<>();
NonPersistentTopicStatsImpl stats = new NonPersistentTopicStatsImpl();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,8 @@ public long estimateBacklogSize() {
}

public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {
boolean getEarliestTimeInBacklog,
boolean getTotalNonContiguousDeletedMessagesRange) {
SubscriptionStatsImpl subStats = new SubscriptionStatsImpl();
subStats.lastExpireTimestamp = lastExpireTimestamp;
subStats.lastConsumedFlowTimestamp = lastConsumedFlowTimestamp;
Expand Down Expand Up @@ -1164,9 +1165,11 @@ public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscri
});
}
}
subStats.nonContiguousDeletedMessagesRanges = cursor.getTotalNonContiguousDeletedMessagesRange();
subStats.nonContiguousDeletedMessagesRangesSerializedSize =
cursor.getNonContiguousDeletedMessagesRangeSerializedSize();
if (getTotalNonContiguousDeletedMessagesRange){
subStats.nonContiguousDeletedMessagesRanges = cursor.getTotalNonContiguousDeletedMessagesRange();
subStats.nonContiguousDeletedMessagesRangesSerializedSize =
cursor.getNonContiguousDeletedMessagesRangeSerializedSize();
}
return subStats;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1107,7 +1107,8 @@ void removeSubscription(String subscriptionName) {
PersistentSubscription sub = subscriptions.remove(subscriptionName);
if (sub != null) {
// preserve accumulative stats form removed subscription
SubscriptionStatsImpl stats = sub.getStats(false, false, false);
SubscriptionStatsImpl stats = sub.getStats(false, false,
false, true);
bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
}
Expand Down Expand Up @@ -2025,9 +2026,11 @@ public double getLastUpdatedAvgPublishRateInByte() {

@Override
public TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {
boolean getEarliestTimeInBacklog,
boolean getTotalNonContiguousDeletedMessagesRange) {
try {
return asyncGetStats(getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog).get();
return asyncGetStats(getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog,
getTotalNonContiguousDeletedMessagesRange).get();
} catch (InterruptedException | ExecutionException e) {
log.error("[{}] Fail to get stats", topic, e);
return null;
Expand All @@ -2036,7 +2039,8 @@ public TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBa

@Override
public CompletableFuture<TopicStatsImpl> asyncGetStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {
boolean getEarliestTimeInBacklog,
boolean getTotalNonContiguousDeletedMessagesRange) {

CompletableFuture<TopicStatsImpl> statsFuture = new CompletableFuture<>();
TopicStatsImpl stats = new TopicStatsImpl();
Expand Down Expand Up @@ -2070,7 +2074,8 @@ public CompletableFuture<TopicStatsImpl> asyncGetStats(boolean getPreciseBacklog

subscriptions.forEach((name, subscription) -> {
SubscriptionStatsImpl subStats =
subscription.getStats(getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog);
subscription.getStats(getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog,
getTotalNonContiguousDeletedMessagesRange);

stats.msgRateOut += subStats.msgRateOut;
stats.msgThroughputOut += subStats.msgThroughputOut;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
stats.managedLedgerStats.storageWriteRate = mlStats.getAddEntryMessagesRate();
stats.managedLedgerStats.storageReadRate = mlStats.getReadEntriesRate();
}
TopicStatsImpl tStatus = topic.getStats(getPreciseBacklog, subscriptionBacklogSize, false);
TopicStatsImpl tStatus = topic.getStats(getPreciseBacklog, subscriptionBacklogSize,
false, true);
stats.msgInCounter = tStatus.msgInCounter;
stats.bytesInCounter = tStatus.bytesInCounter;
stats.msgOutCounter = tStatus.msgOutCounter;
Expand Down
Loading