Skip to content

Commit

Permalink
refactor(registry-etcd): Refactor the EtcdDiscovery class and update …
Browse files Browse the repository at this point in the history
…related dependencies

- Refactored the EtcdDiscovery class and removed unnecessary variables and methods
- Optimized the acquisition and monitoring logic of service instances
- Updated protobuf version to 3.25.3
- Removed unnecessary protobuf-java dependency in demo project
  • Loading branch information
SweetWuXiaoMei committed Oct 28, 2024
1 parent 4915a8c commit cc6c800
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 54 deletions.
6 changes: 0 additions & 6 deletions demo/demo-etcd/consumer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@
<groupId>org.apache.servicecomb</groupId>
<artifactId>java-chassis-spring-boot-starter-standalone</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.25.3</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.servicecomb</groupId>
<artifactId>registry-etcd</artifactId>
Expand Down
6 changes: 0 additions & 6 deletions demo/demo-etcd/provider/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@
<groupId>org.apache.servicecomb</groupId>
<artifactId>java-chassis-spring-boot-starter-standalone</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.25.3</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.servicecomb</groupId>
<artifactId>registry-etcd</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions dependencies/default/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
<netty.version>4.1.113.Final</netty.version>
<okhttp3.version>4.12.0</okhttp3.version>
<prometheus.version>0.16.0</prometheus.version>
<protobuf.version>3.23.4</protobuf.version>
<protobuf.version>3.25.3</protobuf.version>
<protostuff.version>1.8.0</protostuff.version>
<protostuff-parser.version>2.2.27</protostuff-parser.version>
<reactive-streams.version>1.0.4</reactive-streams.version>
Expand Down Expand Up @@ -190,7 +190,7 @@
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
<scope>test</scope>
<!-- <scope>test</scope>-->
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
Expand All @@ -47,8 +45,6 @@

public class EtcdDiscovery implements Discovery<EtcdDiscoveryInstance> {

private final String basePrefix = "instance:";

private Environment environment;

private String basePath;
Expand All @@ -61,10 +57,9 @@ public class EtcdDiscovery implements Discovery<EtcdDiscoveryInstance> {

private static final Logger LOGGER = LoggerFactory.getLogger(EtcdDiscovery.class);

private Map<String, Map<String, EtcdDiscoveryInstance>> etcdInstanceMap = new ConcurrentHashMapEx<>();

private Map<String, Watch> watchMap = new ConcurrentHashMapEx<>();


@Autowired
@SuppressWarnings("unused")
public void setEnvironment(Environment environment) {
Expand Down Expand Up @@ -96,42 +91,33 @@ public List<EtcdDiscoveryInstance> findServiceInstances(String application, Stri
Watch watchClient = client.getWatchClient();
try {
ByteSequence prefixByteSeq = ByteSequence.from(prefixPath, Charset.defaultCharset());
watchClient.watch(
prefixByteSeq,
WatchOption.builder().withPrefix(prefixByteSeq).build(),
resp -> {
Map<String, EtcdDiscoveryInstance> instanceMap = etcdInstanceMap
.get(basePrefix + prefixPath);
resp.getEvents().forEach(event -> {
String key = event.getKeyValue().getKey().toString(Charset.defaultCharset());
switch (event.getEventType()) {
case PUT -> {
EtcdDiscoveryInstance newInstance = getEtcdDiscoveryInstance(event.getKeyValue());
instanceMap.put(key, newInstance);
}
case DELETE -> instanceMap.remove(key);
default -> LOGGER.error("unkonw event");
}
});
List<EtcdDiscoveryInstance> discoveryInstanceList = new ArrayList<>(instanceMap.values());
instanceChangedListener.onInstanceChanged(name(), application, serviceName, discoveryInstanceList);
}
);
watchClient.watch(prefixByteSeq, WatchOption.builder().withPrefix(prefixByteSeq).build(),
resp -> watchNode(application, serviceName, prefixPath));
} catch (Exception e) {
LOGGER.error("Failed to add watch", e);
}
return watchClient;
});

List<KeyValue> endpointKv = getValuesByPrefix(prefixPath);
List<EtcdDiscoveryInstance> etcdDiscoveryInstances = convertServiceInstanceList(endpointKv);
etcdInstanceMap.computeIfAbsent(basePrefix + prefixPath, v -> etcdDiscoveryInstances.stream()
.collect(
Collectors.toConcurrentMap(instance -> prefixPath + "/" + instance.getInstanceId(), Function.identity())));
return etcdDiscoveryInstances;
return convertServiceInstanceList(endpointKv);
}

private void watchNode(String application, String serviceName, String prefixPath) {

CompletableFuture<GetResponse> getFuture = client.getKVClient()
.get(ByteSequence.from(prefixPath, StandardCharsets.UTF_8),
GetOption.builder().withPrefix(ByteSequence.from(prefixPath, StandardCharsets.UTF_8)).build());
getFuture.thenAcceptAsync(response -> {
List<EtcdDiscoveryInstance> discoveryInstanceList = convertServiceInstanceList(response.getKvs());
instanceChangedListener.onInstanceChanged(name(), application, serviceName, discoveryInstanceList);
}).exceptionally(e -> {
LOGGER.error("watchNode error", e);
return null;
});
}

public List<KeyValue> getValuesByPrefix(String prefix) {
private List<KeyValue> getValuesByPrefix(String prefix) {

CompletableFuture<GetResponse> getFuture = client.getKVClient()
.get(ByteSequence.from(prefix, StandardCharsets.UTF_8),
Expand Down Expand Up @@ -167,8 +153,7 @@ public List<String> findServices(String application) {

String prefixPath = basePath + "/" + application;
List<KeyValue> endpointKv = getValuesByPrefix(prefixPath);
return endpointKv.stream().map(kv -> kv.getKey().toString(StandardCharsets.UTF_8))
.collect(Collectors.toList());
return endpointKv.stream().map(kv -> kv.getKey().toString(StandardCharsets.UTF_8)).collect(Collectors.toList());
}

@Override
Expand Down Expand Up @@ -196,19 +181,16 @@ public void run() {
this.client = Client.builder().endpoints(etcdRegistryProperties.getConnectString()).build();
} else {
String[] authInfo = etcdRegistryProperties.getAuthenticationInfo().split(":");
this.client = Client.builder()
.endpoints(etcdRegistryProperties.getConnectString())
this.client = Client.builder().endpoints(etcdRegistryProperties.getConnectString())
.user(ByteSequence.from(authInfo[0], Charset.defaultCharset()))
.password(ByteSequence.from(authInfo[1], Charset.defaultCharset()))
.build();
.password(ByteSequence.from(authInfo[1], Charset.defaultCharset())).build();
}
}

@Override
public void destroy() {
if (client != null) {
client.close();
etcdInstanceMap = null;
watchMap = null;
}
}
Expand Down

0 comments on commit cc6c800

Please sign in to comment.