Skip to content

Commit

Permalink
Merge pull request #340 from dvonthenen/exception-ws-failed-connect
Browse files Browse the repository at this point in the history
Throw Exception on Failed WS Connect
  • Loading branch information
davidvonthenen authored Oct 2, 2024
2 parents c934097 + 9102a8c commit 7aebe09
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 78 deletions.
16 changes: 8 additions & 8 deletions Deepgram/Clients/Listen/v1/REST/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class Client(string? apiKey = null, IDeepgramClientOptions? deepgramClien
public async Task<SyncResponse> TranscribeUrl(UrlSource source, PreRecordedSchema? prerecordedSchema,
CancellationTokenSource? cancellationToken = default, Dictionary<string, string>? addons = null, Dictionary<string, string>? headers = null)
{
Log.Verbose("PreRecordedClient.TranscribeUrl", "ENTER");
Log.Verbose("ListenRESTClient.TranscribeUrl", "ENTER");
Log.Information("TranscribeUrl", $"source: {source}");
Log.Information("TranscribeUrl", $"prerecordedSchema:\n{prerecordedSchema}");

Expand All @@ -38,7 +38,7 @@ public async Task<SyncResponse> TranscribeUrl(UrlSource source, PreRecordedSchem

Log.Information("TranscribeUrl", $"{uri} Succeeded");
Log.Debug("TranscribeUrl", $"result: {result}");
Log.Verbose("PreRecordedClient.TranscribeUrl", "LEAVE");
Log.Verbose("ListenRESTClient.TranscribeUrl", "LEAVE");

return result;
}
Expand All @@ -64,7 +64,7 @@ public async Task<SyncResponse> TranscribeFile(byte[] source, PreRecordedSchema?
/// <returns><see cref="SyncResponse"/></returns>
public async Task<SyncResponse> TranscribeFile(Stream source, PreRecordedSchema? prerecordedSchema, CancellationTokenSource? cancellationToken = default, Dictionary<string, string>? addons = null, Dictionary<string, string>? headers = null)
{
Log.Verbose("PreRecordedClient.TranscribeFile", "ENTER");
Log.Verbose("ListenRESTClient.TranscribeFile", "ENTER");
Log.Information("TranscribeFile", $"source: {source}");
Log.Information("TranscribeFile", $"prerecordedSchema:\n{prerecordedSchema}");

Expand All @@ -75,7 +75,7 @@ public async Task<SyncResponse> TranscribeFile(Stream source, PreRecordedSchema?

Log.Information("TranscribeFile", $"{uri} Succeeded");
Log.Debug("TranscribeFile", $"result: {result}");
Log.Verbose("PreRecordedClient.TranscribeFile", "LEAVE");
Log.Verbose("ListenRESTClient.TranscribeFile", "LEAVE");

return result;
}
Expand Down Expand Up @@ -105,7 +105,7 @@ public async Task<AsyncResponse> TranscribeFileCallBack(byte[] source, string? c
/// <returns><see cref="AsyncResponse"/></returns>
public async Task<AsyncResponse> TranscribeFileCallBack(Stream source, string? callBack, PreRecordedSchema? prerecordedSchema, CancellationTokenSource? cancellationToken = default, Dictionary<string, string>? addons = null, Dictionary<string, string>? headers = null)
{
Log.Verbose("PreRecordedClient.TranscribeFileCallBack", "ENTER");
Log.Verbose("ListenRESTClient.TranscribeFileCallBack", "ENTER");
Log.Information("TranscribeFileCallBack", $"source: {source}");
Log.Information("TranscribeFileCallBack", $"callBack: {callBack}");
Log.Information("TranscribeFileCallBack", $"prerecordedSchema:\n{prerecordedSchema}");
Expand All @@ -119,7 +119,7 @@ public async Task<AsyncResponse> TranscribeFileCallBack(Stream source, string? c

Log.Information("TranscribeFileCallBack", $"{uri} Succeeded");
Log.Debug("TranscribeFileCallBack", $"result: {result}");
Log.Verbose("PreRecordedClient.TranscribeFileCallBack", "LEAVE");
Log.Verbose("ListenRESTClient.TranscribeFileCallBack", "LEAVE");

return result;
}
Expand All @@ -133,7 +133,7 @@ public async Task<AsyncResponse> TranscribeFileCallBack(Stream source, string? c
/// <returns><see cref="AsyncResponse"/></returns>
public async Task<AsyncResponse> TranscribeUrlCallBack(UrlSource source, string? callBack, PreRecordedSchema? prerecordedSchema, CancellationTokenSource? cancellationToken = default, Dictionary<string, string>? addons = null, Dictionary<string, string>? headers = null)
{
Log.Verbose("PreRecordedClient.TranscribeUrlCallBack", "ENTER");
Log.Verbose("ListenRESTClient.TranscribeUrlCallBack", "ENTER");
Log.Information("TranscribeUrlCallBack", $"source: {source}");
Log.Information("TranscribeUrlCallBack", $"callBack: {callBack}");
Log.Information("TranscribeUrlCallBack", $"prerecordedSchema:\n{prerecordedSchema}");
Expand All @@ -148,7 +148,7 @@ public async Task<AsyncResponse> TranscribeUrlCallBack(UrlSource source, string?

Log.Information("TranscribeUrlCallBack", $"{uri} Succeeded");
Log.Debug("TranscribeUrlCallBack", $"result: {result}");
Log.Verbose("PreRecordedClient.TranscribeUrlCallBack", "LEAVE");
Log.Verbose("ListenRESTClient.TranscribeUrlCallBack", "LEAVE");

return result;
}
Expand Down
78 changes: 44 additions & 34 deletions Deepgram/Clients/Listen/v1/WebSocket/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@


using Deepgram.Models.Authenticate.v1;
using Deepgram.Models.Exceptions.v1;
using Deepgram.Models.Listen.v1.WebSocket;
using Deepgram.Clients.Interfaces.v1;

Expand Down Expand Up @@ -66,7 +67,7 @@ public Client(string? apiKey = null, IDeepgramClientOptions? options = null)
public async Task Connect(LiveSchema options, CancellationTokenSource? cancelToken = null, Dictionary<string, string>? addons = null,
Dictionary<string, string>? headers = null)
{
Log.Verbose("LiveClient.Connect", "ENTER");
Log.Verbose("ListenWSClient.Connect", "ENTER");
Log.Information("Connect", $"options:\n{JsonSerializer.Serialize(options, JsonSerializeOptions.DefaultOptions)}");
Log.Debug("Connect", $"addons: {addons}");

Expand All @@ -76,7 +77,7 @@ public async Task Connect(LiveSchema options, CancellationTokenSource? cancelTok
// client has already connected
var exStr = "Client has already been initialized";
Log.Error("Connect", exStr);
Log.Verbose("LiveClient.Connect", "LEAVE");
Log.Verbose("ListenWSClient.Connect", "LEAVE");
throw new InvalidOperationException(exStr);
}

Expand Down Expand Up @@ -126,6 +127,13 @@ public async Task Connect(LiveSchema options, CancellationTokenSource? cancelTok
Log.Debug("Connect", "Connecting to Deepgram API...");
await _clientWebSocket.ConnectAsync(_uri, cancelToken.Token).ConfigureAwait(false);

if (!IsConnected())
{
Log.Error("Connect", "Failed to connect to Deepgram API");
Log.Verbose("ListenWSClient.Connect", "LEAVE");
throw new DeepgramWebSocketException("Failed to connect to Deepgram API");
}

Log.Debug("Connect", "Starting Sender Thread...");
StartSenderBackgroundThread();

Expand Down Expand Up @@ -154,19 +162,19 @@ public async Task Connect(LiveSchema options, CancellationTokenSource? cancelTok
}

Log.Debug("Connect", "Connect Succeeded");
Log.Verbose("LiveClient.Connect", "LEAVE");
Log.Verbose("ListenWSClient.Connect", "LEAVE");
}
catch (TaskCanceledException ex)
{
Log.Debug("Connect", "Connect cancelled.");
Log.Verbose("Connect", $"Connect cancelled. Info: {ex}");
Log.Verbose("LiveClient.Connect", "LEAVE");
Log.Verbose("ListenWSClient.Connect", "LEAVE");
}
catch (Exception ex)
{
Log.Error("Connect", $"{ex.GetType()} thrown {ex.Message}");
Log.Verbose("Connect", $"Excepton: {ex}");
Log.Verbose("LiveClient.Connect", "LEAVE");
Log.Verbose("ListenWSClient.Connect", "LEAVE");
throw;
}

Expand Down Expand Up @@ -404,7 +412,7 @@ internal void EnqueueSendMessage(WebSocketMessage message)

internal async Task ProcessSendQueue()
{
Log.Verbose("LiveClient.ProcessSendQueue", "ENTER");
Log.Verbose("ListenWSClient.ProcessSendQueue", "ENTER");

if (_clientWebSocket == null)
{
Expand Down Expand Up @@ -439,25 +447,25 @@ internal async Task ProcessSendQueue()
}

Log.Verbose("ProcessSendQueue", "Exit");
Log.Verbose("LiveClient.ProcessSendQueue", "LEAVE");
Log.Verbose("ListenWSClient.ProcessSendQueue", "LEAVE");
}
catch (OperationCanceledException ex)
{
Log.Debug("ProcessSendQueue", "SendThread cancelled.");
Log.Verbose("ProcessSendQueue", $"SendThread cancelled. Info: {ex}");
Log.Verbose("LiveClient.ProcessSendQueue", "LEAVE");
Log.Verbose("ListenWSClient.ProcessSendQueue", "LEAVE");
}
catch (Exception ex)
{
Log.Error("ProcessSendQueue", $"{ex.GetType()} thrown {ex.Message}");
Log.Verbose("ProcessSendQueue", $"Excepton: {ex}");
Log.Verbose("LiveClient.ProcessSendQueue", "LEAVE");
Log.Verbose("ListenWSClient.ProcessSendQueue", "LEAVE");
}
}

internal async void ProcessKeepAlive()
{
Log.Verbose("LiveClient.ProcessKeepAlive", "ENTER");
Log.Verbose("ListenWSClient.ProcessKeepAlive", "ENTER");

try
{
Expand All @@ -476,26 +484,26 @@ internal async void ProcessKeepAlive()
}

Log.Verbose("ProcessKeepAlive", "Exit");
Log.Verbose("LiveClient.ProcessKeepAlive", "LEAVE");
Log.Verbose("ListenWSClient.ProcessKeepAlive", "LEAVE");
}
catch (TaskCanceledException ex)
{
Log.Debug("ProcessKeepAlive", "KeepAliveThread cancelled.");
Log.Verbose("ProcessKeepAlive", $"KeepAliveThread cancelled. Info: {ex}");
Log.Verbose("LiveClient.ProcessKeepAlive", "LEAVE");
Log.Verbose("ListenWSClient.ProcessKeepAlive", "LEAVE");
}
catch (Exception ex)
{
Log.Error("ProcessKeepAlive", $"{ex.GetType()} thrown {ex.Message}");
Log.Verbose("ProcessKeepAlive", $"Excepton: {ex}");
Log.Verbose("LiveClient.ProcessKeepAlive", "LEAVE");
Log.Verbose("ListenWSClient.ProcessKeepAlive", "LEAVE");
}
}


internal async void ProcessAutoFlush()
{
Log.Verbose("LiveClient.ProcessAutoFlush", "ENTER");
Log.Verbose("ListenWSClient.ProcessAutoFlush", "ENTER");

var diffTicks = TimeSpan.FromMilliseconds((double)_deepgramClientOptions.AutoFlushReplyDelta);

Expand Down Expand Up @@ -533,25 +541,25 @@ internal async void ProcessAutoFlush()
}

Log.Verbose("ProcessAutoFlush", "Exit");
Log.Verbose("LiveClient.ProcessAutoFlush", "LEAVE");
Log.Verbose("ListenWSClient.ProcessAutoFlush", "LEAVE");
}
catch (TaskCanceledException ex)
{
Log.Debug("ProcessAutoFlush", "KeepAliveThread cancelled.");
Log.Verbose("ProcessAutoFlush", $"KeepAliveThread cancelled. Info: {ex}");
Log.Verbose("LiveClient.ProcessAutoFlush", "LEAVE");
Log.Verbose("ListenWSClient.ProcessAutoFlush", "LEAVE");
}
catch (Exception ex)
{
Log.Error("ProcessAutoFlush", $"{ex.GetType()} thrown {ex.Message}");
Log.Verbose("ProcessAutoFlush", $"Excepton: {ex}");
Log.Verbose("LiveClient.ProcessAutoFlush", "LEAVE");
Log.Verbose("ListenWSClient.ProcessAutoFlush", "LEAVE");
}
}

internal async Task ProcessReceiveQueue()
{
Log.Verbose("LiveClient.ProcessReceiveQueue", "ENTER");
Log.Verbose("ListenWSClient.ProcessReceiveQueue", "ENTER");

while (_clientWebSocket?.State == WebSocketState.Open)
{
Expand Down Expand Up @@ -601,35 +609,35 @@ internal async Task ProcessReceiveQueue()
{
Log.Debug("ProcessReceiveQueue", "ReceiveThread cancelled.");
Log.Verbose("ProcessReceiveQueue", $"ReceiveThread cancelled. Info: {ex}");
Log.Verbose("LiveClient.ProcessReceiveQueue", "LEAVE");
Log.Verbose("ListenWSClient.ProcessReceiveQueue", "LEAVE");
}
catch (Exception ex)
{
Log.Error("ProcessReceiveQueue", $"{ex.GetType()} thrown {ex.Message}");
Log.Verbose("ProcessReceiveQueue", $"Excepton: {ex}");
Log.Verbose("LiveClient.ProcessReceiveQueue", "LEAVE");
Log.Verbose("ListenWSClient.ProcessReceiveQueue", "LEAVE");
}
}
}

internal void ProcessDataReceived(WebSocketReceiveResult result, MemoryStream ms)
{
Log.Verbose("LiveClient.ProcessDataReceived", "ENTER");
Log.Verbose("ListenWSClient.ProcessDataReceived", "ENTER");

ms.Seek(0, SeekOrigin.Begin);

if (result.MessageType != WebSocketMessageType.Text)
{
Log.Warning("ProcessDataReceived", "Received a text message. This is not supported.");
Log.Verbose("LiveClient.ProcessDataReceived", "LEAVE");
Log.Verbose("ListenWSClient.ProcessDataReceived", "LEAVE");
return;
}

var response = Encoding.UTF8.GetString(ms.ToArray());
if (response == null)
{
Log.Warning("ProcessDataReceived", "Response is null");
Log.Verbose("LiveClient.ProcessDataReceived", "LEAVE");
Log.Verbose("ListenWSClient.ProcessDataReceived", "LEAVE");
return;
}

Expand Down Expand Up @@ -794,19 +802,19 @@ internal void ProcessDataReceived(WebSocketReceiveResult result, MemoryStream ms
}

Log.Debug("ProcessDataReceived", "Succeeded");
Log.Verbose("LiveClient.ProcessDataReceived", "LEAVE");
Log.Verbose("ListenWSClient.ProcessDataReceived", "LEAVE");
}
catch (JsonException ex)
{
Log.Error("ProcessDataReceived", $"{ex.GetType()} thrown {ex.Message}");
Log.Verbose("ProcessDataReceived", $"Excepton: {ex}");
Log.Verbose("LiveClient.ProcessDataReceived", "LEAVE");
Log.Verbose("ListenWSClient.ProcessDataReceived", "LEAVE");
}
catch (Exception ex)
{
Log.Error("ProcessDataReceived", $"{ex.GetType()} thrown {ex.Message}");
Log.Verbose("ProcessDataReceived", $"Excepton: {ex}");
Log.Verbose("LiveClient.ProcessDataReceived", "LEAVE");
Log.Verbose("ListenWSClient.ProcessDataReceived", "LEAVE");
}
}

Expand All @@ -816,13 +824,13 @@ internal void ProcessDataReceived(WebSocketReceiveResult result, MemoryStream ms
/// <returns>The task object representing the asynchronous operation.</returns>
public async Task Stop(CancellationTokenSource? cancelToken = null)
{
Log.Verbose("LiveClient.Stop", "ENTER");
Log.Verbose("ListenWSClient.Stop", "ENTER");

// client is already disposed
if (_clientWebSocket == null)
{
Log.Information("Stop", "Client has already been disposed");
Log.Verbose("LiveClient.Stop", "LEAVE");
Log.Verbose("ListenWSClient.Stop", "LEAVE");
return;
}

Expand Down Expand Up @@ -883,19 +891,19 @@ await _clientWebSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, stri
_clientWebSocket = null;

Log.Debug("Stop", "Succeeded");
Log.Verbose("LiveClient.Stop", "LEAVE");
Log.Verbose("ListenWSClient.Stop", "LEAVE");
}
catch (TaskCanceledException ex)
{
Log.Debug("Stop", "Stop cancelled.");
Log.Verbose("Stop", $"Stop cancelled. Info: {ex}");
Log.Verbose("LiveClient.Stop", "LEAVE");
Log.Verbose("ListenWSClient.Stop", "LEAVE");
}
catch (Exception ex)
{
Log.Error("Stop", $"{ex.GetType()} thrown {ex.Message}");
Log.Verbose("Stop", $"Excepton: {ex}");
Log.Verbose("LiveClient.Stop", "LEAVE");
Log.Verbose("ListenWSClient.Stop", "LEAVE");
throw;
}
}
Expand All @@ -911,6 +919,7 @@ public WebSocketState State()
{
return WebSocketState.None;
}
Log.Debug("State", $"WebSocket State: {_clientWebSocket.State}");
return _clientWebSocket.State;
}

Expand All @@ -923,7 +932,8 @@ public bool IsConnected() {
{
return false;
}


Log.Debug("State", $"WebSocket State: {_clientWebSocket.State}");
return _clientWebSocket.State == WebSocketState.Open;
}

Expand Down Expand Up @@ -1023,13 +1033,13 @@ private void InspectMessage(object type, JsonDocument data)
{
Log.Error("InspectMessage", $"{ex.GetType()} thrown {ex.Message}");
Log.Verbose("InspectMessage", $"Excepton: {ex}");
Log.Verbose("LiveClient.InspectMessage", "LEAVE");
Log.Verbose("ListenWSClient.InspectMessage", "LEAVE");
}
catch (Exception ex)
{
Log.Error("InspectMessage", $"{ex.GetType()} thrown {ex.Message}");
Log.Verbose("InspectMessage", $"Excepton: {ex}");
Log.Verbose("LiveClient.InspectMessage", "LEAVE");
Log.Verbose("ListenWSClient.InspectMessage", "LEAVE");
}
}
#endregion
Expand Down
Loading

0 comments on commit 7aebe09

Please sign in to comment.