Skip to content

Commit

Permalink
fix triple client connection management issue, apache#14717, apache#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
chickenlj committed Sep 24, 2024
1 parent eb6158d commit 38e372b
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public static RemoteMetadataService referMetadataService(ServiceInstance instanc
}

remoteMetadataService =
new RemoteMetadataService(consumerModel, proxyFactory.getProxy(invoker), internalModel);
new RemoteMetadataService(consumerModel, proxyFactory.getProxy(invoker), invoker, internalModel);
} else {
Invoker<MetadataService> invoker = protocol.refer(MetadataService.class, url);

Expand All @@ -198,7 +198,7 @@ public static RemoteMetadataService referMetadataService(ServiceInstance instanc
}

remoteMetadataService =
new RemoteMetadataService(consumerModel, proxyFactory.getProxy(invoker), internalModel);
new RemoteMetadataService(consumerModel, proxyFactory.getProxy(invoker), invoker, internalModel);
}

Object metadataServiceProxy = remoteMetadataService.getInternalProxy();
Expand Down Expand Up @@ -320,28 +320,36 @@ public static class RemoteMetadataService {

private MetadataServiceV2 proxyV2;

private Invoker<?> invoker;

private final ModuleModel internalModel;

public RemoteMetadataService(ConsumerModel consumerModel, MetadataService proxy, ModuleModel internalModel) {
public RemoteMetadataService(
ConsumerModel consumerModel, MetadataService proxy, Invoker<?> invoker, ModuleModel internalModel) {
this.consumerModel = consumerModel;
this.proxy = proxy;
this.invoker = invoker;
this.internalModel = internalModel;
}

public RemoteMetadataService(
ConsumerModel consumerModel, MetadataServiceV2 proxyV2, ModuleModel internalModel) {
ConsumerModel consumerModel, MetadataServiceV2 proxyV2, Invoker<?> invoker, ModuleModel internalModel) {
this.consumerModel = consumerModel;
this.proxyV2 = proxyV2;
this.invoker = invoker;
this.internalModel = internalModel;
}

public void destroy() {
// proxy is a reflection proxy that implements Destroyable, call of $destroy() will delegate to
// invoker.destroy()
if (proxy instanceof Destroyable) {
((Destroyable) proxy).$destroy();
}

if (proxyV2 instanceof Destroyable) {
((Destroyable) proxyV2).$destroy();
// proxyV2 is a stub proxy, so it's not an instance of Destroyable
if (proxyV2 != null) {
this.invoker.destroy();
}

internalModel.getServiceRepository().unregisterConsumer(consumerModel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.dubbo.remoting.api.connection;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.rpc.model.FrameworkModel;
Expand All @@ -26,6 +28,9 @@
import java.util.function.Consumer;

public class SingleProtocolConnectionManager implements ConnectionManager {
private static final ErrorTypeAwareLogger logger =
LoggerFactory.getErrorTypeAwareLogger(SingleProtocolConnectionManager.class);

public static final String NAME = "single";

private final ConcurrentMap<String, AbstractConnectionClient> connections = new ConcurrentHashMap<>(16);
Expand All @@ -42,21 +47,34 @@ public AbstractConnectionClient connect(URL url, ChannelHandler handler) {
throw new IllegalArgumentException("url == null");
}
return connections.compute(url.getAddress(), (address, conn) -> {
String transport = url.getParameter(Constants.TRANSPORTER_KEY, "netty4");
if (conn == null) {
String transport = url.getParameter(Constants.TRANSPORTER_KEY, "netty4");
ConnectionManager manager = frameworkModel
.getExtensionLoader(ConnectionManager.class)
.getExtension(transport);
final AbstractConnectionClient connectionClient = manager.connect(url, handler);
connectionClient.addCloseListener(() -> connections.remove(address, connectionClient));
return connectionClient;
return createAbstractConnectionClient(url, handler, address, transport);
} else {
conn.retain();
boolean shouldReuse = conn.retain();
if (!shouldReuse) {
logger.info("Trying to create a new connection for {}.", address);
return createAbstractConnectionClient(url, handler, address, transport);
}
return conn;
}
});
}

private AbstractConnectionClient createAbstractConnectionClient(
URL url, ChannelHandler handler, String address, String transport) {
ConnectionManager manager =
frameworkModel.getExtensionLoader(ConnectionManager.class).getExtension(transport);
final AbstractConnectionClient connectionClient = manager.connect(url, handler);
connectionClient.addCloseListener(() -> {
logger.info(
"Remove closed connection (with reference count==0) for address {}, a new one will be created for upcoming RPC requests routing to this address.",
address);
connections.remove(address, connectionClient);
});
return connectionClient;
}

@Override
public void forEachConnection(Consumer<AbstractConnectionClient> connectionConsumer) {
connections.values().forEach(connectionConsumer);
Expand Down

0 comments on commit 38e372b

Please sign in to comment.