Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] allow client skips the non-continued deleted ranges stats #18263

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.GetStatsOptions;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdmin;
Expand Down Expand Up @@ -1260,9 +1261,7 @@ private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncR
}

protected CompletableFuture<? extends TopicStats> internalGetStatsAsync(boolean authoritative,
boolean getPreciseBacklog,
boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {
GetStatsOptions getStatsOptions) {
CompletableFuture<Void> future;

if (topicName.isGlobal()) {
Expand All @@ -1274,8 +1273,7 @@ protected CompletableFuture<? extends TopicStats> internalGetStatsAsync(boolean
return future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenComposeAsync(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_STATS))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> topic.asyncGetStats(getPreciseBacklog, subscriptionBacklogSize,
getEarliestTimeInBacklog));
.thenCompose(topic -> topic.asyncGetStats(getStatsOptions));
}

protected CompletableFuture<PersistentTopicInternalStats> internalGetInternalStatsAsync(boolean authoritative,
Expand Down Expand Up @@ -1410,8 +1408,7 @@ public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
}

protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative, boolean perPartition,
boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {
GetStatsOptions getStatsOptions) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
Expand All @@ -1431,22 +1428,20 @@ protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean
TopicName partition = topicName.getPartition(i);
topicStatsFutureList.add(
pulsar().getNamespaceService()
.isServiceUnitOwnedAsync(partition)
.thenCompose(owned -> {
if (owned) {
return getTopicReferenceAsync(partition)
.thenApply(ref ->
ref.getStats(getPreciseBacklog, subscriptionBacklogSize,
getEarliestTimeInBacklog));
} else {
try {
return pulsar().getAdminClient().topics().getStatsAsync(
partition.toString(), getPreciseBacklog, subscriptionBacklogSize,
getEarliestTimeInBacklog);
} catch (PulsarServerException e) {
return FutureUtil.failedFuture(e);
.isServiceUnitOwnedAsync(partition)
.thenCompose(owned -> {
if (owned) {
return getTopicReferenceAsync(partition)
.thenApply(ref ->
ref.getStats(getStatsOptions));
} else {
try {
return pulsar().getAdminClient().topics().getStatsAsync(
partition.toString(), getStatsOptions);
} catch (PulsarServerException e) {
return FutureUtil.failedFuture(e);
}
}
}
})
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.GetStatsOptions;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ResetCursorData;
import org.apache.pulsar.common.policies.data.AuthAction;
Expand Down Expand Up @@ -432,9 +433,16 @@ public void getStats(
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog) {
@QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog,
@ApiParam(value = "If return time of the earliest message in backlog")
@QueryParam("getTotalNonContiguousDeletedMessagesRange") @DefaultValue("true")
boolean getTotalNonContiguousDeletedMessagesRange) {
validateTopicName(property, cluster, namespace, encodedTopic);
internalGetStatsAsync(authoritative, getPreciseBacklog, false, false)
GetStatsOptions getStatsOptions =
GetStatsOptions.builder().getPreciseBacklog(getPreciseBacklog).subscriptionBacklogSize(false)
.getEarliestTimeInBacklog(false)
.getTotalNonContiguousDeletedMessagesRange(getTotalNonContiguousDeletedMessagesRange).build();
internalGetStatsAsync(authoritative, getStatsOptions)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
Expand Down Expand Up @@ -501,7 +509,11 @@ public void getPartitionedStats(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalGetPartitionedStats(asyncResponse, authoritative, perPartition, false, false, false);
GetStatsOptions getStatsOptions =
GetStatsOptions.builder().getPreciseBacklog(false).subscriptionBacklogSize(false)
.getEarliestTimeInBacklog(false).getTotalNonContiguousDeletedMessagesRange(true).build();

internalGetPartitionedStats(asyncResponse, authoritative, perPartition, getStatsOptions);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,10 @@ public void getPartitionedStats(
+ "not to use when there's heavy traffic.")
@QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize,
@ApiParam(value = "If return the earliest time in backlog")
@QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog) {
@QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog,
@ApiParam(value = "If return the total non-continues deleted message range")
@QueryParam("getTotalNonContiguousDeletedMessagesRange") @DefaultValue("true")
boolean getTotalNonContiguousDeletedMessagesRange) {
try {
validatePartitionedTopicName(tenant, namespace, encodedTopic);
if (topicName.isGlobal()) {
Expand All @@ -238,7 +241,8 @@ public void getPartitionedStats(
topicStatsFutureList
.add(pulsar().getAdminClient().topics().getStatsAsync(
(topicName.getPartition(i).toString()), getPreciseBacklog,
subscriptionBacklogSize, getEarliestTimeInBacklog));
subscriptionBacklogSize, getEarliestTimeInBacklog,
getTotalNonContiguousDeletedMessagesRange));
} catch (PulsarServerException e) {
asyncResponse.resume(new RestException(e));
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.GetStatsOptions;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
Expand Down Expand Up @@ -1212,9 +1213,17 @@ public void getStats(
+ "not to use when there's heavy traffic.")
@QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize,
@ApiParam(value = "If return time of the earliest message in backlog")
@QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog) {
@QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog,
@ApiParam(value = "If return the total non-continues deleted message range")
@QueryParam("getTotalNonContiguousDeletedMessagesRange") @DefaultValue("true")
boolean getTotalNonContiguousDeletedMessagesRange) {
validateTopicName(tenant, namespace, encodedTopic);
internalGetStatsAsync(authoritative, getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog)
GetStatsOptions getStatsOptions =
GetStatsOptions.builder().getPreciseBacklog(getPreciseBacklog)
.subscriptionBacklogSize(subscriptionBacklogSize)
.getEarliestTimeInBacklog(getEarliestTimeInBacklog)
.getTotalNonContiguousDeletedMessagesRange(getTotalNonContiguousDeletedMessagesRange).build();
internalGetStatsAsync(authoritative, getStatsOptions)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
Expand Down Expand Up @@ -1314,11 +1323,19 @@ public void getPartitionedStats(
+ "not to use when there's heavy traffic.")
@QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize,
@ApiParam(value = "If return the earliest time in backlog")
@QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog) {
@QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean getEarliestTimeInBacklog,
@ApiParam(value = "If return the total non-continues deleted message range")
@QueryParam("getTotalNonContiguousDeletedMessagesRange") @DefaultValue("true")
boolean getTotalNonContiguousDeletedMessagesRange) {
try {
validatePartitionedTopicName(tenant, namespace, encodedTopic);
internalGetPartitionedStats(asyncResponse, authoritative, perPartition, getPreciseBacklog,
subscriptionBacklogSize, getEarliestTimeInBacklog);
GetStatsOptions getStatsOptions =
GetStatsOptions.builder().getPreciseBacklog(getPreciseBacklog)
.subscriptionBacklogSize(subscriptionBacklogSize)
.getEarliestTimeInBacklog(getEarliestTimeInBacklog)
.getTotalNonContiguousDeletedMessagesRange(getTotalNonContiguousDeletedMessagesRange)
.build();
internalGetPartitionedStats(asyncResponse, authoritative, perPartition, getStatsOptions);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupRefTypes;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.client.admin.GetStatsOptions;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -543,7 +544,10 @@ protected static Summary.Child.Value getRgQuotaCalculationTime() {
protected void aggregateResourceGroupLocalUsages() {
final Summary.Timer aggrUsageTimer = rgUsageAggregationLatency.startTimer();
BrokerService bs = this.pulsar.getBrokerService();
Map<String, TopicStatsImpl> topicStatsMap = bs.getTopicStats();
GetStatsOptions getStatsOptions =
GetStatsOptions.builder().getPreciseBacklog(false).subscriptionBacklogSize(false)
.getEarliestTimeInBacklog(false).getTotalNonContiguousDeletedMessagesRange(false).build();
Map<String, TopicStatsImpl> topicStatsMap = bs.getTopicStats(getStatsOptions);

for (Map.Entry<String, TopicStatsImpl> entry : topicStatsMap.entrySet()) {
final String topicName = entry.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.broker.validator.BindAddressValidator;
import org.apache.pulsar.client.admin.GetStatsOptions;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.ClientBuilder;
Expand Down Expand Up @@ -474,8 +475,11 @@ public Map<String, TopicStatsImpl> getTopicStats(NamespaceBundle bundle) {

Map<String, TopicStatsImpl> topicStatsMap = new HashMap<>();
topicMap.forEach((name, topic) -> {
GetStatsOptions getStatsOptions =
GetStatsOptions.builder().getPreciseBacklog(false).subscriptionBacklogSize(false)
.getEarliestTimeInBacklog(false).getTotalNonContiguousDeletedMessagesRange(true).build();
topicStatsMap.put(name,
topic.getStats(false, false, false));
topic.getStats(getStatsOptions));
});
return topicStatsMap;
}
Expand Down Expand Up @@ -2342,11 +2346,9 @@ public String generateUniqueProducerName() {
return producerNameGenerator.getNextId();
}

public Map<String, TopicStatsImpl> getTopicStats() {
public Map<String, TopicStatsImpl> getTopicStats(GetStatsOptions getStatsOptions) {
HashMap<String, TopicStatsImpl> stats = new HashMap<>();

forEachTopic(topic -> stats.put(topic.getName(), topic.getStats(false, false, false)));

forEachTopic(topic -> stats.put(topic.getName(), topic.getStats(getStatsOptions)));
return stats;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.admin.GetStatsOptions;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
Expand Down Expand Up @@ -265,12 +266,9 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats

ConcurrentOpenHashMap<String, ? extends Replicator> getShadowReplicators();

TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog);
TopicStatsImpl getStats(GetStatsOptions getStatsOptions);

CompletableFuture<? extends TopicStatsImpl> asyncGetStats(boolean getPreciseBacklog,
boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog);
CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions getStatsOptions);

CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean includeLedgerMetadata);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.admin.GetStatsOptions;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
Expand Down Expand Up @@ -845,20 +846,17 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
}

@Override
public NonPersistentTopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {
public NonPersistentTopicStatsImpl getStats(GetStatsOptions getStatsOptions) {
try {
return asyncGetStats(getPreciseBacklog, subscriptionBacklogSize, getPreciseBacklog).get();
return asyncGetStats(getStatsOptions).get();
} catch (InterruptedException | ExecutionException e) {
log.error("[{}] Fail to get stats", topic, e);
return null;
}
}

@Override
public CompletableFuture<NonPersistentTopicStatsImpl> asyncGetStats(boolean getPreciseBacklog,
boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {
public CompletableFuture<NonPersistentTopicStatsImpl> asyncGetStats(GetStatsOptions getStatsOptions) {
CompletableFuture<NonPersistentTopicStatsImpl> future = new CompletableFuture<>();
NonPersistentTopicStatsImpl stats = new NonPersistentTopicStatsImpl();

Expand Down
Loading
Loading