Skip to content

Commit

Permalink
Add one more test case (#86)
Browse files Browse the repository at this point in the history
* Add one more test case

* Fix unit tests

* Remove command handler duplicate

* Remove JsonElementExtensions from code coverage and Fix formatting issues

* Fix

* Fix
  • Loading branch information
s-vitaliy authored May 23, 2024
1 parent 4127b59 commit 6197e46
Show file tree
Hide file tree
Showing 12 changed files with 147 additions and 53 deletions.
6 changes: 4 additions & 2 deletions src/Extensions/JsonElementExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
using System.Text.Json;
using System.Diagnostics.CodeAnalysis;
using System.Text.Json;
using Akka.Util;
using Akka.Util.Extensions;
using Arcane.Operator.Models.Resources.StreamClass.Base;
using Arcane.Operator.Models.Resources.StreamClass.V1Beta1;
using Arcane.Operator.Models.Resources.StreamDefinitions;
using Arcane.Operator.Models.StreamDefinitions;
using Arcane.Operator.Models.StreamDefinitions.Base;

namespace Arcane.Operator.Extensions;
Expand All @@ -19,6 +19,7 @@ public static class JsonElementExtensions
/// </summary>
/// <param name="jsonElement">Element to deserialize</param>
/// <returns></returns>
[ExcludeFromCodeCoverage(Justification = "Trivial")]
public static Option<IStreamDefinition> AsOptionalStreamDefinition(this JsonElement jsonElement) =>
jsonElement.Deserialize<StreamDefinition>().AsOption<IStreamDefinition>();

Expand All @@ -27,6 +28,7 @@ public static Option<IStreamDefinition> AsOptionalStreamDefinition(this JsonElem
/// </summary>
/// <param name="jsonElement">Element to deserialize</param>
/// <returns></returns>
[ExcludeFromCodeCoverage(Justification = "Trivial")]
public static Option<IStreamClass> AsOptionalStreamClass(this JsonElement jsonElement) =>
jsonElement.Deserialize<V1Beta1StreamClass>().AsOption<IStreamClass>();
}
13 changes: 0 additions & 13 deletions src/Services/Base/IStreamingJobCommandHandler.cs

This file was deleted.

9 changes: 8 additions & 1 deletion src/Services/CommandHandlers/AnnotationCommandHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,14 @@ public Task Handle(SetAnnotationCommand<IStreamDefinition> command)
public Task Handle(SetAnnotationCommand<V1Job> command)
{
var ((nameSpace, name), annotationKey, annotationValue) = command;
return this.kubeCluster.AnnotateJob(name, nameSpace, annotationKey, annotationValue);
return this.kubeCluster.AnnotateJob(name, nameSpace, annotationKey, annotationValue)
.TryMap(job => job.AsOption(),
exception =>
{
this.logger.LogError(exception, "Failed to annotate {streamId} with {annotationKey}:{annotationValue}",
command.affectedResource, command.annotationKey, command.annotationValue);
return Option<V1Job>.None;
});
}

public Task Handle(RemoveAnnotationCommand<IStreamDefinition> command)
Expand Down
15 changes: 1 addition & 14 deletions src/Services/CommandHandlers/StreamingJobCommandHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
namespace Arcane.Operator.Services.CommandHandlers;

/// <inheritdoc cref="ICommandHandler{T}" />
public class StreamingJobCommandHandler : IStreamingJobCommandHandler
public class StreamingJobCommandHandler : ICommandHandler<StreamingJobCommand>
{
private readonly IStreamClassRepository streamClassRepository;
private readonly IKubeCluster kubeCluster;
Expand Down Expand Up @@ -54,19 +54,6 @@ public StreamingJobCommandHandler(
_ => throw new ArgumentOutOfRangeException(nameof(command), command, null)
};

public Task Handle(SetAnnotationCommand<V1Job> command)
{
return this.kubeCluster.AnnotateJob(command.affectedResource.Name(), command.affectedResource.Namespace(),
command.annotationKey, command.annotationValue)
.TryMap(job => job.AsOption(),
exception =>
{
this.logger.LogError(exception, "Failed to annotate {streamId} with {annotationKey}:{annotationValue}",
command.affectedResource, command.annotationKey, command.annotationValue);
return Option<V1Job>.None;
});
}

private Task StartJob(IStreamDefinition streamDefinition, bool isBackfilling, IStreamClass streamClass)
{
var template = streamDefinition.GetJobTemplate(isBackfilling);
Expand Down
8 changes: 4 additions & 4 deletions src/Services/Operator/StreamOperatorService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class StreamOperatorService : IStreamOperatorService, IDisposable
private readonly ICommandHandler<UpdateStatusCommand> updateStatusCommandHandler;
private readonly ICommandHandler<SetAnnotationCommand<V1Job>> setAnnotationCommandHandler;
private readonly ICommandHandler<RemoveAnnotationCommand<IStreamDefinition>> removeAnnotationCommandHandler;
private readonly IStreamingJobCommandHandler streamingJobCommandHandler;
private readonly ICommandHandler<StreamingJobCommand> streamingJobCommandHandler;
private readonly IMaterializer materializer;
private readonly CancellationTokenSource cancellationTokenSource;
private readonly IReactiveResourceCollection<IStreamDefinition> streamDefinitionSource;
Expand All @@ -45,7 +45,7 @@ public StreamOperatorService(
ICommandHandler<UpdateStatusCommand> updateStatusCommandHandler,
ICommandHandler<SetAnnotationCommand<V1Job>> setAnnotationCommandHandler,
ICommandHandler<RemoveAnnotationCommand<IStreamDefinition>> removeAnnotationCommandHandler,
IStreamingJobCommandHandler streamingJobCommandHandler,
ICommandHandler<StreamingJobCommand> streamingJobCommandHandler,
ILogger<StreamOperatorService> logger,
IMaterializer materializer,
IReactiveResourceCollection<IStreamDefinition> streamDefinitionSource,
Expand Down Expand Up @@ -199,8 +199,8 @@ private List<KubernetesCommand> OnModified(IStreamDefinition streamDefinition, O
{
UpdateStatusCommand sdc => this.updateStatusCommandHandler.Handle(sdc),
StreamingJobCommand sjc => this.streamingJobCommandHandler.Handle(sjc),
RequestJobRestartCommand rrc => this.streamingJobCommandHandler.Handle(rrc),
RequestJobReloadCommand rrc => this.streamingJobCommandHandler.Handle(rrc),
RequestJobRestartCommand rrc => this.setAnnotationCommandHandler.Handle(rrc),
RequestJobReloadCommand rrc => this.setAnnotationCommandHandler.Handle(rrc),
SetAnnotationCommand<V1Job> sac => this.setAnnotationCommandHandler.Handle(sac),
RemoveAnnotationCommand<IStreamDefinition> rac => this.removeAnnotationCommandHandler.Handle(rac),
_ => throw new ArgumentOutOfRangeException(nameof(response), response, null)
Expand Down
4 changes: 2 additions & 2 deletions src/Services/Operator/StreamingJobOperatorService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class StreamingJobOperatorService : IStreamingJobOperatorService
private readonly IMetricsReporter metricsReporter;
private readonly ICommandHandler<UpdateStatusCommand> updateStatusCommandHandler;
private readonly ICommandHandler<SetAnnotationCommand<IStreamDefinition>> setAnnotationCommandHandler;
private readonly IStreamingJobCommandHandler streamingJobCommandHandler;
private readonly ICommandHandler<StreamingJobCommand> streamingJobCommandHandler;
private readonly IStreamingJobCollection streamingJobCollection;

public StreamingJobOperatorService(
Expand All @@ -42,7 +42,7 @@ public StreamingJobOperatorService(
IResourceCollection<IStreamDefinition> streamDefinitionCollection,
ICommandHandler<UpdateStatusCommand> updateStatusCommandHandler,
ICommandHandler<SetAnnotationCommand<IStreamDefinition>> setAnnotationCommandHandler,
IStreamingJobCommandHandler streamingJobCommandHandler,
ICommandHandler<StreamingJobCommand> streamingJobCommandHandler,
IStreamingJobCollection streamingJobCollection)
{
this.configuration = options.Value;
Expand Down
69 changes: 60 additions & 9 deletions test/Services/StreamClassOperatorServiceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
using Akka.Util.Extensions;
using Arcane.Operator.Configurations;
using Arcane.Operator.Configurations.Common;
using Arcane.Operator.Models;
using Arcane.Operator.Models.Api;
using Arcane.Operator.Models.Commands;
using Arcane.Operator.Models.Resources.JobTemplates.Base;
using Arcane.Operator.Models.Resources.StreamClass.Base;
using Arcane.Operator.Models.Resources.StreamClass.V1Beta1;
using Arcane.Operator.Models.StreamDefinitions.Base;
using Arcane.Operator.Services.Base;
Expand All @@ -26,6 +26,7 @@
using Arcane.Operator.Tests.Services.TestCases;
using k8s;
using k8s.Models;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -61,12 +62,12 @@ public StreamClassOperatorServiceTests(LoggerFixture loggerFixture)
this.streamingJobTemplateRepositoryMock
.Setup(s => s.GetStreamingJobTemplate(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>()))
.ReturnsAsync(StreamingJobTemplate.AsOption<IStreamingJobTemplate>());
this.cts.CancelAfter(TimeSpan.FromSeconds(15));
this.cts.CancelAfter(TimeSpan.FromSeconds(60));
this.cts.Token.Register(() => this.tcs.TrySetResult());
}

[Fact]
public async Task TestStreamAdded()
public async Task TestStreamClassAdded()
{
// Arrange
this.kubeClusterMock
Expand Down Expand Up @@ -111,12 +112,13 @@ await sp.GetRequiredService<IStreamClassOperatorService>()
await task;

// Assert
this.kubeClusterMock.Verify(
service => service.SendJob(It.IsAny<V1Job>(), It.IsAny<string>(), It.IsAny<CancellationToken>()));
this.kubeClusterMock.Verify(service => service.SendJob(It.IsAny<V1Job>(),
It.IsAny<string>(),
It.IsAny<CancellationToken>()));
}

[Fact]
public async Task TestStreamDeleted()
public async Task TestStreamClassDeleted()
{
// Arrange
this.kubeClusterMock
Expand Down Expand Up @@ -162,7 +164,54 @@ await sp.GetRequiredService<IStreamClassOperatorService>()
);
}

private ServiceProvider CreateServiceProvider()
[Fact]
public async Task TestFailedStreamClassAdded()
{
var streamClassMockEvents = new List<ResourceEvent<IStreamClass>>
{
new(WatchEventType.Added, FailedStreamClass(new Exception("Test exception"))),
new(WatchEventType.Added, StreamClass)
};

// Arrange
this.streamClassRepositoryMock.Setup(
s => s.GetEvents(It.IsAny<CustomResourceApiRequest>(), It.IsAny<int>()))
.Returns(Source.From(streamClassMockEvents));

this.streamClassRepositoryMock.Setup(s => s.Get(It.IsAny<string>(), It.IsAny<string>()))
.ReturnsAsync(StreamClass.AsOption());

this.kubeClusterMock.Setup(service => service.SendJob(
It.IsAny<V1Job>(),
It.IsAny<string>(),
It.IsAny<CancellationToken>()))
.Callback(() => this.tcs.TrySetResult());

this.streamDefinitionSourceMock
.Setup(m => m.GetEvents(It.IsAny<CustomResourceApiRequest>(), It.IsAny<int>()))
.Returns(Source.From(
new List<ResourceEvent<IStreamDefinition>>
{
new(WatchEventType.Added, StreamDefinitionTestCases.NamedStreamDefinition()),
new(WatchEventType.Added, StreamDefinitionTestCases.NamedStreamDefinition()),
new(WatchEventType.Added, StreamDefinitionTestCases.NamedStreamDefinition())
}));

var task = this.tcs.Task;

// Act
var sp = this.CreateServiceProvider(this.streamClassRepositoryMock.Object);
await sp.GetRequiredService<IStreamClassOperatorService>()
.GetStreamClassEventsGraph(CancellationToken.None)
.Run(this.materializer);
await task;

// Assert
this.kubeClusterMock.Verify(
service => service.SendJob(It.IsAny<V1Job>(), It.IsAny<string>(), It.IsAny<CancellationToken>()));
}

private ServiceProvider CreateServiceProvider(IStreamClassRepository streamClassRepository = null)
{
var optionsMock = new Mock<IOptionsSnapshot<CustomResourceConfiguration>>();
optionsMock
Expand All @@ -183,15 +232,17 @@ private ServiceProvider CreateServiceProvider()
.AddSingleton(this.streamingJobOperatorServiceMock.Object)
.AddSingleton(this.streamDefinitionSourceMock.Object)
.AddSingleton(this.streamingJobTemplateRepositoryMock.Object)
.AddSingleton<IStreamClassRepository, StreamClassRepository>()
.AddSingleton(sp => streamClassRepository ??
new StreamClassRepository(sp.GetRequiredService<IMemoryCache>(),
sp.GetRequiredService<IKubeCluster>()))
.AddMemoryCache()
.AddSingleton<IStreamOperatorService, StreamOperatorService>()
.AddSingleton<ICommandHandler<UpdateStatusCommand>, UpdateStatusCommandHandler>()
.AddSingleton<ICommandHandler<SetStreamClassStatusCommand>, UpdateStatusCommandHandler>()
.AddSingleton<ICommandHandler<SetAnnotationCommand<IStreamDefinition>>, AnnotationCommandHandler>()
.AddSingleton<ICommandHandler<RemoveAnnotationCommand<IStreamDefinition>>, AnnotationCommandHandler>()
.AddSingleton<ICommandHandler<SetAnnotationCommand<V1Job>>, AnnotationCommandHandler>()
.AddSingleton<IStreamingJobCommandHandler, StreamingJobCommandHandler>()
.AddSingleton<ICommandHandler<StreamingJobCommand>, StreamingJobCommandHandler>()
.AddSingleton<IMetricsReporter, MetricsReporter>()
.AddSingleton(Mock.Of<MetricsService>())
.AddSingleton(loggerFixture.Factory.CreateLogger<StreamOperatorService>())
Expand Down
2 changes: 1 addition & 1 deletion test/Services/StreamOperatorServiceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ private ServiceProvider CreateServiceProvider()
.AddSingleton<ICommandHandler<UpdateStatusCommand>, UpdateStatusCommandHandler>()
.AddSingleton<ICommandHandler<SetAnnotationCommand<V1Job>>, AnnotationCommandHandler>()
.AddSingleton<ICommandHandler<RemoveAnnotationCommand<IStreamDefinition>>, AnnotationCommandHandler>()
.AddSingleton<IStreamingJobCommandHandler, StreamingJobCommandHandler>()
.AddSingleton<ICommandHandler<StreamingJobCommand>, StreamingJobCommandHandler>()
.AddSingleton(this.streamClassRepositoryMock.Object)
.AddSingleton<IMetricsReporter, MetricsReporter>()
.AddSingleton(Mock.Of<MetricsService>())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@

namespace Arcane.Operator.Tests.Services;

public class StreamingJobMaintenanceServiceTests : IClassFixture<LoggerFixture>
public class StreamingJobOperatorServiceTests : IClassFixture<LoggerFixture>
{
// Akka service and test helpers
private readonly ActorSystem actorSystem = ActorSystem.Create(nameof(StreamingJobMaintenanceServiceTests));
private readonly ActorSystem actorSystem = ActorSystem.Create(nameof(StreamingJobOperatorServiceTests));
private readonly LoggerFixture loggerFixture;
private readonly ActorMaterializer materializer;

Expand All @@ -51,7 +51,7 @@ public class StreamingJobMaintenanceServiceTests : IClassFixture<LoggerFixture>
private readonly Mock<IStreamClassRepository> streamClassRepositoryMock = new();
private readonly Mock<IStreamingJobTemplateRepository> streamingJobTemplateRepositoryMock = new();

public StreamingJobMaintenanceServiceTests(LoggerFixture loggerFixture)
public StreamingJobOperatorServiceTests(LoggerFixture loggerFixture)
{
this.loggerFixture = loggerFixture;
this.materializer = this.actorSystem.Materializer();
Expand Down Expand Up @@ -211,7 +211,7 @@ private StreamingJobOperatorService CreateService()
.AddSingleton(this.streamingJobTemplateRepositoryMock.Object)
.AddSingleton<ICommandHandler<UpdateStatusCommand>, UpdateStatusCommandHandler>()
.AddSingleton<ICommandHandler<SetAnnotationCommand<IStreamDefinition>>, AnnotationCommandHandler>()
.AddSingleton<IStreamingJobCommandHandler, StreamingJobCommandHandler>()
.AddSingleton<ICommandHandler<StreamingJobCommand>, StreamingJobCommandHandler>()
.AddSingleton<IStreamingJobCollection, StreamingJobRepository>()
.AddSingleton(this.loggerFixture.Factory.CreateLogger<StreamingJobOperatorService>())
.AddSingleton(this.loggerFixture.Factory.CreateLogger<StreamingJobRepository>())
Expand Down
56 changes: 56 additions & 0 deletions test/Services/TestCases/FailedStreamClass.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using System;
using Arcane.Operator.Models.Resources.StreamClass.Base;
using k8s.Models;
using Snd.Sdk.Kubernetes;

namespace Arcane.Operator.Tests.Services.TestCases;

/// <summary>
/// A stream Class that throws an exception (for tests)
/// </summary>
public class FailedStreamClass : IStreamClass
{
private readonly Exception exception;

public FailedStreamClass(Exception exception)
{
this.exception = exception;
}

public string ApiVersion
{
get => throw this.exception;
set => throw this.exception;
}

public string Kind
{
get => throw this.exception;
set => throw this.exception;
}

public V1ObjectMeta Metadata
{
get => throw this.exception;
set => throw this.exception;
}
public string ToStreamClassId()
{
throw this.exception;
}

public string ApiGroupRef => throw this.exception;
public string VersionRef => throw this.exception;
public string PluralNameRef => throw this.exception;
public string KindRef => throw this.exception;
public int MaxBufferCapacity => throw this.exception;
public NamespacedCrd ToNamespacedCrd()
{
throw this.exception;
}

public bool IsSecretRef(string propertyName)
{
throw this.exception;
}
}
8 changes: 7 additions & 1 deletion test/Services/TestCases/StreamClassTestCases.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Arcane.Operator.Models.Resources.StreamClass.Base;
using System;
using Arcane.Operator.Models.Resources.StreamClass.Base;
using Arcane.Operator.Models.Resources.StreamClass.V1Beta1;
using k8s.Models;

Expand All @@ -22,4 +23,9 @@ public static class StreamClassTestCases
NamespaceProperty = "default"
}
};

public static IStreamClass FailedStreamClass(Exception exception)
{
return new FailedStreamClass(exception);
}
}
2 changes: 0 additions & 2 deletions test/Services/TestCases/StreamDefinitionTestCases.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@
using System.Collections.Generic;
using System.Text.Json;
using Arcane.Operator.Models.Resources.StreamDefinitions;
using Arcane.Operator.Models.StreamDefinitions;
using Arcane.Operator.Models.StreamDefinitions.Base;
using Arcane.Operator.StreamingJobLifecycle;
using Avro.IO.Parsing;
using k8s.Models;

namespace Arcane.Operator.Tests.Services.TestCases;
Expand Down

0 comments on commit 6197e46

Please sign in to comment.