Skip to content

Commit

Permalink
Add a blob container client factory for customisation (#7849)
Browse files Browse the repository at this point in the history
* Add a blob container client factory for customisation

Allows blob containers to be customised based on grainType and grain id. This is wrapped in an interface to allow injection of dependencies and state to be maintained.

* Update src/Azure/Orleans.Persistence.AzureStorage/Providers/Storage/IBlobContainerFactory.cs

* Use GrainId since in v4 that's better

* Build fixes & minor feedback adjustments

* Remove redundant code

---------

Co-authored-by: Alex McAuliffe <[email protected]>
Co-authored-by: Reuben Bond <[email protected]>
Co-authored-by: ReubenBond <[email protected]>
  • Loading branch information
4 people authored Apr 11, 2023
1 parent 9e85ac7 commit b24bcc1
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
using Orleans.Configuration;
using Orleans.Providers.Azure;
using Orleans.Runtime;
using Orleans.Serialization;
using LogLevel = Microsoft.Extensions.Logging.LogLevel;

namespace Orleans.Storage
Expand All @@ -23,23 +22,24 @@ namespace Orleans.Storage
/// </summary>
public class AzureBlobGrainStorage : IGrainStorage, ILifecycleParticipant<ISiloLifecycle>
{
private BlobContainerClient container;
private ILogger logger;
private readonly string name;
private AzureBlobStorageOptions options;
private readonly IBlobContainerFactory blobContainerFactory;
private IGrainStorageSerializer grainStorageSerializer;
private readonly IServiceProvider services;

/// <summary> Default constructor </summary>
public AzureBlobGrainStorage(
string name,
AzureBlobStorageOptions options,
IGrainStorageSerializer grainStorageSerializer,
IBlobContainerFactory blobContainerFactory,
IServiceProvider services,
ILogger<AzureBlobGrainStorage> logger)
{
this.name = name;
this.options = options;
this.blobContainerFactory = blobContainerFactory;
this.grainStorageSerializer = options.GrainStorageSerializer;
this.services = services;
this.logger = logger;
Expand All @@ -50,6 +50,8 @@ public AzureBlobGrainStorage(
public async Task ReadStateAsync<T>(string grainType, GrainId grainId, IGrainState<T> grainState)
{
var blobName = GetBlobName(grainType, grainId);
var container = this.blobContainerFactory.GetBlobContainerClient(grainId);

if (this.logger.IsEnabled(LogLevel.Trace)) this.logger.LogTrace((int)AzureProviderErrorCode.AzureBlobProvider_Storage_Reading,
"Reading: GrainType={GrainType} Grainid={GrainId} ETag={ETag} from BlobName={BlobName} in Container={ContainerName}",
grainType,
Expand Down Expand Up @@ -142,6 +144,8 @@ public async Task ReadStateAsync<T>(string grainType, GrainId grainId, IGrainSta
public async Task WriteStateAsync<T>(string grainType, GrainId grainId, IGrainState<T> grainState)
{
var blobName = GetBlobName(grainType, grainId);
var container = this.blobContainerFactory.GetBlobContainerClient(grainId);

try
{
if (this.logger.IsEnabled(LogLevel.Trace)) this.logger.LogTrace((int)AzureProviderErrorCode.AzureBlobProvider_Storage_Writing,
Expand Down Expand Up @@ -186,6 +190,8 @@ public async Task WriteStateAsync<T>(string grainType, GrainId grainId, IGrainSt
public async Task ClearStateAsync<T>(string grainType, GrainId grainId, IGrainState<T> grainState)
{
var blobName = GetBlobName(grainType, grainId);
var container = this.blobContainerFactory.GetBlobContainerClient(grainId);

try
{
if (this.logger.IsEnabled(LogLevel.Trace)) this.logger.LogTrace((int)AzureProviderErrorCode.AzureBlobProvider_ClearingData,
Expand Down Expand Up @@ -237,6 +243,8 @@ await DoOptimisticUpdate(() => blob.DeleteIfExistsAsync(DeleteSnapshotsOption.No

private async Task WriteStateAndCreateContainerIfNotExists<T>(string grainType, GrainId grainId, IGrainState<T> grainState, BinaryData contents, string mimeType, BlobClient blob)
{
var container = this.blobContainerFactory.GetBlobContainerClient(grainId);

try
{
var conditions = string.IsNullOrEmpty(grainState.ETag)
Expand Down Expand Up @@ -306,8 +314,7 @@ private async Task Init(CancellationToken ct)
}

var client = await createClient();
container = client.GetBlobContainerClient(this.options.ContainerName);
await container.CreateIfNotExistsAsync().ConfigureAwait(false);
await this.blobContainerFactory.InitializeAsync(client);
stopWatch.Stop();
this.logger.LogInformation((int)AzureProviderErrorCode.AzureBlobProvider_InitProvider,
"Initializing provider {ProviderName} of type {ProviderType} in stage {Stage} took {ElapsedMilliseconds} Milliseconds.",
Expand Down Expand Up @@ -348,7 +355,11 @@ public static class AzureBlobGrainStorageFactory
public static IGrainStorage Create(IServiceProvider services, string name)
{
var optionsMonitor = services.GetRequiredService<IOptionsMonitor<AzureBlobStorageOptions>>();
return ActivatorUtilities.CreateInstance<AzureBlobGrainStorage>(services, name, optionsMonitor.Get(name));
var options = optionsMonitor.Get(name);

var containerFactory = options.BuildContainerFactory(services, options);

return ActivatorUtilities.CreateInstance<AzureBlobGrainStorage>(services, name, containerFactory);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
using Azure.Core;
using Azure.Storage;
using Azure.Storage.Blobs;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Orleans.Persistence.AzureStorage;
using Orleans.Runtime;
Expand Down Expand Up @@ -36,7 +38,13 @@ public class AzureBlobStorageOptions : IStorageProviderSerializerOptions
public const int DEFAULT_INIT_STAGE = ServiceLifecycleStage.ApplicationServices;

/// <inheritdoc/>
public IGrainStorageSerializer GrainStorageSerializer { get; set;}
public IGrainStorageSerializer GrainStorageSerializer { get; set; }

/// <summary>
/// A function for building container factory instances
/// </summary>
public Func<IServiceProvider, AzureBlobStorageOptions, IBlobContainerFactory> BuildContainerFactory { get; set; }
= static (provider, options) => ActivatorUtilities.CreateInstance<DefaultBlobContainerFactory>(provider, options);

/// <summary>
/// Configures the <see cref="BlobServiceClient"/> using a connection string.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Orleans.Configuration;
using Orleans.Runtime;

namespace Orleans.Storage;

/// <summary>
/// A factory for building container clients for blob storage using grainType and grainId
/// </summary>
public interface IBlobContainerFactory
{
/// <summary>
/// Gets the container which should be used for the specified grain.
/// </summary>
/// <param name="grainId">The grain id</param>
/// <returns>A configured blob client</returns>
public BlobContainerClient GetBlobContainerClient(GrainId grainId);

/// <summary>
/// Initialize any required dependencies using the provided client and options.
/// </summary>
/// <param name="client">The connected blob client</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
public Task InitializeAsync(BlobServiceClient client);
}

/// <summary>
/// A default blob container factory that uses the default container name.
/// </summary>
internal class DefaultBlobContainerFactory : IBlobContainerFactory
{
private readonly AzureBlobStorageOptions _options;
private BlobContainerClient _defaultContainer = null!;

/// <summary>
/// Initializes a new instance of the <see cref="DefaultBlobContainerFactory"/> class.
/// </summary>
/// <param name="options">The blob storage options</param>
public DefaultBlobContainerFactory(AzureBlobStorageOptions options)
{
_options = options;
}

/// <inheritdoc/>
public BlobContainerClient GetBlobContainerClient(GrainId grainId)
=> _defaultContainer;

/// <inheritdoc/>
public async Task InitializeAsync(BlobServiceClient client)
{
_defaultContainer = client.GetBlobContainerClient(_options.ContainerName);
await _defaultContainer.CreateIfNotExistsAsync();
}
}

0 comments on commit b24bcc1

Please sign in to comment.