Skip to content

Commit

Permalink
[#4532] Fixed when the servicecomb engine storage pool is faulty, the…
Browse files Browse the repository at this point in the history
… registration and configuration centers cannot perform HA switchover problem
  • Loading branch information
chengyouling committed Sep 26, 2024
1 parent 8ab4d04 commit ba1c1e2
Show file tree
Hide file tree
Showing 12 changed files with 162 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,28 @@ public QueryConfigurationsResponse queryConfigurations(QueryConfigurationsReques
}
}

@Override
public void checkAddressAvailable(QueryConfigurationsRequest request, String address) {
String dimensionsInfo = buildDimensionsInfo(request, true);
try {
String uri = address + "/configuration/items?dimensionsInfo="
+ HttpUtils.encodeURLParam(dimensionsInfo) + "&revision=" + request.getRevision();

Map<String, String> headers = new HashMap<>();
headers.put("x-environment", request.getEnvironment());
HttpRequest httpRequest = new HttpRequest(uri, headers, null,
HttpRequest.GET);

HttpResponse httpResponse = httpTransport.doRequest(httpRequest);
if (httpResponse.getStatusCode() == HttpStatus.SC_NOT_MODIFIED
|| httpResponse.getStatusCode() == HttpStatus.SC_OK) {
addressManager.recoverIsolatedAddress(address);
}
} catch (Exception e) {
LOGGER.error("check config center isolation address {} available error!", address);
}
}

private String buildDimensionsInfo(QueryConfigurationsRequest request, boolean withVersion) {
String result =
request.getServiceName() + DEFAULT_APP_SEPARATOR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.servicecomb.config.center.client;

import java.util.List;
import java.util.Map;

import org.apache.servicecomb.config.center.client.model.ConfigCenterConfiguration;
Expand Down Expand Up @@ -63,6 +64,7 @@ public void setQueryConfigurationsRequest(QueryConfigurationsRequest queryConfig

public void startConfigCenterManager() {
this.startTask(new PollConfigurationTask(0));
this.startTask(new CheckConfigCenterAddressTask());
}

class PollConfigurationTask implements Task {
Expand Down Expand Up @@ -91,4 +93,19 @@ public void execute() {
}
}
}

class CheckConfigCenterAddressTask implements Task {
@Override
public void execute() {
List<String> isolationAddresses = configCenterAddressManager.getIsolationAddresses();
if (isolationAddresses.isEmpty()) {
return;
}
for (String address : isolationAddresses) {
configCenterClient.checkAddressAvailable(queryConfigurationsRequest, address);
}
startTask(new BackOffSleepTask(configCenterConfiguration.getRefreshIntervalInMillis(),
new CheckConfigCenterAddressTask()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,12 @@ public interface ConfigCenterOperation {
* @throws OperationException If some problems happened to contact service center or non http 200 returned.
*/
QueryConfigurationsResponse queryConfigurations(QueryConfigurationsRequest request, String address);

/**
* Check config center isolation address available
*
* @param request queryConfigurationsRequest
* @param address isolation address
*/
void checkAddressAvailable(QueryConfigurationsRequest request, String address);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.common.eventbus.EventBus;

import java.io.IOException;
import java.io.StringReader;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
Expand Down Expand Up @@ -113,6 +114,21 @@ public ConfigurationsResponse queryConfigurations(ConfigurationsRequest request,
}
}

@Override
public void checkAddressAvailable(ConfigurationsRequest request, String address) {
String url = buildUrl(request, address);
HttpRequest httpRequest = new HttpRequest(url, null, null, HttpRequest.GET);
try {
HttpResponse httpResponse = httpTransport.doRequest(httpRequest);
if (httpResponse.getStatusCode() == HttpStatus.SC_NOT_MODIFIED
|| httpResponse.getStatusCode() == HttpStatus.SC_OK) {
addressManager.recoverIsolatedAddress(address);
}
} catch (IOException e) {
LOGGER.error("check kie config isolation address {} available error!", address, e);
}
}

private String buildUrl(ConfigurationsRequest request, String currentAddress) {
StringBuilder sb = new StringBuilder();
sb.append(currentAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ protected void initTaskPool(String taskName) {
public void startConfigKieManager() {
this.configurationsRequests.forEach((t) ->
this.startTask(new PollConfigurationTask(0, t)));
startTask(new CheckKieConfigAddressTask(configurationsRequests.get(0)));
}

class PollConfigurationTask implements Task {
Expand Down Expand Up @@ -159,4 +160,25 @@ public void execute() {
}
}
}

class CheckKieConfigAddressTask implements Task {
ConfigurationsRequest configurationsRequest;

public CheckKieConfigAddressTask(ConfigurationsRequest configurationsRequest) {
this.configurationsRequest = configurationsRequest;
}

@Override
public void execute() {
List<String> isolationAddresses = kieAddressManager.getIsolationAddresses();
if (isolationAddresses.isEmpty()) {
return;
}
for (String address : isolationAddresses) {
configKieClient.checkAddressAvailable(this.configurationsRequest, address);
}
startTask(new BackOffSleepTask(kieConfiguration.getRefreshIntervalInMillis(),
new CheckKieConfigAddressTask(this.configurationsRequest)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,12 @@ public interface KieConfigOperation {
* @throws OperationException If some problems happened to contact service center or non http 200 returned.
*/
ConfigurationsResponse queryConfigurations(ConfigurationsRequest request, String address);

/**
* Check kie isolation address available
*
* @param configurationsRequest configurationsRequest
* @param address isolation address
*/
void checkAddressAvailable(ConfigurationsRequest configurationsRequest, String address);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,11 @@

package org.apache.servicecomb.http.client.common;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.commons.lang.StringUtils;
Expand All @@ -42,7 +32,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.eventbus.EventBus;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

public class AbstractAddressManager {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAddressManager.class);
Expand All @@ -53,8 +42,6 @@ public class AbstractAddressManager {

private static final String V3_PREFIX = "/v3/";

private static final int DEFAULT_ADDRESS_CHECK_TIME = 30;

private static final int ISOLATION_THRESHOLD = 3;

private volatile List<String> addresses = new ArrayList<>();
Expand All @@ -68,9 +55,6 @@ public class AbstractAddressManager {

private String projectName;

// all address list.
private final Set<String> addressCategory = new HashSet<>();

// recording continuous times of failure of an address.
private final Map<String, Integer> addressFailureStatus = new ConcurrentHashMap<>();

Expand All @@ -90,27 +74,22 @@ public class AbstractAddressManager {

private EventBus eventBus;

private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder()
.setNameFormat("check-available-address-%d")
.build());

public AbstractAddressManager(List<String> addresses) {
this.projectName = DEFAULT_PROJECT;
this.addresses.addAll(addresses);
this.defaultAddress.addAll(addresses);
this.addressCategory.addAll(addresses);
this.index = !addresses.isEmpty() ? random.nextInt(addresses.size()) : 0;
startCheck();
this.index = !addresses.isEmpty() ? getRandomIndex() : 0;
}

public AbstractAddressManager(String projectName, List<String> addresses) {
this.projectName = StringUtils.isEmpty(projectName) ? DEFAULT_PROJECT : projectName;
this.addresses = this.transformAddress(addresses);
this.defaultAddress.addAll(addresses);
this.addressCategory.addAll(this.addresses);
this.index = !addresses.isEmpty() ? random.nextInt(addresses.size()) : 0;
startCheck();
this.index = !addresses.isEmpty() ? getRandomIndex() : 0;
}

private int getRandomIndex() {
return random.nextInt(addresses.size());
}

public void refreshEndpoint(RefreshEndpointEvent event, String key) {
Expand All @@ -120,8 +99,6 @@ public void refreshEndpoint(RefreshEndpointEvent event, String key) {

availableZone = event.getSameZone().stream().map(this::normalizeUri).collect(Collectors.toList());
availableRegion = event.getSameRegion().stream().map(this::normalizeUri).collect(Collectors.toList());
addressCategory.addAll(availableZone);
addressCategory.addAll(availableRegion);
addressAutoRefreshed = true;
}

Expand All @@ -146,10 +123,6 @@ public List<String> getAvailableRegion() {
return availableRegion;
}

private void startCheck() {
executorService.scheduleAtFixedRate(this::checkHistory, 0, DEFAULT_ADDRESS_CHECK_TIME, TimeUnit.SECONDS);
}

public String formatUrl(String url, boolean absoluteUrl, String address) {
return absoluteUrl ? address + url : formatAddress(address) + url;
}
Expand Down Expand Up @@ -188,7 +161,7 @@ private String getDefaultAddress() {
}
LOGGER.warn("all addresses are isolation, please check server status.");
// when all addresses are isolation, it will use all default address for polling.
return getCurrentAddress(new ArrayList<>(defaultAddress));
return getCurrentAddress(defaultAddress);
}

private String getAvailableZoneAddress() {
Expand Down Expand Up @@ -221,42 +194,7 @@ private List<String> getZoneOrRegionAddress() {
return results;
}

@VisibleForTesting
protected void checkHistory() {
addressCategory.forEach(address -> {
if (telnetTest(address)) {
// isolation addresses find address and restore it
findAndRestoreAddress(address);
} else {
recordFailState(address);
}
});
}

protected boolean telnetTest(String address) {
URI uri = parseIpPortFromURI(address);
if (uri == null) {
return false;
}
try (Socket s = new Socket()) {
s.connect(new InetSocketAddress(uri.getHost(), uri.getPort()), 3000);
return true;
} catch (IOException e) {
LOGGER.warn("ping endpoint {} failed, It will be quarantined again.", address);
return false;
}
}

private URI parseIpPortFromURI(String address) {
try {
return new URI(address);
} catch (URISyntaxException e) {
LOGGER.error("parse address [{}] failed.", address, e);
return null;
}
}

protected void findAndRestoreAddress(String address) {
public void recoverIsolatedAddress(String address) {
recordSuccessState(address);
if (addressAutoRefreshed) {
if (isolationZoneAddress.remove(address)) {
Expand Down Expand Up @@ -326,4 +264,11 @@ void removeAddress(String address) {
public void setEventBus(EventBus eventBus) {
this.eventBus = eventBus;
}

public List<String> getIsolationAddresses() {
List<String> isolationAddresses = new ArrayList<>(defaultIsolationAddress);
isolationAddresses.addAll(isolationZoneAddress);
isolationAddresses.addAll(isolationRegionAddress);
return isolationAddresses;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ public class AbstractAddressManagerTest {

private static AbstractAddressManager addressManager3;

private static int index;

@BeforeEach
public void setUp() throws NoSuchFieldException, IllegalAccessException {
addresses.add("http://127.0.0.1:30103");
Expand Down Expand Up @@ -90,12 +88,7 @@ public void recordStateTest() throws ExecutionException {
zoneAndRegion.put("sameZone", addressAZ);
zoneAndRegion.put("sameRegion", addressRG);
RefreshEndpointEvent event = new RefreshEndpointEvent(zoneAndRegion, "TEST");
AbstractAddressManager addressManager = new AbstractAddressManager(addresses) {
@Override
protected boolean telnetTest(String address) {
return true;
}
};
AbstractAddressManager addressManager = new AbstractAddressManager(addresses) {};

addressManager.refreshEndpoint(event, "TEST");

Expand All @@ -122,8 +115,7 @@ protected boolean telnetTest(String address) {
Assertions.assertEquals("http://127.0.0.4:30100", addressManager.address());

// test restore isolation
addressManager.checkHistory();
addressManager.findAndRestoreAddress("http://127.0.0.3:30100");
addressManager.recoverIsolatedAddress("http://127.0.0.3:30100");
Assertions.assertEquals("http://127.0.0.3:30100", addressManager.address());
Assertions.assertEquals("http://127.0.0.3:30100", addressManager.address());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,4 +553,16 @@ public boolean updateMicroserviceProperties(String serviceId, Map<String, String
"update service instance status fails", e);
}
}

@Override
public void checkIsolationAddressAvailable(String serviceId, String instanceId) {
List<String> isolationAddresses = addressManager.getIsolationAddresses();
if (isolationAddresses.isEmpty()) {
return;
}
for (String address : isolationAddresses) {
httpClient.checkServiceCenterAddressAvailable("/registry/microservices/" + serviceId + "/instances/" + instanceId +
"/heartbeat", null, null, address);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,12 @@ boolean updateMicroserviceInstanceStatus(String serviceId, String instanceId,
* @throws OperationException If some problems happened to contact service center or non http 200 returned.
*/
boolean updateMicroserviceProperties(String microserviceId, Map<String, String> serviceProperties);

/**
* Check serviceCenter isolation address available
*
* @param serviceId serviceId
* @param instanceId instanceId
*/
void checkIsolationAddressAvailable(String serviceId, String instanceId);
}
Loading

0 comments on commit ba1c1e2

Please sign in to comment.