Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove streaming job operator service (part 1) #81

Merged
merged 6 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/Services/Base/IStreamingJobCommandHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using Arcane.Operator.Services.Commands;
using k8s.Models;

namespace Arcane.Operator.Services.Base;

/// <summary>
/// Command handler for streaming job commands
/// </summary>
public interface IStreamingJobCommandHandler : ICommandHandler<StreamingJobCommand>,
ICommandHandler<SetAnnotationCommand<V1Job>>
{

}
35 changes: 0 additions & 35 deletions src/Services/Base/IStreamingJobOperatorService.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
using System.Threading.Tasks;
using Akka.Util;
using Arcane.Operator.Models;
using Arcane.Operator.Models.StreamClass.Base;
using Arcane.Operator.Models.StreamDefinitions.Base;
using k8s.Models;

namespace Arcane.Operator.Services.Base;
Expand All @@ -14,42 +11,10 @@ public interface IStreamingJobOperatorService
/// </summary>
public string StreamJobNamespace { get; }

/// <summary>
/// Starts a new stream using an existing stream definition in Kubernetes database.
/// </summary>
/// <param name="streamDefinition">Stream definition</param>
/// <param name="isBackfilling">Whether to perform a full reload for this stream.</param>
/// <param name="streamClass"></param>
/// <returns>StreamInfo if stream was created or None if an error occured</returns>
Task<Option<StreamOperatorResponse>> StartRegisteredStream(IStreamDefinition streamDefinition, bool isBackfilling,
IStreamClass streamClass);

/// <summary>
/// Retrieves a streaming job with name equal to streamId from the cluster. If not found, returns None.
/// </summary>
/// <param name="streamId">Stream identifier that should be started.</param>
/// <returns></returns>
Task<Option<V1Job>> GetStreamingJob(string streamId);

/// <summary>
/// Marks streaming job for restart
/// </summary>
/// <param name="streamId">Stream identifier that should be terminated.</param>
/// <returns></returns>
Task<Option<StreamOperatorResponse>> RequestStreamingJobRestart(string streamId);

/// <summary>
/// Marks streaming job for stop
/// </summary>
/// <param name="streamId">Stream identifier that should be terminated.</param>
/// <returns></returns>
Task<Option<StreamOperatorResponse>> RequestStreamingJobReload(string streamId);

/// <summary>
/// Delete the streaming job
/// </summary>
/// <param name="kind">Stream definition kind</param>
/// <param name="streamId">Stream identifier that should be terminated.</param>
/// <returns></returns>
Task<Option<StreamOperatorResponse>> DeleteJob(string kind, string streamId);
}
14 changes: 0 additions & 14 deletions src/Services/CommandHandlers/IStreamingJobCommandHandler.cs

This file was deleted.

82 changes: 75 additions & 7 deletions src/Services/CommandHandlers/StreamingJobCommandHandler.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Akka.Util;
using Akka.Util.Extensions;
using Arcane.Operator.Extensions;
using Arcane.Operator.Models;
using Arcane.Operator.Models.StreamClass.Base;
using Arcane.Operator.Models.StreamDefinitions.Base;
using Arcane.Operator.Services.Base;
using Arcane.Operator.Services.Commands;
using k8s.Models;
using Microsoft.Extensions.Logging;
using Snd.Sdk.Kubernetes;
using Snd.Sdk.Kubernetes.Base;
using Snd.Sdk.Tasks;

namespace Arcane.Operator.Services.CommandHandlers;
Expand All @@ -13,13 +22,22 @@ public class StreamingJobCommandHandler : IStreamingJobCommandHandler
{
private readonly IStreamClassRepository streamClassRepository;
private readonly IStreamingJobOperatorService streamingJobOperatorService;
private readonly IKubeCluster kubeCluster;
private readonly ILogger<StreamingJobCommandHandler> logger;
private readonly IStreamingJobTemplateRepository streamingJobTemplateRepository;

public StreamingJobCommandHandler(
IStreamClassRepository streamClassRepository,
IStreamingJobOperatorService streamingJobOperatorService)
IStreamingJobOperatorService streamingJobOperatorService,
IKubeCluster kubeCluster,
ILogger<StreamingJobCommandHandler> logger,
IStreamingJobTemplateRepository streamingJobTemplateRepository)
{
this.streamClassRepository = streamClassRepository;
this.streamingJobOperatorService = streamingJobOperatorService;
this.kubeCluster = kubeCluster;
this.logger = logger;
this.streamingJobTemplateRepository = streamingJobTemplateRepository;
}

/// <inheritdoc cref="ICommandHandler{T}.Handle" />
Expand All @@ -29,20 +47,70 @@ public StreamingJobCommandHandler(
.Get(startJob.streamDefinition.Namespace(), startJob.streamDefinition.Kind)
.Map(maybeSc => maybeSc switch
{
{ HasValue: true, Value: var sc } => this.streamingJobOperatorService.StartRegisteredStream(startJob.streamDefinition, startJob.IsBackfilling, sc),
{ HasValue: true, Value: var sc } => this.StartJob(startJob.streamDefinition, startJob.IsBackfilling, sc),
{ HasValue: false } => throw new InvalidOperationException($"Stream class not found for {startJob.streamDefinition.Kind}"),
}),
StopJob stopJob => this.streamingJobOperatorService.DeleteJob(stopJob.streamKind, stopJob.streamId),
StopJob stopJob => this.kubeCluster.DeleteJob(stopJob.name, stopJob.nameSpace),
_ => throw new ArgumentOutOfRangeException(nameof(command), command, null)
};

public Task Handle(RequestJobRestartCommand command)
public Task Handle(SetAnnotationCommand<V1Job> command)
{
return this.streamingJobOperatorService.RequestStreamingJobRestart(command.affectedResource.GetStreamId());
return this.kubeCluster.AnnotateJob(command.affectedResource.Name(), command.affectedResource.Namespace(),
command.annotationKey, command.annotationValue)
.TryMap(job => job.AsOption(),
exception =>
{
this.logger.LogError(exception, "Failed to annotate {streamId} with {annotationKey}:{annotationValue}",
command.affectedResource, command.annotationKey, command.annotationValue);
return Option<V1Job>.None;
});
}

public Task Handle(RequestJobReloadCommand command)
private Task StartJob(IStreamDefinition streamDefinition, bool isBackfilling, IStreamClass streamClass)
{
return this.streamingJobOperatorService.RequestStreamingJobReload(command.affectedResource.GetStreamId());
var template = streamDefinition.GetJobTemplate(isBackfilling);
return this.streamingJobTemplateRepository
.GetStreamingJobTemplate(template.Kind, streamDefinition.Namespace(), template.Name)
.Map(jobTemplate =>
{
if (!jobTemplate.HasValue)
{
return Task.FromResult(StreamOperatorResponse.OperationFailed(streamDefinition.Metadata.Namespace(),
streamDefinition.Kind,
streamDefinition.StreamId,
$"Failed to find job template with kind {template.Kind} and name {template.Name}")
.AsOption());
}

var job = jobTemplate
.Value
.GetJob()
.WithStreamingJobLabels(streamDefinition.StreamId, isBackfilling, streamDefinition.Kind)
.WithStreamingJobAnnotations(streamDefinition.GetConfigurationChecksum())
.WithMetadataAnnotations(streamClass)
.WithCustomEnvironment(streamDefinition.ToV1EnvFromSources(streamClass))
.WithCustomEnvironment(streamDefinition.ToEnvironment(isBackfilling, streamClass))
.WithOwnerReference(streamDefinition)
.WithName(streamDefinition.StreamId);
this.logger.LogInformation("Starting a new stream job with an id {streamId}",
streamDefinition.StreamId);
return this.kubeCluster
.SendJob(job, streamDefinition.Metadata.Namespace(), CancellationToken.None)
.TryMap(
_ => isBackfilling
? StreamOperatorResponse.Reloading(streamDefinition.Metadata.Namespace(),
streamDefinition.Kind,
streamDefinition.StreamId)
: StreamOperatorResponse.Running(streamDefinition.Metadata.Namespace(),
streamDefinition.Kind,
streamDefinition.StreamId),
exception =>
{
this.logger.LogError(exception, "Failed to send job");
return Option<StreamOperatorResponse>.None;
});
})
.Flatten();
}
}
6 changes: 3 additions & 3 deletions src/Services/Commands/StreamingJobCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ public record StartJob(IStreamDefinition streamDefinition, bool IsBackfilling) :
/// <summary>
/// Stop a streaming job
/// </summary>
/// <param name="streamKind">Stream kind</param>
/// <param name="streamId">Id of the stream</param>
public record StopJob(string streamKind, string streamId) : StreamingJobCommand;
/// <param name="name">Job name to stop</param>
/// <param name="nameSpace">Job namespace to stop</param>
public record StopJob(string name, string nameSpace) : StreamingJobCommand;

/// <summary>
/// Request a streaming job to restart
Expand Down
12 changes: 5 additions & 7 deletions src/Services/Operator/StreamOperatorService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
using Arcane.Operator.Models.StreamClass.Base;
using Arcane.Operator.Models.StreamDefinitions.Base;
using Arcane.Operator.Services.Base;
using Arcane.Operator.Services.CommandHandlers;
using Arcane.Operator.Services.Commands;
using Arcane.Operator.Services.Models;
using k8s;
Expand Down Expand Up @@ -139,8 +138,7 @@ private KubernetesCommand OnAdded(IStreamDefinition streamDefinition, Option<V1J
{
{ HasValue: true, Value: var job } when job.IsReloading() => new Reloading(streamDefinition),
{ HasValue: true, Value: var job } when !job.IsReloading() => new Running(streamDefinition),
{ HasValue: true, Value: var job } when streamDefinition.Suspended => new StopJob(job.GetStreamId(),
job.GetStreamKind()),
{ HasValue: true, Value: var job } when streamDefinition.Suspended => new StopJob(job.Name(), job.Namespace()),
{ HasValue: false } when streamDefinition.Suspended => new Suspended(streamDefinition),
{ HasValue: false } when !streamDefinition.Suspended => new StartJob(streamDefinition, true),
_ => throw new ArgumentOutOfRangeException(nameof(maybeJob), maybeJob, null)
Expand All @@ -163,10 +161,10 @@ private List<KubernetesCommand> OnModified(IStreamDefinition streamDefinition, O
},
{ HasValue: false } => new StartJob(streamDefinition, false).AsList(),

{ HasValue: true, Value: var job } when streamDefinition.CrashLoopDetected => new StopJob(job.GetStreamId(),
job.GetStreamKind()).AsList(),
{ HasValue: true, Value: var job } when streamDefinition.Suspended => new StopJob(job.GetStreamId(),
job.GetStreamKind()).AsList(),
{ HasValue: true, Value: var job } when streamDefinition.CrashLoopDetected => new StopJob(job.Name(),
job.Namespace()).AsList(),
{ HasValue: true, Value: var job } when streamDefinition.Suspended => new StopJob(job.Name(),
job.Namespace()).AsList(),
{ HasValue: true, Value: var job } when !job.ConfigurationMatches(streamDefinition) => new
List<KubernetesCommand>
{
Expand Down
93 changes: 1 addition & 92 deletions src/Services/Streams/StreamingJobOperatorService.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
using System.Threading;
using System.Threading.Tasks;
using Akka.Util;
using Akka.Util.Extensions;
using Arcane.Operator.Configurations;
using Arcane.Operator.Extensions;
using Arcane.Operator.Models;
using Arcane.Operator.Models.StreamClass.Base;
using Arcane.Operator.Models.StreamDefinitions.Base;
using Arcane.Operator.Services.Base;
using Arcane.Operator.StreamingJobLifecycle;
using k8s.Models;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Snd.Sdk.Kubernetes;
using Snd.Sdk.Kubernetes.Base;
using Snd.Sdk.Tasks;

Expand All @@ -23,18 +16,15 @@ public class StreamingJobOperatorService : IStreamingJobOperatorService
private readonly StreamingJobOperatorServiceConfiguration configuration;
private readonly IKubeCluster kubernetesService;
private readonly ILogger<StreamingJobOperatorService> logger;
private readonly IStreamingJobTemplateRepository streamingJobTemplateRepository;

public StreamingJobOperatorService(
ILogger<StreamingJobOperatorService> logger,
IOptions<StreamingJobOperatorServiceConfiguration> configuration,
IKubeCluster kubernetesService,
IStreamingJobTemplateRepository streamingJobTemplateRepository)
IKubeCluster kubernetesService)
{
this.logger = logger;
this.configuration = configuration.Value;
this.kubernetesService = kubernetesService;
this.streamingJobTemplateRepository = streamingJobTemplateRepository;
}


Expand All @@ -50,85 +40,4 @@ public Task<Option<V1Job>> GetStreamingJob(string streamId)
});
}

public Task<Option<StreamOperatorResponse>> StartRegisteredStream(IStreamDefinition streamDefinition, bool isBackfilling,
IStreamClass streamClass)
{
var template = streamDefinition.GetJobTemplate(isBackfilling);
return this.streamingJobTemplateRepository
.GetStreamingJobTemplate(template.Kind, streamDefinition.Namespace(), template.Name)
.Map(jobTemplate =>
{
if (!jobTemplate.HasValue)
{
return Task.FromResult(StreamOperatorResponse.OperationFailed(streamDefinition.Metadata.Namespace(),
streamDefinition.Kind,
streamDefinition.StreamId,
$"Failed to find job template with kind {template.Kind} and name {template.Name}")
.AsOption());
}

var job = jobTemplate
.Value
.GetJob()
.WithStreamingJobLabels(streamDefinition.StreamId, isBackfilling, streamDefinition.Kind)
.WithStreamingJobAnnotations(streamDefinition.GetConfigurationChecksum())
.WithMetadataAnnotations(streamClass)
.WithCustomEnvironment(streamDefinition.ToV1EnvFromSources(streamClass))
.WithCustomEnvironment(streamDefinition.ToEnvironment(isBackfilling, streamClass))
.WithOwnerReference(streamDefinition)
.WithName(streamDefinition.StreamId);
this.logger.LogInformation("Starting a new stream job with an id {streamId}",
streamDefinition.StreamId);
return this.kubernetesService
.SendJob(job, streamDefinition.Metadata.Namespace(), CancellationToken.None)
.TryMap(
_ => isBackfilling
? StreamOperatorResponse.Reloading(streamDefinition.Metadata.Namespace(),
streamDefinition.Kind,
streamDefinition.StreamId)
: StreamOperatorResponse.Running(streamDefinition.Metadata.Namespace(),
streamDefinition.Kind,
streamDefinition.StreamId),
exception =>
{
this.logger.LogError(exception, "Failed to send job");
return Option<StreamOperatorResponse>.None;
});
})
.Flatten();
}

public Task<Option<StreamOperatorResponse>> RequestStreamingJobRestart(string streamId)
{
return this.SetStreamingJobAnnotation(streamId, Annotations.RESTARTING_STATE_ANNOTATION_VALUE)
.Map(maybeSi
=> maybeSi.Select(job
=> StreamOperatorResponse.Restarting(this.StreamJobNamespace, job.GetStreamKind(), streamId)));
}

public Task<Option<StreamOperatorResponse>> RequestStreamingJobReload(string streamId)
{
return this.SetStreamingJobAnnotation(streamId, Annotations.RELOADING_STATE_ANNOTATION_VALUE)
.Map(maybeSi
=> maybeSi.Select(job
=> StreamOperatorResponse.Terminating(this.StreamJobNamespace, job.GetStreamKind(), streamId)));
}

public Task<Option<StreamOperatorResponse>> DeleteJob(string kind, string streamId)
{
return this.kubernetesService.DeleteJob(streamId, this.StreamJobNamespace)
.Map(_ => StreamOperatorResponse.Suspended(this.StreamJobNamespace, kind, streamId).AsOption());
}

private Task<Option<V1Job>> SetStreamingJobAnnotation(string streamId, string annotationValue)
{
return this.kubernetesService.AnnotateJob(streamId, this.configuration.Namespace,
Annotations.STATE_ANNOTATION_KEY, annotationValue)
.TryMap(job => job.AsOption(),
exception =>
{
this.logger.LogError(exception, "Failed request {streamId} termination", streamId);
return Option<V1Job>.None;
});
}
}
10 changes: 10 additions & 0 deletions test/Extensions/V1JobExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System.Linq;
using k8s.Models;

namespace Arcane.Operator.Tests.Extensions;

public static class V1JobExtensions
{
public static bool IsBackfilling(this V1Job job) =>
job.Spec.Template.Spec.Containers[0].Env.Any(i => i.Name == "STREAMCONTEXT__BACKFILL" && i.Value == "true");
}
Loading
Loading