Skip to content

Commit

Permalink
Beast Client (#36)
Browse files Browse the repository at this point in the history
  • Loading branch information
jrbentzon authored Apr 10, 2024
1 parent bd2455c commit 84671d9
Show file tree
Hide file tree
Showing 21 changed files with 513 additions and 50 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ jobs:
if: ${{ github.ref != 'refs/heads/main' }}

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Setup .NET
uses: actions/setup-dotnet@v3.0.3
uses: actions/setup-dotnet@v4.0.0
with:
dotnet-version: 6.0.x
- name: Restore dependencies
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/publish-public.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ jobs:
id-token: write

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Setup .NET
uses: actions/setup-dotnet@v3.0.3
uses: actions/setup-dotnet@v4.0.0
with:
dotnet-version: 6.0.x
- name: Set Package Version
Expand All @@ -28,7 +28,7 @@ jobs:
sed -i "s/<Version>0.0.0/<Version>$version/" src/SnD.ApiClient/SnD.ApiClient.csproj
sed -i "s/<Version>0.0.0/<Version>$version/" src/SnD.ApiClient.Azure/SnD.ApiClient.Azure.csproj
- name: Import Secrets
uses: hashicorp/vault-action@v2.4.1
uses: hashicorp/vault-action@v3.0.0
with:
url: https://hashicorp-vault.production.sneaksanddata.com/
role: github
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ jobs:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Setup .NET
uses: actions/setup-dotnet@v3.0.3
uses: actions/setup-dotnet@v4.0.0
with:
dotnet-version: 6.0.x
- name: Set Package Version
Expand Down
7 changes: 5 additions & 2 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@ name: Release a new version

on: workflow_dispatch

permissions:
contents: write

jobs:
create_release:
name: Create Release
runs-on: ubuntu-latest
if: ${{ github.ref == 'refs/heads/main' }}

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Create Release
uses: SneaksAndData/github-actions/semver_release@v0.0.17
uses: SneaksAndData/github-actions/semver_release@v0.1.6
with:
major_v: 0
minor_v: 1
4 changes: 2 additions & 2 deletions src/SnD.ApiClient.Azure/SnD.ApiClient.Azure.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.Identity" Version="1.8.0"/>
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="7.0.0"/>
<PackageReference Include="Azure.Identity" Version="1.10.4" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.1" />
</ItemGroup>

<ItemGroup>
Expand Down
16 changes: 16 additions & 0 deletions src/SnD.ApiClient/Base/Models/ConcurrencyStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
namespace SnD.ApiClient.Base.Models;

/// <summary>
/// The strategy for handling concurrency based on client tag.
/// 1. IGNORE - Ignore the job if a similar job is already running.
/// 2. SKIP - Skip the new job if a similar job is already running.
/// 3. AWAIT - Wait for the similar job to complete before starting the new job.
/// 4. REPLACE - Cancel any similar job and start the new job.
/// </summary>
public enum ConcurrencyStrategy
{
IGNORE,
SKIP,
AWAIT,
REPLACE
}
38 changes: 38 additions & 0 deletions src/SnD.ApiClient/Beast/Base/IBeastClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using SnD.ApiClient.Base.Models;
using SnD.ApiClient.Beast.Models;
using SnD.ApiClient.Exceptions;

namespace SnD.ApiClient.Beast.Base;

public interface IBeastClient
{
/// <summary>
/// Submit a job to the Beast instance
/// </summary>
/// <param name="jobParams"></param>
/// <param name="submissionConfigurationName"></param>
/// <param name="cancellationToken"></param>
/// <param name="concurrencyStrategy">defaults to IGNORE</param>
/// <returns></returns>
/// <exception cref="ConcurrencyError">If there is already a run with the same tag and ConcurrencyStrategy is set to <see cref="ConcurrencyStrategy.SKIP"/></exception>
public Task<RequestState> SubmitJobAsync(JobRequest jobParams, string submissionConfigurationName,
CancellationToken cancellationToken, ConcurrencyStrategy? concurrencyStrategy);


/// <summary>
/// Awaits a run until it completes with any result or runs out of time set via cancellationToken.
/// </summary>
/// <param name="requestId"></param>
/// <param name="pollInterval"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public Task<RequestState> AwaitRunAsync(string requestId, TimeSpan pollInterval, CancellationToken cancellationToken);

/// <summary>
/// Get the state of a Beast job
/// </summary>
/// <param name="requestId"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public Task<RequestState> GetJobStateAsync(string requestId, CancellationToken cancellationToken);
}
145 changes: 145 additions & 0 deletions src/SnD.ApiClient/Beast/BeastClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
using System.Text;
using System.Text.Json;
using System.Text.RegularExpressions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using SnD.ApiClient.Base;
using SnD.ApiClient.Base.Models;
using SnD.ApiClient.Beast.Base;
using SnD.ApiClient.Beast.Models;
using SnD.ApiClient.Boxer.Base;
using SnD.ApiClient.Config;
using SnD.ApiClient.Exceptions;

namespace SnD.ApiClient.Beast;

public class BeastClient : SndApiClient, IBeastClient
{
private readonly Uri baseUri;

private readonly HashSet<BeastRequestLifeCycleStage> completedStages = new()
{
BeastRequestLifeCycleStage.COMPLETED,
BeastRequestLifeCycleStage.FAILED,
BeastRequestLifeCycleStage.STALE,
BeastRequestLifeCycleStage.SCHEDULING_FAILED,
BeastRequestLifeCycleStage.SUBMISSION_FAILED
};

public BeastClient(IOptions<BeastClientOptions> beastClientOptions, HttpClient httpClient,
IJwtTokenExchangeProvider boxerConnector, ILogger<BeastClient> logger) : base(httpClient, boxerConnector,
logger)
{
baseUri = new Uri(beastClientOptions.Value.BaseUri
?? throw new ArgumentNullException(nameof(BeastClientOptions.BaseUri)));
}

/// <inheritdoc />
public async Task<RequestState> SubmitJobAsync(JobRequest jobParams, string submissionConfigurationName,
CancellationToken cancellationToken = default, ConcurrencyStrategy? concurrencyStrategy= null)
{
cancellationToken.ThrowIfCancellationRequested();

concurrencyStrategy ??= ConcurrencyStrategy.IGNORE;
if (concurrencyStrategy != ConcurrencyStrategy.IGNORE)
{
if (string.IsNullOrEmpty(jobParams.ClientTag))
{
throw new ArgumentException("You must supply a client tag when using a concurrency strategy");
}

var existingJobIds = await GetJobIdsByTagAsync(jobParams.ClientTag, cancellationToken);

var incompleteJobs = (await Task.WhenAll(existingJobIds
.Select(async id => await GetJobStateAsync(id, cancellationToken))))
.Where(state => !completedStages.Contains(state.LifeCycleStage)).ToArray();

if (incompleteJobs.Any())
{
switch (concurrencyStrategy)
{
case ConcurrencyStrategy.SKIP:
throw new ConcurrencyError(concurrencyStrategy.Value, incompleteJobs.First().Id,
jobParams.ClientTag);
case ConcurrencyStrategy.AWAIT:
await Task.WhenAll(incompleteJobs
.Select(job => AwaitRunAsync(job.Id, TimeSpan.FromSeconds(5), cancellationToken)));
break;
case ConcurrencyStrategy.REPLACE:
throw new NotImplementedException("ConcurrencyStrategy.REPLACE not implemented for BEAST");
}
}
}
if(Regex.IsMatch(jobParams.ClientTag ?? "", @"[^\w\d\-\._~]"))
{
throw new ArgumentException("ClientTag can only contain alphanumeric characters, hyphens, periods, underscores, and tildes");
}

var requestUri = new Uri(baseUri, new Uri($"job/submit/{submissionConfigurationName}", UriKind.Relative));
var request = new HttpRequestMessage(HttpMethod.Post, requestUri)
{
Content = new StringContent(JsonSerializer.Serialize(jobParams), Encoding.UTF8, "application/json")
};
var response = await SendAuthenticatedRequestAsync(request, cancellationToken);
response.EnsureSuccessStatusCode();
return JsonSerializer.Deserialize<RequestState>(
await response.Content.ReadAsStringAsync(cancellationToken),
JsonSerializerOptions);
}

/// <inheritdoc />
public async Task<RequestState> AwaitRunAsync(string requestId, TimeSpan pollInterval,
CancellationToken cancellationToken)
{
RequestState result = null;

if (cancellationToken == CancellationToken.None)
{
throw new ArgumentException("Cancellation token None is not allowed.");
}

cancellationToken.ThrowIfCancellationRequested();

do
{
await Task.Delay(pollInterval, cancellationToken);
result = await GetRequestState(requestId, cancellationToken);
if (completedStages.Contains(result.LifeCycleStage))
{
return result;
}
} while (!cancellationToken.IsCancellationRequested);

return result;
}

/// <inheritdoc />
public Task<RequestState> GetJobStateAsync(string requestId, CancellationToken cancellationToken = default)
{
return GetRequestState(requestId, cancellationToken);
}


private async Task<string[]> GetJobIdsByTagAsync(string clientTag, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
var requestUri = new Uri(baseUri, new Uri($"job/requests/tags/{clientTag}", UriKind.Relative));
var request = new HttpRequestMessage(HttpMethod.Get, requestUri);
var response = await SendAuthenticatedRequestAsync(request, cancellationToken);
response.EnsureSuccessStatusCode();
return JsonSerializer.Deserialize<string[]>(await response.Content.ReadAsStringAsync(cancellationToken),
JsonSerializerOptions);
}

private async Task<RequestState> GetRequestState(string requestId, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
var requestUri = new Uri(baseUri, new Uri($"job/requests/{requestId}", UriKind.Relative));
var request = new HttpRequestMessage(HttpMethod.Get, requestUri);
var response = await SendAuthenticatedRequestAsync(request, cancellationToken);
response.EnsureSuccessStatusCode();
return JsonSerializer.Deserialize<RequestState>(
await response.Content.ReadAsStringAsync(cancellationToken),
JsonSerializerOptions);
}
}
17 changes: 17 additions & 0 deletions src/SnD.ApiClient/Beast/Models/BeastRequestLifeCycleStage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace SnD.ApiClient.Beast.Models;

public enum BeastRequestLifeCycleStage
{
NEW,
QUEUED,
ALLOCATING,
ALLOCATED,
SUBMITTING,
RUNNING,
SCHEDULING_FAILED,
SUBMISSION_FAILED,
FAILED,
COMPLETED,
STALE,
RETRY
}
19 changes: 19 additions & 0 deletions src/SnD.ApiClient/Beast/Models/JobDataSocket.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace SnD.ApiClient.Beast.Models;

public sealed class JobDataSocket
{
/// <summary>
/// Alias of the data socket
/// </summary>
public string Alias { get; set; }

/// <summary>
/// Fully qualified path to actual data, i.e. abfss://..., s3://... etc.
/// </summary>
public string DataPath { get; set; }

/// <summary>
/// Data format, i.e. csv, json, delta etc.
/// </summary>
public string DataFormat { get; set; }
}
32 changes: 32 additions & 0 deletions src/SnD.ApiClient/Beast/Models/JobRequest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
namespace SnD.ApiClient.Beast.Models;

/// <summary>
/// Job inputs for a Beast job
/// </summary>
public record JobRequest
{
/// <summary>
/// Input definitions - where to read data from and in what format
/// </summary>
public JobDataSocket[] Inputs { get; set; }

/// <summary>
/// Output definitions - where to write data to and in what format
/// </summary>
public JobDataSocket[] Outputs { get; set; }

/// <summary>
/// Any extra args and their values defined by a job's developer
/// </summary>
public Dictionary<string, string> ExtraArgs { get; set; }

/// <summary>
/// Expected number of parallel running tasks in each Spark stage.
/// </summary>
public int? ExpectedParallelism { get; set; }

/// <summary>
/// Tags to apply when submitting a job, so a client can identify a request w/o knowing the id assigned by Beast
/// </summary>
public string ClientTag { get; set; }
}
19 changes: 19 additions & 0 deletions src/SnD.ApiClient/Beast/Models/RequestBase.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace SnD.ApiClient.Beast.Models;

public abstract record RequestBase
{
/// <summary>
/// Unique request identifier assigned after successful buffering.
/// </summary>
public string Id { get; set; }

/// <summary>
/// Request client tag.
/// </summary>
public string ClientTag { get; set; }

/// <summary>
/// Request last modified timestamp.
/// </summary>
public DateTimeOffset? LastModified { get; set; }
}
Loading

0 comments on commit 84671d9

Please sign in to comment.