Skip to content

Commit

Permalink
fix(registry-etcd): Support Etcd authentication and optimize the inst…
Browse files Browse the repository at this point in the history
…ance discovery mechanism

- Add authentication information configuration in EtcdDiscovery
- Implement remote-based observers to monitor changes in service instances
- Use SingletonManager to store and manage instance information
- Update Docker Compose configuration to allow unauthenticated access
- Modify the sayHello method in ProviderController to return the service address
  • Loading branch information
SweetWuXiaoMei committed Oct 25, 2024
1 parent cf55f27 commit fae1f3b
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 27 deletions.
1 change: 1 addition & 0 deletions demo/demo-etcd/consumer/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ servicecomb:
registry:
etcd:
# enabled: true
# authenticationInfo: chenzhida:chenzhida
connectString: http://127.0.0.1:2379

rest:
Expand Down
4 changes: 0 additions & 4 deletions demo/demo-etcd/provider/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@
<groupId>org.apache.servicecomb</groupId>
<artifactId>registry-etcd</artifactId>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.servicecomb</groupId>-->
<!-- <artifactId>config-zookeeper</artifactId>-->
<!-- </dependency>-->

<dependency>
<groupId>io.reactivex.rxjava3</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void setEnvironment(Environment environment) {
// a very simple service to echo the request parameter
@GetMapping("/sayHello")
public String sayHello(@RequestParam("name") String name) {
return "Hello " + name;
return "Hello " + environment.getProperty("servicecomb.rest.address");
}

@GetMapping("/getConfig")
Expand Down
6 changes: 4 additions & 2 deletions demo/demo-etcd/test-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,12 @@
<configuration>
<images>
<image>
<!-- <name>etcd:3.5.9</name>-->
<name>bitnami/etcd:latest</name>
<alias>etcd</alias>
<run>
<namingStrategy>alias</namingStrategy>
<wait>
<log>binding to port</log>
<log>etcdserver</log>
<tcp>
<ports>
<port>2379</port>
Expand All @@ -77,6 +76,9 @@
<ports>
<port>etcd.port:2379</port>
</ports>
<env>
<ALLOW_NONE_AUTHENTICATION>yes</ALLOW_NONE_AUTHENTICATION>
</env>
</run>
</image>
<image>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
Expand All @@ -40,7 +43,6 @@
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.watch.WatchEvent;

public class EtcdDiscovery implements Discovery<EtcdDiscoveryInstance> {

Expand All @@ -54,8 +56,6 @@ public class EtcdDiscovery implements Discovery<EtcdDiscoveryInstance> {

private InstanceChangedListener<EtcdDiscoveryInstance> instanceChangedListener;

private static volatile boolean isWatch = false;

private static final Logger LOGGER = LoggerFactory.getLogger(EtcdDiscovery.class);

@Autowired
Expand Down Expand Up @@ -85,22 +85,43 @@ public boolean enabled(String application, String serviceName) {
public List<EtcdDiscoveryInstance> findServiceInstances(String application, String serviceName) {

String prefixPath = basePath + "/" + application + "/" + serviceName;
SingletonManager.getInstance().getSingleton(prefixPath, serName -> {
SingletonManager.getInstance().computeIfAbsent(prefixPath, serName -> {
Watch watchClient = client.getWatchClient();
try {
watchClient.watch(ByteSequence.from(prefixPath, Charset.defaultCharset()), WatchOption.builder().build(),
ByteSequence prefixByteSeq = ByteSequence.from(prefixPath, Charset.defaultCharset());
watchClient.watch(
prefixByteSeq,
WatchOption.builder().withPrefix(prefixByteSeq).build(),
resp -> {
List<KeyValue> keyValueList = resp.getEvents().stream().map(WatchEvent::getKeyValue).toList();
instanceChangedListener.onInstanceChanged(name(), application, serviceName,
convertServiceInstanceList(keyValueList));
});
Map<String, EtcdDiscoveryInstance> instanceMap = SingletonManager.getInstance()
.get("instance:" + prefixPath);
resp.getEvents().forEach(event -> {
String key = event.getKeyValue().getKey().toString(Charset.defaultCharset());
switch (event.getEventType()) {
case PUT -> {
EtcdDiscoveryInstance newInstance = getEtcdDiscoveryInstance(event.getKeyValue());
instanceMap.put(key, newInstance);
}
case DELETE -> instanceMap.remove(key);
default -> LOGGER.error("unkonw event");
}
});
List<EtcdDiscoveryInstance> discoveryInstanceList = new ArrayList<>(instanceMap.values());
instanceChangedListener.onInstanceChanged(name(), application, serviceName, discoveryInstanceList);
}
);
} catch (Exception e) {
LOGGER.error("add watch failure", e);
LOGGER.error("Failed to add watch", e);
}
return watchClient;
});

List<KeyValue> endpointKv = getValuesByPrefix(prefixPath);
return convertServiceInstanceList(endpointKv);
List<EtcdDiscoveryInstance> etcdDiscoveryInstances = convertServiceInstanceList(endpointKv);
SingletonManager.getInstance().computeIfAbsent("instance:" + prefixPath, v -> etcdDiscoveryInstances.stream()
.collect(
Collectors.toConcurrentMap(instance -> prefixPath + "/" + instance.getInstanceId(), Function.identity())));
return etcdDiscoveryInstances;
}

public List<KeyValue> getValuesByPrefix(String prefix) {
Expand All @@ -117,18 +138,23 @@ private List<EtcdDiscoveryInstance> convertServiceInstanceList(List<KeyValue> ke

List<EtcdDiscoveryInstance> list = Lists.newArrayListWithExpectedSize(keyValueList.size());
for (KeyValue keyValue : keyValueList) {
String valueJson = new String(keyValue.getValue().getBytes(), Charset.defaultCharset());

EtcdInstance etcdInstance = MuteExceptionUtil.builder()
.withLog("convert json value to obj from etcd failure, {}", valueJson)
.executeFunctionWithDoubleParam(JsonUtils::readValue, valueJson.getBytes(StandardCharsets.UTF_8),
EtcdInstance.class);
EtcdDiscoveryInstance etcdDiscoveryInstance = new EtcdDiscoveryInstance(etcdInstance);
EtcdDiscoveryInstance etcdDiscoveryInstance = getEtcdDiscoveryInstance(keyValue);
list.add(etcdDiscoveryInstance);
}
return list;
}

private static EtcdDiscoveryInstance getEtcdDiscoveryInstance(KeyValue keyValue) {
String valueJson = new String(keyValue.getValue().getBytes(), Charset.defaultCharset());

EtcdInstance etcdInstance = MuteExceptionUtil.builder()
.withLog("convert json value to obj from etcd failure, {}", valueJson)
.executeFunctionWithDoubleParam(JsonUtils::readValue, valueJson.getBytes(StandardCharsets.UTF_8),
EtcdInstance.class);
EtcdDiscoveryInstance etcdDiscoveryInstance = new EtcdDiscoveryInstance(etcdInstance);
return etcdDiscoveryInstance;
}

@Override
public List<String> findServices(String application) {

Expand Down Expand Up @@ -159,13 +185,23 @@ public void init() {

@Override
public void run() {
client = Client.builder().endpoints(etcdRegistryProperties.getConnectString()).build();
if (StringUtils.isEmpty(etcdRegistryProperties.getAuthenticationInfo())) {
this.client = Client.builder().endpoints(etcdRegistryProperties.getConnectString()).build();
} else {
String[] authInfo = etcdRegistryProperties.getAuthenticationInfo().split(":");
this.client = Client.builder()
.endpoints(etcdRegistryProperties.getConnectString())
.user(ByteSequence.from(authInfo[0], Charset.defaultCharset()))
.password(ByteSequence.from(authInfo[1], Charset.defaultCharset()))
.build();
}
}

@Override
public void destroy() {
if (client != null) {
client.close();
SingletonManager.getInstance().destroy();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,18 @@ public static synchronized SingletonManager getInstance() {
return instance;
}

public <T> T getSingleton(String key, Function<? super String, ? extends T> mappingFunction) {
public <T> T computeIfAbsent(String key, Function<? super String, ? extends T> mappingFunction) {
return (T) singletons.computeIfAbsent(key, mappingFunction);
}

public <T> T get(String key) {
return (T) singletons.get(key);
}

public void remove(String key) {
singletons.remove(key);
}

public void destroy() {
singletons.clear();
instance = null;
Expand Down

0 comments on commit fae1f3b

Please sign in to comment.