Skip to content
This repository has been archived by the owner on Sep 17, 2023. It is now read-only.

Catching exceptions in Changes.ObserveContinuous() #176

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions source/projects/MyCouch/Contexts/Changes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,13 @@ public virtual IObservable<string> ObserveContinuous(GetChangesRequest request,

Task.Factory.StartNew(async () =>
{
using (var httpResponse = await SendAsync(httpRequest, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ForAwait())
try
{
var response = await ContinuousChangesResponseFactory.CreateAsync(httpResponse).ForAwait();
if (response.IsSuccess)
using (var httpResponse = await SendAsync(httpRequest, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ForAwait())
{
var response = await ContinuousChangesResponseFactory.CreateAsync(httpResponse).ForAwait();
if (!response.IsSuccess)
throw new MyCouchResponseException(response);
using (var content = await httpResponse.Content.ReadAsStreamAsync().ForAwait())
{
using (var reader = new StreamReader(content, MyCouchRuntime.DefaultEncoding))
Expand All @@ -117,6 +119,10 @@ public virtual IObservable<string> ObserveContinuous(GetChangesRequest request,
}
}
}
catch (Exception e)
{
ob.Error(e);
}
}, cancellationToken).ForAwait();

return ob;
Expand Down
9 changes: 9 additions & 0 deletions source/projects/MyCouch/MyObservable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ public virtual void Complete()
observer.OnCompleted();
}

public virtual void Error(Exception error)
{
var obs = _observers.ToArray();
_observers.Clear();

foreach (var observer in obs)
observer.OnError(error);
}

private class Unsubscriber : IDisposable
{
private readonly IList<IObserver<T>> _observers;
Expand Down