diff --git a/.gitignore b/.gitignore index 56688635b..b19855ab7 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,8 @@ target log.xml .externalToolBuilders *.pyc +##### +# Intellij +#### +.idea/ +*.iml diff --git a/kurento-jsonrpc/kurento-jsonrpc-client/src/main/java/org/kurento/jsonrpc/client/JsonRpcClientNettyWebSocket.java b/kurento-jsonrpc/kurento-jsonrpc-client/src/main/java/org/kurento/jsonrpc/client/JsonRpcClientNettyWebSocket.java index a7e280b85..bb93df053 100644 --- a/kurento-jsonrpc/kurento-jsonrpc-client/src/main/java/org/kurento/jsonrpc/client/JsonRpcClientNettyWebSocket.java +++ b/kurento-jsonrpc/kurento-jsonrpc-client/src/main/java/org/kurento/jsonrpc/client/JsonRpcClientNettyWebSocket.java @@ -16,14 +16,10 @@ package org.kurento.jsonrpc.client; +import javax.net.ssl.SSLException; import java.io.IOException; import java.util.concurrent.TimeoutException; -import javax.net.ssl.SSLException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -55,6 +51,8 @@ import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.CharsetUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class JsonRpcClientNettyWebSocket extends AbstractJsonRpcClientWebSocket { @@ -154,13 +152,30 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { private volatile Channel channel; private volatile EventLoopGroup group; private volatile JsonRpcWebSocketClientHandler handler; + private final boolean channelTimeout; + private String host = ""; + private int port; public JsonRpcClientNettyWebSocket(String url) { - this(url, null); + this(url, null, false); } public JsonRpcClientNettyWebSocket(String url, JsonRpcWSConnectionListener connectionListener) { + this(url, connectionListener, false); + log.debug("{} Creating JsonRPC NETTY Websocket client", label); + } + + /** + * Create json rpc client netty websocket connection + * + * @param url - url of websocket connection + * @param connectionListener - listener + * @param channelTimeout - should use future timeout for channel creation to prevent this thread from blocking forever. + */ + public JsonRpcClientNettyWebSocket(String url, JsonRpcWSConnectionListener connectionListener, + boolean channelTimeout) { super(url, connectionListener); + this.channelTimeout = channelTimeout; log.debug("{} Creating JsonRPC NETTY Websocket client", label); } @@ -185,20 +200,18 @@ protected boolean isNativeClientConnected() { @Override protected void connectNativeClient() throws TimeoutException, Exception { - if (channel == null || !channel.isActive() || group == null || group.isShuttingDown() - || group.isShutdown()) { - + if (channel == null || !channel.isActive() || group == null || group.isShuttingDown() || group.isShutdown()) { log.info("{} Connecting native client", label); final boolean ssl = "wss".equalsIgnoreCase(this.uri.getScheme()); final SslContext sslCtx; try { sslCtx = ssl ? SslContextBuilder.forClient() - .trustManager(InsecureTrustManagerFactory.INSTANCE).build() : null; + .trustManager(InsecureTrustManagerFactory.INSTANCE).build() : null; } catch (SSLException e) { log.error("{} Could not create SSL Context", label, e); throw new IllegalArgumentException( - "Could not create SSL context. See logs for more details", e); + "Could not create SSL context. See logs for more details", e); } final String scheme = uri.getScheme() == null ? "ws" : uri.getScheme(); @@ -215,7 +228,8 @@ protected void connectNativeClient() throws TimeoutException, Exception { } else { port = uri.getPort(); } - + this.port = port; + this.host = host; if (group == null || group.isShuttingDown() || group.isShutdown() || group.isTerminated()) { log.info("{} Creating new NioEventLoopGroup", label); group = new NioEventLoopGroup(); @@ -250,13 +264,26 @@ protected void initChannel(SocketChannel ch) { final int maxRetries = 5; while (channel == null || !channel.isOpen()) { try { - channel = b.connect(host, port).sync().channel(); - handler.handshakeFuture().sync(); + ChannelFuture future = b.connect(host, port); + if (channelTimeout) { + handleTimeout(future); + this.channel = future.channel(); + } else { + channel = future.sync().channel(); + } + future = handler.handshakeFuture(); + if (channelTimeout) { + handleTimeout(future); + } else { + future.sync(); + } } catch (InterruptedException e) { // This should never happen log.warn("{} ERROR connecting WS Netty client, opening channel", label, e); } catch (Exception e) { - if (e.getCause() instanceof WebSocketHandshakeException && numRetries < maxRetries) { + if (e.getCause() instanceof WebSocketHandshakeException + || e.getCause() instanceof TimeoutException + && numRetries < maxRetries) { log.warn( "{} Upgrade exception when trying to connect to {}. Try {} of {}. Retrying in 200ms ", label, uri, numRetries + 1, maxRetries); @@ -281,6 +308,23 @@ public void operationComplete(ChannelFuture future) throws Exception { } + private void handleTimeout(ChannelFuture future) throws Exception { + // increase timeout plus 500 milliseconds to add buffer between actual connection timeout + final int timeoutMillis = this.connectionTimeout + 500; + future.await(timeoutMillis); + if (!future.isDone()) { + future.cancel(true); + throw new TimeoutException("Connection to " + host + ":" + port + " with future timeout of " + timeoutMillis); + } + if (future.isCancelled()) { + throw new Exception("Connection to " + host + ":" + port + " cancelled by user!"); + } + if (!future.isSuccess()) { + final Throwable cause = future.cause(); + throw new Exception("Create connection to " + host + ":" + port + " error", cause); + } + } + @Override public void closeNativeClient() { closeChannel(); @@ -298,10 +342,14 @@ private void closeChannel() { if (channel != null) { log.debug("{} Closing client", label); try { - channel.close().sync(); + ChannelFuture channelFuture = channel.close(); + if (channelTimeout) { + handleTimeout(channelFuture); + } else { + channelFuture.sync(); + } } catch (Exception e) { - log.debug("{} Could not properly close websocket client. Reason: {}", label, e.getMessage(), - e); + log.debug("{} Could not properly close websocket client. Reason: {}", label, e.getMessage(), e); } channel = null; } else {