Skip to content

Commit

Permalink
feat: Add Shutdown extension method for ChannelBase
Browse files Browse the repository at this point in the history
(Happy to rename this to something else.)
  • Loading branch information
jskeet committed Apr 26, 2022
1 parent c987d86 commit 5adf741
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 0 deletions.
109 changes: 109 additions & 0 deletions Google.Api.Gax.Grpc.Tests/ChannelBaseExtensionsTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2022 Google Inc. All Rights Reserved.
* Use of this source code is governed by a BSD-style
* license that can be found in the LICENSE file or at
* https://developers.google.com/open-source/licenses/bsd
*/

using Grpc.Core;
using System;
using System.Threading;
using System.Threading.Tasks;
using Xunit;

namespace Google.Api.Gax.Grpc.Tests;

public class ChannelBaseExtensionsTest
{
[Fact]
public void Shutdown_NonDisposable()
{
var logger = new MemoryLogger("shutdown");
var channel = new CustomChannel(Task.CompletedTask);
channel.Shutdown(logger);
Assert.True(channel.ShutdownCalled);
Assert.Empty(logger.ListLogEntries());
}

[Fact]
public void Shutdown_Disposable()
{
var logger = new MemoryLogger("shutdown");
var channel = new CustomDisposableChannel(Task.CompletedTask);
channel.Shutdown(logger);
Assert.True(channel.ShutdownCalled);
Assert.True(channel.DisposeCalled);
Assert.Empty(logger.ListLogEntries());
}

[Fact]
public void Shutdown_FaultedTask()
{
var logger = new MemoryLogger("shutdown");
var exception = new Exception("Bang");
var channel = new CustomDisposableChannel(Task.FromException(exception));
channel.Shutdown(logger);
Assert.True(channel.ShutdownCalled);
Assert.True(channel.DisposeCalled);
var entry = Assert.Single(logger.ListLogEntries());
Assert.Contains("failed", entry.Message, StringComparison.OrdinalIgnoreCase);
var aggregate = Assert.IsType<AggregateException>(entry.Exception);
Assert.Same(exception, aggregate.InnerExceptions[0]);
}

[Fact]
public void Shutdown_FaultedTask_NoLogger()
{
var exception = new Exception("Bang");
var channel = new CustomDisposableChannel(Task.FromException(exception));
channel.Shutdown();
Assert.True(channel.ShutdownCalled);
Assert.True(channel.DisposeCalled);
}

[Fact]
public void Shutdown_CanceledTask()
{
var logger = new MemoryLogger("shutdown");
var source = new CancellationTokenSource();
source.Cancel();
var channel = new CustomDisposableChannel(Task.FromCanceled(source.Token));
channel.Shutdown(logger);
Assert.True(channel.ShutdownCalled);
Assert.True(channel.DisposeCalled);
var entry = Assert.Single(logger.ListLogEntries());
Assert.Contains("canceled", entry.Message, StringComparison.OrdinalIgnoreCase);
Assert.Null(entry.Exception);
}

private class CustomChannel : ChannelBase
{
internal bool ShutdownCalled { get; private set; }
internal Task ShutdownTask { get; }

internal CustomChannel(Task shutdownTask) : base("target")
{
ShutdownTask = shutdownTask;
}

public override CallInvoker CreateCallInvoker() =>
throw new NotImplementedException();

protected override Task ShutdownAsyncCore()
{
ShutdownCalled = true;
return ShutdownTask;
}
}

private class CustomDisposableChannel : CustomChannel, IDisposable
{
internal bool DisposeCalled { get; private set; }

internal CustomDisposableChannel(Task shutdownTask) : base(shutdownTask)
{
}

public void Dispose() => DisposeCalled = true;
}
}
46 changes: 46 additions & 0 deletions Google.Api.Gax.Grpc/ChannelBaseExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2022 Google Inc. All Rights Reserved.
* Use of this source code is governed by a BSD-style
* license that can be found in the LICENSE file or at
* https://developers.google.com/open-source/licenses/bsd
*/

using Grpc.Core;
using Grpc.Net.Client;
using Microsoft.Extensions.Logging;
using System;
using System.Threading.Tasks;

namespace Google.Api.Gax.Grpc;

/// <summary>
/// Extension methods for <see cref="ChannelBase"/>.
/// </summary>
public static class ChannelBaseExtensions
{
/// <summary>
/// Shuts down a channel semi-synchronously. This method initially calls <see cref="IDisposable.Dispose"/>
/// if the channel implements <see cref="IDisposable"/> (e.g. in the case of <see cref="GrpcChannel"/>)
/// and then calls <see cref="ChannelBase.ShutdownAsync"/>. This method does not wait for the task
/// to complete, but observes any exceptions (whether the task is faulted or canceled), optionally logging
/// them to <paramref name="logger"/>.
/// </summary>
/// <param name="channel">The channel to shut down.</param>
/// <param name="logger">An optional logger to record any errors during asynchronous shutdown.</param>
public static void Shutdown(this ChannelBase channel, ILogger logger = null)
{
GaxPreconditions.CheckNotNull(channel, nameof(channel));
if (channel is IDisposable disposable)
{
disposable.Dispose();
}
channel.ShutdownAsync().ContinueWith(
task =>
{
// Always observe the exception, whether we've got a logger or not.
var exception = task.Exception;
logger?.LogWarning(exception, task.IsCanceled ? "Channel shutdown canceled" : "Channel shutdown failed");
},
TaskContinuationOptions.NotOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously);
}
}

0 comments on commit 5adf741

Please sign in to comment.