Skip to content

Commit

Permalink
Merge f86f7da into da9ac2e
Browse files Browse the repository at this point in the history
  • Loading branch information
davidreneuw authored Dec 17, 2024
2 parents da9ac2e + f86f7da commit e336a6d
Show file tree
Hide file tree
Showing 11 changed files with 88 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Runtime.CompilerServices;
using Datahub.Core.Model.Context;

[assembly: InternalsVisibleTo("Datahub.SpecflowTests")]

Expand Down Expand Up @@ -39,8 +40,9 @@ public Task<bool> VerifyAndRefreshWorkspaceCostsAsync(string workspaceAcronym,
/// Checks if the costs for the given workspace acronym need to be updated.
/// </summary>
/// <param name="workspaceAcronym">The workspace acronym to check for</param>
/// <param name="ctx">The project db context to use, to avoid having to create a context every time</param>
/// <returns>True if it is needed, false otherwise</returns>
public bool CheckUpdateNeeded(string workspaceAcronym);
public bool CheckUpdateNeeded(string workspaceAcronym, DatahubProjectDBContext ctx);

/// <summary>
/// Queries the costs for the given subscription id within the given date range.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace Datahub.Application.Services.Storage
using Datahub.Core.Model.Context;

namespace Datahub.Application.Services.Storage
{
public interface IWorkspaceStorageManagementService
{
Expand All @@ -22,7 +24,8 @@ public interface IWorkspaceStorageManagementService
/// Checks if a storage update is needed for a workspace
/// </summary>
/// <param name="workspaceAcronym">The workspace acronym to check for</param>
/// <param name="ctx">Project db context to use</param>
/// <returns>True if it is needed, false otherwise</returns>
public bool CheckUpdateNeeded(string workspaceAcronym);
public bool CheckUpdateNeeded(string workspaceAcronym, DatahubProjectDBContext ctx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public static string GetResourceGroupName(this Datahub_Project project)
{
var jsonContent = JsonSerializer.Deserialize<JsonObject>(newProjectResource.JsonContent);
string rgName = jsonContent["resource_group_name"]!.ToString();
if (rgName == "Missing") throw new Exception("Resource group name not found");
return rgName;
}

Expand All @@ -30,6 +31,7 @@ public static string GetResourceGroupNameFromBlob(this Datahub_Project project)
{
var jsonContent = JsonSerializer.Deserialize<JsonObject>(blobStorageResource.JsonContent);
var rgName = jsonContent["resource_group_name"]!.ToString();
if (rgName == "Missing") throw new Exception("Resource group name not found");
return rgName;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Datahub.Application.Services.Cost;
using Datahub.Core.Model.Context;

namespace Datahub.Infrastructure.Offline
{
Expand All @@ -19,7 +20,7 @@ public Task<bool> VerifyAndRefreshWorkspaceCostsAsync(string workspaceAcronym, L
throw new NotImplementedException();
}

public bool CheckUpdateNeeded(string workspaceAcronym)
public bool CheckUpdateNeeded(string workspaceAcronym, DatahubProjectDBContext ctx)
{
throw new NotImplementedException();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Datahub.Application.Services.Storage;
using Datahub.Core.Model.Context;

namespace Datahub.Infrastructure.Offline
{
Expand All @@ -14,7 +15,7 @@ public Task<double> UpdateStorageCapacity(string workspaceAcronym, List<string>?
throw new NotImplementedException();
}

public bool CheckUpdateNeeded(string workspaceAcronym)
public bool CheckUpdateNeeded(string workspaceAcronym, DatahubProjectDBContext ctx)
{
throw new NotImplementedException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ public async Task<bool> VerifyAndRefreshWorkspaceCostsAsync(string workspaceAcro

if (diff > REFRESH_THRESHOLD)
{
logger.LogWarning("Workspace costs for {WorkspaceAcronym} do not match Azure costs (diff = ${Diff} > ${Threshold}). " +
"Refreshing costs for workspace", workspaceAcronym, diff, REFRESH_THRESHOLD);
logger.LogWarning(
"Workspace costs for {WorkspaceAcronym} do not match Azure costs (diff = ${Diff} > ${Threshold}). " +
"Refreshing costs for workspace", workspaceAcronym, diff, REFRESH_THRESHOLD);
if (executeRefresh) return await RefreshWorkspaceCostsAsync(workspaceAcronym);
return true;
}
Expand All @@ -120,10 +121,10 @@ public async Task<bool> VerifyAndRefreshWorkspaceCostsAsync(string workspaceAcro
}

/// <inheritdoc />
public bool CheckUpdateNeeded(string workspaceAcronym)
public bool CheckUpdateNeeded(string workspaceAcronym, DatahubProjectDBContext ctx)
{
using var ctx = dbContextFactory.CreateDbContext();
var credits = ctx.Project_Credits
.AsNoTracking()
.Include(c => c.Project)
.FirstOrDefault(c => c.Project.Project_Acronym_CD == workspaceAcronym);
if (credits is null) return true;
Expand All @@ -142,6 +143,7 @@ public async Task<List<DailyServiceCost>> QuerySubscriptionCostsAsync(string sub
{
subscriptionId = $"/subscriptions/{subscriptionId}";
}

var queryResult = await QueryScopeCostsAsync(subscriptionId, startDate, endDate, granularity, rgNames);
return queryResult;
}
Expand Down Expand Up @@ -535,7 +537,9 @@ internal List<DailyServiceCost> ParseQueryResult(List<QueryResult> queryResults)
{
lstDailyCosts.Add(new DailyServiceCost
{
Amount = costColumn < 0 ? 0 : decimal.Parse(r[costColumn].ToString(), CultureInfo.InvariantCulture),
Amount = costColumn < 0
? 0
: decimal.Parse(r[costColumn].ToString(), CultureInfo.InvariantCulture),
Source = serviceColumn < 0
? String.Empty
: r[serviceColumn].ToString().Replace("\"", ""),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,15 @@ await ctx.Project_Storage_Avgs.FirstOrDefaultAsync(p =>
}

/// <inheritdoc />
public bool CheckUpdateNeeded(string workspaceAcronym)
public bool CheckUpdateNeeded(string workspaceAcronym, DatahubProjectDBContext ctx)
{
using var ctx = dbContextFactory.CreateDbContext();
var date = DateTime.UtcNow.Date;
var project = ctx.Projects.First(p => p.Project_Acronym_CD == workspaceAcronym);
var projectAverage = ctx.Project_Storage_Avgs.FirstOrDefault(p =>
p.ProjectId == project.Project_ID && p.Date == date.Date);
var project = ctx.Projects
.AsNoTracking()
.First(p => p.Project_Acronym_CD == workspaceAcronym);
var projectAverage = ctx.Project_Storage_Avgs
.AsNoTracking()
.FirstOrDefault(p => p.ProjectId == project.Project_ID && p.Date == date.Date);
if (projectAverage is null) return true;
return false;
}
Expand Down Expand Up @@ -148,7 +150,7 @@ internal async Task<List<string>> GetStorageAccountIds(string workspaceAcronym)
logger.LogError("Resource group with id {RgId} not found", rgId);
throw new Exception($"Resource group with id {rgId} not found");
}

var storageAccountsCollection = response.Value.GetStorageAccounts();
var storageAccountsPageable = storageAccountsCollection.GetAll();
var storageAccounts = storageAccountsPageable.ToList();
Expand All @@ -163,7 +165,7 @@ internal async Task<List<string>> GetStorageAccountIds(string workspaceAcronym)

return storageIds;
}

#endregion
}
}
8 changes: 4 additions & 4 deletions Portal/test/Datahub.SpecflowTests/Hooks/ProjectUsageHook.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ public void BeforeScenarioWorkspaceCosts(IObjectContainer objectContainer,
}
});
workspaceCostsManagementService
.CheckUpdateNeeded(Testing.WorkspaceAcronym)
.CheckUpdateNeeded(Testing.WorkspaceAcronym, Arg.Any<DatahubProjectDBContext>())
.Returns(true);
workspaceCostsManagementService
.CheckUpdateNeeded(Testing.WorkspaceAcronym2)
.CheckUpdateNeeded(Testing.WorkspaceAcronym2, Arg.Any<DatahubProjectDBContext>())
.Returns(false);
workspaceBudgetManagementService
.GetWorkspaceBudgetAmountAsync(Arg.Any<string>())
Expand All @@ -132,10 +132,10 @@ public void BeforeScenarioWorkspaceCosts(IObjectContainer objectContainer,
.UpdateStorageCapacity(Arg.Any<string>())
.Returns(100.0);
workspaceStorageManagementService
.CheckUpdateNeeded(Testing.WorkspaceAcronym)
.CheckUpdateNeeded(Testing.WorkspaceAcronym, Arg.Any<DatahubProjectDBContext>())
.Returns(true);
workspaceStorageManagementService
.CheckUpdateNeeded(Testing.WorkspaceAcronym2)
.CheckUpdateNeeded(Testing.WorkspaceAcronym2, Arg.Any<DatahubProjectDBContext>())
.Returns(false);
workspaceRgManagementService
.GetAllSubscriptionResourceGroupsAsync(Arg.Any<string>())
Expand Down
30 changes: 17 additions & 13 deletions ServerlessOperations/src/Datahub.Functions/ProjectUsageScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class ProjectUsageScheduler(
public bool Mock { get; set; } = false;
private readonly ILogger<ProjectUsageScheduler> _logger = loggerFactory.CreateLogger<ProjectUsageScheduler>();
private readonly AzureConfig _azConfig = new(config);
private bool _forceUpdate = false;
private const int WORKSPACE_UPDATE_LIMIT = 100;

[Function("ProjectUsageScheduler")]
Expand Down Expand Up @@ -69,7 +70,7 @@ public async Task<HttpResponseData> RunHttp(

_logger.LogInformation("Request body: {Body}", body);
var schedulerRequest = ParseRequestBody(body);

_forceUpdate = schedulerRequest.Acronyms.Count != 0; // If acronyms are given, we force update for those projects
_logger.LogInformation("Manual rollover is set to: {ManualRollover}", schedulerRequest.ManualRollover);
_logger.LogInformation("Acronyms: {Acronyms}", schedulerRequest.Acronyms);
await RunScheduler(schedulerRequest.Acronyms, schedulerRequest.ManualRollover);
Expand Down Expand Up @@ -123,11 +124,12 @@ public async Task<HttpResponseData> RunHttp(
_logger.LogInformation("Sending messages to update usage and capacity for {Count} projects", projects.Count);
var costMessages = 0;
var storageMessages = 0;
var ctx = await dbContextFactory.CreateDbContextAsync();
foreach (var usageMessage in projects.Select(resource => new ProjectUsageUpdateMessage(
resource.Project_Acronym_CD, costBlobName, totalBlobName,
manualRollover)))
{
var (costUpdate, storageUpdate) = await SendMessagesIfNeeded(usageMessage);
var (costUpdate, storageUpdate) = await SendMessagesIfNeeded(usageMessage, ctx);
costMessages += costUpdate ? 1 : 0;
storageMessages += storageUpdate ? 1 : 0;
// We delay to avoid sending all the messages at the same time to avoid throttling
Expand All @@ -139,12 +141,12 @@ public async Task<HttpResponseData> RunHttp(
return (costMessages, storageMessages);
}

internal async Task<(bool, bool)> SendMessagesIfNeeded(ProjectUsageUpdateMessage usageMessage)
internal async Task<(bool, bool)> SendMessagesIfNeeded(ProjectUsageUpdateMessage usageMessage, DatahubProjectDBContext ctx)
{
try
{
var costUpdate = NeedsCostUpdate(usageMessage.ProjectAcronym);
var storageUpdate = NeedsStorageUpdate(usageMessage.ProjectAcronym);
var costUpdate = NeedsCostUpdate(usageMessage.ProjectAcronym, ctx);
var storageUpdate = NeedsStorageUpdate(usageMessage.ProjectAcronym, ctx);

if (costUpdate)
{
Expand Down Expand Up @@ -178,7 +180,9 @@ internal async Task<List<Datahub_Project>> GetProjects(List<string> acronyms, in
await using var ctx = await dbContextFactory.CreateDbContextAsync();
var projects = ctx.Projects
.Include(p => p.DatahubAzureSubscription)
.Include(p => p.Credits).ToList();
.Include(p => p.Credits)
.AsNoTracking()
.ToList();

if (acronyms.Any())
{
Expand All @@ -188,7 +192,7 @@ internal async Task<List<Datahub_Project>> GetProjects(List<string> acronyms, in
else
{
// Otherwise, we grab the last 100 projects that were updated the longest ago
projects = projects.OrderBy(LastUpdate).Where(NeedsUpdate).Take(limit).ToList();
projects = projects.Where(p => NeedsUpdate(p, ctx)).OrderBy(LastUpdate).Take(limit).ToList();
}

return projects;
Expand Down Expand Up @@ -276,19 +280,19 @@ private DateTime LastUpdate(Datahub_Project p)
return p.Credits?.LastUpdate ?? DateTime.MinValue;
}

private bool NeedsUpdate(Datahub_Project p)
private bool NeedsUpdate(Datahub_Project p, DatahubProjectDBContext ctx)
{
return NeedsCostUpdate(p.Project_Acronym_CD) || NeedsStorageUpdate(p.Project_Acronym_CD);
return _forceUpdate || NeedsCostUpdate(p.Project_Acronym_CD, ctx) || NeedsStorageUpdate(p.Project_Acronym_CD, ctx);
}

private bool NeedsCostUpdate(string workspaceAcronym)
private bool NeedsCostUpdate(string workspaceAcronym, DatahubProjectDBContext ctx)
{
return workspaceCostMgmtService.CheckUpdateNeeded(workspaceAcronym);
return _forceUpdate || workspaceCostMgmtService.CheckUpdateNeeded(workspaceAcronym, ctx);
}

private bool NeedsStorageUpdate(string workspaceAcronym)
private bool NeedsStorageUpdate(string workspaceAcronym, DatahubProjectDBContext ctx)
{
return workspaceStorageMgmtService.CheckUpdateNeeded(workspaceAcronym);
return _forceUpdate || workspaceStorageMgmtService.CheckUpdateNeeded(workspaceAcronym, ctx);
}

static ProjectCapacityUpdateMessage ConvertToCapacityUpdateMessage(ProjectUsageUpdateMessage message)
Expand Down
35 changes: 32 additions & 3 deletions ServerlessOperations/src/Datahub.Functions/ProjectUsageUpdater.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ public class ProjectUsageUpdater(
IConfiguration config)
{
private readonly ILogger<ProjectUsageUpdater> _logger = loggerFactory.CreateLogger<ProjectUsageUpdater>();
private static readonly SemaphoreSlim _costUpdateSemaphore = new(1, 10);
private static readonly SemaphoreSlim _storageUpdateSemaphore = new(1, 10);
private readonly AzureConfig _azConfig = new(config);
public bool Mock = false;
public List<DailyServiceCost> MockCosts { get; set; } = new();
public List<DailyServiceCost> MockCosts { get; set; } = new();


[Function("ProjectUsageUpdater")]
Expand All @@ -45,7 +47,20 @@ public async Task Run(
{
// deserialize message
var message = await serviceBusReceivedMessage.DeserializeAndUnwrapMessageAsync<ProjectUsageUpdateMessage>();
await UpdateUsage(message, cancellationToken);
await _costUpdateSemaphore.WaitAsync(cancellationToken);
try
{
await UpdateUsage(message, cancellationToken);
}
catch (Exception e)
{
_logger.LogError("Error updating usage: {Error}", e.Message);
throw new Exception(e.Message);
}
finally
{
_costUpdateSemaphore.Release();
}
}

[Function("ProjectCapacityUsageUpdater")]
Expand All @@ -56,7 +71,20 @@ public async Task UpdateProjectCapacity(
CancellationToken cancellationToken)
{
var message = await serviceBusReceivedMessage.DeserializeAndUnwrapMessageAsync<ProjectCapacityUpdateMessage>();
await UpdateCapacity(message, cancellationToken);
await _storageUpdateSemaphore.WaitAsync(cancellationToken);
try
{
await UpdateCapacity(message, cancellationToken);
}
catch (Exception e)
{
_logger.LogError("Error updating capacity: {Error}", e.Message);
throw new Exception(e.Message);
}
finally
{
_storageUpdateSemaphore.Release();
}
}

internal async Task<bool> UpdateUsage(ProjectUsageUpdateMessage message, CancellationToken cancellationToken)
Expand Down Expand Up @@ -157,6 +185,7 @@ internal async Task<List<DailyServiceCost>> FromBlob(string fileName)
{
return MockCosts;
}

_logger.LogInformation("Downloading from blob {FileName}", fileName);
var blobServiceClient = new BlobServiceClient(_azConfig.MediaStorageConnectionString);
var containerClient = blobServiceClient.GetBlobContainerClient("costs");
Expand Down
4 changes: 3 additions & 1 deletion ServerlessOperations/template.settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
"DatahubServiceBus": {
"ConnectionString": "@Microsoft.KeyVault(VaultName=fsdh-key-$Environment;SecretName=service-bus-connection-string)"
},
"MediaStorageConnectionString": "@Microsoft.KeyVault(VaultName=fsdh-key-$Environment;SecretName=datahub-media-storage-connection-string)",
"Media": {
"StorageConnectionString": "@Microsoft.KeyVault(VaultName=fsdh-key-$Environment;SecretName=datahub-media-storage-connection-string)"
},
"FUNC_SP_CLIENT_ID": "@Microsoft.KeyVault(VaultName=fsdh-key-$Environment;SecretName=datahubportal-client-id)",
"FUNC_SP_CLIENT_SECRET": "@Microsoft.KeyVault(VaultName=fsdh-key-$Environment;SecretName=datahubportal-client-secret)",
"FUNCTIONS_EXTENSION_VERSION": "~4",
Expand Down

0 comments on commit e336a6d

Please sign in to comment.