diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/common/Constants.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/common/Constants.java
index d6b98a60f..d975b13fb 100644
--- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/common/Constants.java
+++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/common/Constants.java
@@ -172,13 +172,14 @@ public final class Constants {
public static final String LOCALHOST = "localhost";
public static final String HTTP_OBJECT_AGGREGATOR = "HTTP_OBJECT_AGGREGATOR";
- public static final String WEBSOCKET_PROTOCOL = "ws";
- public static final String WEBSOCKET_PROTOCOL_SECURED = "wss";
+ public static final String WS_SCHEME = "ws";
+ public static final String WSS_SCHEME = "wss";
public static final String WEBSOCKET_UPGRADE = "websocket";
public static final String WEBSOCKET_FRAME_HANDLER = "WEBSOCKET_FRAME_HANDLER";
- public static final String WEBSOCKET_FRAME_BLOCKING_HANDLER = "WEBSOCKET_FRAME_BLOCKING_HANDLER";
+ public static final String MESSAGE_QUEUE_HANDLER = "MESSAGE_QUEUE_HANDLER";
public static final int WEBSOCKET_STATUS_CODE_NORMAL_CLOSURE = 1000;
public static final int WEBSOCKET_STATUS_CODE_GOING_AWAY = 1001;
+ public static final int WEBSOCKET_STATUS_CODE_PROTOCOL_ERROR = 1002;
public static final int WEBSOCKET_STATUS_CODE_ABNORMAL_CLOSURE = 1006;
public static final int WEBSOCKET_STATUS_CODE_UNEXPECTED_CONDITION = 1011;
diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/common/Util.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/common/Util.java
index 9f5afbd90..7fecc32c7 100644
--- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/common/Util.java
+++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/common/Util.java
@@ -20,6 +20,7 @@
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultHttpResponse;
@@ -35,15 +36,22 @@
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.HttpConversionUtil;
+import io.netty.handler.ssl.ReferenceCountedOpenSslContext;
+import io.netty.handler.ssl.ReferenceCountedOpenSslEngine;
+import io.netty.handler.ssl.SslHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.transport.http.netty.common.ssl.SSLConfig;
+import org.wso2.transport.http.netty.common.ssl.SSLHandlerFactory;
import org.wso2.transport.http.netty.config.ChunkConfig;
import org.wso2.transport.http.netty.config.Parameter;
+import org.wso2.transport.http.netty.config.SslConfiguration;
import org.wso2.transport.http.netty.contract.HttpResponseFuture;
import org.wso2.transport.http.netty.message.DefaultListener;
import org.wso2.transport.http.netty.message.HTTPCarbonMessage;
import org.wso2.transport.http.netty.message.Listener;
+import org.wso2.transport.http.netty.sender.CertificateValidationHandler;
+import org.wso2.transport.http.netty.sender.OCSPStaplingHandler;
import java.io.File;
import java.io.IOException;
@@ -54,6 +62,8 @@
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLException;
import static org.wso2.transport.http.netty.common.Constants.COLON;
import static org.wso2.transport.http.netty.common.Constants.HTTP_HOST;
@@ -341,7 +351,7 @@ public static SSLConfig getSSLConfigForSender(String certPass, String keyStorePa
certPass = keyStorePass;
}
if (trustStoreFilePath == null || trustStorePass == null) {
- throw new IllegalArgumentException("TrusStoreFile or trustStorePassword not defined for HTTPS scheme");
+ throw new IllegalArgumentException("TrustStoreFile or trustStorePassword not defined for HTTPS/WSS scheme");
}
SSLConfig sslConfig = new SSLConfig(null, null).setCertPass(null);
@@ -374,6 +384,64 @@ public static SSLConfig getSSLConfigForSender(String certPass, String keyStorePa
return sslConfig;
}
+ /**
+ * Configure outbound HTTP pipeline for SSL configuration.
+ *
+ * @param socketChannel Socket channel of outbound connection
+ * @param sslConfiguration {@link SslConfiguration}
+ * @param host host of the connection
+ * @param port port of the connection
+ * @throws SSLException if any error occurs in the SSL connection
+ */
+ public static void configureHttpPipelineForSSL(SocketChannel socketChannel, String host, int port,
+ SslConfiguration sslConfiguration) throws SSLException {
+ log.debug("adding ssl handler");
+ SSLConfig sslConfig = sslConfiguration.generateSSLConfig();
+ ChannelPipeline pipeline = socketChannel.pipeline();
+ if (sslConfiguration.isOcspStaplingEnabled()) {
+ SSLHandlerFactory sslHandlerFactory = new SSLHandlerFactory(sslConfig);
+ ReferenceCountedOpenSslContext referenceCountedOpenSslContext = sslHandlerFactory
+ .buildClientReferenceCountedOpenSslContext();
+
+ if (referenceCountedOpenSslContext != null) {
+ SslHandler sslHandler = referenceCountedOpenSslContext.newHandler(socketChannel.alloc());
+ ReferenceCountedOpenSslEngine engine = (ReferenceCountedOpenSslEngine) sslHandler.engine();
+ socketChannel.pipeline().addLast(sslHandler);
+ socketChannel.pipeline().addLast(new OCSPStaplingHandler(engine));
+ }
+ } else {
+ SSLEngine sslEngine = instantiateAndConfigSSL(sslConfig, host, port,
+ sslConfiguration.hostNameVerificationEnabled());
+ pipeline.addLast(Constants.SSL_HANDLER, new SslHandler(sslEngine));
+ if (sslConfiguration.validateCertEnabled()) {
+ pipeline.addLast(Constants.HTTP_CERT_VALIDATION_HANDLER, new CertificateValidationHandler(
+ sslEngine, sslConfiguration.getCacheValidityPeriod(), sslConfiguration.getCacheSize()));
+ }
+ }
+ }
+
+ /**
+ * Set configurations to create ssl engine.
+ *
+ * @param sslConfig ssl related configurations
+ * @return ssl engine
+ */
+ public static SSLEngine instantiateAndConfigSSL(SSLConfig sslConfig, String host, int port,
+ boolean hostNameVerificationEnabled) {
+ // set the pipeline factory, which creates the pipeline for each newly created channels
+ SSLEngine sslEngine = null;
+ if (sslConfig != null) {
+ SSLHandlerFactory sslHandlerFactory = new SSLHandlerFactory(sslConfig);
+ sslEngine = sslHandlerFactory.buildClientSSLEngine(host, port);
+ sslEngine.setUseClientMode(true);
+ sslHandlerFactory.setSNIServerNames(sslEngine, host);
+ if (hostNameVerificationEnabled) {
+ sslHandlerFactory.setHostNameVerfication(sslEngine);
+ }
+ }
+ return sslEngine;
+ }
+
/**
* Get integer type property value from a property map.
*
diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/config/SenderConfiguration.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/config/SenderConfiguration.java
index a4766a75d..97aaf1bd8 100644
--- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/config/SenderConfiguration.java
+++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/config/SenderConfiguration.java
@@ -19,25 +19,12 @@
package org.wso2.transport.http.netty.config;
import org.wso2.transport.http.netty.common.ProxyServerConfiguration;
-import org.wso2.transport.http.netty.common.Util;
-import org.wso2.transport.http.netty.common.ssl.SSLConfig;
import org.wso2.transport.http.netty.sender.channel.pool.PoolConfiguration;
-import java.util.ArrayList;
-import java.util.List;
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlAttribute;
-import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlElementWrapper;
-
-
/**
* JAXB representation of the Netty transport sender configuration.
*/
-@SuppressWarnings("unused")
-@XmlAccessorType(XmlAccessType.FIELD)
-public class SenderConfiguration {
+public class SenderConfiguration extends SslConfiguration {
private static final String DEFAULT_KEY = "netty";
@@ -48,57 +35,17 @@ public static SenderConfiguration getDefault() {
return defaultConfig;
}
- @XmlAttribute(required = true)
private String id = DEFAULT_KEY;
-
- @XmlAttribute
- private String scheme = "http";
-
- @XmlAttribute
- private String keyStoreFile;
-
- @XmlAttribute
- private String keyStorePassword;
-
- @XmlAttribute
- private String trustStoreFile;
-
- @XmlAttribute
- private String trustStorePass;
-
- @XmlAttribute
- private String certPass;
-
- @XmlAttribute
private int socketIdleTimeout = 60000;
-
- @XmlAttribute
private boolean httpTraceLogEnabled;
-
private ChunkConfig chunkingConfig = ChunkConfig.AUTO;
-
- @XmlAttribute
- private String sslProtocol;
-
- @XmlElementWrapper(name = "parameters")
- @XmlElement(name = "parameter")
- private List parameters = new ArrayList<>();
-
private KeepAliveConfig keepAliveConfig = KeepAliveConfig.AUTO;
-
- @XmlAttribute
private boolean forceHttp2 = false;
-
- private String tlsStoreType;
private String httpVersion = "1.1";
private ProxyServerConfiguration proxyServerConfiguration;
private PoolConfiguration poolConfiguration;
- private boolean validateCertEnabled;
- private int cacheSize = 50;
- private int cacheValidityPeriod = 15;
- private boolean hostNameVerificationEnabled = true;
+
private ForwardedExtensionConfig forwardedExtensionConfig;
- private boolean ocspStaplingEnabled = false;
public SenderConfiguration() {
this.poolConfiguration = new PoolConfiguration();
@@ -109,30 +56,6 @@ public SenderConfiguration(String id) {
this.poolConfiguration = new PoolConfiguration();
}
- public void setSSLProtocol(String sslProtocol) {
- this.sslProtocol = sslProtocol;
- }
-
- public String getSSLProtocol() {
- return sslProtocol;
- }
-
- public String getCertPass() {
- return certPass;
- }
-
- public String getTLSStoreType() {
- return tlsStoreType;
- }
-
- public void setTLSStoreType(String storeType) {
- this.tlsStoreType = storeType;
- }
-
- public void setCertPass(String certPass) {
- this.certPass = certPass;
- }
-
public String getId() {
return id;
}
@@ -141,62 +64,6 @@ public void setId(String id) {
this.id = id;
}
- public String getKeyStoreFile() {
- return keyStoreFile;
- }
-
- public void setKeyStoreFile(String keyStoreFile) {
- this.keyStoreFile = keyStoreFile;
- }
-
- public String getKeyStorePassword() {
- return keyStorePassword;
- }
-
- public void setKeyStorePassword(String keyStorePassword) {
- this.keyStorePassword = keyStorePassword;
- }
-
- public String getScheme() {
- return scheme;
- }
-
- public void setScheme(String scheme) {
- this.scheme = scheme;
- }
-
- public List getParameters() {
- return parameters;
- }
-
- public void setParameters(List parameters) {
- this.parameters = parameters;
- }
-
- public String getTrustStoreFile() {
- return trustStoreFile;
- }
-
- public void setTrustStoreFile(String trustStoreFile) {
- this.trustStoreFile = trustStoreFile;
- }
-
- public String getTrustStorePass() {
- return trustStorePass;
- }
-
- public void setTrustStorePass(String trustStorePass) {
- this.trustStorePass = trustStorePass;
- }
-
- public SSLConfig getSSLConfig() {
- if (scheme == null || !scheme.equalsIgnoreCase("https")) {
- return null;
- }
- return Util.getSSLConfigForSender(certPass, keyStorePassword, keyStoreFile, trustStoreFile, trustStorePass,
- parameters, sslProtocol, tlsStoreType);
- }
-
public int getSocketIdleTimeout(int defaultValue) {
if (socketIdleTimeout == 0) {
return defaultValue;
@@ -258,46 +125,6 @@ public void setForceHttp2(boolean forceHttp2) {
this.forceHttp2 = forceHttp2;
}
- public void setValidateCertEnabled(boolean validateCertEnabled) {
- this.validateCertEnabled = validateCertEnabled;
- }
-
- public void setCacheSize(int cacheSize) {
- this.cacheSize = cacheSize;
- }
-
- public void setCacheValidityPeriod(int cacheValidityPeriod) {
- this.cacheValidityPeriod = cacheValidityPeriod;
- }
-
- public boolean validateCertEnabled() {
- return validateCertEnabled;
- }
-
- public int getCacheSize() {
- return cacheSize;
- }
-
- public void setHostNameVerificationEnabled(boolean hostNameVerificationEnabled) {
- this.hostNameVerificationEnabled = hostNameVerificationEnabled;
- }
-
- public boolean hostNameVerificationEnabled() {
- return hostNameVerificationEnabled;
- }
-
- public int getCacheValidityPeriod() {
- return cacheValidityPeriod;
- }
-
- public void setOcspStaplingEnabled(boolean ocspStaplingEnabled) {
- this.ocspStaplingEnabled = ocspStaplingEnabled;
- }
-
- public boolean isOcspStaplingEnabled() {
- return ocspStaplingEnabled;
- }
-
public PoolConfiguration getPoolConfiguration() {
return poolConfiguration;
}
diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/config/SslConfiguration.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/config/SslConfiguration.java
new file mode 100644
index 000000000..4ed7f3bee
--- /dev/null
+++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/config/SslConfiguration.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.wso2.transport.http.netty.config;
+
+import org.wso2.transport.http.netty.common.Util;
+import org.wso2.transport.http.netty.common.ssl.SSLConfig;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * SSL configuration for HTTP connection.
+ */
+public class SslConfiguration {
+
+ private String scheme = "http";
+ private String keyStoreFile;
+ private String keyStorePassword;
+ private String trustStoreFile;
+ private String trustStorePass;
+ private String certPass;
+ private String sslProtocol;
+ private List parameters = new ArrayList<>();
+ private String tlsStoreType;
+ private boolean hostNameVerificationEnabled = true;
+ private boolean validateCertEnabled;
+ private int cacheValidityPeriod = 15;
+ private int cacheSize = 50;
+ private boolean ocspStaplingEnabled = false;
+
+ public String getCertPass() {
+ return certPass;
+ }
+
+ public void setCertPass(String certPass) {
+ this.certPass = certPass;
+ }
+
+ public String getKeyStoreFile() {
+ return keyStoreFile;
+ }
+
+ public void setKeyStoreFile(String keyStoreFile) {
+ this.keyStoreFile = keyStoreFile;
+ }
+
+ public String getKeyStorePassword() {
+ return keyStorePassword;
+ }
+
+ public void setKeyStorePassword(String keyStorePassword) {
+ this.keyStorePassword = keyStorePassword;
+ }
+
+ public String getScheme() {
+ return scheme;
+ }
+
+ public void setScheme(String scheme) {
+ this.scheme = scheme;
+ }
+
+ public String getTrustStoreFile() {
+ return trustStoreFile;
+ }
+
+ public void setTrustStoreFile(String trustStoreFile) {
+ this.trustStoreFile = trustStoreFile;
+ }
+
+ public String getTrustStorePass() {
+ return trustStorePass;
+ }
+
+ public void setTrustStorePass(String trustStorePass) {
+ this.trustStorePass = trustStorePass;
+ }
+
+ public void setSSLProtocol(String sslProtocol) {
+ this.sslProtocol = sslProtocol;
+ }
+
+ public String getSSLProtocol() {
+ return sslProtocol;
+ }
+
+ public List getParameters() {
+ return parameters;
+ }
+
+ public void setParameters(List parameters) {
+ this.parameters = parameters;
+ }
+
+ public String getTLSStoreType() {
+ return tlsStoreType;
+ }
+
+ public void setTLSStoreType(String storeType) {
+ this.tlsStoreType = storeType;
+ }
+
+ public void setValidateCertEnabled(boolean validateCertEnabled) {
+ this.validateCertEnabled = validateCertEnabled;
+ }
+
+ public boolean validateCertEnabled() {
+ return validateCertEnabled;
+ }
+
+ public void setHostNameVerificationEnabled(boolean hostNameVerificationEnabled) {
+ this.hostNameVerificationEnabled = hostNameVerificationEnabled;
+ }
+
+ public boolean hostNameVerificationEnabled() {
+ return hostNameVerificationEnabled;
+ }
+
+ public void setCacheValidityPeriod(int cacheValidityPeriod) {
+ this.cacheValidityPeriod = cacheValidityPeriod;
+ }
+
+ public int getCacheValidityPeriod() {
+ return cacheValidityPeriod;
+ }
+
+ public void setCacheSize(int cacheSize) {
+ this.cacheSize = cacheSize;
+ }
+
+ public int getCacheSize() {
+ return cacheSize;
+ }
+
+ public void setOcspStaplingEnabled(boolean ocspStaplingEnabled) {
+ this.ocspStaplingEnabled = ocspStaplingEnabled;
+ }
+
+ public boolean isOcspStaplingEnabled() {
+ return ocspStaplingEnabled;
+ }
+
+ public SSLConfig generateSSLConfig() {
+ if (scheme == null || !scheme.equalsIgnoreCase("https")) {
+ return null;
+ }
+ return Util.getSSLConfigForSender(certPass, keyStorePassword, keyStoreFile, trustStoreFile, trustStorePass,
+ parameters, sslProtocol, tlsStoreType);
+ }
+}
diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/HttpWsConnectorFactory.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/HttpWsConnectorFactory.java
index 27380c1f3..8ee03e78f 100644
--- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/HttpWsConnectorFactory.java
+++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/HttpWsConnectorFactory.java
@@ -22,7 +22,7 @@
import org.wso2.transport.http.netty.config.ListenerConfiguration;
import org.wso2.transport.http.netty.config.SenderConfiguration;
import org.wso2.transport.http.netty.contract.websocket.WebSocketClientConnector;
-import org.wso2.transport.http.netty.contract.websocket.WsClientConnectorConfig;
+import org.wso2.transport.http.netty.contract.websocket.WebSocketClientConnectorConfig;
import org.wso2.transport.http.netty.listener.ServerBootstrapConfiguration;
import java.util.Map;
@@ -57,7 +57,7 @@ HttpClientConnector createHttpClientConnector(Map transportPrope
* @param clientConnectorConfig Properties to create a client connector.
* @return WebSocketClientConnector.
*/
- WebSocketClientConnector createWsClientConnector(WsClientConnectorConfig clientConnectorConfig);
+ WebSocketClientConnector createWsClientConnector(WebSocketClientConnectorConfig clientConnectorConfig);
/**
* Shutdown all the server channels and the accepted channels. It also shutdown all the eventloop groups.
diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/ServerConnectorFuture.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/ServerConnectorFuture.java
index 72197925b..f80d943f2 100644
--- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/ServerConnectorFuture.java
+++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/ServerConnectorFuture.java
@@ -27,7 +27,7 @@
public interface ServerConnectorFuture extends HttpConnectorFuture, WebSocketConnectorFuture {
/**
- * Set life cycle event listener for the HTTP/WS connector
+ * Set life cycle event listener for the HTTP/WS_SCHEME connector
*
* @param portBindingEventListener The PortBindingEventListener implementation
*/
diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/WsClientConnectorConfig.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/WebSocketClientConnectorConfig.java
similarity index 82%
rename from components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/WsClientConnectorConfig.java
rename to components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/WebSocketClientConnectorConfig.java
index a353f5dd2..503279191 100644
--- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/WsClientConnectorConfig.java
+++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/WebSocketClientConnectorConfig.java
@@ -19,35 +19,32 @@
package org.wso2.transport.http.netty.contract.websocket;
+import io.netty.handler.codec.http.DefaultHttpHeaders;
+import io.netty.handler.codec.http.HttpHeaders;
+import org.wso2.transport.http.netty.common.Constants;
+import org.wso2.transport.http.netty.config.SslConfiguration;
+
+import java.net.URI;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
- * Sender configuration for WebSocket client connector.
+ * Configuration for WebSocket client connector.
*/
-public class WsClientConnectorConfig {
+public class WebSocketClientConnectorConfig extends SslConfiguration {
private final String remoteAddress;
private List subProtocols;
private int idleTimeoutInSeconds;
private boolean autoRead;
- private final Map headers = new HashMap<>();
-
- public WsClientConnectorConfig(String remoteAddress) {
- this.remoteAddress = remoteAddress;
- this.idleTimeoutInSeconds = -1;
- this.autoRead = true;
-
- }
+ private final HttpHeaders headers;
- public WsClientConnectorConfig(String remoteAddress, List subProtocols,
- int idleTimeoutInSeconds, boolean autoRead) {
+ public WebSocketClientConnectorConfig(String remoteAddress) {
this.remoteAddress = remoteAddress;
- this.subProtocols = subProtocols;
- this.idleTimeoutInSeconds = idleTimeoutInSeconds;
- this.autoRead = autoRead;
+ this.headers = new DefaultHttpHeaders();
+ this.setScheme(Constants.WSS_SCHEME.equals(URI.create(remoteAddress).getScheme())
+ ? Constants.HTTPS_SCHEME : Constants.HTTP_SCHEME);
}
/**
@@ -126,7 +123,7 @@ public String getRemoteAddress() {
* @param headers Headers map.
*/
public void addHeaders(Map headers) {
- this.headers.putAll(headers);
+ headers.forEach(this.headers::add);
}
/**
@@ -136,7 +133,7 @@ public void addHeaders(Map headers) {
* @param value Value of the header.
*/
public void addHeader(String key, String value) {
- this.headers.put(key, value);
+ this.headers.add(key, value);
}
/**
@@ -144,7 +141,7 @@ public void addHeader(String key, String value) {
*
* @return all the headers as a map.
*/
- public Map getHeaders() {
+ public HttpHeaders getHeaders() {
return headers;
}
@@ -155,7 +152,7 @@ public Map getHeaders() {
* @return true of the header is present.
*/
public boolean containsHeader(String key) {
- return headers.containsKey(key);
+ return headers.contains(key);
}
/**
diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/WebSocketControlMessage.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/WebSocketControlMessage.java
index ca86152c8..cc6099980 100644
--- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/WebSocketControlMessage.java
+++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contract/websocket/WebSocketControlMessage.java
@@ -19,12 +19,10 @@
package org.wso2.transport.http.netty.contract.websocket;
-import java.nio.ByteBuffer;
-
/**
- * This message contains the details of WebSocket bong message.
+ * This message contains the details of WebSocket control message.
*/
-public interface WebSocketControlMessage extends WebSocketMessage {
+public interface WebSocketControlMessage extends WebSocketBinaryMessage {
/**
* Get the control signal.
@@ -32,18 +30,4 @@ public interface WebSocketControlMessage extends WebSocketMessage {
* @return the control signal as a {@link WebSocketControlSignal}.
*/
WebSocketControlSignal getControlSignal();
-
- /**
- * Get the payload of the control signal.
- *
- * @return the payload of the control signal.
- */
- ByteBuffer getPayload();
-
- /**
- * Get the binary data as a byte array.
- *
- * @return the binary data as a byte array.
- */
- byte[] getByteArray();
}
diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/DefaultHttpClientConnector.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/DefaultHttpClientConnector.java
index 9f3bc1502..96c0e4199 100644
--- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/DefaultHttpClientConnector.java
+++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/DefaultHttpClientConnector.java
@@ -292,7 +292,7 @@ private void initTargetChannelProperties(SenderConfiguration senderConfiguration
this.httpVersion = senderConfiguration.getHttpVersion();
this.chunkConfig = senderConfiguration.getChunkingConfig();
this.socketIdleTimeout = senderConfiguration.getSocketIdleTimeout(Constants.ENDPOINT_TIMEOUT);
- this.sslConfig = senderConfiguration.getSSLConfig();
+ this.sslConfig = senderConfiguration.generateSSLConfig();
this.keepAliveConfig = senderConfiguration.getKeepAliveConfig();
this.forwardedExtensionConfig = senderConfiguration.getForwardedExtensionConfig();
}
diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/DefaultHttpWsConnectorFactory.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/DefaultHttpWsConnectorFactory.java
index e58076bcd..8f3495cab 100644
--- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/DefaultHttpWsConnectorFactory.java
+++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/DefaultHttpWsConnectorFactory.java
@@ -31,7 +31,7 @@
import org.wso2.transport.http.netty.contract.HttpWsConnectorFactory;
import org.wso2.transport.http.netty.contract.ServerConnector;
import org.wso2.transport.http.netty.contract.websocket.WebSocketClientConnector;
-import org.wso2.transport.http.netty.contract.websocket.WsClientConnectorConfig;
+import org.wso2.transport.http.netty.contract.websocket.WebSocketClientConnectorConfig;
import org.wso2.transport.http.netty.contractimpl.websocket.DefaultWebSocketClientConnector;
import org.wso2.transport.http.netty.listener.ServerBootstrapConfiguration;
import org.wso2.transport.http.netty.listener.ServerConnectorBootstrap;
@@ -96,7 +96,7 @@ public HttpClientConnector createHttpClientConnector(
}
@Override
- public WebSocketClientConnector createWsClientConnector(WsClientConnectorConfig clientConnectorConfig) {
+ public WebSocketClientConnector createWsClientConnector(WebSocketClientConnectorConfig clientConnectorConfig) {
return new DefaultWebSocketClientConnector(clientConnectorConfig, clientGroup);
}
diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultClientHandshakeFuture.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultClientHandshakeFuture.java
index cb38c2a81..5886395b2 100644
--- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultClientHandshakeFuture.java
+++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultClientHandshakeFuture.java
@@ -42,8 +42,7 @@ public void setClientHandshakeListener(ClientHandshakeListener clientHandshakeLi
this.clientHandshakeListener = clientHandshakeListener;
if (throwable != null) {
clientHandshakeListener.onError(throwable, response);
- }
- if (webSocketConnection != null && response != null) {
+ } else if (webSocketConnection != null && response != null) {
clientHandshakeListener.onSuccess(webSocketConnection, response);
}
}
diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultWebSocketClientConnector.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultWebSocketClientConnector.java
index 47145c329..594d947ee 100644
--- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultWebSocketClientConnector.java
+++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultWebSocketClientConnector.java
@@ -22,37 +22,23 @@
import io.netty.channel.EventLoopGroup;
import org.wso2.transport.http.netty.contract.websocket.ClientHandshakeFuture;
import org.wso2.transport.http.netty.contract.websocket.WebSocketClientConnector;
-import org.wso2.transport.http.netty.contract.websocket.WsClientConnectorConfig;
+import org.wso2.transport.http.netty.contract.websocket.WebSocketClientConnectorConfig;
import org.wso2.transport.http.netty.sender.websocket.WebSocketClient;
-import java.util.Map;
-
/**
* Implementation of WebSocket client connector.
*/
public class DefaultWebSocketClientConnector implements WebSocketClientConnector {
- private final String remoteUrl;
- private final String subProtocols;
- private final int idleTimeout;
- private final Map customHeaders;
- private final EventLoopGroup wsClientEventLoopGroup;
- private final boolean autoRead;
+ private final WebSocketClient webSocketClient;
- public DefaultWebSocketClientConnector(WsClientConnectorConfig clientConnectorConfig,
+ public DefaultWebSocketClientConnector(WebSocketClientConnectorConfig clientConnectorConfig,
EventLoopGroup wsClientEventLoopGroup) {
- this.remoteUrl = clientConnectorConfig.getRemoteAddress();
- this.subProtocols = clientConnectorConfig.getSubProtocolsAsCSV();
- this.customHeaders = clientConnectorConfig.getHeaders();
- this.idleTimeout = clientConnectorConfig.getIdleTimeoutInMillis();
- this.wsClientEventLoopGroup = wsClientEventLoopGroup;
- this.autoRead = clientConnectorConfig.isAutoRead();
+ this.webSocketClient = new WebSocketClient(wsClientEventLoopGroup, clientConnectorConfig);
}
@Override
public ClientHandshakeFuture connect() {
- WebSocketClient webSocketClient = new WebSocketClient(remoteUrl, subProtocols, idleTimeout,
- wsClientEventLoopGroup, customHeaders, autoRead);
return webSocketClient.handshake();
}
}
diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultWebSocketConnection.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultWebSocketConnection.java
index 19ec58a84..dba160aac 100644
--- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultWebSocketConnection.java
+++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultWebSocketConnection.java
@@ -15,7 +15,7 @@
import org.wso2.transport.http.netty.contract.websocket.WebSocketConnection;
import org.wso2.transport.http.netty.contract.websocket.WebSocketFrameType;
import org.wso2.transport.http.netty.internal.websocket.DefaultWebSocketSession;
-import org.wso2.transport.http.netty.listener.WebSocketFramesBlockingHandler;
+import org.wso2.transport.http.netty.listener.MessageQueueHandler;
import java.nio.ByteBuffer;
import javax.websocket.Session;
@@ -28,17 +28,16 @@ public class DefaultWebSocketConnection implements WebSocketConnection {
private final WebSocketInboundFrameHandler frameHandler;
private final ChannelHandlerContext ctx;
private final DefaultWebSocketSession session;
- private WebSocketFramesBlockingHandler blockingHandler;
+ private MessageQueueHandler messageQueueHandler;
private WebSocketFrameType continuationFrameType;
private boolean closeFrameSent;
private int closeInitiatedStatusCode;
public DefaultWebSocketConnection(ChannelHandlerContext ctx, WebSocketInboundFrameHandler frameHandler,
- WebSocketFramesBlockingHandler blockingHandler,
- DefaultWebSocketSession session) {
+ MessageQueueHandler messageQueueHandler, DefaultWebSocketSession session) {
this.ctx = ctx;
this.frameHandler = frameHandler;
- this.blockingHandler = blockingHandler;
+ this.messageQueueHandler = messageQueueHandler;
this.session = session;
}
@@ -54,20 +53,20 @@ public Session getSession() {
@Override
public void readNextFrame() {
- blockingHandler.readNextFrame();
+ messageQueueHandler.readNextFrame();
}
@Override
public void startReadingFrames() {
- ctx.pipeline().remove(Constants.WEBSOCKET_FRAME_BLOCKING_HANDLER);
+ ctx.pipeline().remove(Constants.MESSAGE_QUEUE_HANDLER);
ctx.channel().config().setAutoRead(true);
}
@Override
public void stopReadingFrames() {
ctx.channel().config().setAutoRead(false);
- ctx.pipeline().addBefore(Constants.WEBSOCKET_FRAME_HANDLER, Constants.WEBSOCKET_FRAME_BLOCKING_HANDLER,
- blockingHandler = new WebSocketFramesBlockingHandler());
+ ctx.pipeline().addBefore(Constants.WEBSOCKET_FRAME_HANDLER, Constants.MESSAGE_QUEUE_HANDLER,
+ messageQueueHandler = new MessageQueueHandler());
}
@Override
diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultWebSocketMessage.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultWebSocketMessage.java
index dd70f4b1a..7b4d3c143 100644
--- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultWebSocketMessage.java
+++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/DefaultWebSocketMessage.java
@@ -36,7 +36,7 @@ public class DefaultWebSocketMessage implements WebSocketMessage {
protected boolean secureConnection;
protected boolean isServerMessage;
protected WebSocketConnection webSocketConnection;
- protected String sessionlID;
+ protected String sessionID;
public void setProperty(String key, Object value) {
properties.put(key, value);
@@ -54,8 +54,8 @@ public Map getProperties() {
return properties;
}
- public void setSessionlID(String sessionlID) {
- this.sessionlID = sessionlID;
+ public void setSessionID(String sessionID) {
+ this.sessionID = sessionID;
}
public void setTarget(String target) {
@@ -76,7 +76,7 @@ public String getListenerInterface() {
return listenerInterface;
}
- public void setIsConnectionSecured(boolean isConnectionSecured) {
+ public void setIsSecureConnection(boolean isConnectionSecured) {
this.secureConnection = isConnectionSecured;
}
@@ -105,6 +105,6 @@ public WebSocketConnection getWebSocketConnection() {
@Override
public String getSessionID() {
- return sessionlID;
+ return sessionID;
}
}
diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/WebSocketInboundFrameHandler.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/WebSocketInboundFrameHandler.java
index a275f84ac..de3a073e9 100644
--- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/WebSocketInboundFrameHandler.java
+++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/WebSocketInboundFrameHandler.java
@@ -47,7 +47,7 @@
import org.wso2.transport.http.netty.contractimpl.websocket.message.DefaultWebSocketControlMessage;
import org.wso2.transport.http.netty.exception.UnknownWebSocketFrameTypeException;
import org.wso2.transport.http.netty.internal.websocket.WebSocketUtil;
-import org.wso2.transport.http.netty.listener.WebSocketFramesBlockingHandler;
+import org.wso2.transport.http.netty.listener.MessageQueueHandler;
import java.net.InetSocketAddress;
@@ -63,17 +63,17 @@ public class WebSocketInboundFrameHandler extends ChannelInboundHandlerAdapter {
private final boolean securedConnection;
private final String target;
private final String interfaceId;
- private final WebSocketFramesBlockingHandler blockingHandler;
private DefaultWebSocketConnection webSocketConnection;
private ChannelHandlerContext ctx;
private ChannelPromise closePromise;
private WebSocketFrameType continuationFrameType;
+ private final MessageQueueHandler blockingHandler;
private boolean caughtException;
private boolean closeFrameReceived;
private boolean closeInitialized;
public WebSocketInboundFrameHandler(WebSocketConnectorFuture connectorFuture,
- WebSocketFramesBlockingHandler blockingHandler, boolean isServer,
+ MessageQueueHandler blockingHandler, boolean isServer,
boolean securedConnection, String target, String interfaceId) {
this.connectorFuture = connectorFuture;
this.blockingHandler = blockingHandler;
@@ -147,7 +147,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws WebSocketConnector
return;
}
- if (closePromise != null && !closePromise.isDone()) {
+ if (closePromise != null && !closeFrameReceived) {
String errMsg = "Connection is closed by remote endpoint without echoing a close frame";
ctx.close().addListener(closeFuture -> closePromise.setFailure(new IllegalStateException(errMsg)));
}
@@ -175,6 +175,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
notifyBinaryMessage(binaryFrame, binaryFrame.content(), binaryFrame.isFinalFragment());
} else if (msg instanceof CloseWebSocketFrame) {
+ closeFrameReceived = true;
notifyCloseMessage((CloseWebSocketFrame) msg);
} else if (msg instanceof PingWebSocketFrame) {
notifyPingMessage((PingWebSocketFrame) msg);
@@ -228,7 +229,6 @@ private void notifyCloseMessage(CloseWebSocketFrame closeWebSocketFrame) throws
if (closePromise == null) {
DefaultWebSocketMessage webSocketCloseMessage = new DefaultWebSocketCloseMessage(statusCode, reasonText);
setupCommonProperties(webSocketCloseMessage);
- closeFrameReceived = true;
connectorFuture.notifyWebSocketListener((WebSocketCloseMessage) webSocketCloseMessage);
} else {
if (webSocketConnection.getCloseInitiatedStatusCode() != closeWebSocketFrame.statusCode()) {
@@ -267,9 +267,9 @@ private void notifyIdleTimeout() throws WebSocketConnectorException {
private void setupCommonProperties(DefaultWebSocketMessage webSocketMessage) {
webSocketMessage.setTarget(target);
webSocketMessage.setListenerInterface(interfaceId);
- webSocketMessage.setIsConnectionSecured(securedConnection);
+ webSocketMessage.setIsSecureConnection(securedConnection);
webSocketMessage.setWebSocketConnection(webSocketConnection);
- webSocketMessage.setSessionlID(webSocketConnection.getId());
+ webSocketMessage.setSessionID(webSocketConnection.getId());
webSocketMessage.setIsServerMessage(isServer);
webSocketMessage.setProperty(Constants.LISTENER_PORT,
((InetSocketAddress) ctx.channel().localAddress()).getPort());
diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/DefaultWebSocketControlMessage.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/DefaultWebSocketControlMessage.java
index 0b38b6343..41180a71e 100644
--- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/DefaultWebSocketControlMessage.java
+++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/DefaultWebSocketControlMessage.java
@@ -21,45 +21,23 @@
import org.wso2.transport.http.netty.contract.websocket.WebSocketControlMessage;
import org.wso2.transport.http.netty.contract.websocket.WebSocketControlSignal;
-import org.wso2.transport.http.netty.contractimpl.websocket.DefaultWebSocketMessage;
import java.nio.ByteBuffer;
/**
* Implementation of WebSocket control message.
*/
-public class DefaultWebSocketControlMessage extends DefaultWebSocketMessage implements WebSocketControlMessage {
+public class DefaultWebSocketControlMessage extends DefaultWebSocketBinaryMessage implements WebSocketControlMessage {
private final WebSocketControlSignal controlSignal;
- private final ByteBuffer buffer;
public DefaultWebSocketControlMessage(WebSocketControlSignal controlSignal, ByteBuffer buffer) {
+ super(buffer, true);
this.controlSignal = controlSignal;
- this.buffer = buffer;
}
@Override
public WebSocketControlSignal getControlSignal() {
return controlSignal;
}
-
- @Override
- public byte[] getByteArray() {
- byte[] bytes;
- if (buffer.hasArray()) {
- bytes = buffer.array();
- } else {
- int remaining = buffer.remaining();
- bytes = new byte[remaining];
- for (int i = 0; i < remaining; i++) {
- bytes[i] = buffer.get();
- }
- }
- return bytes;
- }
-
- @Override
- public ByteBuffer getPayload() {
- return buffer;
- }
}
diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/DefaultWebSocketInitMessage.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/DefaultWebSocketInitMessage.java
index eb872307a..533c028c3 100644
--- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/DefaultWebSocketInitMessage.java
+++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/contractimpl/websocket/message/DefaultWebSocketInitMessage.java
@@ -30,7 +30,6 @@
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
-import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.handler.timeout.IdleStateHandler;
@@ -43,7 +42,7 @@
import org.wso2.transport.http.netty.contractimpl.websocket.DefaultWebSocketMessage;
import org.wso2.transport.http.netty.contractimpl.websocket.WebSocketInboundFrameHandler;
import org.wso2.transport.http.netty.internal.websocket.WebSocketUtil;
-import org.wso2.transport.http.netty.listener.WebSocketFramesBlockingHandler;
+import org.wso2.transport.http.netty.listener.MessageQueueHandler;
import org.wso2.transport.http.netty.message.HttpCarbonRequest;
import java.nio.charset.StandardCharsets;
@@ -67,7 +66,7 @@ public DefaultWebSocketInitMessage(ChannelHandlerContext ctx, ServerConnectorFut
this.connectorFuture = connectorFuture;
this.secureConnection = ctx.channel().pipeline().get(Constants.SSL_HANDLER) != null;
this.httpRequest = httpRequest;
- this.sessionlID = WebSocketUtil.getSessionID(ctx);
+ this.sessionID = WebSocketUtil.getSessionID(ctx);
}
@Override
@@ -118,31 +117,28 @@ public ServerHandshakeFuture handshake(String[] subProtocols, boolean allowExten
@Override
public ChannelFuture cancelHandshake(int statusCode, String closeReason) {
- if (!cancelled && !handshakeStarted) {
- try {
- int responseStatusCode = statusCode >= 400 && statusCode < 500 ? statusCode : 400;
- ChannelFuture responseFuture;
- if (closeReason != null) {
- ByteBuf content = Unpooled.wrappedBuffer(closeReason.getBytes(StandardCharsets.UTF_8));
- responseFuture = ctx.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
- HttpResponseStatus
- .valueOf(responseStatusCode),
- content));
- } else {
- responseFuture = ctx.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
- HttpResponseStatus.valueOf(
- responseStatusCode)));
- }
- return responseFuture;
- } finally {
- cancelled = true;
- }
- } else {
- if (cancelled) {
- throw new IllegalStateException("Cannot cancel the handshake: handshake already cancelled");
+ if (cancelled) {
+ throw new IllegalStateException("Cannot cancel the handshake: handshake already cancelled");
+ }
+
+ if (handshakeStarted) {
+ throw new IllegalStateException("Cannot cancel the handshake: handshake already started");
+ }
+
+ try {
+ int responseStatusCode = statusCode >= 400 && statusCode < 500 ? statusCode : 400;
+ ChannelFuture responseFuture;
+ if (closeReason != null) {
+ ByteBuf content = Unpooled.wrappedBuffer(closeReason.getBytes(StandardCharsets.UTF_8));
+ responseFuture = ctx.writeAndFlush(new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(responseStatusCode), content));
} else {
- throw new IllegalStateException("Cannot cancel the handshake: handshake already started");
+ responseFuture = ctx.writeAndFlush(new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(responseStatusCode)));
}
+ return responseFuture;
+ } finally {
+ cancelled = true;
}
}
@@ -159,60 +155,52 @@ public boolean isHandshakeStarted() {
private ServerHandshakeFuture handleHandshake(WebSocketServerHandshaker handshaker, int idleTimeout,
HttpHeaders headers) {
DefaultServerHandshakeFuture handshakeFuture = new DefaultServerHandshakeFuture();
-
if (cancelled) {
Throwable e = new IllegalAccessException("Handshake is already cancelled!");
handshakeFuture.notifyError(e);
return handshakeFuture;
}
-
- try {
- ChannelFuture channelFuture = handshaker.handshake(ctx.channel(), httpRequest, headers,
- ctx.channel().newPromise());
- channelFuture.addListener(future -> {
+ ChannelFuture channelFuture =
+ handshaker.handshake(ctx.channel(), httpRequest, headers, ctx.channel().newPromise());
+ channelFuture.addListener(future -> {
+ if (future.isSuccess() && future.cause() == null) {
String selectedSubProtocol = handshaker.selectedSubprotocol();
- WebSocketFramesBlockingHandler blockingHandler = new WebSocketFramesBlockingHandler();
+ MessageQueueHandler messageQueueHandler = new MessageQueueHandler();
WebSocketInboundFrameHandler frameHandler = new WebSocketInboundFrameHandler(connectorFuture,
- blockingHandler, true, secureConnection, target, listenerInterface);
-
- //Replace HTTP handlers with new Handlers for WebSocket in the pipeline
- ChannelPipeline pipeline = ctx.pipeline();
- if (idleTimeout > 0) {
- pipeline.replace(Constants.IDLE_STATE_HANDLER, Constants.IDLE_STATE_HANDLER,
- new IdleStateHandler(idleTimeout, idleTimeout, idleTimeout,
- TimeUnit.MILLISECONDS));
- } else {
- pipeline.remove(Constants.IDLE_STATE_HANDLER);
- }
- pipeline.addLast(Constants.WEBSOCKET_FRAME_BLOCKING_HANDLER, blockingHandler);
- pipeline.addLast(Constants.WEBSOCKET_FRAME_HANDLER, frameHandler);
- pipeline.remove(Constants.HTTP_SOURCE_HANDLER);
- pipeline.fireChannelActive();
- // Make sure to get WebSocket connection after fireChannelActive
+ messageQueueHandler, true, secureConnection, target, listenerInterface);
+ configureFrameHandlingPipeline(idleTimeout, messageQueueHandler, frameHandler);
DefaultWebSocketConnection webSocketConnection = frameHandler.getWebSocketConnection();
webSocketConnection.getDefaultWebSocketSession().setNegotiatedSubProtocol(selectedSubProtocol);
handshakeFuture.notifySuccess(frameHandler.getWebSocketConnection());
- });
- handshakeStarted = true;
- return handshakeFuture;
- } catch (Exception e) {
- /*
- Code 1002 : indicates that an endpoint is terminating the connection
- due to a protocol error.
- */
- handshaker.close(ctx.channel(),
- new CloseWebSocketFrame(1002,
- "Terminating the connection due to a protocol error."));
- handshakeFuture.notifyError(e);
- return handshakeFuture;
+ } else {
+ handshakeFuture.notifyError(future.cause());
+ }
+ });
+ handshakeStarted = true;
+ return handshakeFuture;
+ }
+
+ private void configureFrameHandlingPipeline(int idleTimeout, MessageQueueHandler messageQueueHandler,
+ WebSocketInboundFrameHandler frameHandler) {
+ ChannelPipeline pipeline = ctx.pipeline();
+ if (idleTimeout > 0) {
+ pipeline.replace(Constants.IDLE_STATE_HANDLER, Constants.IDLE_STATE_HANDLER,
+ new IdleStateHandler(idleTimeout, idleTimeout, idleTimeout,
+ TimeUnit.MILLISECONDS));
+ } else {
+ pipeline.remove(Constants.IDLE_STATE_HANDLER);
}
+ pipeline.addLast(Constants.MESSAGE_QUEUE_HANDLER, messageQueueHandler);
+ pipeline.addLast(Constants.WEBSOCKET_FRAME_HANDLER, frameHandler);
+ pipeline.remove(Constants.HTTP_SOURCE_HANDLER);
+ pipeline.fireChannelActive();
}
/* Get the URL of the given connection */
private String getWebSocketURL(HttpRequest req) {
- String protocol = Constants.WEBSOCKET_PROTOCOL;
+ String protocol = Constants.WS_SCHEME;
if (secureConnection) {
- protocol = Constants.WEBSOCKET_PROTOCOL_SECURED;
+ protocol = Constants.WSS_SCHEME;
}
return protocol + "://" + req.headers().get("Host") + req.uri();
}
diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/internal/websocket/WebSocketUtil.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/internal/websocket/WebSocketUtil.java
index b9e0c3b9b..123538a9f 100644
--- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/internal/websocket/WebSocketUtil.java
+++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/internal/websocket/WebSocketUtil.java
@@ -30,7 +30,7 @@
import org.wso2.transport.http.netty.contractimpl.websocket.message.DefaultWebSocketBinaryMessage;
import org.wso2.transport.http.netty.contractimpl.websocket.message.DefaultWebSocketControlMessage;
import org.wso2.transport.http.netty.contractimpl.websocket.message.DefaultWebSocketTextMessage;
-import org.wso2.transport.http.netty.listener.WebSocketFramesBlockingHandler;
+import org.wso2.transport.http.netty.listener.MessageQueueHandler;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
@@ -46,7 +46,7 @@ public static String getSessionID(ChannelHandlerContext ctx) {
public static DefaultWebSocketConnection getWebSocketConnection(ChannelHandlerContext ctx,
WebSocketInboundFrameHandler frameHandler,
- WebSocketFramesBlockingHandler blockingHandler,
+ MessageQueueHandler blockingHandler,
boolean isSecured,
String uri) throws URISyntaxException {
DefaultWebSocketSession session = new DefaultWebSocketSession(ctx, isSecured, uri, getSessionID(ctx));
diff --git a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/listener/WebSocketFramesBlockingHandler.java b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/listener/MessageQueueHandler.java
similarity index 81%
rename from components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/listener/WebSocketFramesBlockingHandler.java
rename to components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/listener/MessageQueueHandler.java
index 05ef3b2de..6eb549291 100644
--- a/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/listener/WebSocketFramesBlockingHandler.java
+++ b/components/org.wso2.transport.http.netty/src/main/java/org/wso2/transport/http/netty/listener/MessageQueueHandler.java
@@ -28,14 +28,14 @@
* This Handler is responsible for issuing frame by frame when the WebSocket connection is asked to read next frame
* when autoRead is set to false.
*/
-public class WebSocketFramesBlockingHandler extends ChannelInboundHandlerAdapter {
+public class MessageQueueHandler extends ChannelInboundHandlerAdapter {
- private final Queue