Skip to content
This repository has been archived by the owner on May 14, 2022. It is now read-only.

Commit

Permalink
Improve performance of the admission controller and token bucket (#1243)
Browse files Browse the repository at this point in the history
* Improve performance of the admission controller and token bucket

* Bug fixes
  • Loading branch information
tbak authored Apr 1, 2022
1 parent b536edf commit 2e5c06a
Show file tree
Hide file tree
Showing 23 changed files with 254 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.base.Preconditions;
import com.netflix.titus.common.util.limiter.Limiters;
import com.netflix.titus.common.util.limiter.tokenbucket.TokenBucket;
import com.netflix.titus.common.util.time.Clocks;

/**
* Supplementary functions to work with the token bucket policies.
Expand All @@ -47,7 +48,8 @@ public static TokenBucket newTokenBucket(String name, TokenBucketPolicy policy)
policy.getInitialNumberOfTokens(),
refillPolicy.getNumberOfTokensPerInterval(),
refillPolicy.getIntervalMs(),
TimeUnit.MILLISECONDS
TimeUnit.MILLISECONDS,
Clocks.system()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.google.protobuf.GeneratedMessageV3;
import com.netflix.titus.api.jobmanager.model.job.LogStorageInfo;
import com.netflix.titus.api.jobmanager.service.V3JobOperations;
import com.netflix.titus.api.supervisor.service.LeaderActivator;
import com.netflix.titus.common.runtime.TitusRuntime;
import com.netflix.titus.common.util.Evaluators;
import com.netflix.titus.common.util.guice.annotation.Activator;
Expand All @@ -32,13 +33,16 @@ public class DefaultGrpcObjectsCache implements GrpcObjectsCache {
private final V3JobOperations jobOperations;
private final GrpcObjectsCacheConfiguration configuration;
private final LogStorageInfo<com.netflix.titus.api.jobmanager.model.job.Task> logStorageInfo;
private final LeaderActivator leaderActivator;
private final TitusRuntime titusRuntime;

@Inject
public DefaultGrpcObjectsCache(V3JobOperations jobOperations,
GrpcObjectsCacheConfiguration configuration,
LogStorageInfo<com.netflix.titus.api.jobmanager.model.job.Task> logStorageInfo,
LeaderActivator leaderActivator,
TitusRuntime titusRuntime) {
this.leaderActivator = leaderActivator;
this.titusRuntime = titusRuntime;
this.configuration = configuration;
this.logStorageInfo = logStorageInfo;
Expand All @@ -58,6 +62,10 @@ public void activate() {
return job;
},
() -> {
// FIXME DefaultGrpcObjectsCache activation happens immediately in DefaultJobManagementServiceGrpc
if (!leaderActivator.isActivated()) {
return job -> true;
}
List<com.netflix.titus.api.jobmanager.model.job.Job> allJobs = jobOperations.getJobs();
Set<String> knownJobIds = new HashSet<>();
allJobs.forEach(job -> knownJobIds.add(job.getId()));
Expand All @@ -77,6 +85,10 @@ public void activate() {
return task;
},
() -> {
// FIXME DefaultGrpcObjectsCache activation happens immediately in DefaultJobManagementServiceGrpc
if (!leaderActivator.isActivated()) {
return job -> true;
}
List<com.netflix.titus.api.jobmanager.model.job.Task> allTasks = jobOperations.getTasks();
Set<String> knownTasksIds = new HashSet<>();
allTasks.forEach(task -> knownTasksIds.add(task.getId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.netflix.titus.api.jobmanager.model.job.LogStorageInfo;
import com.netflix.titus.api.jobmanager.model.job.Task;
import com.netflix.titus.api.jobmanager.service.V3JobOperations;
import com.netflix.titus.api.supervisor.service.LeaderActivator;
import com.netflix.titus.common.runtime.TitusRuntimes;
import com.netflix.titus.common.util.Evaluators;
import com.netflix.titus.common.util.tuple.Pair;
Expand All @@ -29,18 +30,23 @@ public class DefaultGrpcObjectsCacheTest {

private final V3JobOperations v3JobOperations = mock(V3JobOperations.class);

private final LeaderActivator leaderActivator = mock(LeaderActivator.class);

private final LogStorageInfo<com.netflix.titus.api.jobmanager.model.job.Task> logStorageInfo = EmptyLogStorageInfo.empty();

private DefaultGrpcObjectsCache grpcObjectsCache;

private final SomeJobs someJobs = new SomeJobs();


@Before
public void setUp() {
when(configuration.getCacheRefreshInitialDelayMs()).thenReturn(1L);
when(configuration.getCacheRefreshIntervalMs()).thenReturn(1L);
when(configuration.getCacheCleanupDelayMs()).thenReturn(1L);

when(leaderActivator.isActivated()).thenReturn(true);

someJobs.addJob(2);
someJobs.addJob(5);
someJobs.addJob(10);
Expand All @@ -51,7 +57,7 @@ public void setUp() {

private void setupGrpcObjectCache(boolean isEnabled) {
when(configuration.isGrpcObjectsCacheEnabled()).thenReturn(isEnabled);
grpcObjectsCache = new DefaultGrpcObjectsCache(v3JobOperations, configuration, logStorageInfo, TitusRuntimes.internal());
grpcObjectsCache = new DefaultGrpcObjectsCache(v3JobOperations, configuration, logStorageInfo, leaderActivator, TitusRuntimes.internal());
grpcObjectsCache.activate();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class LoggingCodeInvariants extends CodeInvariants {

private static final CodeInvariants INSTANCE = new LoggingCodeInvariants(
Limiters.createFixedIntervalTokenBucket(
"invariants", 1000, 1000, 100, 1, TimeUnit.SECONDS
"invariants", 1000, 1000, 100, 1, TimeUnit.SECONDS, Clocks.system()
),
Clocks.system()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import com.google.common.base.Stopwatch;
import com.netflix.titus.common.runtime.TitusRuntime;
import com.netflix.titus.common.util.limiter.tokenbucket.FixedIntervalTokenBucketConfiguration;
import com.netflix.titus.common.util.limiter.tokenbucket.RefillStrategy;
Expand All @@ -30,6 +29,7 @@
import com.netflix.titus.common.util.limiter.tokenbucket.internal.FixedIntervalRefillStrategy;
import com.netflix.titus.common.util.limiter.tokenbucket.internal.FixedIntervalTokenBucketSupplier;
import com.netflix.titus.common.util.limiter.tokenbucket.internal.SpectatorTokenBucketDecorator;
import com.netflix.titus.common.util.time.Clock;

public class Limiters {

Expand All @@ -47,9 +47,9 @@ public static TokenBucket unlimited(String name) {
* Create a {@link TokenBucket} with a fixed interval {@link RefillStrategy}.
*/
public static TokenBucket createFixedIntervalTokenBucket(String name, long capacity, long initialNumberOfTokens,
long numberOfTokensPerInterval, long interval, TimeUnit unit) {
RefillStrategy refillStrategy = new FixedIntervalRefillStrategy(Stopwatch.createStarted(),
numberOfTokensPerInterval, interval, unit);
long numberOfTokensPerInterval, long interval, TimeUnit unit,
Clock clock) {
RefillStrategy refillStrategy = new FixedIntervalRefillStrategy(numberOfTokensPerInterval, interval, unit, clock);
return new DefaultTokenBucket(name, capacity, refillStrategy, initialNumberOfTokens);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,29 @@ public boolean tryTake(long numberOfTokens) {
Preconditions.checkArgument(numberOfTokens > 0, "Number of tokens must be greater than 0.");
Preconditions.checkArgument(numberOfTokens <= capacity, "Number of tokens must not be greater than the capacity.");

synchronized (mutex) {
refill(refillStrategy.refill());
// Quick path if no refill.
if (refillStrategy.getTimeUntilNextRefill(TimeUnit.MILLISECONDS) > 0) {
if (numberOfTokens > this.numberOfTokens) {
return false;
}
synchronized (mutex) {
if (numberOfTokens <= this.numberOfTokens) {
this.numberOfTokens -= numberOfTokens;
return true;
}
}
return false;
}

synchronized (mutex) {
long refill = refillStrategy.refill();
if (refill > 0) {
refillInternal(refill);
}
if (numberOfTokens <= this.numberOfTokens) {
this.numberOfTokens -= numberOfTokens;
return true;
}

return false;
}
}
Expand Down Expand Up @@ -110,10 +125,14 @@ public void take(long numberOfTokens) {
@Override
public void refill(long numberOfTokens) {
synchronized (mutex) {
this.numberOfTokens = Math.min(capacity, Math.max(0, this.numberOfTokens + numberOfTokens));
refillInternal(numberOfTokens);
}
}

private void refillInternal(long numberOfTokens) {
this.numberOfTokens = Math.min(capacity, Math.max(0, this.numberOfTokens + numberOfTokens));
}

@Override
public RefillStrategy getRefillStrategy() {
return refillStrategy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,53 +18,59 @@

import java.util.concurrent.TimeUnit;

import com.google.common.base.Stopwatch;
import com.netflix.titus.common.util.DateTimeExt;
import com.netflix.titus.common.util.limiter.tokenbucket.RefillStrategy;
import com.netflix.titus.common.util.time.Clock;

public class FixedIntervalRefillStrategy implements RefillStrategy {

private final Object mutex = new Object();

private final Stopwatch stopwatch;
private final long numberOfTokensPerInterval;
private final long intervalInNanos;
private final String toStringValue;
private long lastRefillTime;
private long nextRefillTime;
private long lastRefillTimeNano;

/**
* Make it volatile, so we do not have to sync for reading.
*/
private volatile long nextRefillTimeNano;

private final Clock clock;

public FixedIntervalRefillStrategy(Stopwatch stopwatch, long numberOfTokensPerInterval, long interval, TimeUnit unit) {
this.stopwatch = stopwatch;
public FixedIntervalRefillStrategy(long numberOfTokensPerInterval, long interval, TimeUnit unit, Clock clock) {
this.numberOfTokensPerInterval = numberOfTokensPerInterval;
this.clock = clock;
this.intervalInNanos = unit.toNanos(interval);
this.toStringValue = "FixedIntervalRefillStrategy{refillRate=" + DateTimeExt.toRateString(interval, numberOfTokensPerInterval, unit, "refill") + '}';

this.lastRefillTime = -intervalInNanos;
this.nextRefillTime = -intervalInNanos;

if (!this.stopwatch.isRunning()) {
this.stopwatch.start();
}
long nowNano = clock.nanoTime();
this.lastRefillTimeNano = nowNano - intervalInNanos;
this.nextRefillTimeNano = nowNano - intervalInNanos;
}

@Override
public long refill() {
long nowNano = clock.nanoTime();
if (nowNano < nextRefillTimeNano) {
return 0;
}
synchronized (mutex) {
long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
if (elapsed < nextRefillTime) {
if (nowNano < nextRefillTimeNano) {
return 0;
}
long numberOfIntervals = Math.max(0, (elapsed - lastRefillTime) / intervalInNanos);
lastRefillTime += numberOfIntervals * intervalInNanos;
nextRefillTime = lastRefillTime + intervalInNanos;
long numberOfIntervals = Math.max(0, (nowNano - lastRefillTimeNano) / intervalInNanos);
lastRefillTimeNano += numberOfIntervals * intervalInNanos;
nextRefillTimeNano = lastRefillTimeNano + intervalInNanos;

return numberOfIntervals * numberOfTokensPerInterval;
}
}

@Override
public long getTimeUntilNextRefill(TimeUnit unit) {
long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
return unit.convert(Math.max(0, nextRefillTime - elapsed), TimeUnit.NANOSECONDS);
long nowNano = clock.nanoTime();
return unit.convert(Math.max(0, nextRefillTimeNano - nowNano), TimeUnit.NANOSECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
import java.util.function.Consumer;
import java.util.function.Supplier;

import com.google.common.base.Stopwatch;
import com.netflix.titus.common.runtime.TitusRuntime;
import com.netflix.titus.common.util.Evaluators;
import com.netflix.titus.common.util.ExceptionExt;
import com.netflix.titus.common.util.limiter.tokenbucket.FixedIntervalTokenBucketConfiguration;
import com.netflix.titus.common.util.limiter.tokenbucket.RefillStrategy;
import com.netflix.titus.common.util.limiter.tokenbucket.TokenBucket;
import com.netflix.titus.common.util.time.Clocks;

/**
* {@link TokenBucket} supplier which recreates a token bucket if any of its configurable parameters changes.
Expand Down Expand Up @@ -102,13 +102,13 @@ private ActiveConfiguration(FixedIntervalTokenBucketConfiguration configuration)
this.numberOfTokensPerInterval = configuration.getNumberOfTokensPerInterval();

RefillStrategy baseRefillStrategy = new FixedIntervalRefillStrategy(
Stopwatch.createStarted(),
numberOfTokensPerInterval,
intervalMs, TimeUnit.MILLISECONDS
intervalMs, TimeUnit.MILLISECONDS,
titusRuntime.map(TitusRuntime::getClock).orElse(Clocks.system())
);

this.refillStrategy = titusRuntime.map(runtime ->
(RefillStrategy) new SpectatorRefillStrategyDecorator(name, baseRefillStrategy, runtime))
(RefillStrategy) new SpectatorRefillStrategyDecorator(name, baseRefillStrategy, runtime))
.orElse(baseRefillStrategy);

this.tokenBucket = new DefaultTokenBucket(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ public class AdmissionControllerRequest {

private final String endpointName;
private final String callerId;
private final int hash;

private AdmissionControllerRequest(String endpointName, String callerId) {
this.endpointName = endpointName;
this.callerId = callerId;
this.hash = Objects.hash(endpointName, callerId);
}

public String getEndpointName() {
Expand All @@ -51,7 +53,7 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(endpointName, callerId);
return hash;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,18 @@ public static AdaptiveAdmissionController spectator(AdaptiveAdmissionController
return new SpectatorAdaptiveAdmissionController(delegate, titusRuntime);
}

public static AdmissionController tokenBucketsFromArchaius(Config config, TitusRuntime titusRuntime) {
return tokenBucketsFromArchaius(config, noBackoff(), titusRuntime);
public static AdmissionController tokenBucketsFromArchaius(Config config, boolean includeDetailsInResponse, TitusRuntime titusRuntime) {
return tokenBucketsFromArchaius(config, noBackoff(), includeDetailsInResponse, titusRuntime);
}

public static AdaptiveAdmissionController tokenBucketsFromArchaius(Config config,
AdmissionBackoffStrategy backoffStrategy,
boolean includeDetailsInResponse,
TitusRuntime titusRuntime) {
return new ConfigurableTokenBucketAdmissionController(
new ArchaiusTokenBucketAdmissionConfigurationParser(config),
backoffStrategy,
includeDetailsInResponse,
titusRuntime
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,11 @@ public class ConfigurableTokenBucketAdmissionController implements AdaptiveAdmis

public ConfigurableTokenBucketAdmissionController(Supplier<List<TokenBucketConfiguration>> configurationSupplier,
AdmissionBackoffStrategy admissionBackoffStrategy,
boolean includeDetailsInResponse,
TitusRuntime titusRuntime) {
this(configurationSupplier,
tokenBucketConfigurations -> new TokenBucketAdmissionController(tokenBucketConfigurations,
admissionBackoffStrategy, titusRuntime),
admissionBackoffStrategy, includeDetailsInResponse, titusRuntime),
SCHEDULE_DESCRIPTOR,
titusRuntime
);
Expand Down
Loading

0 comments on commit 2e5c06a

Please sign in to comment.