Skip to content

Commit

Permalink
Address the multiple onError calls in dual read and enhance unit test (
Browse files Browse the repository at this point in the history
  • Loading branch information
brycezhongqing authored Mar 25, 2024
1 parent cdb8828 commit 11e6488
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 97 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.51.12] - 2024-03-22
- Address the multiple onError calls in dual read and enhance unit test rigorously

## [29.51.11] - 2024-03-20
- Glob collections support

Expand Down Expand Up @@ -5665,7 +5668,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.51.11...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.51.12...master
[29.51.12]: https://github.com/linkedin/rest.li/compare/v29.51.11...v29.51.12
[29.51.11]: https://github.com/linkedin/rest.li/compare/v29.51.10...v29.51.11
[29.51.10]: https://github.com/linkedin/rest.li/compare/v29.51.9...v29.51.10
[29.51.9]: https://github.com/linkedin/rest.li/compare/v29.51.8...v29.51.9
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ public void onError(Throwable e)
if (e instanceof TimeoutException)
{
// if timed out, should try to fetch the service properties from the cache
handleTimeoutFromGetServiceProperties(e, serviceName, finalCallback);
handleTimeoutFromGetServiceProperties(serviceName, finalCallback);
}
else
{
Expand Down Expand Up @@ -782,7 +782,7 @@ public void onError(Throwable e)
{
if (e instanceof TimeoutException)
{
handleTimeoutFromGetServiceProperties(e, serviceName, finalCallback);
handleTimeoutFromGetServiceProperties(serviceName, finalCallback);
}
else
{
Expand Down Expand Up @@ -829,7 +829,7 @@ public void getLoadBalancedServiceProperties(String serviceName, boolean waitFor
}
}

public void handleTimeoutFromGetServiceProperties(Throwable e, String serviceName,
public void handleTimeoutFromGetServiceProperties(String serviceName,
Callback<ServiceProperties> servicePropertiesCallback)
{
ServiceProperties properties = getServicePropertiesFromCache(serviceName, servicePropertiesCallback);
Expand All @@ -840,9 +840,7 @@ public void handleTimeoutFromGetServiceProperties(Throwable e, String serviceNam
}
else
{
_log.error("getServiceProperties for {} timed out, but no value in cache!", serviceName);
servicePropertiesCallback.onError(
new ServiceUnavailableException(serviceName, "PEGA_1011. " + e.getMessage(), e));
_log.warn("getServiceProperties for {} timed out, but no value in cache!", serviceName);
}
}

Expand Down Expand Up @@ -883,7 +881,7 @@ public void onError(Throwable e)
{
if (e instanceof TimeoutException)
{
handleTimeoutFromGetClusterAndUriProperties(e, clusterName, finalCallback);
handleTimeoutFromGetClusterAndUriProperties(clusterName, finalCallback);
}
else
{
Expand Down Expand Up @@ -931,7 +929,8 @@ public void getLoadBalancedClusterAndUriProperties(String clusterName, boolean w
}
}

private void handleTimeoutFromGetClusterAndUriProperties(Throwable e, String clusterName,
@VisibleForTesting
public void handleTimeoutFromGetClusterAndUriProperties(String clusterName,
Callback<Pair<ClusterProperties, UriProperties>> clusterAndUriPropertiesCallback)
{
Pair<ClusterProperties, UriProperties> pair =
Expand All @@ -943,8 +942,7 @@ private void handleTimeoutFromGetClusterAndUriProperties(Throwable e, String clu
}
else
{
clusterAndUriPropertiesCallback.onError(
new ServiceUnavailableException(clusterName, "PEGA_1011. " + e.getMessage(), e));
_log.warn("getClusterAndUriProperties for {} timed out, but no value in cache!", clusterName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,117 +439,112 @@ public void testClusterInfoProviderUnregisterClusterListener()
}

@Test
public void testListenToServiceAndClusterTimeout() throws Exception
public void testListenToServiceAndClusterTimeout() throws ExecutionException, InterruptedException
{
MockStore<ServiceProperties> serviceRegistry = new MockStore<>();
MockStore<ClusterProperties> clusterRegistry = new MockStore<>();
MockStore<UriProperties> uriRegistry = new MockStore<>();
LoadBalancerTestState state = spy(new LoadBalancerTestState());
state.listenToService = false;
SimpleLoadBalancer loadBalancer = spy(setupLoadBalancer(state, serviceRegistry, clusterRegistry, uriRegistry));
SimpleLoadBalancerState state =
spy(new SimpleLoadBalancerState(new SynchronousExecutorService(), uriRegistry, clusterRegistry, serviceRegistry,
new HashMap<>(), new HashMap<>()));
doAnswer(invocation ->
{
Thread.sleep(10);
return null;
}).when(state).listenToService(any(), any());
SimpleLoadBalancer loadBalancer = spy(new SimpleLoadBalancer(state, 1, TimeUnit.MILLISECONDS, _d2Executor));
// case1: listenToService timeout, and simpleLoadBalancer not hit the cache value
Callback<ServiceProperties> callback = new Callback<ServiceProperties>()
{
@Override
public void onError(Throwable e)
{
assertTrue(e instanceof ServiceUnavailableException);
}

@Override
public void onSuccess(ServiceProperties result)
{
throw new RuntimeException("onSuccess should not be called");
}
};
FutureCallback<ServiceProperties> callback = mock(FutureCallback.class);
loadBalancer.listenToServiceAndCluster(SERVICE_NAME, callback);

callback = new Callback<ServiceProperties>()
try
{
@Override
public void onError(Throwable e)
{
throw new RuntimeException("onError should not be called");
}

@Override
public void onSuccess(ServiceProperties result)
{
assertTrue(result != null);
assertTrue(result.getServiceName().equals(SERVICE_NAME));
assertTrue(result.getClusterName().equals(CLUSTER1_NAME));
}
};
callback.get();
}
catch (Exception e)
{
Assert.assertTrue(e.getCause() instanceof ServiceUnavailableException);
}
// Make sure the onError is called with ServiceUnavailableException only once.
verify(loadBalancer).handleTimeoutFromGetServiceProperties(eq(SERVICE_NAME), eq(callback));
verify(callback).onError(any(ServiceUnavailableException.class));

// case2: listenToService timeout, and simpleLoadBalancer hit the cache value from state
LoadBalancerStateItem<ServiceProperties> serviceItem = new LoadBalancerStateItem<>(SERVICE_PROPERTIES, 1, 1);
when(state.getServiceProperties(SERVICE_NAME)).thenReturn(serviceItem);
callback = mock(FutureCallback.class);
loadBalancer.listenToServiceAndCluster(SERVICE_NAME, callback);
// Make sure the onSuccess is called with SERVICE_PROPERTIES only once.
callback.get();
verify(callback).onSuccess(eq(SERVICE_PROPERTIES));

// case3: listenToService without timeout
serviceRegistry.put(SERVICE_NAME, SERVICE_PROPERTIES);
loadBalancer = setupLoadBalancer(null, serviceRegistry, clusterRegistry, uriRegistry);
state =
spy(new SimpleLoadBalancerState(new SynchronousExecutorService(), uriRegistry, clusterRegistry, serviceRegistry,
new HashMap<>(), new HashMap<>()));
loadBalancer = spy(new SimpleLoadBalancer(state, 5, TimeUnit.SECONDS, _d2Executor));
callback = mock(FutureCallback.class);
loadBalancer.listenToServiceAndCluster(SERVICE_NAME, callback);
callback.get();
// Make sure there is no timeout.
verify(loadBalancer, never()).handleTimeoutFromGetServiceProperties(any(), any());
verify(callback).onSuccess(eq(SERVICE_PROPERTIES));
}

@Test
public void testGetLoadBalancedClusterAndUriProperties() throws Exception
public void testGetLoadBalancedClusterAndUriProperties() throws InterruptedException, ExecutionException
{
MockStore<ServiceProperties> serviceRegistry = new MockStore<>();
MockStore<ClusterProperties> clusterRegistry = new MockStore<>();
MockStore<UriProperties> uriRegistry = new MockStore<>();
SimpleLoadBalancer loadBalancer = setupLoadBalancer(null, serviceRegistry, clusterRegistry, uriRegistry);

// case1: getLoadBalancedClusterAndUriProperties timeout, and simpleLoadBalancer not hit the cache value
Callback<Pair<ClusterProperties, UriProperties>> callback = new Callback<Pair<ClusterProperties, UriProperties>>()
{
@Override
public void onError(Throwable e)
{
assertTrue(e instanceof ServiceUnavailableException);
}

@Override
public void onSuccess(Pair<ClusterProperties, UriProperties> result)
{
throw new RuntimeException("onSuccess should not be called");
}
};

SimpleLoadBalancerState state =
spy(new SimpleLoadBalancerState(new SynchronousExecutorService(), uriRegistry, clusterRegistry, serviceRegistry,
new HashMap<>(), new HashMap<>()));

doAnswer(invocation ->
{
Thread.sleep(10);
return null;
}).when(state).listenToCluster(any(), any());

SimpleLoadBalancer loadBalancer = spy(new SimpleLoadBalancer(state, 1, TimeUnit.MILLISECONDS, _d2Executor));
FutureCallback<Pair<ClusterProperties, UriProperties>> callback = mock(FutureCallback.class);
// case1: listenToCluster timeout, and simpleLoadBalancer not hit the cache value
loadBalancer.getLoadBalancedClusterAndUriProperties(CLUSTER1_NAME, callback);
try
{
callback.get();
}
catch (ExecutionException e)
{
Assert.assertTrue(e.getCause() instanceof ServiceUnavailableException);
}
verify(loadBalancer).handleTimeoutFromGetClusterAndUriProperties(eq(CLUSTER1_NAME), eq(callback));
verify(callback).onError(any(ServiceUnavailableException.class));

// case2: getLoadBalancedClusterAndUriProperties timeout, and simpleLoadBalancer hit the cache value from state
// case2: listenToCluster timeout, and simpleLoadBalancer hit the cache value from state
LoadBalancerStateItem<ClusterProperties> clusterItem = new LoadBalancerStateItem<>(CLUSTER_PROPERTIES, 1, 1);
LoadBalancerStateItem<UriProperties> uriItem = new LoadBalancerStateItem<>(URI_PROPERTIES, 1, 1);
LoadBalancerTestState state = spy(new LoadBalancerTestState());
state.listenToCluster = false;
state.listenToService = false;
when(state.getClusterProperties(CLUSTER1_NAME)).thenReturn(clusterItem);
when(state.getUriProperties(CLUSTER1_NAME)).thenReturn(uriItem);
loadBalancer = spy(setupLoadBalancer(state, serviceRegistry, clusterRegistry, uriRegistry));
callback = new Callback<Pair<ClusterProperties, UriProperties>>()
{
@Override
public void onError(Throwable e)
{
throw new RuntimeException("onError should not be called");
}

@Override
public void onSuccess(Pair<ClusterProperties, UriProperties> result)
{
assertEquals(result.getLeft(), CLUSTER_PROPERTIES);
assertEquals(result.getRight(), URI_PROPERTIES);
}
};

callback = mock(FutureCallback.class);
loadBalancer.getLoadBalancedClusterAndUriProperties(CLUSTER1_NAME, callback);
callback.get();
verify(callback).onSuccess(eq(Pair.of(CLUSTER_PROPERTIES, URI_PROPERTIES)));


// case3: getLoadBalancedClusterAndUriProperties without timeout
loadBalancer = setupLoadBalancer(null, serviceRegistry, clusterRegistry, uriRegistry);
state =
spy(new SimpleLoadBalancerState(new SynchronousExecutorService(), uriRegistry, clusterRegistry, serviceRegistry,
new HashMap<>(), new HashMap<>()));
loadBalancer = spy(new SimpleLoadBalancer(state, 5, TimeUnit.SECONDS, _d2Executor));
clusterRegistry.put(CLUSTER1_NAME, CLUSTER_PROPERTIES);
uriRegistry.put(CLUSTER1_NAME, new UriProperties(CLUSTER1_NAME, new HashMap<>()));
uriRegistry.put(CLUSTER1_NAME, URI_PROPERTIES);
callback = mock(FutureCallback.class);
loadBalancer.getLoadBalancedClusterAndUriProperties(CLUSTER1_NAME, callback);
callback.get();
verify(loadBalancer, never()).handleTimeoutFromGetClusterAndUriProperties(any(), any());
verify(callback).onSuccess(eq(Pair.of(CLUSTER_PROPERTIES, URI_PROPERTIES)));
}

@Test
Expand All @@ -559,10 +554,17 @@ public void testGetClusterCountTimeout() throws Exception
MockStore<ClusterProperties> clusterRegistry = new MockStore<>();
MockStore<UriProperties> uriRegistry = new MockStore<>();
int partitionId = 0;
LoadBalancerTestState state = spy(new LoadBalancerTestState());
state.listenToCluster = false;

SimpleLoadBalancer loadBalancer = spy(setupLoadBalancer(state, serviceRegistry, clusterRegistry, uriRegistry));
SimpleLoadBalancerState state =
spy(new SimpleLoadBalancerState(new SynchronousExecutorService(), uriRegistry, clusterRegistry, serviceRegistry,
new HashMap<>(), new HashMap<>()));

doAnswer(invocation ->
{
Thread.sleep(10);
return null;
}).when(state).listenToCluster(any(), any());
SimpleLoadBalancer loadBalancer = spy(new SimpleLoadBalancer(state, 1, TimeUnit.MILLISECONDS, _d2Executor));
LoadBalancerStateItem<ClusterProperties> clusterItem = new LoadBalancerStateItem<>(CLUSTER_PROPERTIES, 1, 1);
LoadBalancerStateItem<UriProperties> uriItem = new LoadBalancerStateItem<>(URI_PROPERTIES, 1, 1);
when(state.getClusterProperties(CLUSTER1_NAME)).thenReturn(clusterItem);
Expand All @@ -577,9 +579,15 @@ public void testGetDarkClusterConfigMapTimeout() throws Exception
MockStore<ServiceProperties> serviceRegistry = new MockStore<>();
MockStore<ClusterProperties> clusterRegistry = new MockStore<>();
MockStore<UriProperties> uriRegistry = new MockStore<>();
LoadBalancerTestState state = spy(new LoadBalancerTestState());
state.listenToCluster = false;
SimpleLoadBalancer loadBalancer = spy(setupLoadBalancer(state, serviceRegistry, clusterRegistry, uriRegistry));
SimpleLoadBalancerState state =
spy(new SimpleLoadBalancerState(new SynchronousExecutorService(), uriRegistry, clusterRegistry, serviceRegistry,
new HashMap<>(), new HashMap<>()));
doAnswer(invocation ->
{
Thread.sleep(10);
return null;
}).when(state).listenToCluster(any(), any());
SimpleLoadBalancer loadBalancer = spy(new SimpleLoadBalancer(state, 1, TimeUnit.MILLISECONDS, _d2Executor));
DarkClusterConfigMap darkClusterConfigMap = new DarkClusterConfigMap();
DarkClusterConfig darkClusterConfig = new DarkClusterConfig().setMultiplier(1.0f)
.setDispatcherOutboundTargetRate(1)
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=29.51.11
version=29.51.12
group=com.linkedin.pegasus
org.gradle.configureondemand=true
org.gradle.parallel=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ public void testSuccessfulRequests(boolean restOverStream, String protocolVersio
// standard
HashMap<String, String> properties = new HashMap<>();
properties.put(HttpClientFactory.HTTP_PROTOCOL_VERSION, protocolVersion);
properties.put(HttpClientFactory.HTTP_REQUEST_TIMEOUT, String.valueOf(10000));
properties.put(HttpClientFactory.HTTP_REQUEST_TIMEOUT, String.valueOf(20000));
clients.add(new TransportClientAdapter(clientFactory.getClient(properties), restOverStream));

// with parameter that should NOT create a new ChannelPoolManager
properties = new HashMap<>();
properties.put(HttpClientFactory.HTTP_PROTOCOL_VERSION, protocolVersion);
properties.put(HttpClientFactory.HTTP_REQUEST_TIMEOUT, String.valueOf(2000)); // property NOT of the ChannelPoolManager
properties.put(HttpClientFactory.HTTP_REQUEST_TIMEOUT, String.valueOf(10000)); // property NOT of the ChannelPoolManager
clients.add(new TransportClientAdapter(clientFactory.getClient(properties), restOverStream));
},
// since the two clients have the same settings, with sharing, it should just open 1 connections
Expand Down

0 comments on commit 11e6488

Please sign in to comment.