Skip to content

Commit 7d9d003

Browse files
Merge pull request #269 from dvonthenen/fix-ws-self-hosted
Fix Using `ws://` With Self Hosted
2 parents 85b9256 + 268d1d2 commit 7d9d003

File tree

4 files changed

+87
-49
lines changed

4 files changed

+87
-49
lines changed

pkg/api/version/version.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func getAPIURL(ctx context.Context, apiType, host, version, path string, options
3737

3838
// check if the host has a protocol
3939
r := regexp.MustCompile(`^(https?)://(.+)$`)
40-
if apiType == APITypeLive {
40+
if apiType == APITypeLive || apiType == APITypeSpeakStream {
4141
r = regexp.MustCompile(`^(wss?)://(.+)$`)
4242
}
4343

@@ -98,6 +98,7 @@ func getAPIURL(ctx context.Context, apiType, host, version, path string, options
9898
// construct the full path and substitute the version and all query parameters
9999
fullpath := fmt.Sprintf("%%s/%s", path)
100100
completeFullpath := fmt.Sprintf(fullpath, append([]interface{}{version}, args...)...)
101+
klog.V(3).Infof("completeFullpath: %s\n", completeFullpath)
101102

102103
// construct the URL
103104
var u url.URL
@@ -106,6 +107,7 @@ func getAPIURL(ctx context.Context, apiType, host, version, path string, options
106107
} else {
107108
u = url.URL{Scheme: protocol, Host: host, Path: completeFullpath}
108109
}
110+
klog.V(3).Infof("URI final: %s\n", u.String())
109111

110112
return u.String(), nil
111113
}

pkg/client/common/v1/websocket.go

Lines changed: 82 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,9 @@ func (c *WSClient) internalConnect() *websocket.Conn {
8181
return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt), false)
8282
}
8383

84-
//nolint:funlen // this is a complex function. keep as is
84+
//nolint:funlen,gocyclo // this is a complex function. keep as is
8585
func (c *WSClient) internalConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int, lock bool) *websocket.Conn {
86-
klog.V(7).Infof("live.internalConnectWithCancel() ENTER\n")
86+
klog.V(7).Infof("common.internalConnectWithCancel() ENTER\n")
8787

8888
// set the context
8989
c.ctx = ctx
@@ -94,13 +94,16 @@ func (c *WSClient) internalConnectWithCancel(ctx context.Context, ctxCancel cont
9494
if lock {
9595
klog.V(3).Infof("Locking connection mutex\n")
9696
c.muConn.Lock()
97-
defer c.muConn.Unlock()
9897
}
9998

10099
// we explicitly stopped and should not attempt to reconnect
101100
if !c.retry {
102101
klog.V(7).Infof("This connection has been terminated. Please either call with AttemptReconnect or create a new Client object using NewWebSocketClient.")
103-
klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n")
102+
klog.V(7).Infof("common.internalConnectWithCancel() LEAVE\n")
103+
if lock {
104+
klog.V(3).Infof("Unlocking connection mutex\n")
105+
c.muConn.Unlock()
106+
}
104107
return nil
105108
}
106109

@@ -109,32 +112,36 @@ func (c *WSClient) internalConnectWithCancel(ctx context.Context, ctxCancel cont
109112
select {
110113
case <-c.ctx.Done():
111114
klog.V(1).Infof("Connection is not valid\n")
112-
klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n")
115+
klog.V(7).Infof("common.internalConnectWithCancel() LEAVE\n")
116+
if lock {
117+
klog.V(3).Infof("Unlocking connection mutex\n")
118+
c.muConn.Unlock()
119+
}
113120
return nil
114121
default:
115122
klog.V(7).Infof("Connection is good. Return object.")
116-
klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n")
123+
klog.V(7).Infof("common.internalConnectWithCancel() LEAVE\n")
124+
if lock {
125+
klog.V(3).Infof("Unlocking connection mutex\n")
126+
c.muConn.Unlock()
127+
}
117128
return c.wsconn
118129
}
119130
} else {
120131
select {
121132
case <-c.ctx.Done():
122133
klog.V(1).Infof("Context is not valid. Has been canceled.\n")
123-
klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n")
134+
klog.V(7).Infof("common.internalConnectWithCancel() LEAVE\n")
135+
if lock {
136+
klog.V(3).Infof("Unlocking connection mutex\n")
137+
c.muConn.Unlock()
138+
}
124139
return nil
125140
default:
126141
klog.V(3).Infof("Context is still valid. Retry...\n")
127142
}
128143
}
129144

130-
dialer := websocket.Dialer{
131-
HandshakeTimeout: 45 * time.Second,
132-
/* #nosec G402 */
133-
TLSClientConfig: &tls.Config{InsecureSkipVerify: c.cOptions.SkipServerAuth},
134-
RedirectService: c.cOptions.RedirectService,
135-
SkipServerAuth: c.cOptions.SkipServerAuth,
136-
}
137-
138145
// set websocket headers
139146
myHeader := http.Header{}
140147

@@ -175,10 +182,30 @@ func (c *WSClient) internalConnectWithCancel(ctx context.Context, ctxCancel cont
175182
if err != nil {
176183
klog.V(1).Infof("GetURL failed. Err: %v\n", err)
177184
klog.V(7).Infof("internalConnectWithCancel() LEAVE\n")
185+
if lock {
186+
klog.V(3).Infof("Unlocking connection mutex\n")
187+
c.muConn.Unlock()
188+
}
178189
return nil // no point in retrying because this is going to fail on every retry
179190
}
180191
klog.V(5).Infof("Connecting to %s\n", url)
181192

193+
// if host starts with "ws://", then disable TLS
194+
var dialer websocket.Dialer
195+
if url[:5] == "ws://" {
196+
dialer = websocket.Dialer{
197+
HandshakeTimeout: 15 * time.Second,
198+
RedirectService: c.cOptions.RedirectService,
199+
}
200+
} else {
201+
dialer = websocket.Dialer{
202+
HandshakeTimeout: 15 * time.Second,
203+
/* #nosec G402 */
204+
TLSClientConfig: &tls.Config{InsecureSkipVerify: c.cOptions.SkipServerAuth},
205+
RedirectService: c.cOptions.RedirectService,
206+
SkipServerAuth: c.cOptions.SkipServerAuth,
207+
}
208+
}
182209
// perform the websocket connection
183210
ws, res, err := dialer.DialContext(c.ctx, url, myHeader)
184211
if res != nil {
@@ -197,6 +224,10 @@ func (c *WSClient) internalConnectWithCancel(ctx context.Context, ctxCancel cont
197224

198225
// kick off threads to listen for messages and ping/keepalive
199226
go c.listen()
227+
if lock {
228+
klog.V(3).Infof("Unlocking connection mutex\n")
229+
c.muConn.Unlock()
230+
}
200231

201232
// start WS specific items
202233
(*c.processMessages).Start()
@@ -210,21 +241,26 @@ func (c *WSClient) internalConnectWithCancel(ctx context.Context, ctxCancel cont
210241
}
211242

212243
klog.V(3).Infof("WebSocket Connection Successful!")
213-
klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n")
244+
klog.V(7).Infof("common.internalConnectWithCancel() LEAVE\n")
214245

215246
return c.wsconn
216247
}
217248

218249
// if we get here, we failed to connect
219250
klog.V(1).Infof("Failed to connect to websocket: %s\n", c.cOptions.Host)
220-
klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n")
251+
klog.V(7).Infof("common.internalConnectWithCancel() LEAVE\n")
252+
253+
if lock {
254+
klog.V(3).Infof("Unlocking connection mutex\n")
255+
c.muConn.Unlock()
256+
}
221257

222258
return nil
223259
}
224260

225261
//nolint:funlen // this is a complex function. keep as is
226262
func (c *WSClient) listen() {
227-
klog.V(6).Infof("live.listen() ENTER\n")
263+
klog.V(6).Infof("common.listen() ENTER\n")
228264

229265
defer func() {
230266
if r := recover(); r != nil {
@@ -240,7 +276,7 @@ func (c *WSClient) listen() {
240276
// fatal close
241277
c.closeWs(true, false)
242278

243-
klog.V(6).Infof("live.listen() LEAVE\n")
279+
klog.V(6).Infof("common.listen() LEAVE\n")
244280
return
245281
}
246282
}()
@@ -256,7 +292,7 @@ func (c *WSClient) listen() {
256292
c.muConn.Unlock()
257293

258294
klog.V(3).Infof("listen: Connection is not valid\n")
259-
klog.V(6).Infof("live.listen() LEAVE\n")
295+
klog.V(6).Infof("common.listen() LEAVE\n")
260296
return
261297
}
262298

@@ -275,15 +311,15 @@ func (c *WSClient) listen() {
275311
// graceful close
276312
c.closeWs(false, false)
277313

278-
klog.V(6).Infof("live.listen() LEAVE\n")
314+
klog.V(6).Infof("common.listen() LEAVE\n")
279315
return
280316
case strings.Contains(errStr, UseOfClosedSocket):
281317
klog.V(3).Infof("Probable graceful websocket close: %v\n", err)
282318

283319
// fatal close
284320
c.closeWs(false, false)
285321

286-
klog.V(6).Infof("live.listen() LEAVE\n")
322+
klog.V(6).Infof("common.listen() LEAVE\n")
287323
return
288324
case strings.Contains(errStr, FatalReadSocketErr):
289325
klog.V(1).Infof("Fatal socket error: %v\n", err)
@@ -297,7 +333,7 @@ func (c *WSClient) listen() {
297333
// fatal close
298334
c.closeWs(true, false)
299335

300-
klog.V(6).Infof("live.listen() LEAVE\n")
336+
klog.V(6).Infof("common.listen() LEAVE\n")
301337
return
302338
case strings.Contains(errStr, "Deepgram"):
303339
klog.V(1).Infof("listen: Deepgram error. Err: %v\n", err)
@@ -311,7 +347,7 @@ func (c *WSClient) listen() {
311347
// close the connection
312348
c.closeWs(false, false)
313349

314-
klog.V(6).Infof("live.listen() LEAVE\n")
350+
klog.V(6).Infof("common.listen() LEAVE\n")
315351
return
316352
case (err == io.EOF || err == io.ErrUnexpectedEOF):
317353
klog.V(3).Infof("stream object EOF\n")
@@ -325,7 +361,7 @@ func (c *WSClient) listen() {
325361
// close the connection
326362
c.closeWs(true, false)
327363

328-
klog.V(6).Infof("live.listen() LEAVE\n")
364+
klog.V(6).Infof("common.listen() LEAVE\n")
329365
return
330366
default:
331367
klog.V(1).Infof("listen: Cannot read websocket message. Err: %v\n", err)
@@ -339,7 +375,7 @@ func (c *WSClient) listen() {
339375
// close the connection
340376
c.closeWs(true, false)
341377

342-
klog.V(6).Infof("live.listen() LEAVE\n")
378+
klog.V(6).Infof("common.listen() LEAVE\n")
343379
return
344380
}
345381
}
@@ -359,7 +395,7 @@ func (c *WSClient) listen() {
359395

360396
// WriteBinary writes binary data to the websocket server
361397
func (c *WSClient) WriteBinary(byData []byte) error {
362-
klog.V(7).Infof("live.WriteBinary() ENTER\n")
398+
klog.V(7).Infof("common.WriteBinary() ENTER\n")
363399

364400
// doing a write, need to lock
365401
c.muConn.Lock()
@@ -370,7 +406,7 @@ func (c *WSClient) WriteBinary(byData []byte) error {
370406
if ws == nil {
371407
err := ErrInvalidConnection
372408
klog.V(4).Infof("c.internalConnect() is nil. Err: %v\n", err)
373-
klog.V(7).Infof("live.WriteBinary() LEAVE\n")
409+
klog.V(7).Infof("common.WriteBinary() LEAVE\n")
374410

375411
return err
376412
}
@@ -380,28 +416,28 @@ func (c *WSClient) WriteBinary(byData []byte) error {
380416
byData,
381417
); err != nil {
382418
klog.V(1).Infof("WriteBinary WriteMessage failed. Err: %v\n", err)
383-
klog.V(7).Infof("live.WriteBinary() LEAVE\n")
419+
klog.V(7).Infof("common.WriteBinary() LEAVE\n")
384420
return err
385421
}
386422

387423
klog.V(7).Infof("WriteBinary Successful\n")
388424
klog.V(7).Infof("payload: %x\n", byData)
389-
klog.V(7).Infof("live.WriteBinary() LEAVE\n")
425+
klog.V(7).Infof("common.WriteBinary() LEAVE\n")
390426

391427
return nil
392428
}
393429

394430
/*
395431
WriteJSON writes a JSON control payload to the websocket server. These are control messages for
396-
managing the live transcription session on the Deepgram server.
432+
managing the websocket connection.
397433
*/
398434
func (c *WSClient) WriteJSON(payload interface{}) error {
399-
klog.V(6).Infof("live.WriteJSON() ENTER\n")
435+
klog.V(6).Infof("common.WriteJSON() ENTER\n")
400436

401437
byData, err := json.Marshal(payload)
402438
if err != nil {
403439
klog.V(1).Infof("WriteJSON: Error marshaling JSON. Data: %v, Err: %v\n", payload, err)
404-
klog.V(6).Infof("live.WriteJSON() LEAVE\n")
440+
klog.V(6).Infof("common.WriteJSON() LEAVE\n")
405441
return err
406442
}
407443

@@ -414,7 +450,7 @@ func (c *WSClient) WriteJSON(payload interface{}) error {
414450
if ws == nil {
415451
err := ErrInvalidConnection
416452
klog.V(4).Infof("c.internalConnect() is nil. Err: %v\n", err)
417-
klog.V(6).Infof("live.WriteJSON() LEAVE\n")
453+
klog.V(6).Infof("common.WriteJSON() LEAVE\n")
418454

419455
return err
420456
}
@@ -424,20 +460,20 @@ func (c *WSClient) WriteJSON(payload interface{}) error {
424460
byData,
425461
); err != nil {
426462
klog.V(1).Infof("WriteJSON WriteMessage failed. Err: %v\n", err)
427-
klog.V(6).Infof("live.WriteJSON() LEAVE\n")
463+
klog.V(6).Infof("common.WriteJSON() LEAVE\n")
428464
return err
429465
}
430466

431-
klog.V(4).Infof("live.WriteJSON() Succeeded\n")
467+
klog.V(4).Infof("common.WriteJSON() Succeeded\n")
432468
klog.V(6).Infof("payload: %s\n", string(byData))
433-
klog.V(6).Infof("live.WriteJSON() LEAVE\n")
469+
klog.V(6).Infof("common.WriteJSON() LEAVE\n")
434470

435471
return nil
436472
}
437473

438474
// closeStream sends an application level message to Deepgram
439475
func (c *WSClient) closeStream(lock bool) error {
440-
klog.V(7).Infof("live.closeStream() ENTER\n")
476+
klog.V(7).Infof("common.closeStream() ENTER\n")
441477

442478
// doing a write, need to lock
443479
if lock {
@@ -456,20 +492,20 @@ func (c *WSClient) closeStream(lock bool) error {
456492

457493
if err != nil {
458494
klog.V(1).Infof("WriteMessage failed. Err: %v\n", err)
459-
klog.V(7).Infof("live.closeStream() LEAVE\n")
495+
klog.V(7).Infof("common.closeStream() LEAVE\n")
460496

461497
return err
462498
}
463499

464500
klog.V(4).Infof("closeStream Succeeded\n")
465-
klog.V(7).Infof("live.closeStream() LEAVE\n")
501+
klog.V(7).Infof("common.closeStream() LEAVE\n")
466502

467503
return err
468504
}
469505

470506
// normalClosure sends a normal closure message to the server
471507
func (c *WSClient) normalClosure(lock bool) error {
472-
klog.V(7).Infof("live.normalClosure() ENTER\n")
508+
klog.V(7).Infof("common.normalClosure() ENTER\n")
473509

474510
// doing a write, need to lock
475511
if lock {
@@ -481,7 +517,7 @@ func (c *WSClient) normalClosure(lock bool) error {
481517
if ws == nil {
482518
err := ErrInvalidConnection
483519
klog.V(4).Infof("c.internalConnect() is nil. Err: %v\n", err)
484-
klog.V(7).Infof("live.normalClosure() LEAVE\n")
520+
klog.V(7).Infof("common.normalClosure() LEAVE\n")
485521

486522
return err
487523
}
@@ -496,7 +532,7 @@ func (c *WSClient) normalClosure(lock bool) error {
496532
klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", err)
497533
}
498534

499-
klog.V(7).Infof("live.normalClosure() LEAVE\n")
535+
klog.V(7).Infof("common.normalClosure() LEAVE\n")
500536

501537
return err
502538
}
@@ -514,7 +550,7 @@ func (c *WSClient) Stop() {
514550

515551
// closeWs closes the websocket connection
516552
func (c *WSClient) closeWs(fatal bool, perm bool) {
517-
klog.V(6).Infof("live.closeWs() closing channels...\n")
553+
klog.V(6).Infof("common.closeWs() closing channels...\n")
518554

519555
// doing a write, need to lock
520556
c.muConn.Lock()
@@ -555,8 +591,8 @@ func (c *WSClient) closeWs(fatal bool, perm bool) {
555591
c.wsconn = nil
556592
}
557593

558-
klog.V(4).Infof("live.closeWs() Succeeded\n")
559-
klog.V(6).Infof("live.closeWs() LEAVE\n")
594+
klog.V(4).Infof("common.closeWs() Succeeded\n")
595+
klog.V(6).Infof("common.closeWs() LEAVE\n")
560596
}
561597

562598
// sendError sends an error message to the callback handler

0 commit comments

Comments
 (0)