Skip to content

Commit

Permalink
Make warmup to respect dual read mode, and separate warmup configs fo…
Browse files Browse the repository at this point in the history
…r indis (#962)

* Make warm-up to respect dual read mode, and separate warmup configs for indis.

* put fetching service data to separate tasks, and adjust log msg

* warm up services filtered by dual read mode only

* add log msg for fetching dual read mode

* wait for xDS warmup under observer-only mode

* add prepareWarmup method

* add unit tests

* not to block the thread calling start callback

* add a comment

* update version

* update tests to ms level

* minor cleanup

* reduce the ms further

* loosen the ms wait

* loosen the ms wait for success scenario

* further loosen the ms wait for stableness
  • Loading branch information
bohhyang authored Jan 5, 2024
1 parent 0bed54f commit 146e1f2
Show file tree
Hide file tree
Showing 14 changed files with 503 additions and 126 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ When updating the changelog, remember to be very clear about what behavior has c
and what APIs have changed, if applicable.

## [Unreleased]

## [29.49.4] - 2024-01-04
- Make warm-up to respect dual read mode, and separate warmup configs for indis.

## [29.49.3] - 2024-01-03
- Fix rate limiter for dual-read mode switch

Expand Down Expand Up @@ -5602,7 +5606,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.49.3...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.49.4...master
[29.49.4]: https://github.com/linkedin/rest.li/compare/v29.49.3...v29.49.4
[29.49.3]: https://github.com/linkedin/rest.li/compare/v29.49.2...v29.49.3
[29.49.2]: https://github.com/linkedin/rest.li/compare/v29.49.1...v29.49.2
[29.49.1]: https://github.com/linkedin/rest.li/compare/v29.49.0...v29.49.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,23 @@ public class TestLoadBalancer implements LoadBalancerWithFacilities, WarmUpServi

private final AtomicInteger _requestCount = new AtomicInteger();
private final AtomicInteger _completedRequestCount = new AtomicInteger();
private int _delayMs = 0;
private final int DELAY_STANDARD_DEVIATION = 10; //ms
private int _warmUpDelayMs = 0;
private int _serviceDataDelayMs = 0;

private final int DELAY_STANDARD_DEVIATION = 5; //ms
private final ScheduledExecutorService _executorService = Executors.newSingleThreadScheduledExecutor();

public TestLoadBalancer() {}

public TestLoadBalancer(int delayMs)
public TestLoadBalancer(int warmUpDelayMs)
{
_delayMs = delayMs;
this(warmUpDelayMs, 0);
}

public TestLoadBalancer(int warmUpDelayMs, int serviceDataDelayMs)
{
_warmUpDelayMs = warmUpDelayMs;
_serviceDataDelayMs = serviceDataDelayMs;
}

@Override
Expand All @@ -67,14 +75,15 @@ public void getClient(Request request, RequestContext requestContext, Callback<T
@Override
public void warmUpService(String serviceName, Callback<None> callback)
{
double g = Math.min(1.0, Math.max(-1.0, new Random().nextGaussian()));
int actualDelay = Math.max(0,
_warmUpDelayMs + ((int) g * DELAY_STANDARD_DEVIATION)); // +/- DELAY_STANDARD_DEVIATION ms
_requestCount.incrementAndGet();
_executorService.schedule(() ->
{
_completedRequestCount.incrementAndGet();
callback.onSuccess(None.none());
}, Math.max(0, _delayMs
// any kind of random delay works for the test
+ ((int) new Random().nextGaussian() * DELAY_STANDARD_DEVIATION)), TimeUnit.MILLISECONDS);
}, actualDelay, TimeUnit.MILLISECONDS);
}

@Override
Expand All @@ -92,6 +101,14 @@ public void shutdown(PropertyEventShutdownCallback shutdown)
@Override
public void getLoadBalancedServiceProperties(String serviceName, Callback<ServiceProperties> clientCallback)
{
if (_serviceDataDelayMs > 0)
{
try {
Thread.sleep(_serviceDataDelayMs);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
clientCallback.onSuccess(new ServiceProperties(serviceName, "clustername", "/foo", Arrays.asList("rr")));
}

Expand Down
23 changes: 20 additions & 3 deletions d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ public D2Client build()
_config.retryAggregatedIntervalNum,
_config.warmUp,
_config.warmUpTimeoutSeconds,
_config.indisWarmUpTimeoutSeconds,
_config.warmUpConcurrentRequests,
_config.indisWarmUpConcurrentRequests,
_config.downstreamServicesFetcher,
_config.indisDownstreamServicesFetcher,
_config.backupRequestsEnabled,
Expand Down Expand Up @@ -526,21 +528,36 @@ public D2ClientBuilder setWarmUp(boolean warmUp){
return this;
}

public D2ClientBuilder setWarmUpTimeoutSeconds(int warmUpTimeoutSeconds){
public D2ClientBuilder setWarmUpTimeoutSeconds(int warmUpTimeoutSeconds)
{
_config.warmUpTimeoutSeconds = warmUpTimeoutSeconds;
return this;
}

public D2ClientBuilder setZookeeperReadWindowMs(int zookeeperReadWindowMs){
public D2ClientBuilder setIndisWarmUpTimeoutSeconds(int indisWarmUpTimeoutSeconds)
{
_config.indisWarmUpTimeoutSeconds = indisWarmUpTimeoutSeconds;
return this;
}

public D2ClientBuilder setZookeeperReadWindowMs(int zookeeperReadWindowMs)
{
_config.zookeeperReadWindowMs = zookeeperReadWindowMs;
return this;
}

public D2ClientBuilder setWarmUpConcurrentRequests(int warmUpConcurrentRequests){
public D2ClientBuilder setWarmUpConcurrentRequests(int warmUpConcurrentRequests)
{
_config.warmUpConcurrentRequests = warmUpConcurrentRequests;
return this;
}

public D2ClientBuilder setIndisWarmUpConcurrentRequests(int indisWarmUpConcurrentRequests)
{
_config.indisWarmUpConcurrentRequests = indisWarmUpConcurrentRequests;
return this;
}

public D2ClientBuilder setStartUpExecutorService(ScheduledExecutorService executorService)
{
_config.startUpExecutorService = executorService;
Expand Down
6 changes: 6 additions & 0 deletions d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@ public class D2ClientConfig
int retryAggregatedIntervalNum = RetryClient.DEFAULT_AGGREGATED_INTERVAL_NUM;
public boolean warmUp = true;
public int warmUpTimeoutSeconds = WarmUpLoadBalancer.DEFAULT_SEND_REQUESTS_TIMEOUT_SECONDS;
public int indisWarmUpTimeoutSeconds = WarmUpLoadBalancer.DEFAULT_SEND_REQUESTS_TIMEOUT_SECONDS;
int zookeeperReadWindowMs = ZooKeeperStore.DEFAULT_READ_WINDOW_MS;
public int warmUpConcurrentRequests = WarmUpLoadBalancer.DEFAULT_CONCURRENT_REQUESTS;
public int indisWarmUpConcurrentRequests = WarmUpLoadBalancer.DEFAULT_CONCURRENT_REQUESTS;
public DownstreamServicesFetcher downstreamServicesFetcher = null;
public DownstreamServicesFetcher indisDownstreamServicesFetcher = null;
boolean backupRequestsEnabled = true;
Expand Down Expand Up @@ -168,7 +170,9 @@ public D2ClientConfig()
int retryAggregatedIntervalNum,
boolean warmUp,
int warmUpTimeoutSeconds,
int indisWarmUpTimeoutSeconds,
int warmUpConcurrentRequests,
int indisWarmUpConcurrentRequests,
DownstreamServicesFetcher downstreamServicesFetcher,
DownstreamServicesFetcher indisDownstreamServicesFetcher,
boolean backupRequestsEnabled,
Expand Down Expand Up @@ -235,7 +239,9 @@ public D2ClientConfig()
this.retryAggregatedIntervalNum = retryAggregatedIntervalNum;
this.warmUp = warmUp;
this.warmUpTimeoutSeconds = warmUpTimeoutSeconds;
this.indisWarmUpTimeoutSeconds = indisWarmUpTimeoutSeconds;
this.warmUpConcurrentRequests = warmUpConcurrentRequests;
this.indisWarmUpConcurrentRequests = indisWarmUpConcurrentRequests;
this.downstreamServicesFetcher = downstreamServicesFetcher;
this.indisDownstreamServicesFetcher = indisDownstreamServicesFetcher;
this.backupRequestsEnabled = backupRequestsEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public LoadBalancerWithFacilities create(D2ClientConfig config)
{
balancer = new WarmUpLoadBalancer(balancer, lastSeenLoadBalancer, config.startUpExecutorService, config.fsBasePath,
config.d2ServicePath, config.downstreamServicesFetcher, config.warmUpTimeoutSeconds,
config.warmUpConcurrentRequests, config.dualReadStateManager);
config.warmUpConcurrentRequests, config.dualReadStateManager, false);
}

return balancer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public LoadBalancerWithFacilities create(D2ClientConfig config)
{
balancer = new WarmUpLoadBalancer(balancer, zkfsLoadBalancer, config.startUpExecutorService, config.fsBasePath,
config.d2ServicePath, config.downstreamServicesFetcher, config.warmUpTimeoutSeconds,
config.warmUpConcurrentRequests, config.dualReadStateManager);
config.warmUpConcurrentRequests, config.dualReadStateManager, false);
}
return balancer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,28 +95,50 @@ public DualReadLoadBalancer(LoadBalancerWithFacilities oldLb, LoadBalancerWithFa
public void start(Callback<None> callback)
{
// Prefetch the global dual read mode
_dualReadStateManager.checkAndSwitchMode(null);
DualReadModeProvider.DualReadMode mode = _dualReadStateManager.getGlobalDualReadMode();

_newLb.start(new Callback<None>()
{
// if in new-lb-only mode, new lb needs to start successfully to call the callback. Otherwise, the old lb does.
// Use a separate executor service to start the new lb, so both lbs can start concurrently.
_newLbExecutor.execute(() -> _newLb.start(getStartUpCallback(true,
mode == DualReadModeProvider.DualReadMode.NEW_LB_ONLY ? callback : null)
));

_oldLb.start(getStartUpCallback(false,
mode == DualReadModeProvider.DualReadMode.NEW_LB_ONLY ? null : callback
));
}

private Callback<None> getStartUpCallback(boolean isForNewLb, Callback<None> callback)
{
return new Callback<None>() {
@Override
public void onError(Throwable e)
{
LOG.warn("Failed to start new load balancer. Fall back to read from old balancer only", e);
_isNewLbReady = false;
public void onError(Throwable e) {
LOG.warn("Failed to start {} load balancer.", isForNewLb ? "new" : "old", e);
if (isForNewLb)
{
_isNewLbReady = false;
}

if (callback != null)
{
callback.onError(e);
}
}

@Override
public void onSuccess(None result)
{
LOG.info("New load balancer successfully started");
_isNewLbReady = true;
}
});
public void onSuccess(None result) {
LOG.info("{} load balancer successfully started", isForNewLb ? "New" : "Old");
if (isForNewLb)
{
_isNewLbReady = true;
}

// Call back will succeed as long as the old balancer is successfully started. New load balancer failure
// won't block application start up.
_oldLb.start(callback);
if (callback != null)
{
callback.onSuccess(None.none());
}
}
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,24 @@ public DualReadLoadBalancerMonitor(Clock clock)
public void reportData(String propertyName, T property, String propertyVersion, boolean fromNewLb)
{
Cache<String, CacheEntry<T>> cacheToAdd = fromNewLb ? _newLbPropertyCache : _oldLbPropertyCache;
CacheEntry<T> existingEntry = cacheToAdd.getIfPresent(propertyName);
if (existingEntry != null)
{
if (existingEntry._version.equals(propertyVersion) && existingEntry._data.equals(property))
{
_rateLimitedLogger.debug("Reported duplicate for {} LB for property: {}, version: {}, data: {}",
fromNewLb ? "New" : "Old", propertyName, propertyVersion, property);
return; // skip setting duplicate data to avoid incorrectly incrementing OutOfSync metric
}
else if (existingEntry._data.equals(property))
{
_rateLimitedLogger.warn("Reported data that only differs in version for {} LB for property: {}. "
+ "Old version: {}, New version: {}, with the same data: {}", fromNewLb ? "New" : "Old", propertyName,
existingEntry._version, propertyVersion, existingEntry._data);
// since the version is different, we don't skipping setting it to the cache
}
}

Cache<String, CacheEntry<T>> cacheToCompare = fromNewLb ? _oldLbPropertyCache : _newLbPropertyCache;

CacheEntry<T> entryToCompare = cacheToCompare.getIfPresent(propertyName);
Expand Down
Loading

0 comments on commit 146e1f2

Please sign in to comment.