1616
1717package org .kurento .jsonrpc .client ;
1818
19+ import javax .net .ssl .SSLException ;
1920import java .io .IOException ;
2021import java .util .concurrent .TimeoutException ;
2122
22- import javax .net .ssl .SSLException ;
23-
24- import org .slf4j .Logger ;
25- import org .slf4j .LoggerFactory ;
26-
2723import io .netty .bootstrap .Bootstrap ;
2824import io .netty .channel .Channel ;
2925import io .netty .channel .ChannelFuture ;
5551import io .netty .handler .timeout .IdleStateEvent ;
5652import io .netty .handler .timeout .IdleStateHandler ;
5753import io .netty .util .CharsetUtil ;
54+ import org .slf4j .Logger ;
55+ import org .slf4j .LoggerFactory ;
5856
5957public class JsonRpcClientNettyWebSocket extends AbstractJsonRpcClientWebSocket {
6058
@@ -154,13 +152,30 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
154152 private volatile Channel channel ;
155153 private volatile EventLoopGroup group ;
156154 private volatile JsonRpcWebSocketClientHandler handler ;
155+ private final boolean channelTimeout ;
156+ private String host = "" ;
157+ private int port ;
157158
158159 public JsonRpcClientNettyWebSocket (String url ) {
159- this (url , null );
160+ this (url , null , false );
160161 }
161162
162163 public JsonRpcClientNettyWebSocket (String url , JsonRpcWSConnectionListener connectionListener ) {
164+ this (url , connectionListener , false );
165+ log .debug ("{} Creating JsonRPC NETTY Websocket client" , label );
166+ }
167+
168+ /**
169+ * Create json rpc client netty websocket connection
170+ *
171+ * @param url - url of websocket connection
172+ * @param connectionListener - listener
173+ * @param channelTimeout - should use future timeout for channel creation to prevent this thread from blocking forever.
174+ */
175+ public JsonRpcClientNettyWebSocket (String url , JsonRpcWSConnectionListener connectionListener ,
176+ boolean channelTimeout ) {
163177 super (url , connectionListener );
178+ this .channelTimeout = channelTimeout ;
164179 log .debug ("{} Creating JsonRPC NETTY Websocket client" , label );
165180 }
166181
@@ -185,20 +200,18 @@ protected boolean isNativeClientConnected() {
185200 @ Override
186201 protected void connectNativeClient () throws TimeoutException , Exception {
187202
188- if (channel == null || !channel .isActive () || group == null || group .isShuttingDown ()
189- || group .isShutdown ()) {
190-
203+ if (channel == null || !channel .isActive () || group == null || group .isShuttingDown () || group .isShutdown ()) {
191204 log .info ("{} Connecting native client" , label );
192205
193206 final boolean ssl = "wss" .equalsIgnoreCase (this .uri .getScheme ());
194207 final SslContext sslCtx ;
195208 try {
196209 sslCtx = ssl ? SslContextBuilder .forClient ()
197- .trustManager (InsecureTrustManagerFactory .INSTANCE ).build () : null ;
210+ .trustManager (InsecureTrustManagerFactory .INSTANCE ).build () : null ;
198211 } catch (SSLException e ) {
199212 log .error ("{} Could not create SSL Context" , label , e );
200213 throw new IllegalArgumentException (
201- "Could not create SSL context. See logs for more details" , e );
214+ "Could not create SSL context. See logs for more details" , e );
202215 }
203216
204217 final String scheme = uri .getScheme () == null ? "ws" : uri .getScheme ();
@@ -215,7 +228,8 @@ protected void connectNativeClient() throws TimeoutException, Exception {
215228 } else {
216229 port = uri .getPort ();
217230 }
218-
231+ this .port = port ;
232+ this .host = host ;
219233 if (group == null || group .isShuttingDown () || group .isShutdown () || group .isTerminated ()) {
220234 log .info ("{} Creating new NioEventLoopGroup" , label );
221235 group = new NioEventLoopGroup ();
@@ -228,38 +242,52 @@ protected void connectNativeClient() throws TimeoutException, Exception {
228242
229243 Bootstrap b = new Bootstrap ();
230244 b .group (group ).channel (NioSocketChannel .class )
231- .handler (new ChannelInitializer <SocketChannel >() {
232- @ Override
233- protected void initChannel (SocketChannel ch ) {
234- log .info ("{} Initiating new Netty channel. Will create new handler too!" , label );
235- handler = new JsonRpcWebSocketClientHandler (
236- WebSocketClientHandshakerFactory .newHandshaker (uri , WebSocketVersion .V13 , null ,
237- true , new DefaultHttpHeaders (), maxPacketSize ));
238-
239- ChannelPipeline p = ch .pipeline ();
240- p .addLast ("idleStateHandler" , new IdleStateHandler (0 , 0 , idleTimeout / 1000 ));
241- if (sslCtx != null ) {
242- p .addLast (sslCtx .newHandler (ch .alloc (), host , port ));
243- }
244- p .addLast (new HttpClientCodec (), new HttpObjectAggregator (8192 ),
245- WebSocketClientCompressionHandler .INSTANCE , handler );
246- }
247- }).option (ChannelOption .CONNECT_TIMEOUT_MILLIS , this .connectionTimeout );
245+ .handler (new ChannelInitializer <SocketChannel >() {
246+ @ Override
247+ protected void initChannel (SocketChannel ch ) {
248+ log .info ("{} Initiating new Netty channel. Will create new handler too!" , label );
249+ handler = new JsonRpcWebSocketClientHandler (
250+ WebSocketClientHandshakerFactory .newHandshaker (uri , WebSocketVersion .V13 , null ,
251+ true , new DefaultHttpHeaders (), maxPacketSize ));
252+
253+ ChannelPipeline p = ch .pipeline ();
254+ p .addLast ("idleStateHandler" , new IdleStateHandler (0 , 0 , idleTimeout / 1000 ));
255+ if (sslCtx != null ) {
256+ p .addLast (sslCtx .newHandler (ch .alloc (), host , port ));
257+ }
258+ p .addLast (new HttpClientCodec (), new HttpObjectAggregator (8192 ),
259+ WebSocketClientCompressionHandler .INSTANCE , handler );
260+ }
261+ }).option (ChannelOption .CONNECT_TIMEOUT_MILLIS , this .connectionTimeout );
248262
249263 int numRetries = 0 ;
250264 final int maxRetries = 5 ;
251265 while (channel == null || !channel .isOpen ()) {
252266 try {
253- channel = b .connect (host , port ).sync ().channel ();
254- handler .handshakeFuture ().sync ();
267+ ChannelFuture future = b .connect (host , port );
268+ if (channelTimeout ) {
269+ handleTimeout (future );
270+ this .channel = future .channel ();
271+ } else {
272+ channel = future .sync ()
273+ .channel ();
274+ }
275+ future = handler .handshakeFuture ();
276+ if (channelTimeout ) {
277+ handleTimeout (future );
278+ } else {
279+ future .sync ();
280+ }
255281 } catch (InterruptedException e ) {
256282 // This should never happen
257283 log .warn ("{} ERROR connecting WS Netty client, opening channel" , label , e );
258284 } catch (Exception e ) {
259- if (e .getCause () instanceof WebSocketHandshakeException && numRetries < maxRetries ) {
285+ if (e .getCause () instanceof WebSocketHandshakeException
286+ || e .getCause () instanceof TimeoutException
287+ && numRetries < maxRetries ) {
260288 log .warn (
261- "{} Upgrade exception when trying to connect to {}. Try {} of {}. Retrying in 200ms " ,
262- label , uri , numRetries + 1 , maxRetries );
289+ "{} Upgrade exception when trying to connect to {}. Try {} of {}. Retrying in 200ms " ,
290+ label , uri , numRetries + 1 , maxRetries );
263291 Thread .sleep (200 );
264292 numRetries ++;
265293 } else {
@@ -281,6 +309,23 @@ public void operationComplete(ChannelFuture future) throws Exception {
281309
282310 }
283311
312+ private void handleTimeout (ChannelFuture future ) throws Exception {
313+ // increase timeout plus 500 milliseconds to add buffer between actual connection timeout
314+ final int timeoutMillis = this .connectionTimeout + 500 ;
315+ future .await (timeoutMillis );
316+ if (!future .isDone ()) {
317+ future .cancel (true );
318+ throw new TimeoutException ("Connection to " + host + ":" + port + " with future timeout of " + timeoutMillis );
319+ }
320+ if (future .isCancelled ()) {
321+ throw new Exception ("Connection to " + host + ":" + port + " cancelled by user!" );
322+ }
323+ if (!future .isSuccess ()) {
324+ final Throwable cause = future .cause ();
325+ throw new Exception ("Create connection to " + host + ":" + port + " error" , cause );
326+ }
327+ }
328+
284329 @ Override
285330 public void closeNativeClient () {
286331 closeChannel ();
@@ -298,10 +343,14 @@ private void closeChannel() {
298343 if (channel != null ) {
299344 log .debug ("{} Closing client" , label );
300345 try {
301- channel .close ().sync ();
346+ ChannelFuture channelFuture = channel .close ();
347+ if (channelTimeout ) {
348+ handleTimeout (channelFuture );
349+ } else {
350+ channelFuture .sync ();
351+ }
302352 } catch (Exception e ) {
303- log .debug ("{} Could not properly close websocket client. Reason: {}" , label , e .getMessage (),
304- e );
353+ log .debug ("{} Could not properly close websocket client. Reason: {}" , label , e .getMessage (), e );
305354 }
306355 channel = null ;
307356 } else {
0 commit comments