From a31e804160c5fa0aeb5e59c7178dc7f85609a614 Mon Sep 17 00:00:00 2001 From: leizhiyuan Date: Mon, 31 Oct 2022 13:42:37 +0800 Subject: [PATCH 1/3] feat:allow client skips the non-continued deleted ranges stats --- .../admin/impl/PersistentTopicsBase.java | 19 +++++++++------- .../broker/admin/v1/PersistentTopics.java | 10 ++++++--- .../broker/admin/v2/NonPersistentTopics.java | 8 +++++-- .../broker/admin/v2/PersistentTopics.java | 15 +++++++++---- .../resourcegroup/ResourceGroupService.java | 2 +- .../pulsar/broker/service/BrokerService.java | 7 +++--- .../apache/pulsar/broker/service/Topic.java | 5 +++-- .../nonpersistent/NonPersistentTopic.java | 9 +++++--- .../persistent/PersistentSubscription.java | 11 ++++++---- .../service/persistent/PersistentTopic.java | 15 ++++++++----- .../prometheus/NamespaceStatsAggregator.java | 3 ++- .../pulsar/broker/admin/AdminApiTest.java | 6 ++--- .../broker/admin/PersistentTopicsTest.java | 12 ++++++---- .../RGUsageMTAggrWaitForAllMsgsTest.java | 2 +- .../ResourceGroupUsageAggregationTest.java | 2 +- .../broker/service/BrokerServiceTest.java | 18 +++++++-------- .../broker/service/PersistentTopicTest.java | 6 ++--- .../nonpersistent/NonPersistentTopicTest.java | 6 ++--- .../persistent/PersistentTopicTest.java | 6 ++--- .../broker/transaction/TransactionTest.java | 2 +- .../api/DispatcherBlockConsumerTest.java | 4 ++-- .../client/api/NonPersistentTopicTest.java | 10 ++++----- .../client/impl/MessageChunkingTest.java | 2 +- .../pulsar/client/admin/GetStatsOptions.java | 5 +++++ .../apache/pulsar/client/admin/Topics.java | 22 ++++++++++++------- .../client/admin/internal/TopicsImpl.java | 11 +++++++--- 26 files changed, 135 insertions(+), 83 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index afe54f5cc57cd..a18281972f66d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1262,7 +1262,8 @@ private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncR protected CompletableFuture internalGetStatsAsync(boolean authoritative, boolean getPreciseBacklog, boolean subscriptionBacklogSize, - boolean getEarliestTimeInBacklog) { + boolean getEarliestTimeInBacklog, + boolean getTotalNonContiguousDeletedMessagesRange) { CompletableFuture future; if (topicName.isGlobal()) { @@ -1275,7 +1276,7 @@ protected CompletableFuture internalGetStatsAsync(boolean .thenComposeAsync(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_STATS)) .thenCompose(__ -> getTopicReferenceAsync(topicName)) .thenCompose(topic -> topic.asyncGetStats(getPreciseBacklog, subscriptionBacklogSize, - getEarliestTimeInBacklog)); + getEarliestTimeInBacklog, getTotalNonContiguousDeletedMessagesRange)); } protected CompletableFuture internalGetInternalStatsAsync(boolean authoritative, @@ -1411,7 +1412,8 @@ public void getInfoFailed(ManagedLedgerException exception, Object ctx) { protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative, boolean perPartition, boolean getPreciseBacklog, boolean subscriptionBacklogSize, - boolean getEarliestTimeInBacklog) { + boolean getEarliestTimeInBacklog, + boolean getTotalNonContiguousDeletedMessagesRange) { CompletableFuture future; if (topicName.isGlobal()) { future = validateGlobalNamespaceOwnershipAsync(namespaceName); @@ -1435,14 +1437,15 @@ protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean .thenCompose(owned -> { if (owned) { return getTopicReferenceAsync(partition) - .thenApply(ref -> - ref.getStats(getPreciseBacklog, subscriptionBacklogSize, - getEarliestTimeInBacklog)); + .thenApply(ref -> + ref.getStats(getPreciseBacklog, subscriptionBacklogSize, + getEarliestTimeInBacklog, + getTotalNonContiguousDeletedMessagesRange)); } else { try { return pulsar().getAdminClient().topics().getStatsAsync( - partition.toString(), getPreciseBacklog, subscriptionBacklogSize, - getEarliestTimeInBacklog); + partition.toString(), getPreciseBacklog, subscriptionBacklogSize, + getEarliestTimeInBacklog, getTotalNonContiguousDeletedMessagesRange); } catch (PulsarServerException e) { return FutureUtil.failedFuture(e); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index 943b191209fa8..c5bccbb96cf18 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -432,9 +432,12 @@ public void getStats( @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, - @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog) { + @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog, + @QueryParam("getTotalNonContiguousDeletedMessagesRange") @DefaultValue("true") + boolean getTotalNonContiguousDeletedMessagesRange) { validateTopicName(property, cluster, namespace, encodedTopic); - internalGetStatsAsync(authoritative, getPreciseBacklog, false, false) + internalGetStatsAsync(authoritative, getPreciseBacklog, false, + false, getTotalNonContiguousDeletedMessagesRange) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. @@ -501,7 +504,8 @@ public void getPartitionedStats(@Suspended final AsyncResponse asyncResponse, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { try { validateTopicName(property, cluster, namespace, encodedTopic); - internalGetPartitionedStats(asyncResponse, authoritative, perPartition, false, false, false); + internalGetPartitionedStats(asyncResponse, authoritative, perPartition, false, + false, false, true); } catch (WebApplicationException wae) { asyncResponse.resume(wae); } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index de1ee46fd8ed5..55e22a5cd7e95 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -212,7 +212,10 @@ public void getPartitionedStats( + "not to use when there's heavy traffic.") @QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize, @ApiParam(value = "If return the earliest time in backlog") - @QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog) { + @QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog, + @ApiParam(value = "If return If return TotalNonContiguousDeletedMessagesRange") + @QueryParam("getTotalNonContiguousDeletedMessagesRange") @DefaultValue("true") + boolean getTotalNonContiguousDeletedMessagesRange) { try { validatePartitionedTopicName(tenant, namespace, encodedTopic); if (topicName.isGlobal()) { @@ -238,7 +241,8 @@ public void getPartitionedStats( topicStatsFutureList .add(pulsar().getAdminClient().topics().getStatsAsync( (topicName.getPartition(i).toString()), getPreciseBacklog, - subscriptionBacklogSize, getEarliestTimeInBacklog)); + subscriptionBacklogSize, getEarliestTimeInBacklog, + getTotalNonContiguousDeletedMessagesRange)); } catch (PulsarServerException e) { asyncResponse.resume(new RestException(e)); return; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 903423737785a..2f09c7f9cc0bc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -1212,9 +1212,13 @@ public void getStats( + "not to use when there's heavy traffic.") @QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize, @ApiParam(value = "If return time of the earliest message in backlog") - @QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog) { + @QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog, + @ApiParam(value = "If return If return TotalNonContiguousDeletedMessagesRange") + @QueryParam("getTotalNonContiguousDeletedMessagesRange") @DefaultValue("true") + boolean getTotalNonContiguousDeletedMessagesRange) { validateTopicName(tenant, namespace, encodedTopic); - internalGetStatsAsync(authoritative, getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog) + internalGetStatsAsync(authoritative, getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog, + getTotalNonContiguousDeletedMessagesRange) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. @@ -1314,11 +1318,14 @@ public void getPartitionedStats( + "not to use when there's heavy traffic.") @QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize, @ApiParam(value = "If return the earliest time in backlog") - @QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog) { + @QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog, + @ApiParam(value = "If return If return TotalNonContiguousDeletedMessagesRange") + @QueryParam("getTotalNonContiguousDeletedMessagesRange") @DefaultValue("true") + boolean getTotalNonContiguousDeletedMessagesRange) { try { validatePartitionedTopicName(tenant, namespace, encodedTopic); internalGetPartitionedStats(asyncResponse, authoritative, perPartition, getPreciseBacklog, - subscriptionBacklogSize, getEarliestTimeInBacklog); + subscriptionBacklogSize, getEarliestTimeInBacklog, getTotalNonContiguousDeletedMessagesRange); } catch (WebApplicationException wae) { asyncResponse.resume(wae); } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java index d3f8eb7613a40..bb7d3a1de5e90 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java @@ -543,7 +543,7 @@ protected static Summary.Child.Value getRgQuotaCalculationTime() { protected void aggregateResourceGroupLocalUsages() { final Summary.Timer aggrUsageTimer = rgUsageAggregationLatency.startTimer(); BrokerService bs = this.pulsar.getBrokerService(); - Map topicStatsMap = bs.getTopicStats(); + Map topicStatsMap = bs.getTopicStats(false); for (Map.Entry entry : topicStatsMap.entrySet()) { final String topicName = entry.getKey(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 65e6a6a71add5..49d54925b358f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -475,7 +475,7 @@ public Map getTopicStats(NamespaceBundle bundle) { Map topicStatsMap = new HashMap<>(); topicMap.forEach((name, topic) -> { topicStatsMap.put(name, - topic.getStats(false, false, false)); + topic.getStats(false, false, false, true)); }); return topicStatsMap; } @@ -2342,10 +2342,11 @@ public String generateUniqueProducerName() { return producerNameGenerator.getNextId(); } - public Map getTopicStats() { + public Map getTopicStats(boolean getTotalNonContiguousDeletedMessagesRange) { HashMap stats = new HashMap<>(); - forEachTopic(topic -> stats.put(topic.getName(), topic.getStats(false, false, false))); + forEachTopic(topic -> stats.put(topic.getName(), topic.getStats(false, + false, false, getTotalNonContiguousDeletedMessagesRange))); return stats; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 3949df92ceca5..a42dbdeac1a45 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -266,11 +266,12 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats ConcurrentOpenHashMap getShadowReplicators(); TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize, - boolean getEarliestTimeInBacklog); + boolean getEarliestTimeInBacklog, boolean getTotalNonContiguousDeletedMessagesRange); CompletableFuture asyncGetStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize, - boolean getEarliestTimeInBacklog); + boolean getEarliestTimeInBacklog, + boolean getTotalNonContiguousDeletedMessagesRange); CompletableFuture getInternalStats(boolean includeLedgerMetadata); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index cf46103cc357b..722f7e1ea9b41 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -846,9 +846,11 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats @Override public NonPersistentTopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize, - boolean getEarliestTimeInBacklog) { + boolean getEarliestTimeInBacklog, + boolean getTotalNonContiguousDeletedMessagesRange) { try { - return asyncGetStats(getPreciseBacklog, subscriptionBacklogSize, getPreciseBacklog).get(); + return asyncGetStats(getPreciseBacklog, subscriptionBacklogSize, getPreciseBacklog, + getTotalNonContiguousDeletedMessagesRange).get(); } catch (InterruptedException | ExecutionException e) { log.error("[{}] Fail to get stats", topic, e); return null; @@ -858,7 +860,8 @@ public NonPersistentTopicStatsImpl getStats(boolean getPreciseBacklog, boolean s @Override public CompletableFuture asyncGetStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize, - boolean getEarliestTimeInBacklog) { + boolean getEarliestTimeInBacklog, + boolean getTotalNonContiguousDeletedMessagesRange) { CompletableFuture future = new CompletableFuture<>(); NonPersistentTopicStatsImpl stats = new NonPersistentTopicStatsImpl(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index f88ce03532414..e214202ec2960 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -1066,7 +1066,8 @@ public long estimateBacklogSize() { } public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscriptionBacklogSize, - boolean getEarliestTimeInBacklog) { + boolean getEarliestTimeInBacklog, + boolean getTotalNonContiguousDeletedMessagesRange) { SubscriptionStatsImpl subStats = new SubscriptionStatsImpl(); subStats.lastExpireTimestamp = lastExpireTimestamp; subStats.lastConsumedFlowTimestamp = lastConsumedFlowTimestamp; @@ -1164,9 +1165,11 @@ public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscri }); } } - subStats.nonContiguousDeletedMessagesRanges = cursor.getTotalNonContiguousDeletedMessagesRange(); - subStats.nonContiguousDeletedMessagesRangesSerializedSize = - cursor.getNonContiguousDeletedMessagesRangeSerializedSize(); + if (getTotalNonContiguousDeletedMessagesRange){ + subStats.nonContiguousDeletedMessagesRanges = cursor.getTotalNonContiguousDeletedMessagesRange(); + subStats.nonContiguousDeletedMessagesRangesSerializedSize = + cursor.getNonContiguousDeletedMessagesRangeSerializedSize(); + } return subStats; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 94cc0964c3708..0f558156cf34b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1107,7 +1107,8 @@ void removeSubscription(String subscriptionName) { PersistentSubscription sub = subscriptions.remove(subscriptionName); if (sub != null) { // preserve accumulative stats form removed subscription - SubscriptionStatsImpl stats = sub.getStats(false, false, false); + SubscriptionStatsImpl stats = sub.getStats(false, false, + false, true); bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter); msgOutFromRemovedSubscriptions.add(stats.msgOutCounter); } @@ -2025,9 +2026,11 @@ public double getLastUpdatedAvgPublishRateInByte() { @Override public TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize, - boolean getEarliestTimeInBacklog) { + boolean getEarliestTimeInBacklog, + boolean getTotalNonContiguousDeletedMessagesRange) { try { - return asyncGetStats(getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog).get(); + return asyncGetStats(getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog, + getTotalNonContiguousDeletedMessagesRange).get(); } catch (InterruptedException | ExecutionException e) { log.error("[{}] Fail to get stats", topic, e); return null; @@ -2036,7 +2039,8 @@ public TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBa @Override public CompletableFuture asyncGetStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize, - boolean getEarliestTimeInBacklog) { + boolean getEarliestTimeInBacklog, + boolean getTotalNonContiguousDeletedMessagesRange) { CompletableFuture statsFuture = new CompletableFuture<>(); TopicStatsImpl stats = new TopicStatsImpl(); @@ -2070,7 +2074,8 @@ public CompletableFuture asyncGetStats(boolean getPreciseBacklog subscriptions.forEach((name, subscription) -> { SubscriptionStatsImpl subStats = - subscription.getStats(getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog); + subscription.getStats(getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog, + getTotalNonContiguousDeletedMessagesRange); stats.msgRateOut += subStats.msgRateOut; stats.msgThroughputOut += subStats.msgThroughputOut; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index 4ba414623ab78..e2c30f93db65d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -179,7 +179,8 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include stats.managedLedgerStats.storageWriteRate = mlStats.getAddEntryMessagesRate(); stats.managedLedgerStats.storageReadRate = mlStats.getReadEntriesRate(); } - TopicStatsImpl tStatus = topic.getStats(getPreciseBacklog, subscriptionBacklogSize, false); + TopicStatsImpl tStatus = topic.getStats(getPreciseBacklog, subscriptionBacklogSize, + false, true); stats.msgInCounter = tStatus.msgInCounter; stats.bytesInCounter = tStatus.bytesInCounter; stats.msgOutCounter = tStatus.msgOutCounter; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 5a845ac9e7326..db6dd9ea3b82d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -1218,7 +1218,7 @@ public void testGetStats() throws Exception { // create consumer and subscription @Cleanup Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(subName).subscribe(); - TopicStats topicStats = admin.topics().getStats(topic, false, false, true); + TopicStats topicStats = admin.topics().getStats(topic, false, false, true, true); assertEquals(topicStats.getEarliestMsgPublishTimeInBacklogs(), 0); assertEquals(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog(), 0); @@ -1227,7 +1227,7 @@ public void testGetStats() throws Exception { publishMessagesOnPersistentTopic(topic, 10); Thread.sleep(1000); - topicStats = admin.topics().getStats(topic, false, false, true); + topicStats = admin.topics().getStats(topic, false, false, true, true); assertTrue(topicStats.getEarliestMsgPublishTimeInBacklogs() > 0); assertTrue(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog() > 0); @@ -1237,7 +1237,7 @@ public void testGetStats() throws Exception { } Thread.sleep(1000); - topicStats = admin.topics().getStats(topic, false, false, true); + topicStats = admin.topics().getStats(topic, false, false, true, true); assertEquals(topicStats.getEarliestMsgPublishTimeInBacklogs(), 0); assertEquals(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog(), 0); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 2cbce5612073c..3fe63808616e7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -288,7 +288,8 @@ public void testCreateSubscriptions() throws Exception { Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); response = mock(AsyncResponse.class); - persistentTopics.getStats(response, testTenant, testNamespace, testLocalTopicName, true, true, false, false); + persistentTopics.getStats(response, testTenant, testNamespace, testLocalTopicName, true, true, false, false, + true); ArgumentCaptor statCaptor = ArgumentCaptor.forClass(TopicStats.class); verify(response, timeout(5000).times(1)).resume(statCaptor.capture()); TopicStats topicStats = statCaptor.getValue(); @@ -306,7 +307,8 @@ public void testCreateSubscriptions() throws Exception { Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); response = mock(AsyncResponse.class); - persistentTopics.getStats(response, testTenant, testNamespace, testLocalTopicName, true, true, false, false); + persistentTopics.getStats(response, testTenant, testNamespace, testLocalTopicName, true, true, false, false, + true); statCaptor = ArgumentCaptor.forClass(TopicStats.class); verify(response, timeout(5000).times(1)).resume(statCaptor.capture()); topicStats = statCaptor.getValue(); @@ -325,7 +327,8 @@ public void testCreateSubscriptions() throws Exception { Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); response = mock(AsyncResponse.class); - persistentTopics.getStats(response, testTenant, testNamespace, testLocalTopicName, true, true, false, false); + persistentTopics.getStats(response, testTenant, testNamespace, testLocalTopicName, true, true, false, false, + true); statCaptor = ArgumentCaptor.forClass(TopicStats.class); verify(response, timeout(5000).times(1)).resume(statCaptor.capture()); topicStats = statCaptor.getValue(); @@ -344,7 +347,8 @@ public void testCreateSubscriptions() throws Exception { Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); response = mock(AsyncResponse.class); - persistentTopics.getStats(response, testTenant, testNamespace, testLocalTopicName, true, true, false, false); + persistentTopics.getStats(response, testTenant, testNamespace, testLocalTopicName, true, true, false, false, + true); statCaptor = ArgumentCaptor.forClass(TopicStats.class); verify(response, timeout(5000).times(1)).resume(statCaptor.capture()); topicStats = statCaptor.getValue(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java index 9bf7e3c5325d9..ec1a8bfc94f5d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java @@ -547,7 +547,7 @@ private void verifyRGProdConsStats(String[] topicStrings, boolean checkConsume) throws Exception { BrokerService bs = pulsar.getBrokerService(); - Map topicStatsMap = bs.getTopicStats(); + Map topicStatsMap = bs.getTopicStats(false); log.debug("verifyProdConsStats: topicStatsMap has {} entries", topicStatsMap.size()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java index 72fd91ea46d83..082f2a5f07fbe 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java @@ -210,7 +210,7 @@ private void verifyStats(String topicString, String rgName, throws InterruptedException, PulsarAdminException { BrokerService bs = pulsar.getBrokerService(); Awaitility.await().untilAsserted(() -> { - TopicStatsImpl topicStats = bs.getTopicStats().get(topicString); + TopicStatsImpl topicStats = bs.getTopicStats(false).get(topicString); Assert.assertNotNull(topicStats); if (checkProduce) { Assert.assertTrue(topicStats.bytesInCounter >= sentNumBytes); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 24e38438c5329..d6c32500d4afb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -231,7 +231,7 @@ public void testBrokerServicePersistentTopicStats() throws Exception { assertNotNull(topicRef); rolloverPerIntervalStats(); - stats = topicRef.getStats(false, false, false); + stats = topicRef.getStats(false, false, false, true); subStats = stats.getSubscriptions().values().iterator().next(); // subscription stats @@ -252,7 +252,7 @@ public void testBrokerServicePersistentTopicStats() throws Exception { Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); rolloverPerIntervalStats(); - stats = topicRef.getStats(false, false, false); + stats = topicRef.getStats(false, false, false, true); subStats = stats.getSubscriptions().values().iterator().next(); // publisher stats @@ -290,7 +290,7 @@ public void testBrokerServicePersistentTopicStats() throws Exception { Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); rolloverPerIntervalStats(); - stats = topicRef.getStats(false, false, false); + stats = topicRef.getStats(false, false, false, true); subStats = stats.getSubscriptions().values().iterator().next(); assertEquals(stats.getOffloadedStorageSize(), 0); @@ -419,13 +419,13 @@ public void testStatsOfStorageSizeWithSubscription() throws Exception { PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); - assertEquals(topicRef.getStats(false, false, false).storageSize, 0); + assertEquals(topicRef.getStats(false, false, false, true).storageSize, 0); for (int i = 0; i < 10; i++) { producer.send(new byte[10]); } - assertTrue(topicRef.getStats(false, false, false).storageSize > 0); + assertTrue(topicRef.getStats(false, false, false, true).storageSize > 0); } @Test @@ -447,7 +447,7 @@ public void testBrokerServicePersistentRedeliverTopicStats() throws Exception { assertNotNull(topicRef); rolloverPerIntervalStats(); - stats = topicRef.getStats(false, false, false); + stats = topicRef.getStats(false, false, false, true); subStats = stats.getSubscriptions().values().iterator().next(); // subscription stats @@ -465,7 +465,7 @@ public void testBrokerServicePersistentRedeliverTopicStats() throws Exception { Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); rolloverPerIntervalStats(); - stats = topicRef.getStats(false, false, false); + stats = topicRef.getStats(false, false, false, true); subStats = stats.getSubscriptions().values().iterator().next(); // publisher stats @@ -500,7 +500,7 @@ public void testBrokerServicePersistentRedeliverTopicStats() throws Exception { Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); rolloverPerIntervalStats(); - stats = topicRef.getStats(false, false, false); + stats = topicRef.getStats(false, false, false, true); subStats = stats.getSubscriptions().values().iterator().next(); assertTrue(subStats.getMsgRateRedeliver() > 0.0); assertEquals(subStats.getMsgRateRedeliver(), subStats.getConsumers().get(0).getMsgRateRedeliver()); @@ -514,7 +514,7 @@ public void testBrokerServicePersistentRedeliverTopicStats() throws Exception { Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); rolloverPerIntervalStats(); - stats = topicRef.getStats(false, false, false); + stats = topicRef.getStats(false, false, false, true); subStats = stats.getSubscriptions().values().iterator().next(); assertEquals(subStats.getMsgBacklog(), 0); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 45ebc685bbb27..59317f0a10d0c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -2280,7 +2280,7 @@ public void testKeySharedMetadataExposedToStats() throws Exception { sub1.addConsumer(consumer1); consumer1.close(); - SubscriptionStatsImpl stats1 = sub1.getStats(false, false, false); + SubscriptionStatsImpl stats1 = sub1.getStats(false, false, false, true); assertEquals(stats1.keySharedMode, "AUTO_SPLIT"); assertFalse(stats1.allowOutOfOrderDelivery); @@ -2291,7 +2291,7 @@ public void testKeySharedMetadataExposedToStats() throws Exception { sub2.addConsumer(consumer2); consumer2.close(); - SubscriptionStatsImpl stats2 = sub2.getStats(false, false, false); + SubscriptionStatsImpl stats2 = sub2.getStats(false, false, false, true); assertEquals(stats2.keySharedMode, "AUTO_SPLIT"); assertTrue(stats2.allowOutOfOrderDelivery); @@ -2303,7 +2303,7 @@ public void testKeySharedMetadataExposedToStats() throws Exception { sub3.addConsumer(consumer3); consumer3.close(); - SubscriptionStatsImpl stats3 = sub3.getStats(false, false, false); + SubscriptionStatsImpl stats3 = sub3.getStats(false, false, false, true); assertEquals(stats3.keySharedMode, "STICKY"); assertFalse(stats3.allowOutOfOrderDelivery); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java index 71caa1edb527c..18b4003f36142 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java @@ -63,7 +63,7 @@ public void testAccumulativeStats() throws Exception { NonPersistentTopic topic = (NonPersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); // stats are at zero before any activity - TopicStats stats = topic.getStats(false, false, false); + TopicStats stats = topic.getStats(false, false, false, true); assertEquals(stats.getBytesInCounter(), 0); assertEquals(stats.getMsgInCounter(), 0); assertEquals(stats.getBytesOutCounter(), 0); @@ -77,7 +77,7 @@ public void testAccumulativeStats() throws Exception { assertNotNull(msg); // send/receive result in non-zero stats - TopicStats statsBeforeUnsubscribe = topic.getStats(false, false, false); + TopicStats statsBeforeUnsubscribe = topic.getStats(false, false, false, true); assertTrue(statsBeforeUnsubscribe.getBytesInCounter() > 0); assertTrue(statsBeforeUnsubscribe.getMsgInCounter() > 0); assertTrue(statsBeforeUnsubscribe.getBytesOutCounter() > 0); @@ -90,7 +90,7 @@ public void testAccumulativeStats() throws Exception { assertEquals(topic.getProducers().size(), 0); // consumer unsubscribe/producer removal does not result in stats loss - TopicStats statsAfterUnsubscribe = topic.getStats(false, false, false); + TopicStats statsAfterUnsubscribe = topic.getStats(false, false, false, true); assertEquals(statsAfterUnsubscribe.getBytesInCounter(), statsBeforeUnsubscribe.getBytesInCounter()); assertEquals(statsAfterUnsubscribe.getMsgInCounter(), statsBeforeUnsubscribe.getMsgInCounter()); assertEquals(statsAfterUnsubscribe.getBytesOutCounter(), statsBeforeUnsubscribe.getBytesOutCounter()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index aa05624a5b0c9..83bd8445cdac7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -209,7 +209,7 @@ public void testAccumulativeStats() throws Exception { PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); // stats are at zero before any activity - TopicStats stats = topic.getStats(false, false, false); + TopicStats stats = topic.getStats(false, false, false, true); assertEquals(stats.getBytesInCounter(), 0); assertEquals(stats.getMsgInCounter(), 0); assertEquals(stats.getBytesOutCounter(), 0); @@ -223,7 +223,7 @@ public void testAccumulativeStats() throws Exception { assertNotNull(msg); // send/receive result in non-zero stats - TopicStats statsBeforeUnsubscribe = topic.getStats(false, false, false); + TopicStats statsBeforeUnsubscribe = topic.getStats(false, false, false, true); assertTrue(statsBeforeUnsubscribe.getBytesInCounter() > 0); assertTrue(statsBeforeUnsubscribe.getMsgInCounter() > 0); assertTrue(statsBeforeUnsubscribe.getBytesOutCounter() > 0); @@ -236,7 +236,7 @@ public void testAccumulativeStats() throws Exception { assertEquals(topic.getProducers().size(), 0); // consumer unsubscribe/producer removal does not result in stats loss - TopicStats statsAfterUnsubscribe = topic.getStats(false, false, false); + TopicStats statsAfterUnsubscribe = topic.getStats(false, false, false, true); assertEquals(statsAfterUnsubscribe.getBytesInCounter(), statsBeforeUnsubscribe.getBytesInCounter()); assertEquals(statsAfterUnsubscribe.getMsgInCounter(), statsBeforeUnsubscribe.getMsgInCounter()); assertEquals(statsAfterUnsubscribe.getBytesOutCounter(), statsBeforeUnsubscribe.getBytesOutCounter()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 64dca965267f9..6e864a8e77d3e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -203,7 +203,7 @@ public void testTopicTransactionMetrics() throws Exception { Optional optional = pulsarService.getBrokerService().getTopic(topic, false).get(); assertTrue(optional.isPresent()); PersistentTopic persistentTopic = (PersistentTopic) optional.get(); - TopicStatsImpl stats = persistentTopic.getStats(false, false, false); + TopicStatsImpl stats = persistentTopic.getStats(false, false, false, true); assertEquals(stats.committedTxnCount, 1); assertEquals(stats.abortedTxnCount, 1); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java index ec24c066f3605..cb12973233e7a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java @@ -545,7 +545,7 @@ public void testBlockDispatcherStats() throws Exception { assertNotNull(topicRef); rolloverPerIntervalStats(); - stats = topicRef.getStats(false, false, false); + stats = topicRef.getStats(false, false, false, true); subStats = stats.getSubscriptions().values().iterator().next(); // subscription stats @@ -563,7 +563,7 @@ public void testBlockDispatcherStats() throws Exception { Thread.sleep(timeWaitToSync); rolloverPerIntervalStats(); - stats = topicRef.getStats(false, false, false); + stats = topicRef.getStats(false, false, false, true); subStats = stats.getSubscriptions().values().iterator().next(); assertTrue(subStats.getMsgBacklog() > 0); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java index d4af84b846960..57c38e4ab8ebd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java @@ -495,7 +495,7 @@ public void testTopicStats() throws Exception { assertNotNull(topicRef); rolloverPerIntervalStats(pulsar); - stats = topicRef.getStats(false, false, false); + stats = topicRef.getStats(false, false, false, true); subStats = stats.getSubscriptions().values().iterator().next(); // subscription stats @@ -513,7 +513,7 @@ public void testTopicStats() throws Exception { Thread.sleep(timeWaitToSync); rolloverPerIntervalStats(pulsar); - stats = topicRef.getStats(false, false, false); + stats = topicRef.getStats(false, false, false, true); subStats = stats.getSubscriptions().values().iterator().next(); assertTrue(subStats.getMsgRateOut() > 0); @@ -580,7 +580,7 @@ public void testReplicator() throws Exception { assertNotNull(replicatorR3); rolloverPerIntervalStats(replicationPulasr); - stats = topicRef.getStats(false, false, false); + stats = topicRef.getStats(false, false, false, true); subStats = stats.getSubscriptions().values().iterator().next(); // subscription stats @@ -651,7 +651,7 @@ public void testReplicator() throws Exception { Thread.sleep(timeWaitToSync); rolloverPerIntervalStats(replicationPulasr); - stats = topicRef.getStats(false, false, false); + stats = topicRef.getStats(false, false, false, true); subStats = stats.getSubscriptions().values().iterator().next(); assertTrue(subStats.getMsgRateOut() > 0); @@ -849,7 +849,7 @@ public void testMsgDropStat() throws Exception { NonPersistentTopic topic = (NonPersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); pulsar.getBrokerService().updateRates(); - NonPersistentTopicStats stats = topic.getStats(false, false, false); + NonPersistentTopicStats stats = topic.getStats(false, false, false, true); NonPersistentPublisherStats npStats = stats.getPublishers().get(0); NonPersistentSubscriptionStats sub1Stats = stats.getSubscriptions().get("subscriber-1"); NonPersistentSubscriptionStats sub2Stats = stats.getSubscriptions().get("subscriber-2"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java index b1eb6fb384657..a4e6cbb05f233 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java @@ -159,7 +159,7 @@ public void testLargeMessage(boolean ackReceiptEnabled, boolean clientSizeMaxMes pulsar.getBrokerService().updateRates(); - PublisherStats producerStats = topic.getStats(false, false, false).getPublishers().get(0); + PublisherStats producerStats = topic.getStats(false, false, false, true).getPublishers().get(0); assertTrue(producerStats.getChunkedMessageRate() > 0); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/GetStatsOptions.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/GetStatsOptions.java index 14e99ac014ba8..02ad06a52ee86 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/GetStatsOptions.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/GetStatsOptions.java @@ -38,4 +38,9 @@ public class GetStatsOptions { * Whether to get the earliest time in backlog. */ private final boolean getEarliestTimeInBacklog; + + /** + * Whether to get total NonContiguousDeletedMessagesRange. + */ + private final boolean getTotalNonContiguousDeletedMessagesRange; } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 16b9afca5f2cd..8fa0cf84fafb6 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -1137,25 +1137,30 @@ default CompletableFuture deleteAsync(String topic, boolean force) { default TopicStats getStats(String topic, boolean getPreciseBacklog, boolean subscriptionBacklogSize, - boolean getEarliestTimeInBacklog) throws PulsarAdminException { + boolean getEarliestTimeInBacklog, boolean getTotalNonContiguousDeletedMessagesRange) + throws PulsarAdminException { GetStatsOptions getStatsOptions = - new GetStatsOptions(getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog); + new GetStatsOptions(getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog, + getTotalNonContiguousDeletedMessagesRange); return getStats(topic, getStatsOptions); } default TopicStats getStats(String topic, boolean getPreciseBacklog, - boolean subscriptionBacklogSize) throws PulsarAdminException { - GetStatsOptions getStatsOptions = new GetStatsOptions(getPreciseBacklog, subscriptionBacklogSize, false); + boolean subscriptionBacklogSize) throws PulsarAdminException { + GetStatsOptions getStatsOptions = new GetStatsOptions(getPreciseBacklog, subscriptionBacklogSize, + false, true); return getStats(topic, getStatsOptions); } default TopicStats getStats(String topic, boolean getPreciseBacklog) throws PulsarAdminException { - GetStatsOptions getStatsOptions = new GetStatsOptions(getPreciseBacklog, false, false); + GetStatsOptions getStatsOptions = new GetStatsOptions(getPreciseBacklog, false, + false, true); return getStats(topic, getStatsOptions); } default TopicStats getStats(String topic) throws PulsarAdminException { - return getStats(topic, new GetStatsOptions(false, false, false)); + return getStats(topic, new GetStatsOptions(false, false, + false, true)); } /** @@ -1174,10 +1179,11 @@ default TopicStats getStats(String topic) throws PulsarAdminException { * */ CompletableFuture getStatsAsync(String topic, boolean getPreciseBacklog, - boolean subscriptionBacklogSize, boolean getEarliestTimeInBacklog); + boolean subscriptionBacklogSize, boolean getEarliestTimeInBacklog, + boolean getTotalNonContiguousDeletedMessagesRange); default CompletableFuture getStatsAsync(String topic) { - return getStatsAsync(topic, false, false, false); + return getStatsAsync(topic, false, false, false, true); } /** diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 3b50737ffc027..9782384b86f55 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -593,18 +593,23 @@ public TopicStats getStats(String topic, GetStatsOptions getStatsOptions) throws boolean getPreciseBacklog = getStatsOptions.isGetPreciseBacklog(); boolean subscriptionBacklogSize = getStatsOptions.isSubscriptionBacklogSize(); boolean getEarliestTimeInBacklog = getStatsOptions.isGetEarliestTimeInBacklog(); - return sync(() -> getStatsAsync(topic, getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog)); + boolean getTotalNonContiguousDeletedMessagesRange = + getStatsOptions.isGetTotalNonContiguousDeletedMessagesRange(); + return sync(() -> getStatsAsync(topic, getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog, + getTotalNonContiguousDeletedMessagesRange)); } @Override public CompletableFuture getStatsAsync(String topic, boolean getPreciseBacklog, boolean subscriptionBacklogSize, - boolean getEarliestTimeInBacklog) { + boolean getEarliestTimeInBacklog, + boolean getTotalNonContiguousDeletedMessagesRange) { TopicName tn = validateTopic(topic); WebTarget path = topicPath(tn, "stats") .queryParam("getPreciseBacklog", getPreciseBacklog) .queryParam("subscriptionBacklogSize", subscriptionBacklogSize) - .queryParam("getEarliestTimeInBacklog", getEarliestTimeInBacklog); + .queryParam("getEarliestTimeInBacklog", getEarliestTimeInBacklog) + .queryParam("getTotalNonContiguousDeletedMessagesRange", getTotalNonContiguousDeletedMessagesRange); final CompletableFuture future = new CompletableFuture<>(); InvocationCallback persistentCB = new InvocationCallback() { From 8a2e85496dd94c410d463f969bb748e53d2d6214 Mon Sep 17 00:00:00 2001 From: leizhiyuan Date: Wed, 2 Nov 2022 06:55:32 +0800 Subject: [PATCH 2/3] chore: fix apiParam --- .../apache/pulsar/broker/admin/v2/NonPersistentTopics.java | 2 +- .../org/apache/pulsar/broker/admin/v2/PersistentTopics.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index 55e22a5cd7e95..39e16fba3afe6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -213,7 +213,7 @@ public void getPartitionedStats( @QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize, @ApiParam(value = "If return the earliest time in backlog") @QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog, - @ApiParam(value = "If return If return TotalNonContiguousDeletedMessagesRange") + @ApiParam(value = "If return the total non-continues deleted message range") @QueryParam("getTotalNonContiguousDeletedMessagesRange") @DefaultValue("true") boolean getTotalNonContiguousDeletedMessagesRange) { try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 2f09c7f9cc0bc..2b0ed3fcd6378 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -1213,7 +1213,7 @@ public void getStats( @QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize, @ApiParam(value = "If return time of the earliest message in backlog") @QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog, - @ApiParam(value = "If return If return TotalNonContiguousDeletedMessagesRange") + @ApiParam(value = "If return the total non-continues deleted message range") @QueryParam("getTotalNonContiguousDeletedMessagesRange") @DefaultValue("true") boolean getTotalNonContiguousDeletedMessagesRange) { validateTopicName(tenant, namespace, encodedTopic); @@ -1319,7 +1319,7 @@ public void getPartitionedStats( @QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize, @ApiParam(value = "If return the earliest time in backlog") @QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog, - @ApiParam(value = "If return If return TotalNonContiguousDeletedMessagesRange") + @ApiParam(value = "If return the total non-continues deleted message range") @QueryParam("getTotalNonContiguousDeletedMessagesRange") @DefaultValue("true") boolean getTotalNonContiguousDeletedMessagesRange) { try { From 8460be242b2d8920a53bd929d767ff3f333d1381 Mon Sep 17 00:00:00 2001 From: leizhiyuan Date: Wed, 2 Nov 2022 08:10:13 +0800 Subject: [PATCH 3/3] feat: reform getStats --- .../admin/impl/PersistentTopicsBase.java | 42 ++++++++----------- .../broker/admin/v1/PersistentTopics.java | 16 +++++-- .../broker/admin/v2/PersistentTopics.java | 18 ++++++-- .../resourcegroup/ResourceGroupService.java | 6 ++- .../pulsar/broker/service/BrokerService.java | 13 +++--- .../apache/pulsar/broker/service/Topic.java | 9 ++-- .../nonpersistent/NonPersistentTopic.java | 13 ++---- .../persistent/PersistentSubscription.java | 7 ++++ .../service/persistent/PersistentTopic.java | 23 +++++----- .../prometheus/NamespaceStatsAggregator.java | 8 +++- .../RGUsageMTAggrWaitForAllMsgsTest.java | 6 ++- .../ResourceGroupUsageAggregationTest.java | 6 ++- .../broker/service/BrokerServiceTest.java | 28 +++++++++---- .../nonpersistent/NonPersistentTopicTest.java | 10 +++-- .../persistent/PersistentTopicTest.java | 10 +++-- .../broker/transaction/TransactionTest.java | 6 ++- .../api/DispatcherBlockConsumerTest.java | 8 +++- .../client/api/NonPersistentTopicTest.java | 20 ++++++--- .../client/impl/MessageChunkingTest.java | 7 +++- .../apache/pulsar/client/admin/Topics.java | 8 +++- .../client/admin/internal/TopicsImpl.java | 16 +++---- .../pulsar/admin/cli/PulsarAdminToolTest.java | 11 ++++- .../apache/pulsar/admin/cli/CmdTopics.java | 14 ++++++- 23 files changed, 197 insertions(+), 108 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index a18281972f66d..0ae48d4835fe0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -78,6 +78,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.client.admin.GetStatsOptions; import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.admin.OffloadProcessStatus; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -1260,10 +1261,7 @@ private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncR } protected CompletableFuture internalGetStatsAsync(boolean authoritative, - boolean getPreciseBacklog, - boolean subscriptionBacklogSize, - boolean getEarliestTimeInBacklog, - boolean getTotalNonContiguousDeletedMessagesRange) { + GetStatsOptions getStatsOptions) { CompletableFuture future; if (topicName.isGlobal()) { @@ -1275,8 +1273,7 @@ protected CompletableFuture internalGetStatsAsync(boolean return future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) .thenComposeAsync(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_STATS)) .thenCompose(__ -> getTopicReferenceAsync(topicName)) - .thenCompose(topic -> topic.asyncGetStats(getPreciseBacklog, subscriptionBacklogSize, - getEarliestTimeInBacklog, getTotalNonContiguousDeletedMessagesRange)); + .thenCompose(topic -> topic.asyncGetStats(getStatsOptions)); } protected CompletableFuture internalGetInternalStatsAsync(boolean authoritative, @@ -1411,9 +1408,7 @@ public void getInfoFailed(ManagedLedgerException exception, Object ctx) { } protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative, boolean perPartition, - boolean getPreciseBacklog, boolean subscriptionBacklogSize, - boolean getEarliestTimeInBacklog, - boolean getTotalNonContiguousDeletedMessagesRange) { + GetStatsOptions getStatsOptions) { CompletableFuture future; if (topicName.isGlobal()) { future = validateGlobalNamespaceOwnershipAsync(namespaceName); @@ -1433,23 +1428,20 @@ protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean TopicName partition = topicName.getPartition(i); topicStatsFutureList.add( pulsar().getNamespaceService() - .isServiceUnitOwnedAsync(partition) - .thenCompose(owned -> { - if (owned) { - return getTopicReferenceAsync(partition) - .thenApply(ref -> - ref.getStats(getPreciseBacklog, subscriptionBacklogSize, - getEarliestTimeInBacklog, - getTotalNonContiguousDeletedMessagesRange)); - } else { - try { - return pulsar().getAdminClient().topics().getStatsAsync( - partition.toString(), getPreciseBacklog, subscriptionBacklogSize, - getEarliestTimeInBacklog, getTotalNonContiguousDeletedMessagesRange); - } catch (PulsarServerException e) { - return FutureUtil.failedFuture(e); + .isServiceUnitOwnedAsync(partition) + .thenCompose(owned -> { + if (owned) { + return getTopicReferenceAsync(partition) + .thenApply(ref -> + ref.getStats(getStatsOptions)); + } else { + try { + return pulsar().getAdminClient().topics().getStatsAsync( + partition.toString(), getStatsOptions); + } catch (PulsarServerException e) { + return FutureUtil.failedFuture(e); + } } - } }) ); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index c5bccbb96cf18..cf91ead2639c5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -44,6 +44,7 @@ import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.client.admin.GetStatsOptions; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.ResetCursorData; import org.apache.pulsar.common.policies.data.AuthAction; @@ -433,11 +434,15 @@ public void getStats( @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog, + @ApiParam(value = "If return time of the earliest message in backlog") @QueryParam("getTotalNonContiguousDeletedMessagesRange") @DefaultValue("true") boolean getTotalNonContiguousDeletedMessagesRange) { validateTopicName(property, cluster, namespace, encodedTopic); - internalGetStatsAsync(authoritative, getPreciseBacklog, false, - false, getTotalNonContiguousDeletedMessagesRange) + GetStatsOptions getStatsOptions = + GetStatsOptions.builder().getPreciseBacklog(getPreciseBacklog).subscriptionBacklogSize(false) + .getEarliestTimeInBacklog(false) + .getTotalNonContiguousDeletedMessagesRange(getTotalNonContiguousDeletedMessagesRange).build(); + internalGetStatsAsync(authoritative, getStatsOptions) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. @@ -504,8 +509,11 @@ public void getPartitionedStats(@Suspended final AsyncResponse asyncResponse, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { try { validateTopicName(property, cluster, namespace, encodedTopic); - internalGetPartitionedStats(asyncResponse, authoritative, perPartition, false, - false, false, true); + GetStatsOptions getStatsOptions = + GetStatsOptions.builder().getPreciseBacklog(false).subscriptionBacklogSize(false) + .getEarliestTimeInBacklog(false).getTotalNonContiguousDeletedMessagesRange(true).build(); + + internalGetPartitionedStats(asyncResponse, authoritative, perPartition, getStatsOptions); } catch (WebApplicationException wae) { asyncResponse.resume(wae); } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 2b0ed3fcd6378..c311d81f7d0ce 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -50,6 +50,7 @@ import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.client.admin.GetStatsOptions; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MessageIdImpl; @@ -1217,8 +1218,12 @@ public void getStats( @QueryParam("getTotalNonContiguousDeletedMessagesRange") @DefaultValue("true") boolean getTotalNonContiguousDeletedMessagesRange) { validateTopicName(tenant, namespace, encodedTopic); - internalGetStatsAsync(authoritative, getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog, - getTotalNonContiguousDeletedMessagesRange) + GetStatsOptions getStatsOptions = + GetStatsOptions.builder().getPreciseBacklog(getPreciseBacklog) + .subscriptionBacklogSize(subscriptionBacklogSize) + .getEarliestTimeInBacklog(getEarliestTimeInBacklog) + .getTotalNonContiguousDeletedMessagesRange(getTotalNonContiguousDeletedMessagesRange).build(); + internalGetStatsAsync(authoritative, getStatsOptions) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. @@ -1324,8 +1329,13 @@ public void getPartitionedStats( boolean getTotalNonContiguousDeletedMessagesRange) { try { validatePartitionedTopicName(tenant, namespace, encodedTopic); - internalGetPartitionedStats(asyncResponse, authoritative, perPartition, getPreciseBacklog, - subscriptionBacklogSize, getEarliestTimeInBacklog, getTotalNonContiguousDeletedMessagesRange); + GetStatsOptions getStatsOptions = + GetStatsOptions.builder().getPreciseBacklog(getPreciseBacklog) + .subscriptionBacklogSize(subscriptionBacklogSize) + .getEarliestTimeInBacklog(getEarliestTimeInBacklog) + .getTotalNonContiguousDeletedMessagesRange(getTotalNonContiguousDeletedMessagesRange) + .build(); + internalGetPartitionedStats(asyncResponse, authoritative, perPartition, getStatsOptions); } catch (WebApplicationException wae) { asyncResponse.resume(wae); } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java index bb7d3a1de5e90..8f511d0f7027f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java @@ -35,6 +35,7 @@ import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass; import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupRefTypes; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.client.admin.GetStatsOptions; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; @@ -543,7 +544,10 @@ protected static Summary.Child.Value getRgQuotaCalculationTime() { protected void aggregateResourceGroupLocalUsages() { final Summary.Timer aggrUsageTimer = rgUsageAggregationLatency.startTimer(); BrokerService bs = this.pulsar.getBrokerService(); - Map topicStatsMap = bs.getTopicStats(false); + GetStatsOptions getStatsOptions = + GetStatsOptions.builder().getPreciseBacklog(false).subscriptionBacklogSize(false) + .getEarliestTimeInBacklog(false).getTotalNonContiguousDeletedMessagesRange(false).build(); + Map topicStatsMap = bs.getTopicStats(getStatsOptions); for (Map.Entry entry : topicStatsMap.entrySet()) { final String topicName = entry.getKey(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 49d54925b358f..cdb533f2c73a7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -119,6 +119,7 @@ import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge; import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; import org.apache.pulsar.broker.validator.BindAddressValidator; +import org.apache.pulsar.client.admin.GetStatsOptions; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.api.ClientBuilder; @@ -474,8 +475,11 @@ public Map getTopicStats(NamespaceBundle bundle) { Map topicStatsMap = new HashMap<>(); topicMap.forEach((name, topic) -> { + GetStatsOptions getStatsOptions = + GetStatsOptions.builder().getPreciseBacklog(false).subscriptionBacklogSize(false) + .getEarliestTimeInBacklog(false).getTotalNonContiguousDeletedMessagesRange(true).build(); topicStatsMap.put(name, - topic.getStats(false, false, false, true)); + topic.getStats(getStatsOptions)); }); return topicStatsMap; } @@ -2342,12 +2346,9 @@ public String generateUniqueProducerName() { return producerNameGenerator.getNextId(); } - public Map getTopicStats(boolean getTotalNonContiguousDeletedMessagesRange) { + public Map getTopicStats(GetStatsOptions getStatsOptions) { HashMap stats = new HashMap<>(); - - forEachTopic(topic -> stats.put(topic.getName(), topic.getStats(false, - false, false, getTotalNonContiguousDeletedMessagesRange))); - + forEachTopic(topic -> stats.put(topic.getName(), topic.getStats(getStatsOptions))); return stats; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index a42dbdeac1a45..42070ec81da5c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -29,6 +29,7 @@ import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.NamespaceStats; +import org.apache.pulsar.client.admin.GetStatsOptions; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; @@ -265,13 +266,9 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats ConcurrentOpenHashMap getShadowReplicators(); - TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize, - boolean getEarliestTimeInBacklog, boolean getTotalNonContiguousDeletedMessagesRange); + TopicStatsImpl getStats(GetStatsOptions getStatsOptions); - CompletableFuture asyncGetStats(boolean getPreciseBacklog, - boolean subscriptionBacklogSize, - boolean getEarliestTimeInBacklog, - boolean getTotalNonContiguousDeletedMessagesRange); + CompletableFuture asyncGetStats(GetStatsOptions getStatsOptions); CompletableFuture getInternalStats(boolean includeLedgerMetadata); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 722f7e1ea9b41..0d879bdec7616 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -65,6 +65,7 @@ import org.apache.pulsar.broker.service.TransportCnx; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.NamespaceStats; +import org.apache.pulsar.client.admin.GetStatsOptions; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -845,12 +846,9 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats } @Override - public NonPersistentTopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize, - boolean getEarliestTimeInBacklog, - boolean getTotalNonContiguousDeletedMessagesRange) { + public NonPersistentTopicStatsImpl getStats(GetStatsOptions getStatsOptions) { try { - return asyncGetStats(getPreciseBacklog, subscriptionBacklogSize, getPreciseBacklog, - getTotalNonContiguousDeletedMessagesRange).get(); + return asyncGetStats(getStatsOptions).get(); } catch (InterruptedException | ExecutionException e) { log.error("[{}] Fail to get stats", topic, e); return null; @@ -858,10 +856,7 @@ public NonPersistentTopicStatsImpl getStats(boolean getPreciseBacklog, boolean s } @Override - public CompletableFuture asyncGetStats(boolean getPreciseBacklog, - boolean subscriptionBacklogSize, - boolean getEarliestTimeInBacklog, - boolean getTotalNonContiguousDeletedMessagesRange) { + public CompletableFuture asyncGetStats(GetStatsOptions getStatsOptions) { CompletableFuture future = new CompletableFuture<>(); NonPersistentTopicStatsImpl stats = new NonPersistentTopicStatsImpl(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index e214202ec2960..8150f00f13780 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -74,6 +74,7 @@ import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle; import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleDisabled; import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl; +import org.apache.pulsar.client.admin.GetStatsOptions; import org.apache.pulsar.client.api.Range; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.CommandAck.AckType; @@ -1065,6 +1066,12 @@ public long estimateBacklogSize() { return cursor.getEstimatedSizeSinceMarkDeletePosition(); } + public SubscriptionStatsImpl getStats(GetStatsOptions getStatsOptions) { + return getStats(getStatsOptions.isGetPreciseBacklog(), getStatsOptions.isSubscriptionBacklogSize(), + getStatsOptions.isGetEarliestTimeInBacklog(), + getStatsOptions.isGetTotalNonContiguousDeletedMessagesRange()); + } + @Deprecated public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscriptionBacklogSize, boolean getEarliestTimeInBacklog, boolean getTotalNonContiguousDeletedMessagesRange) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 0f558156cf34b..d5049827336d8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -113,6 +113,7 @@ import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable; import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; +import org.apache.pulsar.client.admin.GetStatsOptions; import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.admin.OffloadProcessStatus; import org.apache.pulsar.client.api.MessageId; @@ -1107,8 +1108,10 @@ void removeSubscription(String subscriptionName) { PersistentSubscription sub = subscriptions.remove(subscriptionName); if (sub != null) { // preserve accumulative stats form removed subscription - SubscriptionStatsImpl stats = sub.getStats(false, false, - false, true); + GetStatsOptions getStatsOptions = + GetStatsOptions.builder().getPreciseBacklog(false).subscriptionBacklogSize(false) + .getEarliestTimeInBacklog(false).getTotalNonContiguousDeletedMessagesRange(true).build(); + SubscriptionStatsImpl stats = sub.getStats(getStatsOptions); bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter); msgOutFromRemovedSubscriptions.add(stats.msgOutCounter); } @@ -2025,12 +2028,9 @@ public double getLastUpdatedAvgPublishRateInByte() { } @Override - public TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize, - boolean getEarliestTimeInBacklog, - boolean getTotalNonContiguousDeletedMessagesRange) { + public TopicStatsImpl getStats(GetStatsOptions getStatsOptions) { try { - return asyncGetStats(getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog, - getTotalNonContiguousDeletedMessagesRange).get(); + return asyncGetStats(getStatsOptions).get(); } catch (InterruptedException | ExecutionException e) { log.error("[{}] Fail to get stats", topic, e); return null; @@ -2038,9 +2038,7 @@ public TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBa } @Override - public CompletableFuture asyncGetStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize, - boolean getEarliestTimeInBacklog, - boolean getTotalNonContiguousDeletedMessagesRange) { + public CompletableFuture asyncGetStats(GetStatsOptions getStatsOptions) { CompletableFuture statsFuture = new CompletableFuture<>(); TopicStatsImpl stats = new TopicStatsImpl(); @@ -2074,8 +2072,7 @@ public CompletableFuture asyncGetStats(boolean getPreciseBacklog subscriptions.forEach((name, subscription) -> { SubscriptionStatsImpl subStats = - subscription.getStats(getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog, - getTotalNonContiguousDeletedMessagesRange); + subscription.getStats(getStatsOptions); stats.msgRateOut += subStats.msgRateOut; stats.msgThroughputOut += subStats.msgThroughputOut; @@ -2127,7 +2124,7 @@ public CompletableFuture asyncGetStats(boolean getPreciseBacklog return compactionRecord; }); - if (getEarliestTimeInBacklog && stats.backlogSize != 0) { + if (getStatsOptions.isGetEarliestTimeInBacklog() && stats.backlogSize != 0) { ledger.getEarliestMessagePublishTimeInBacklog().whenComplete((earliestTime, e) -> { if (e != null) { log.error("[{}] Failed to get earliest message publish time in backlog", topic, e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index e2c30f93db65d..c440d7ddaaa9b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -31,6 +31,7 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.admin.GetStatsOptions; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; import org.apache.pulsar.common.policies.data.stats.NonPersistentSubscriptionStatsImpl; @@ -179,8 +180,11 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include stats.managedLedgerStats.storageWriteRate = mlStats.getAddEntryMessagesRate(); stats.managedLedgerStats.storageReadRate = mlStats.getReadEntriesRate(); } - TopicStatsImpl tStatus = topic.getStats(getPreciseBacklog, subscriptionBacklogSize, - false, true); + GetStatsOptions getStatsOptions = + GetStatsOptions.builder().getPreciseBacklog(getPreciseBacklog) + .subscriptionBacklogSize(subscriptionBacklogSize) + .getEarliestTimeInBacklog(false).getTotalNonContiguousDeletedMessagesRange(true).build(); + TopicStatsImpl tStatus = topic.getStats(getStatsOptions); stats.msgInCounter = tStatus.msgInCounter; stats.bytesInCounter = tStatus.bytesInCounter; stats.msgOutCounter = tStatus.msgOutCounter; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java index ec1a8bfc94f5d..ab55199f611e8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java @@ -25,6 +25,7 @@ import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass; import org.apache.pulsar.broker.resourcegroup.ResourceGroupService.ResourceGroupUsageStatsType; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.client.admin.GetStatsOptions; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -547,7 +548,10 @@ private void verifyRGProdConsStats(String[] topicStrings, boolean checkConsume) throws Exception { BrokerService bs = pulsar.getBrokerService(); - Map topicStatsMap = bs.getTopicStats(false); + GetStatsOptions getStatsOptions = + GetStatsOptions.builder().getPreciseBacklog(false).subscriptionBacklogSize(false) + .getEarliestTimeInBacklog(false).getTotalNonContiguousDeletedMessagesRange(false).build(); + Map topicStatsMap = bs.getTopicStats(getStatsOptions); log.debug("verifyProdConsStats: topicStatsMap has {} entries", topicStatsMap.size()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java index 082f2a5f07fbe..4eb3f91667be1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java @@ -27,6 +27,7 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.resource.usage.ResourceUsage; +import org.apache.pulsar.client.admin.GetStatsOptions; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -210,7 +211,10 @@ private void verifyStats(String topicString, String rgName, throws InterruptedException, PulsarAdminException { BrokerService bs = pulsar.getBrokerService(); Awaitility.await().untilAsserted(() -> { - TopicStatsImpl topicStats = bs.getTopicStats(false).get(topicString); + GetStatsOptions getStatsOptions = + GetStatsOptions.builder().getPreciseBacklog(false).subscriptionBacklogSize(false) + .getEarliestTimeInBacklog(false).getTotalNonContiguousDeletedMessagesRange(false).build(); + TopicStatsImpl topicStats = bs.getTopicStats(getStatsOptions).get(topicString); Assert.assertNotNull(topicStats); if (checkProduce) { Assert.assertTrue(topicStats.bytesInCounter >= sentNumBytes); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index d6c32500d4afb..72bc85c69e91b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -77,6 +77,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider; import org.apache.pulsar.client.admin.BrokerStats; +import org.apache.pulsar.client.admin.GetStatsOptions; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.ClientBuilder; @@ -231,7 +232,10 @@ public void testBrokerServicePersistentTopicStats() throws Exception { assertNotNull(topicRef); rolloverPerIntervalStats(); - stats = topicRef.getStats(false, false, false, true); + GetStatsOptions getStatsOptions = + GetStatsOptions.builder().getPreciseBacklog(false).subscriptionBacklogSize(false) + .getEarliestTimeInBacklog(false).getTotalNonContiguousDeletedMessagesRange(true).build(); + stats = topicRef.getStats(getStatsOptions); subStats = stats.getSubscriptions().values().iterator().next(); // subscription stats @@ -252,7 +256,7 @@ public void testBrokerServicePersistentTopicStats() throws Exception { Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); rolloverPerIntervalStats(); - stats = topicRef.getStats(false, false, false, true); + stats = topicRef.getStats(getStatsOptions); subStats = stats.getSubscriptions().values().iterator().next(); // publisher stats @@ -290,7 +294,7 @@ public void testBrokerServicePersistentTopicStats() throws Exception { Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); rolloverPerIntervalStats(); - stats = topicRef.getStats(false, false, false, true); + stats = topicRef.getStats(getStatsOptions); subStats = stats.getSubscriptions().values().iterator().next(); assertEquals(stats.getOffloadedStorageSize(), 0); @@ -419,13 +423,16 @@ public void testStatsOfStorageSizeWithSubscription() throws Exception { PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); - assertEquals(topicRef.getStats(false, false, false, true).storageSize, 0); + GetStatsOptions getStatsOptions = + GetStatsOptions.builder().getPreciseBacklog(false).subscriptionBacklogSize(false) + .getEarliestTimeInBacklog(false).getTotalNonContiguousDeletedMessagesRange(true).build(); + assertEquals(topicRef.getStats(getStatsOptions).storageSize, 0); for (int i = 0; i < 10; i++) { producer.send(new byte[10]); } - assertTrue(topicRef.getStats(false, false, false, true).storageSize > 0); + assertTrue(topicRef.getStats(getStatsOptions).storageSize > 0); } @Test @@ -447,7 +454,10 @@ public void testBrokerServicePersistentRedeliverTopicStats() throws Exception { assertNotNull(topicRef); rolloverPerIntervalStats(); - stats = topicRef.getStats(false, false, false, true); + GetStatsOptions getStatsOptions = + GetStatsOptions.builder().getPreciseBacklog(false).subscriptionBacklogSize(false) + .getEarliestTimeInBacklog(false).getTotalNonContiguousDeletedMessagesRange(true).build(); + stats = topicRef.getStats(getStatsOptions); subStats = stats.getSubscriptions().values().iterator().next(); // subscription stats @@ -465,7 +475,7 @@ public void testBrokerServicePersistentRedeliverTopicStats() throws Exception { Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); rolloverPerIntervalStats(); - stats = topicRef.getStats(false, false, false, true); + stats = topicRef.getStats(getStatsOptions); subStats = stats.getSubscriptions().values().iterator().next(); // publisher stats @@ -500,7 +510,7 @@ public void testBrokerServicePersistentRedeliverTopicStats() throws Exception { Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); rolloverPerIntervalStats(); - stats = topicRef.getStats(false, false, false, true); + stats = topicRef.getStats(getStatsOptions); subStats = stats.getSubscriptions().values().iterator().next(); assertTrue(subStats.getMsgRateRedeliver() > 0.0); assertEquals(subStats.getMsgRateRedeliver(), subStats.getConsumers().get(0).getMsgRateRedeliver()); @@ -514,7 +524,7 @@ public void testBrokerServicePersistentRedeliverTopicStats() throws Exception { Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); rolloverPerIntervalStats(); - stats = topicRef.getStats(false, false, false, true); + stats = topicRef.getStats(getStatsOptions); subStats = stats.getSubscriptions().values().iterator().next(); assertEquals(subStats.getMsgBacklog(), 0); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java index 18b4003f36142..e93278db1d96f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service.nonpersistent; import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.client.admin.GetStatsOptions; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; @@ -63,7 +64,10 @@ public void testAccumulativeStats() throws Exception { NonPersistentTopic topic = (NonPersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); // stats are at zero before any activity - TopicStats stats = topic.getStats(false, false, false, true); + GetStatsOptions getStatsOptions = + GetStatsOptions.builder().getPreciseBacklog(false).subscriptionBacklogSize(false) + .getEarliestTimeInBacklog(false).getTotalNonContiguousDeletedMessagesRange(true).build(); + TopicStats stats = topic.getStats(getStatsOptions); assertEquals(stats.getBytesInCounter(), 0); assertEquals(stats.getMsgInCounter(), 0); assertEquals(stats.getBytesOutCounter(), 0); @@ -77,7 +81,7 @@ public void testAccumulativeStats() throws Exception { assertNotNull(msg); // send/receive result in non-zero stats - TopicStats statsBeforeUnsubscribe = topic.getStats(false, false, false, true); + TopicStats statsBeforeUnsubscribe = topic.getStats(getStatsOptions); assertTrue(statsBeforeUnsubscribe.getBytesInCounter() > 0); assertTrue(statsBeforeUnsubscribe.getMsgInCounter() > 0); assertTrue(statsBeforeUnsubscribe.getBytesOutCounter() > 0); @@ -90,7 +94,7 @@ public void testAccumulativeStats() throws Exception { assertEquals(topic.getProducers().size(), 0); // consumer unsubscribe/producer removal does not result in stats loss - TopicStats statsAfterUnsubscribe = topic.getStats(false, false, false, true); + TopicStats statsAfterUnsubscribe = topic.getStats(getStatsOptions); assertEquals(statsAfterUnsubscribe.getBytesInCounter(), statsBeforeUnsubscribe.getBytesInCounter()); assertEquals(statsAfterUnsubscribe.getMsgInCounter(), statsBeforeUnsubscribe.getMsgInCounter()); assertEquals(statsAfterUnsubscribe.getBytesOutCounter(), statsBeforeUnsubscribe.getBytesOutCounter()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 83bd8445cdac7..e541be4013b59 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -46,6 +46,7 @@ import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.stats.PrometheusMetricsTest; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; +import org.apache.pulsar.client.admin.GetStatsOptions; import org.apache.pulsar.client.api.*; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; @@ -209,7 +210,10 @@ public void testAccumulativeStats() throws Exception { PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); // stats are at zero before any activity - TopicStats stats = topic.getStats(false, false, false, true); + GetStatsOptions getStatsOptions = + GetStatsOptions.builder().getPreciseBacklog(false).subscriptionBacklogSize(false) + .getEarliestTimeInBacklog(false).getTotalNonContiguousDeletedMessagesRange(true).build(); + TopicStats stats = topic.getStats(getStatsOptions); assertEquals(stats.getBytesInCounter(), 0); assertEquals(stats.getMsgInCounter(), 0); assertEquals(stats.getBytesOutCounter(), 0); @@ -223,7 +227,7 @@ public void testAccumulativeStats() throws Exception { assertNotNull(msg); // send/receive result in non-zero stats - TopicStats statsBeforeUnsubscribe = topic.getStats(false, false, false, true); + TopicStats statsBeforeUnsubscribe = topic.getStats(getStatsOptions); assertTrue(statsBeforeUnsubscribe.getBytesInCounter() > 0); assertTrue(statsBeforeUnsubscribe.getMsgInCounter() > 0); assertTrue(statsBeforeUnsubscribe.getBytesOutCounter() > 0); @@ -236,7 +240,7 @@ public void testAccumulativeStats() throws Exception { assertEquals(topic.getProducers().size(), 0); // consumer unsubscribe/producer removal does not result in stats loss - TopicStats statsAfterUnsubscribe = topic.getStats(false, false, false, true); + TopicStats statsAfterUnsubscribe = topic.getStats(getStatsOptions); assertEquals(statsAfterUnsubscribe.getBytesInCounter(), statsBeforeUnsubscribe.getBytesInCounter()); assertEquals(statsAfterUnsubscribe.getMsgInCounter(), statsBeforeUnsubscribe.getMsgInCounter()); assertEquals(statsAfterUnsubscribe.getBytesOutCounter(), statsBeforeUnsubscribe.getBytesOutCounter()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 6e864a8e77d3e..a916a38388486 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -103,6 +103,7 @@ import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider; import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl; +import org.apache.pulsar.client.admin.GetStatsOptions; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -203,7 +204,10 @@ public void testTopicTransactionMetrics() throws Exception { Optional optional = pulsarService.getBrokerService().getTopic(topic, false).get(); assertTrue(optional.isPresent()); PersistentTopic persistentTopic = (PersistentTopic) optional.get(); - TopicStatsImpl stats = persistentTopic.getStats(false, false, false, true); + GetStatsOptions getStatsOptions = + GetStatsOptions.builder().getPreciseBacklog(false).subscriptionBacklogSize(false) + .getEarliestTimeInBacklog(false).getTotalNonContiguousDeletedMessagesRange(true).build(); + TopicStatsImpl stats = persistentTopic.getStats(getStatsOptions); assertEquals(stats.committedTxnCount, 1); assertEquals(stats.abortedTxnCount, 1); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java index cb12973233e7a..ff60f675a69e8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java @@ -53,6 +53,7 @@ import org.apache.pulsar.broker.service.CanPausedNamespaceService; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.admin.GetStatsOptions; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.policies.data.SubscriptionStats; @@ -545,7 +546,10 @@ public void testBlockDispatcherStats() throws Exception { assertNotNull(topicRef); rolloverPerIntervalStats(); - stats = topicRef.getStats(false, false, false, true); + GetStatsOptions getStatsOptions = + GetStatsOptions.builder().getPreciseBacklog(false).subscriptionBacklogSize(false) + .getEarliestTimeInBacklog(false).getTotalNonContiguousDeletedMessagesRange(true).build(); + stats = topicRef.getStats(getStatsOptions); subStats = stats.getSubscriptions().values().iterator().next(); // subscription stats @@ -563,7 +567,7 @@ public void testBlockDispatcherStats() throws Exception { Thread.sleep(timeWaitToSync); rolloverPerIntervalStats(); - stats = topicRef.getStats(false, false, false, true); + stats = topicRef.getStats(getStatsOptions); subStats = stats.getSubscriptions().values().iterator().next(); assertTrue(subStats.getMsgBacklog() > 0); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java index 57c38e4ab8ebd..698143d447176 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java @@ -48,6 +48,7 @@ import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; +import org.apache.pulsar.client.admin.GetStatsOptions; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; @@ -495,7 +496,10 @@ public void testTopicStats() throws Exception { assertNotNull(topicRef); rolloverPerIntervalStats(pulsar); - stats = topicRef.getStats(false, false, false, true); + GetStatsOptions getStatsOptions = + GetStatsOptions.builder().getPreciseBacklog(false).subscriptionBacklogSize(false) + .getEarliestTimeInBacklog(false).getTotalNonContiguousDeletedMessagesRange(true).build(); + stats = topicRef.getStats(getStatsOptions); subStats = stats.getSubscriptions().values().iterator().next(); // subscription stats @@ -513,7 +517,7 @@ public void testTopicStats() throws Exception { Thread.sleep(timeWaitToSync); rolloverPerIntervalStats(pulsar); - stats = topicRef.getStats(false, false, false, true); + stats = topicRef.getStats(getStatsOptions); subStats = stats.getSubscriptions().values().iterator().next(); assertTrue(subStats.getMsgRateOut() > 0); @@ -580,7 +584,10 @@ public void testReplicator() throws Exception { assertNotNull(replicatorR3); rolloverPerIntervalStats(replicationPulasr); - stats = topicRef.getStats(false, false, false, true); + GetStatsOptions getStatsOptions = + GetStatsOptions.builder().getPreciseBacklog(false).subscriptionBacklogSize(false) + .getEarliestTimeInBacklog(false).getTotalNonContiguousDeletedMessagesRange(true).build(); + stats = topicRef.getStats(getStatsOptions); subStats = stats.getSubscriptions().values().iterator().next(); // subscription stats @@ -651,7 +658,7 @@ public void testReplicator() throws Exception { Thread.sleep(timeWaitToSync); rolloverPerIntervalStats(replicationPulasr); - stats = topicRef.getStats(false, false, false, true); + stats = topicRef.getStats(getStatsOptions); subStats = stats.getSubscriptions().values().iterator().next(); assertTrue(subStats.getMsgRateOut() > 0); @@ -849,7 +856,10 @@ public void testMsgDropStat() throws Exception { NonPersistentTopic topic = (NonPersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); pulsar.getBrokerService().updateRates(); - NonPersistentTopicStats stats = topic.getStats(false, false, false, true); + GetStatsOptions getStatsOptions = + GetStatsOptions.builder().getPreciseBacklog(false).subscriptionBacklogSize(false) + .getEarliestTimeInBacklog(false).getTotalNonContiguousDeletedMessagesRange(true).build(); + NonPersistentTopicStats stats = topic.getStats(getStatsOptions); NonPersistentPublisherStats npStats = stats.getPublishers().get(0); NonPersistentSubscriptionStats sub1Stats = stats.getSubscriptions().get("subscriber-1"); NonPersistentSubscriptionStats sub2Stats = stats.getSubscriptions().get("subscriber-2"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java index a4e6cbb05f233..459bc2ef8cbb7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java @@ -43,6 +43,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.RandomUtils; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.admin.GetStatsOptions; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Consumer; @@ -158,8 +159,10 @@ public void testLargeMessage(boolean ackReceiptEnabled, boolean clientSizeMaxMes } pulsar.getBrokerService().updateRates(); - - PublisherStats producerStats = topic.getStats(false, false, false, true).getPublishers().get(0); + GetStatsOptions getStatsOptions = + GetStatsOptions.builder().getPreciseBacklog(false).subscriptionBacklogSize(false) + .getEarliestTimeInBacklog(false).getTotalNonContiguousDeletedMessagesRange(true).build(); + PublisherStats producerStats = topic.getStats(getStatsOptions).getPublishers().get(0); assertTrue(producerStats.getChunkedMessageRate() > 0); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 8fa0cf84fafb6..a5406ddc64d84 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -1182,8 +1182,14 @@ CompletableFuture getStatsAsync(String topic, boolean getPreciseBack boolean subscriptionBacklogSize, boolean getEarliestTimeInBacklog, boolean getTotalNonContiguousDeletedMessagesRange); + CompletableFuture getStatsAsync(String topic, GetStatsOptions getStatsOptions); + default CompletableFuture getStatsAsync(String topic) { - return getStatsAsync(topic, false, false, false, true); + GetStatsOptions getStatsOptions = + GetStatsOptions.builder().getPreciseBacklog(false).subscriptionBacklogSize(false) + .getEarliestTimeInBacklog(false).getTotalNonContiguousDeletedMessagesRange(true).build(); + + return getStatsAsync(topic, getStatsOptions); } /** diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 9782384b86f55..00774551a7b3e 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -590,15 +590,10 @@ public CompletableFuture> getSubscriptionsAsync(String topic) { @Override public TopicStats getStats(String topic, GetStatsOptions getStatsOptions) throws PulsarAdminException { - boolean getPreciseBacklog = getStatsOptions.isGetPreciseBacklog(); - boolean subscriptionBacklogSize = getStatsOptions.isSubscriptionBacklogSize(); - boolean getEarliestTimeInBacklog = getStatsOptions.isGetEarliestTimeInBacklog(); - boolean getTotalNonContiguousDeletedMessagesRange = - getStatsOptions.isGetTotalNonContiguousDeletedMessagesRange(); - return sync(() -> getStatsAsync(topic, getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog, - getTotalNonContiguousDeletedMessagesRange)); + return sync(() -> getStatsAsync(topic, getStatsOptions)); } + @Deprecated @Override public CompletableFuture getStatsAsync(String topic, boolean getPreciseBacklog, boolean subscriptionBacklogSize, @@ -646,6 +641,13 @@ public void failed(Throwable throwable) { return future; } + @Override + public CompletableFuture getStatsAsync(String topic, GetStatsOptions getStatsOptions) { + return getStatsAsync(topic, getStatsOptions.isGetPreciseBacklog(), getStatsOptions.isSubscriptionBacklogSize(), + getStatsOptions.isGetEarliestTimeInBacklog(), + getStatsOptions.isGetTotalNonContiguousDeletedMessagesRange()); + } + @Override public PersistentTopicInternalStats getInternalStats(String topic) throws PulsarAdminException { return getInternalStats(topic, false); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 51ef3d1524df3..a694f73f9aa44 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -56,6 +56,7 @@ import org.apache.pulsar.client.admin.BrokerStats; import org.apache.pulsar.client.admin.Brokers; import org.apache.pulsar.client.admin.Clusters; +import org.apache.pulsar.client.admin.GetStatsOptions; import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions; import org.apache.pulsar.client.admin.ListTopicsOptions; import org.apache.pulsar.client.admin.LongRunningProcessStatus; @@ -1473,7 +1474,10 @@ public void topics() throws Exception { verify(mockTopics).deleteSubscription("persistent://myprop/clust/ns1/ds1", "sub1", false); cmdTopics.run(split("stats persistent://myprop/clust/ns1/ds1")); - verify(mockTopics).getStats("persistent://myprop/clust/ns1/ds1", false, false, false); + GetStatsOptions getStatsOptions = + GetStatsOptions.builder().getPreciseBacklog(false).subscriptionBacklogSize(false) + .getEarliestTimeInBacklog(false).getTotalNonContiguousDeletedMessagesRange(true).build(); + verify(mockTopics).getStats("persistent://myprop/clust/ns1/ds1", getStatsOptions); cmdTopics.run(split("stats-internal persistent://myprop/clust/ns1/ds1")); verify(mockTopics).getInternalStats("persistent://myprop/clust/ns1/ds1", false); @@ -2084,7 +2088,10 @@ public void nonPersistentTopics() throws Exception { CmdTopics topics = new CmdTopics(() -> admin); topics.run(split("stats non-persistent://myprop/ns1/ds1")); - verify(mockTopics).getStats("non-persistent://myprop/ns1/ds1", false, false, false); + GetStatsOptions getStatsOptions = + GetStatsOptions.builder().getPreciseBacklog(false).subscriptionBacklogSize(false) + .getEarliestTimeInBacklog(false).getTotalNonContiguousDeletedMessagesRange(true).build(); + verify(mockTopics).getStats("non-persistent://myprop/ns1/ds1", getStatsOptions); topics.run(split("stats-internal non-persistent://myprop/ns1/ds1")); verify(mockTopics).getInternalStats("non-persistent://myprop/ns1/ds1", false); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 3b6f571333855..2a9ddcd2848ae 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -47,6 +47,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.Getter; +import org.apache.pulsar.client.admin.GetStatsOptions; import org.apache.pulsar.client.admin.ListTopicsOptions; import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.admin.OffloadProcessStatus; @@ -793,10 +794,21 @@ private class GetStats extends CliCommand { "--get-earliest-time-in-backlog" }, description = "Set true to get earliest time in backlog") private boolean getEarliestTimeInBacklog = false; + @Parameter(names = {"-ncm", + "--get-total-non-contiguous-deleted-messages-range"}, description = "If return the total " + + "non-continues deleted message range") + private boolean getTotalNonContiguousDeletedMessagesRange = true; + @Override void run() throws PulsarAdminException { String topic = validateTopicName(params); - print(getTopics().getStats(topic, getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog)); + GetStatsOptions getStatsOptions = + GetStatsOptions.builder().getPreciseBacklog(getPreciseBacklog) + .subscriptionBacklogSize(subscriptionBacklogSize) + .getEarliestTimeInBacklog(getEarliestTimeInBacklog) + .getTotalNonContiguousDeletedMessagesRange(getTotalNonContiguousDeletedMessagesRange) + .build(); + print(getTopics().getStats(topic, getStatsOptions)); } }