Skip to content

Commit

Permalink
Not fail operator if an unknown job found in cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
s-vitaliy committed Oct 9, 2024
1 parent 7b55d37 commit d2f6805
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 3 deletions.
17 changes: 17 additions & 0 deletions src/Exceptions/JobListenerException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System;

namespace Arcane.Operator.Exceptions;

/// <summary>
/// Thrown when an error occurs in the job listener
/// </summary>
public class JobListenerException: Exception
{
/// <summary>
/// Thrown when an error occurs in the job listener
/// </summary>
/// <param name="message">Error message</param>
public JobListenerException(string message) : base(message)
{
}
}
7 changes: 4 additions & 3 deletions src/Extensions/V1JobExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using Arcane.Operator.Contracts;
using Arcane.Operator.Exceptions;
using Arcane.Operator.Models.Api;
using Arcane.Operator.Models.Resources.StreamClass.Base;
using Arcane.Operator.Models.StreamDefinitions.Base;
Expand Down Expand Up @@ -128,7 +129,7 @@ private static string GetApiGroup(this V1Job job)
return value;
}

throw new InvalidOperationException("Api group not found in job annotations.");
throw new JobListenerException("Api group not found in job annotations.");
}

private static string GetApiVersion(this V1Job job)
Expand All @@ -138,7 +139,7 @@ private static string GetApiVersion(this V1Job job)
return value;
}

throw new InvalidOperationException("Api version not found in job annotations.");
throw new JobListenerException("Api version not found in job annotations.");
}

private static string GetPluralName(this V1Job job)
Expand All @@ -148,6 +149,6 @@ private static string GetPluralName(this V1Job job)
return value;
}

throw new InvalidOperationException("Api plural name version not found in job annotations.");
throw new JobListenerException("Api plural name version not found in job annotations.");
}
}
26 changes: 26 additions & 0 deletions src/Services/Operators/StreamingJobOperatorService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Akka.Streams.Dsl;
using Akka.Util;
using Arcane.Operator.Configurations;
using Arcane.Operator.Exceptions;
using Arcane.Operator.Extensions;
using Arcane.Operator.Models.Api;
using Arcane.Operator.Models.Base;
Expand All @@ -23,6 +24,8 @@
using Snd.Sdk.ActorProviders;
using Snd.Sdk.Kubernetes;
using Snd.Sdk.Tasks;
using Akka.Streams.Supervision;


namespace Arcane.Operator.Services.Operators;

Expand Down Expand Up @@ -70,6 +73,7 @@ public IRunnableGraph<Task> GetJobEventsGraph(CancellationToken cancellationToke
.SelectAsync(parallelism, this.OnJobEvent)
.SelectMany(e => e)
.CollectOption()
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(this.HandleError))
.ToMaterialized(Sink.ForEachAsync<KubernetesCommand>(parallelism, this.HandleCommand), Keep.Right);
}

Expand Down Expand Up @@ -159,4 +163,26 @@ private Task<List<Option<KubernetesCommand>>> OnJobDelete(V1Job job)
RemoveAnnotationCommand<IStreamDefinition> command => this.removeAnnotationHandler.Handle(command),
_ => throw new ArgumentOutOfRangeException(nameof(response), response, null)
};

private Directive HandleError(Exception exception)
{
this.logger.LogError(exception, "Failed to handle stream definition event");
return exception switch
{
JobListenerException ex => this.LogAndContinue(ex),
_ => this.LogAndStop(exception),
};
}

private Directive LogAndContinue(Exception exception)
{
this.logger.LogWarning(exception, "Failed to handle stream definition event");
return Directive.Resume;
}

private Directive LogAndStop(Exception exception)
{
this.logger.LogError(exception, "Failed to handle stream definition event");
return Directive.Resume;
}
}

0 comments on commit d2f6805

Please sign in to comment.