Skip to content

Commit

Permalink
Add null guard and timeout process for INDIS response (#975)
Browse files Browse the repository at this point in the history
  • Loading branch information
brycezhongqing authored Mar 16, 2024
1 parent 53aab6f commit e3e56f0
Show file tree
Hide file tree
Showing 15 changed files with 1,091 additions and 122 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ When updating the changelog, remember to be very clear about what behavior has c
and what APIs have changed, if applicable.

## [Unreleased]
## [29.51.8] - 2024-03-13
- Null data guard for D2 cache and fix timeout issue for INDIS response

## [29.51.7] - 2024-03-13
- clarify dual read error messages
Expand Down Expand Up @@ -5653,7 +5655,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.51.7...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.51.8...master
[29.51.8]: https://github.com/linkedin/rest.li/compare/v29.51.7...v29.51.8
[29.51.7]: https://github.com/linkedin/rest.li/compare/v29.51.6...v29.51.7
[29.51.6]: https://github.com/linkedin/rest.li/compare/v29.51.5...v29.51.6
[29.51.5]: https://github.com/linkedin/rest.li/compare/v29.51.4...v29.51.5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,15 @@ public void tryStopListening(String propertyName, LoadBalancerState.LoadBalancer
public void onAdd(final String propertyName, final T propertyValue)
{
trace(_log, _name, ".onAdd: ", propertyName, ": ", propertyValue);

handlePut(propertyName, propertyValue);
if (propertyValue != null)
{
// null value guard to avoid overwriting the property with null
handlePut(propertyName, propertyValue);
}
else
{
_log.info("Got the null value for property : {}", propertyName);
}

// if bad properties are received, then onInitialize()::handlePut might throw an exception and
// the queue might not be closed. If the queue is not closed, then even if the underlying
Expand Down
244 changes: 189 additions & 55 deletions d2/src/main/java/com/linkedin/d2/balancer/simple/SimpleLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.linkedin.d2.balancer.simple;

import com.google.common.annotations.VisibleForTesting;
import com.linkedin.common.callback.Callback;
import com.linkedin.common.callback.Callbacks;
import com.linkedin.common.callback.FutureCallback;
Expand Down Expand Up @@ -75,12 +76,7 @@
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -471,7 +467,8 @@ public HashFunction<Request> getRequestHashFunction(String serviceName) throws S
}
}

private void listenToServiceAndCluster(String serviceName, Callback<ServiceProperties> callback)
@VisibleForTesting
void listenToServiceAndCluster(String serviceName, Callback<ServiceProperties> callback)
{

boolean waitForUpdatedValue = _timeout > 0;
Expand All @@ -487,7 +484,15 @@ private void listenToServiceAndCluster(String serviceName, Callback<ServicePrope
@Override
public void onError(Throwable e)
{
finalCallback.onError(new ServiceUnavailableException(serviceName, "PEGA_1004. " +e.getMessage(), e));
if (e instanceof TimeoutException)
{
// if timed out, should try to fetch the service properties from the cache
handleTimeoutFromGetServiceProperties(e, serviceName, finalCallback);
}
else
{
finalCallback.onError(new ServiceUnavailableException(serviceName, "PEGA_1004. " + e.getMessage(), e));
}
}

@Override
Expand Down Expand Up @@ -531,6 +536,11 @@ private ServiceProperties listenToServiceAndCluster(String serviceName)
}
catch (TimeoutException e)
{
ServiceProperties serviceProperties = getServicePropertiesFromCache(serviceName, servicePropertiesFutureCallback);
if (serviceProperties != null)
{
return serviceProperties;
}
throw new ServiceUnavailableException(serviceName, "PEGA_1005. Timeout occurred while fetching property. Timeout:" + _timeout, e);
}
catch (Exception e)
Expand Down Expand Up @@ -770,8 +780,15 @@ public void getLoadBalancedServiceProperties(String serviceName, Callback<Servic
@Override
public void onError(Throwable e)
{
_serviceNotFoundStats.inc();
finalCallback.onError(new ServiceUnavailableException(serviceName, "PEGA_1011. " + e.getMessage(), e));
if (e instanceof TimeoutException)
{
handleTimeoutFromGetServiceProperties(e, serviceName, finalCallback);
}
else
{
_serviceNotFoundStats.inc();
finalCallback.onError(new ServiceUnavailableException(serviceName, "PEGA_1011. " + e.getMessage(), e));
}
}

@Override
Expand All @@ -793,20 +810,11 @@ public void getLoadBalancedServiceProperties(String serviceName, boolean waitFor
{
Runnable callback = () ->
{
LoadBalancerStateItem<ServiceProperties> serviceItem =
_state.getServiceProperties(serviceName);

if (serviceItem == null || serviceItem.getProperty() == null)
ServiceProperties serviceProperties = getServicePropertiesFromCache(serviceName, servicePropertiesCallback);
if (serviceProperties != null)
{
warn(_log, "unable to find service: ", serviceName);

die(servicePropertiesCallback, serviceName, "PEGA_1012. no service properties in lb state");
return;
servicePropertiesCallback.onSuccess(serviceProperties);
}

debug(_log, "got service: ", serviceItem);

servicePropertiesCallback.onSuccess(serviceItem.getProperty());
};

if (waitForUpdatedValue)
Expand All @@ -815,13 +823,48 @@ public void getLoadBalancedServiceProperties(String serviceName, boolean waitFor
}
else
{
_log.info("No timeout for service {}", serviceName);
_log.debug("No timeout for service {}", serviceName);
_state.listenToService(serviceName, new NullStateListenerCallback());
callback.run();
}
}

public void handleTimeoutFromGetServiceProperties(Throwable e, String serviceName,
Callback<ServiceProperties> servicePropertiesCallback)
{
ServiceProperties properties = getServicePropertiesFromCache(serviceName, servicePropertiesCallback);
if (properties != null)
{
_log.info("getServiceProperties for {} timed out, used cached value instead.", serviceName);
servicePropertiesCallback.onSuccess(properties);
}
else
{
_log.error("getServiceProperties for {} timed out, but no value in cache!", serviceName);
servicePropertiesCallback.onError(
new ServiceUnavailableException(serviceName, "PEGA_1011. " + e.getMessage(), e));
}
}

public ServiceProperties getServicePropertiesFromCache(String serviceName,
Callback<ServiceProperties> servicePropertiesCallback)
{
LoadBalancerStateItem<ServiceProperties> serviceItem = _state.getServiceProperties(serviceName);

if (serviceItem == null || serviceItem.getProperty() == null)
{
warn(_log, "unable to find service: ", serviceName);
_serviceNotFoundStats.inc();
die(servicePropertiesCallback, serviceName, "PEGA_1012. no service properties in lb state");
return null;
}

debug(_log, "got service: ", serviceItem);
return serviceItem.getProperty();
}

@Override
@VisibleForTesting
public void getLoadBalancedClusterAndUriProperties(String clusterName,
Callback<Pair<ClusterProperties, UriProperties>> callback)
{
Expand All @@ -832,20 +875,29 @@ public void getLoadBalancedClusterAndUriProperties(String clusterName,
Callback<Pair<ClusterProperties, UriProperties>> finalCallback = callback;
try
{
callback = new TimeoutCallback<>(_executor, _timeout, _unit, new Callback<Pair<ClusterProperties, UriProperties>>()
{
@Override
public void onError(Throwable e)
{
finalCallback.onError(new ServiceUnavailableException(clusterName, "PEGA_1011. " + e.getMessage(), e));
}
callback =
new TimeoutCallback<>(_executor, _timeout, _unit, new Callback<Pair<ClusterProperties, UriProperties>>()
{
@Override
public void onError(Throwable e)
{
if (e instanceof TimeoutException)
{
handleTimeoutFromGetClusterAndUriProperties(e, clusterName, finalCallback);
}
else
{
finalCallback.onError(
new ServiceUnavailableException(clusterName, "PEGA_1011. " + e.getMessage(), e));
}
}

@Override
public void onSuccess(Pair<ClusterProperties, UriProperties> result)
{
finalCallback.onSuccess(result);
}
}, "Timeout while fetching cluster");
@Override
public void onSuccess(Pair<ClusterProperties, UriProperties> result)
{
finalCallback.onSuccess(result);
}
}, "Timeout while fetching cluster");
}
catch (RejectedExecutionException e)
{
Expand All @@ -860,22 +912,12 @@ public void getLoadBalancedClusterAndUriProperties(String clusterName, boolean w
{
Runnable callback = () ->
{
LoadBalancerStateItem<ClusterProperties> clusterItem =
_state.getClusterProperties(clusterName);

LoadBalancerStateItem<UriProperties> uriItem =
_state.getUriProperties(clusterName);

if (clusterItem == null || clusterItem.getProperty() == null || uriItem == null || uriItem.getProperty() == null)
Pair<ClusterProperties, UriProperties> pair = getClusterAndUriPropertiesFromCache(clusterName, pairCallback);
if (pair != null)
{
warn(_log, "unable to find cluster: ", clusterName);

_clusterNotFoundStats.inc();
die(pairCallback, clusterName, "PEGA_1012. no cluster properties in lb state");
return;
_log.info("getClusterAndUriProperties for {} timed out, used cached value instead.", clusterName);
pairCallback.onSuccess(pair);
}

pairCallback.onSuccess(Pair.of(clusterItem.getProperty(), uriItem.getProperty()));
};

if (waitForUpdatedValue)
Expand All @@ -890,6 +932,44 @@ public void getLoadBalancedClusterAndUriProperties(String clusterName, boolean w
}
}

private void handleTimeoutFromGetClusterAndUriProperties(Throwable e, String clusterName,
Callback<Pair<ClusterProperties, UriProperties>> clusterAndUriPropertiesCallback)
{
Pair<ClusterProperties, UriProperties> pair =
getClusterAndUriPropertiesFromCache(clusterName, clusterAndUriPropertiesCallback);
if (pair != null)
{
clusterAndUriPropertiesCallback.onSuccess(pair);
}
else
{
clusterAndUriPropertiesCallback.onError(
new ServiceUnavailableException(clusterName, "PEGA_1011. " + e.getMessage(), e));
}
}

/**
* Gets the cluster and uri properties from the cache
* If the properties are not found, call the callback with an error.
*/
private Pair<ClusterProperties, UriProperties> getClusterAndUriPropertiesFromCache(String clusterName,
Callback<Pair<ClusterProperties, UriProperties>> clusterPropertiesCallback)
{
LoadBalancerStateItem<ClusterProperties> clusterItem = _state.getClusterProperties(clusterName);

LoadBalancerStateItem<UriProperties> uriItem = _state.getUriProperties(clusterName);

if (clusterItem == null || clusterItem.getProperty() == null || uriItem == null || uriItem.getProperty() == null)
{
warn(_log, "unable to find cluster: ", clusterName);

_clusterNotFoundStats.inc();
die(clusterPropertiesCallback, clusterName, "PEGA_1012. no cluster properties in lb state");
return null;
}
return Pair.of(clusterItem.getProperty(), uriItem.getProperty());
}

// supports partitioning
private TrackerClientSubsetItem getPotentialClients(String serviceName,
ServiceProperties serviceProperties,
Expand Down Expand Up @@ -1161,14 +1241,42 @@ public int getClusterCount(String clusterName, String scheme, int partitionId) t
{
return clusterCountFutureCallback.get(_timeout, _unit);
}
catch (ExecutionException | TimeoutException | IllegalStateException | InterruptedException e )
catch (ExecutionException | TimeoutException | IllegalStateException | InterruptedException e)
{
die("ClusterInfo", "PEGA_1017, unable to retrieve cluster count for cluster: " + clusterName +
", scheme: " + scheme + ", partition: " + partitionId + ", exception: " + e);
if (e instanceof TimeoutException || e.getCause() instanceof TimeoutException)
{
int clusterCount = getClusterCountFromCache(clusterName, scheme, partitionId);
if (clusterCount >= 0)
{
return clusterCount;
}
}
die("ClusterInfo",
"PEGA_1017, unable to retrieve cluster count for cluster: " + clusterName + ", scheme: " + scheme
+ ", partition: " + partitionId + ", exception: " + e);
return -1;
}
}

/**
* Get cluster count from cache
* @return -1 if the cluster count is not found in cache
*/
@VisibleForTesting
int getClusterCountFromCache(String clusterName, String scheme, int partitionId)
{
if (_state.getUriProperties(clusterName) != null && _state.getUriProperties(clusterName).getProperty() != null)
{
Set<URI> uris =
_state.getUriProperties(clusterName).getProperty().getUriBySchemeAndPartition(scheme, partitionId);
if (uris != null)
{
return uris.size();
}
}
return -1;
}

@Override
public DarkClusterConfigMap getDarkClusterConfigMap(String clusterName) throws ServiceUnavailableException
{
Expand All @@ -1179,9 +1287,19 @@ public DarkClusterConfigMap getDarkClusterConfigMap(String clusterName) throws S
{
return darkClusterConfigMapFutureCallback.get(_timeout, _unit);
}
catch (ExecutionException | TimeoutException | IllegalStateException | InterruptedException e )
catch (ExecutionException | TimeoutException | IllegalStateException | InterruptedException e)
{
die("ClusterInfo", "PEGA_1018, unable to retrieve dark cluster info for cluster: " + clusterName + ", exception: " + e);
if (e instanceof TimeoutException || e.getCause() instanceof TimeoutException)
{
DarkClusterConfigMap darkClusterConfigMap = getDarkClusterConfigMapFromCache(clusterName);
if (darkClusterConfigMap != null)
{
_log.info("Got dark cluster config map for {} timed out, used cached value instead.", clusterName);
return darkClusterConfigMap;
}
}
die("ClusterInfo",
"PEGA_1018, unable to retrieve dark cluster info for cluster: " + clusterName + ", exception" + ": " + e);
return new DarkClusterConfigMap();
}
}
Expand All @@ -1206,6 +1324,22 @@ public void getDarkClusterConfigMap(String clusterName, Callback<DarkClusterConf
}
}

/**
* Get dark cluster config map from cache
* @return empty DarkClusterConfigMap if the dark cluster config map is not found in cache
*/
@VisibleForTesting
DarkClusterConfigMap getDarkClusterConfigMapFromCache(String clusterName)
{
if (_state.getClusterProperties(clusterName) != null
&& _state.getClusterProperties(clusterName).getProperty() != null)
{
ClusterProperties clusterProperties = _state.getClusterProperties(clusterName).getProperty();
return clusterProperties != null ? clusterProperties.accessDarkClusters() : new DarkClusterConfigMap();
}
return new DarkClusterConfigMap();
}

@Override
public FailoutConfig getFailoutConfig(String clusterName)
{
Expand Down
Loading

0 comments on commit e3e56f0

Please sign in to comment.