Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server] Fixed storage node read quota usage ratio spikes #1256

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testGrpcServerReadQuota() throws Exception {
for (int i = 0; i < veniceCluster.getVeniceServers().size(); i++) {
serverMetrics.add(veniceCluster.getVeniceServers().get(i).getMetricsRepository());
}
String readQuotaRequestedString = "." + storeName + "--quota_request.Rate";
String readQuotaRequestedString = "." + storeName + "--current_quota_request.Gauge";
String readQuotaRejectedString = "." + storeName + "--quota_rejected_request.Rate";
String readQuotaAllowedUnintentionally = "." + storeName + "--quota_unintentionally_allowed_key_count.Count";
String readQuotaUsageRatio = "." + storeName + "--quota_requested_usage_ratio.Gauge";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ public void testServerReadQuota() throws Exception {
for (int i = 0; i < veniceCluster.getVeniceServers().size(); i++) {
serverMetrics.add(veniceCluster.getVeniceServers().get(i).getMetricsRepository());
}
String readQuotaRequestedQPSString = "." + storeName + "--quota_request.Rate";
String readQuotaRequestedQPSString = "." + storeName + "--current_quota_request.Gauge";
String readQuotaRejectedQPSString = "." + storeName + "--quota_rejected_request.Rate";
String readQuotaRequestedKPSString = "." + storeName + "--quota_request_key_count.Rate";
String readQuotaRequestedKPSString = "." + storeName + "--current_quota_request_key_count.Gauge";
String readQuotaRejectedKPSString = "." + storeName + "--quota_rejected_key_count.Rate";
String readQuotaAllowedUnintentionally = "." + storeName + "--quota_unintentionally_allowed_key_count.Count";
String readQuotaUsageRatio = "." + storeName + "--quota_requested_usage_ratio.Gauge";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,10 @@ public QuotaEnforcementResult enforceQuota(RouterRequest request) {
* First check per store version level quota; don't throttle retried request at store version level
*/
VeniceRateLimiter veniceRateLimiter = storeVersionRateLimiters.get(request.getResourceName());
int version = Version.parseVersionFromKafkaTopicName(request.getResourceName());
if (veniceRateLimiter != null) {
if (!request.isRetryRequest() && !veniceRateLimiter.tryAcquirePermit(readCapacityUnits)) {
stats.recordRejected(request.getStoreName(), readCapacityUnits);
stats.recordRejected(request.getStoreName(), version, readCapacityUnits);
return QuotaEnforcementResult.REJECTED;
}
} else {
Expand All @@ -245,7 +246,7 @@ public QuotaEnforcementResult enforceQuota(RouterRequest request) {
return QuotaEnforcementResult.OVER_CAPACITY;
}

stats.recordAllowed(storeName, readCapacityUnits);
stats.recordAllowed(storeName, version, readCapacityUnits);
return QuotaEnforcementResult.ALLOWED;
}

Expand Down Expand Up @@ -312,6 +313,8 @@ private void updateQuota(PartitionAssignment partitionAssignment) {
"Routing data changed on quota enforcement handler with 0 replicas assigned to this node, removing quota for resource: {}",
topic);
storeVersionRateLimiters.remove(topic);
stats.getStoreStats(Version.parseStoreFromKafkaTopicName(topic))
.removeVersion(Version.parseVersionFromKafkaTopicName(topic));
return;
}
String storeName = Version.parseStoreFromKafkaTopicName(topic);
Expand All @@ -328,7 +331,10 @@ private void updateQuota(PartitionAssignment partitionAssignment) {
clock);

if (rateLimiter != v) {
stats.setNodeQuotaResponsibility(storeName, (long) Math.ceil(quotaInRcu * thisNodeQuotaResponsibility));
stats.setNodeQuotaResponsibility(
storeName,
Version.parseVersionFromKafkaTopicName(topic),
(long) Math.ceil(quotaInRcu * thisNodeQuotaResponsibility));
}
return rateLimiter;
});
Expand Down Expand Up @@ -379,6 +385,8 @@ public void onPartitionStatusChange(String topic, ReadOnlyPartitionStatus partit
@Override
public void onRoutingDataDeleted(String kafkaTopic) {
storeVersionRateLimiters.remove(kafkaTopic);
stats.getStoreStats(Version.parseStoreFromKafkaTopicName(kafkaTopic))
.removeVersion(Version.parseVersionFromKafkaTopicName(kafkaTopic));
}

@Override
Expand Down Expand Up @@ -413,7 +421,7 @@ public void handleStoreChanged(Store store) {
* During a new push, a new version is added to the version list of the Store metadata before the push actually
* starts, so this function (ReadQuotaEnforcementHandler#handleStoreChanged()) is invoked before the new
* resource assignment shows up in the external view, so calling customizedViewRepository.getPartitionAssignments() for
* a the future version will fail in most cases, because the new topic is not in the external view at all.
* a future version will fail in most cases, because the new topic is not in the external view at all.
*
*/
this.onCustomizedViewChange(customizedViewRepository.getPartitionAssignments(topic));
Expand Down Expand Up @@ -452,6 +460,7 @@ public void handleStoreChanged(Store store) {
}
}
removeTopics(toBeRemovedTopics);
stats.setCurrentVersion(store.getName(), store.getCurrentVersion());
}

private Set<String> getStoreTopics(String storeName) {
Expand All @@ -465,6 +474,8 @@ private void removeTopics(Set<String> topicsToRemove) {
for (String topic: topicsToRemove) {
customizedViewRepository.unSubscribeRoutingDataChange(topic, this);
storeVersionRateLimiters.remove(topic);
stats.getStoreStats(Version.parseStoreFromKafkaTopicName(topic))
.removeVersion(Version.parseVersionFromKafkaTopicName(topic));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,34 @@
* {@code AggServerQuotaUsageStats} is the aggregate statistics for {@code ServerQuotaUsageStats} which
* measures requests and quota rejections of each store.
*/
public class AggServerQuotaUsageStats extends AbstractVeniceAggStats<ServerQuotaUsageStats> {
public class AggServerQuotaUsageStats extends AbstractVeniceAggStats<ServerReadQuotaUsageStats> {
private static final int SINGLE_VERSION_FOR_TOTAL_STATS = 1;

public AggServerQuotaUsageStats(MetricsRepository metricsRepository) {
super(metricsRepository, (metrics, storeName) -> new ServerQuotaUsageStats(metrics, storeName));
super(metricsRepository, (metrics, storeName) -> new ServerReadQuotaUsageStats(metrics, storeName));
totalStats.setCurrentVersion(SINGLE_VERSION_FOR_TOTAL_STATS);
}

public void recordAllowed(String storeName, long rcu) {
totalStats.recordAllowed(rcu);
getStoreStats(storeName).recordAllowed(rcu);
public void recordAllowed(String storeName, int version, long rcu) {
totalStats.recordAllowed(SINGLE_VERSION_FOR_TOTAL_STATS, rcu);
getStoreStats(storeName).recordAllowed(version, rcu);
}

public void recordRejected(String storeName, long rcu) {
totalStats.recordRejected(rcu);
getStoreStats(storeName).recordRejected(rcu);
public void recordRejected(String storeName, int version, long rcu) {
totalStats.recordRejected(SINGLE_VERSION_FOR_TOTAL_STATS, rcu);
getStoreStats(storeName).recordRejected(version, rcu);
}

public void recordAllowedUnintentionally(String storeName, long rcu) {
totalStats.recordAllowedUnintentionally(rcu);
getStoreStats(storeName).recordAllowedUnintentionally(rcu);
}

public void setNodeQuotaResponsibility(String storeName, long nodeQpsResponsibility) {
getStoreStats(storeName).setNodeQuotaResponsibility(nodeQpsResponsibility);
public void setNodeQuotaResponsibility(String storeName, int version, long nodeKpsResponsibility) {
getStoreStats(storeName).setNodeQuotaResponsibility(version, nodeKpsResponsibility);
}

public void setCurrentVersion(String storeName, int version) {
getStoreStats(storeName).setCurrentVersion(version);
}
}

This file was deleted.

Loading
Loading