Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove streaming job operator service (part 2) #82

Merged
merged 3 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,9 @@ public class StreamingJobMaintenanceServiceConfiguration
/// Max buffer capacity for job events stream
/// </summary>
public int MaxBufferCapacity { get; init; }

/// <summary>
/// Namespace where the job will be created
/// </summary>
public string Namespace { get; set; }
}
17 changes: 0 additions & 17 deletions src/Configurations/StreamingJobOperatorServiceConfiguration.cs

This file was deleted.

1 change: 1 addition & 0 deletions src/Services/Base/IStreamClassRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Arcane.Operator.Models;
using Arcane.Operator.Models.StreamClass.Base;
using Arcane.Operator.Models.StreamStatuses.StreamStatus.V1Beta1;
using Arcane.Operator.Services.Base.Repositories.CustomResources;

namespace Arcane.Operator.Services.Base;

Expand Down
20 changes: 0 additions & 20 deletions src/Services/Base/IStreamingJobOperatorService.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
using k8s;
using k8s.Models;

namespace Arcane.Operator.Services.Base;
namespace Arcane.Operator.Services.Base.Repositories.CustomResources;

public interface IReactiveResourceCollection<TResourceType> where TResourceType : IKubernetesObject<V1ObjectMeta>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
using k8s;
using k8s.Models;

namespace Arcane.Operator.Services.Base;
namespace Arcane.Operator.Services.Base.Repositories.CustomResources;

public interface IResourceCollection<TResourceType> where TResourceType : IKubernetesObject<V1ObjectMeta>
{
Expand All @@ -13,7 +13,6 @@ public interface IResourceCollection<TResourceType> where TResourceType : IKuber
/// </summary>
/// <param name="name"></param>
/// <param name="request">An object that contains required information for a Kubernetes API call</param>
/// <param name="maxBufferCapacity">Maximum capacity of the buffer in the source</param>
/// <returns>An Akka source that emits a Kubernetes entity updates</returns>
/// <returns>A task containing optional Resource</returns>
Task<Option<TResourceType>> Get(string name, CustomResourceApiRequest request);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System.Threading.Tasks;
using Akka;
using Akka.Streams.Dsl;
using Akka.Util;
using Arcane.Operator.Services.Models;
using k8s.Models;

namespace Arcane.Operator.Services.Base.Repositories.StreamingJob;

public interface IStreamingJobCollection
{
/// <summary>
/// Subscribe to a stream class updates
/// </summary>
/// <param name="nameSpace">The namespace to watch for</param>
/// <param name="maxBufferCapacity">Maximum capacity of the buffer in the source</param>
/// <returns>An Akka source that emits a Kubernetes entity updates</returns>
Source<ResourceEvent<V1Job>, NotUsed> GetEvents(string nameSpace, int maxBufferCapacity);

/// <summary>
/// Subscribe to a stream class updates
/// </summary>
/// <param name="nameSpace">An object that contains required information for a Kubernetes API call</param>
/// <param name="name"></param>
/// <returns>An Akka source that emits a Kubernetes entity updates</returns>
Task<Option<V1Job>> Get(string nameSpace, string name);
}
3 changes: 0 additions & 3 deletions src/Services/CommandHandlers/StreamingJobCommandHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,17 @@ namespace Arcane.Operator.Services.CommandHandlers;
public class StreamingJobCommandHandler : IStreamingJobCommandHandler
{
private readonly IStreamClassRepository streamClassRepository;
private readonly IStreamingJobOperatorService streamingJobOperatorService;
private readonly IKubeCluster kubeCluster;
private readonly ILogger<StreamingJobCommandHandler> logger;
private readonly IStreamingJobTemplateRepository streamingJobTemplateRepository;

public StreamingJobCommandHandler(
IStreamClassRepository streamClassRepository,
IStreamingJobOperatorService streamingJobOperatorService,
IKubeCluster kubeCluster,
ILogger<StreamingJobCommandHandler> logger,
IStreamingJobTemplateRepository streamingJobTemplateRepository)
{
this.streamClassRepository = streamClassRepository;
this.streamingJobOperatorService = streamingJobOperatorService;
this.kubeCluster = kubeCluster;
this.logger = logger;
this.streamingJobTemplateRepository = streamingJobTemplateRepository;
Expand Down
21 changes: 9 additions & 12 deletions src/Services/Maintenance/StreamingJobMaintenanceService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@
using Arcane.Operator.Extensions;
using Arcane.Operator.Models.StreamDefinitions.Base;
using Arcane.Operator.Services.Base;
using Arcane.Operator.Services.CommandHandlers;
using Arcane.Operator.Services.Base.Repositories.CustomResources;
using Arcane.Operator.Services.Base.Repositories.StreamingJob;
using Arcane.Operator.Services.Commands;
using Arcane.Operator.Services.Models;
using k8s;
using k8s.Models;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Snd.Sdk.ActorProviders;
using Snd.Sdk.Kubernetes;
using Snd.Sdk.Kubernetes.Base;
using Snd.Sdk.Tasks;

namespace Arcane.Operator.Services.Maintenance;
Expand All @@ -26,51 +27,47 @@ public class StreamingJobMaintenanceService : IStreamingJobMaintenanceService
{
private const int parallelism = 1;
private readonly StreamingJobMaintenanceServiceConfiguration configuration;
private readonly IKubeCluster kubeCluster;
private readonly ILogger<StreamingJobMaintenanceService> logger;
private readonly IStreamingJobOperatorService operatorService;
private readonly IResourceCollection<IStreamDefinition> streamDefinitionCollection;
private readonly IMetricsReporter metricsReporter;
private readonly ICommandHandler<UpdateStatusCommand> updateStatusCommandHandler;
private readonly ICommandHandler<SetAnnotationCommand<IStreamDefinition>> setAnnotationCommandHandler;
private readonly IStreamingJobCommandHandler streamingJobCommandHandler;
private readonly IStreamingJobCollection streamingJobCollection;

public StreamingJobMaintenanceService(
ILogger<StreamingJobMaintenanceService> logger,
IOptions<StreamingJobMaintenanceServiceConfiguration> options,
IKubeCluster kubeCluster,
IMetricsReporter metricsReporter,
IResourceCollection<IStreamDefinition> streamDefinitionCollection,
ICommandHandler<UpdateStatusCommand> updateStatusCommandHandler,
ICommandHandler<SetAnnotationCommand<IStreamDefinition>> setAnnotationCommandHandler,
IStreamingJobCommandHandler streamingJobCommandHandler,
IStreamingJobOperatorService operatorService)
IStreamingJobCollection streamingJobCollection)
{
this.configuration = options.Value;
this.kubeCluster = kubeCluster;
this.streamDefinitionCollection = streamDefinitionCollection;
this.operatorService = operatorService;
this.logger = logger;
this.metricsReporter = metricsReporter;
this.updateStatusCommandHandler = updateStatusCommandHandler;
this.streamingJobCommandHandler = streamingJobCommandHandler;
this.setAnnotationCommandHandler = setAnnotationCommandHandler;
this.streamingJobCollection = streamingJobCollection;
}


public IRunnableGraph<Task> GetJobEventsGraph(CancellationToken cancellationToken)
{
return this.kubeCluster
.StreamJobEvents(this.operatorService.StreamJobNamespace, this.configuration.MaxBufferCapacity, OverflowStrategy.Fail)
.Via(cancellationToken.AsFlow<(WatchEventType, V1Job)>(true))
return this.streamingJobCollection.GetEvents(this.configuration.Namespace, this.configuration.MaxBufferCapacity)
.Via(cancellationToken.AsFlow<ResourceEvent<V1Job>>(true))
.Select(this.metricsReporter.ReportTrafficMetrics)
.SelectAsync(parallelism, this.OnJobEvent)
.SelectMany(e => e)
.CollectOption()
.ToMaterialized(Sink.ForEachAsync<KubernetesCommand>(parallelism, this.HandleCommand), Keep.Right);
}

private Task<List<Option<KubernetesCommand>>> OnJobEvent((WatchEventType, V1Job) valueTuple)
private Task<List<Option<KubernetesCommand>>> OnJobEvent(ResourceEvent<V1Job> valueTuple)
{
return valueTuple switch
{
Expand Down
19 changes: 12 additions & 7 deletions src/Services/Operator/StreamOperatorService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
using Arcane.Operator.Models.StreamClass.Base;
using Arcane.Operator.Models.StreamDefinitions.Base;
using Arcane.Operator.Services.Base;
using Arcane.Operator.Services.Base.Repositories.CustomResources;
using Arcane.Operator.Services.Base.Repositories.StreamingJob;
using Arcane.Operator.Services.Commands;
using Arcane.Operator.Services.Models;
using k8s;
Expand All @@ -27,7 +29,6 @@ public class StreamOperatorService : IStreamOperatorService, IDisposable
private const int bufferSize = 1000;

private readonly ILogger<StreamOperatorService> logger;
private readonly IStreamingJobOperatorService operatorService;
private readonly IMetricsReporter metricsReporter;
private readonly ICommandHandler<UpdateStatusCommand> updateStatusCommandHandler;
private readonly ICommandHandler<SetAnnotationCommand<V1Job>> setAnnotationCommandHandler;
Expand All @@ -36,20 +37,20 @@ public class StreamOperatorService : IStreamOperatorService, IDisposable
private readonly IMaterializer materializer;
private readonly CancellationTokenSource cancellationTokenSource;
private readonly IReactiveResourceCollection<IStreamDefinition> streamDefinitionSource;
private Dictionary<string, UniqueKillSwitch> killSwitches = new();
private readonly IStreamingJobCollection streamingJobCollection;
private readonly Dictionary<string, UniqueKillSwitch> killSwitches = new();

public StreamOperatorService(
IStreamingJobOperatorService operatorService,
IMetricsReporter metricsReporter,
ICommandHandler<UpdateStatusCommand> updateStatusCommandHandler,
ICommandHandler<SetAnnotationCommand<V1Job>> setAnnotationCommandHandler,
ICommandHandler<RemoveAnnotationCommand<IStreamDefinition>> removeAnnotationCommandHandler,
IStreamingJobCommandHandler streamingJobCommandHandler,
ILogger<StreamOperatorService> logger,
IMaterializer materializer,
IReactiveResourceCollection<IStreamDefinition> streamDefinitionSource)
IReactiveResourceCollection<IStreamDefinition> streamDefinitionSource,
IStreamingJobCollection streamingJobCollection)
{
this.operatorService = operatorService;
this.logger = logger;
this.metricsReporter = metricsReporter;
this.updateStatusCommandHandler = updateStatusCommandHandler;
Expand All @@ -59,6 +60,7 @@ public StreamOperatorService(
this.materializer = materializer;
this.cancellationTokenSource = new CancellationTokenSource();
this.streamDefinitionSource = streamDefinitionSource;
this.streamingJobCollection = streamingJobCollection;
}

public virtual void Dispose()
Expand All @@ -69,7 +71,7 @@ public virtual void Dispose()
public void Attach(IStreamClass streamClass)
{
var request = new CustomResourceApiRequest(
this.operatorService.StreamJobNamespace,
streamClass.Namespace(),
streamClass.ApiGroupRef,
streamClass.VersionRef,
streamClass.PluralNameRef
Expand Down Expand Up @@ -108,7 +110,10 @@ private IRunnableGraph<Sink<ResourceEvent<IStreamDefinition>, NotUsed>> BuildSin
return MergeHub.Source<ResourceEvent<IStreamDefinition>>(perProducerBufferSize: bufferSize)
.Via(cancellationToken.AsFlow<ResourceEvent<IStreamDefinition>>(true))
.Select(this.metricsReporter.ReportTrafficMetrics)
.SelectAsync(parallelism, ev => this.operatorService.GetStreamingJob(ev.kubernetesObject.StreamId).Map(job => (ev, job)))
.SelectAsync(parallelism,
ev => this.streamingJobCollection.Get(ev.kubernetesObject.Namespace(),
ev.kubernetesObject.StreamId)
.Map(job => (ev, job)))
.Select(this.OnEvent)
.SelectMany(e => e)
.To(Akka.Streams.Dsl.Sink.ForEachAsync<KubernetesCommand>(parallelism, this.HandleCommand))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Arcane.Operator.Models.StreamClass.Base;
using Arcane.Operator.Models.StreamStatuses.StreamStatus.V1Beta1;
using Arcane.Operator.Services.Base;
using Arcane.Operator.Services.Base.Repositories.CustomResources;
using Arcane.Operator.Services.Models;
using Microsoft.Extensions.Caching.Memory;
using Snd.Sdk.Kubernetes.Base;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Arcane.Operator.Models.StreamDefinitions;
using Arcane.Operator.Models.StreamDefinitions.Base;
using Arcane.Operator.Services.Base;
using Arcane.Operator.Services.Base.Repositories.CustomResources;
using Arcane.Operator.Services.Models;
using Snd.Sdk.Kubernetes.Base;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System.Threading.Tasks;
using Akka;
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.Util;
using Akka.Util.Extensions;
using Arcane.Operator.Services.Base.Repositories.StreamingJob;
using Arcane.Operator.Services.Models;
using k8s.Models;
using Microsoft.Extensions.Logging;
using Snd.Sdk.Kubernetes.Base;
using Snd.Sdk.Tasks;

namespace Arcane.Operator.Services.Repositories.StreamingJob;

public class StreamingJobRepository : IStreamingJobCollection
{
private readonly IKubeCluster kubeCluster;
private readonly ILogger<StreamingJobRepository> logger;

public StreamingJobRepository(IKubeCluster kubeCluster, ILogger<StreamingJobRepository> logger)
{
this.kubeCluster = kubeCluster;
this.logger = logger;
}

public Source<ResourceEvent<V1Job>, NotUsed> GetEvents(string nameSpace, int maxBufferCapacity) =>
this.kubeCluster
.StreamJobEvents(nameSpace, maxBufferCapacity, OverflowStrategy.Fail)
.Select(tuple => new ResourceEvent<V1Job>(tuple.Item1, tuple.Item2));

public Task<Option<V1Job>> Get(string nameSpace, string name) =>
this.kubeCluster.GetJob(name, nameSpace)
.TryMap(job => job.AsOption(), exception =>
{
this.logger.LogWarning(exception, "The job resource {jobName} not found", name);
return Option<V1Job>.None;
});
}
43 changes: 0 additions & 43 deletions src/Services/Streams/StreamingJobOperatorService.cs

This file was deleted.

Loading
Loading