From fae1f3b92710ffb791ab9deb893299c73bdc83a5 Mon Sep 17 00:00:00 2001 From: SweetWuXiaoMei Date: Fri, 25 Oct 2024 15:50:29 +0800 Subject: [PATCH] fix(registry-etcd): Support Etcd authentication and optimize the instance discovery mechanism - Add authentication information configuration in EtcdDiscovery - Implement remote-based observers to monitor changes in service instances - Use SingletonManager to store and manage instance information - Update Docker Compose configuration to allow unauthenticated access - Modify the sayHello method in ProviderController to return the service address --- .../src/main/resources/application.yml | 1 + demo/demo-etcd/provider/pom.xml | 4 - .../samples/ProviderController.java | 2 +- demo/demo-etcd/test-client/pom.xml | 6 +- .../registry/etcd/EtcdDiscovery.java | 74 ++++++++++++++----- .../registry/etcd/SingletonManager.java | 10 ++- 6 files changed, 70 insertions(+), 27 deletions(-) diff --git a/demo/demo-etcd/consumer/src/main/resources/application.yml b/demo/demo-etcd/consumer/src/main/resources/application.yml index 1c686e5544..2bb97c757c 100644 --- a/demo/demo-etcd/consumer/src/main/resources/application.yml +++ b/demo/demo-etcd/consumer/src/main/resources/application.yml @@ -25,6 +25,7 @@ servicecomb: registry: etcd: # enabled: true +# authenticationInfo: chenzhida:chenzhida connectString: http://127.0.0.1:2379 rest: diff --git a/demo/demo-etcd/provider/pom.xml b/demo/demo-etcd/provider/pom.xml index 9a4dbbb000..c1413de54c 100644 --- a/demo/demo-etcd/provider/pom.xml +++ b/demo/demo-etcd/provider/pom.xml @@ -48,10 +48,6 @@ org.apache.servicecomb registry-etcd - - - - io.reactivex.rxjava3 diff --git a/demo/demo-etcd/provider/src/main/java/org/apache/servicecomb/samples/ProviderController.java b/demo/demo-etcd/provider/src/main/java/org/apache/servicecomb/samples/ProviderController.java index e4828bb9ba..5a6a7e3e7a 100644 --- a/demo/demo-etcd/provider/src/main/java/org/apache/servicecomb/samples/ProviderController.java +++ b/demo/demo-etcd/provider/src/main/java/org/apache/servicecomb/samples/ProviderController.java @@ -38,7 +38,7 @@ public void setEnvironment(Environment environment) { // a very simple service to echo the request parameter @GetMapping("/sayHello") public String sayHello(@RequestParam("name") String name) { - return "Hello " + name; + return "Hello " + environment.getProperty("servicecomb.rest.address"); } @GetMapping("/getConfig") diff --git a/demo/demo-etcd/test-client/pom.xml b/demo/demo-etcd/test-client/pom.xml index 461df7e4ac..0c050adcb8 100644 --- a/demo/demo-etcd/test-client/pom.xml +++ b/demo/demo-etcd/test-client/pom.xml @@ -60,13 +60,12 @@ - bitnami/etcd:latest etcd alias - binding to port + etcdserver 2379 @@ -77,6 +76,9 @@ etcd.port:2379 + + yes + diff --git a/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/EtcdDiscovery.java b/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/EtcdDiscovery.java index 0537ebf615..e0bf72237c 100644 --- a/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/EtcdDiscovery.java +++ b/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/EtcdDiscovery.java @@ -18,8 +18,11 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; @@ -40,7 +43,6 @@ import io.etcd.jetcd.kv.GetResponse; import io.etcd.jetcd.options.GetOption; import io.etcd.jetcd.options.WatchOption; -import io.etcd.jetcd.watch.WatchEvent; public class EtcdDiscovery implements Discovery { @@ -54,8 +56,6 @@ public class EtcdDiscovery implements Discovery { private InstanceChangedListener instanceChangedListener; - private static volatile boolean isWatch = false; - private static final Logger LOGGER = LoggerFactory.getLogger(EtcdDiscovery.class); @Autowired @@ -85,22 +85,43 @@ public boolean enabled(String application, String serviceName) { public List findServiceInstances(String application, String serviceName) { String prefixPath = basePath + "/" + application + "/" + serviceName; - SingletonManager.getInstance().getSingleton(prefixPath, serName -> { + SingletonManager.getInstance().computeIfAbsent(prefixPath, serName -> { Watch watchClient = client.getWatchClient(); try { - watchClient.watch(ByteSequence.from(prefixPath, Charset.defaultCharset()), WatchOption.builder().build(), + ByteSequence prefixByteSeq = ByteSequence.from(prefixPath, Charset.defaultCharset()); + watchClient.watch( + prefixByteSeq, + WatchOption.builder().withPrefix(prefixByteSeq).build(), resp -> { - List keyValueList = resp.getEvents().stream().map(WatchEvent::getKeyValue).toList(); - instanceChangedListener.onInstanceChanged(name(), application, serviceName, - convertServiceInstanceList(keyValueList)); - }); + Map instanceMap = SingletonManager.getInstance() + .get("instance:" + prefixPath); + resp.getEvents().forEach(event -> { + String key = event.getKeyValue().getKey().toString(Charset.defaultCharset()); + switch (event.getEventType()) { + case PUT -> { + EtcdDiscoveryInstance newInstance = getEtcdDiscoveryInstance(event.getKeyValue()); + instanceMap.put(key, newInstance); + } + case DELETE -> instanceMap.remove(key); + default -> LOGGER.error("unkonw event"); + } + }); + List discoveryInstanceList = new ArrayList<>(instanceMap.values()); + instanceChangedListener.onInstanceChanged(name(), application, serviceName, discoveryInstanceList); + } + ); } catch (Exception e) { - LOGGER.error("add watch failure", e); + LOGGER.error("Failed to add watch", e); } return watchClient; }); + List endpointKv = getValuesByPrefix(prefixPath); - return convertServiceInstanceList(endpointKv); + List etcdDiscoveryInstances = convertServiceInstanceList(endpointKv); + SingletonManager.getInstance().computeIfAbsent("instance:" + prefixPath, v -> etcdDiscoveryInstances.stream() + .collect( + Collectors.toConcurrentMap(instance -> prefixPath + "/" + instance.getInstanceId(), Function.identity()))); + return etcdDiscoveryInstances; } public List getValuesByPrefix(String prefix) { @@ -117,18 +138,23 @@ private List convertServiceInstanceList(List ke List list = Lists.newArrayListWithExpectedSize(keyValueList.size()); for (KeyValue keyValue : keyValueList) { - String valueJson = new String(keyValue.getValue().getBytes(), Charset.defaultCharset()); - - EtcdInstance etcdInstance = MuteExceptionUtil.builder() - .withLog("convert json value to obj from etcd failure, {}", valueJson) - .executeFunctionWithDoubleParam(JsonUtils::readValue, valueJson.getBytes(StandardCharsets.UTF_8), - EtcdInstance.class); - EtcdDiscoveryInstance etcdDiscoveryInstance = new EtcdDiscoveryInstance(etcdInstance); + EtcdDiscoveryInstance etcdDiscoveryInstance = getEtcdDiscoveryInstance(keyValue); list.add(etcdDiscoveryInstance); } return list; } + private static EtcdDiscoveryInstance getEtcdDiscoveryInstance(KeyValue keyValue) { + String valueJson = new String(keyValue.getValue().getBytes(), Charset.defaultCharset()); + + EtcdInstance etcdInstance = MuteExceptionUtil.builder() + .withLog("convert json value to obj from etcd failure, {}", valueJson) + .executeFunctionWithDoubleParam(JsonUtils::readValue, valueJson.getBytes(StandardCharsets.UTF_8), + EtcdInstance.class); + EtcdDiscoveryInstance etcdDiscoveryInstance = new EtcdDiscoveryInstance(etcdInstance); + return etcdDiscoveryInstance; + } + @Override public List findServices(String application) { @@ -159,13 +185,23 @@ public void init() { @Override public void run() { - client = Client.builder().endpoints(etcdRegistryProperties.getConnectString()).build(); + if (StringUtils.isEmpty(etcdRegistryProperties.getAuthenticationInfo())) { + this.client = Client.builder().endpoints(etcdRegistryProperties.getConnectString()).build(); + } else { + String[] authInfo = etcdRegistryProperties.getAuthenticationInfo().split(":"); + this.client = Client.builder() + .endpoints(etcdRegistryProperties.getConnectString()) + .user(ByteSequence.from(authInfo[0], Charset.defaultCharset())) + .password(ByteSequence.from(authInfo[1], Charset.defaultCharset())) + .build(); + } } @Override public void destroy() { if (client != null) { client.close(); + SingletonManager.getInstance().destroy(); } } } diff --git a/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/SingletonManager.java b/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/SingletonManager.java index 7153ff4db1..f1d2a8b1dd 100644 --- a/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/SingletonManager.java +++ b/service-registry/registry-etcd/src/main/java/org/apache/servicecomb/registry/etcd/SingletonManager.java @@ -36,10 +36,18 @@ public static synchronized SingletonManager getInstance() { return instance; } - public T getSingleton(String key, Function mappingFunction) { + public T computeIfAbsent(String key, Function mappingFunction) { return (T) singletons.computeIfAbsent(key, mappingFunction); } + public T get(String key) { + return (T) singletons.get(key); + } + + public void remove(String key) { + singletons.remove(key); + } + public void destroy() { singletons.clear(); instance = null;