diff --git a/Portal/src/Datahub.Application/Services/Cost/IWorkspaceCostManagementService.cs b/Portal/src/Datahub.Application/Services/Cost/IWorkspaceCostManagementService.cs index 0b4667954..29bac2df9 100644 --- a/Portal/src/Datahub.Application/Services/Cost/IWorkspaceCostManagementService.cs +++ b/Portal/src/Datahub.Application/Services/Cost/IWorkspaceCostManagementService.cs @@ -1,4 +1,5 @@ using System.Runtime.CompilerServices; +using Datahub.Core.Model.Context; [assembly: InternalsVisibleTo("Datahub.SpecflowTests")] @@ -39,8 +40,9 @@ public Task VerifyAndRefreshWorkspaceCostsAsync(string workspaceAcronym, /// Checks if the costs for the given workspace acronym need to be updated. /// /// The workspace acronym to check for + /// The project db context to use, to avoid having to create a context every time /// True if it is needed, false otherwise - public bool CheckUpdateNeeded(string workspaceAcronym); + public bool CheckUpdateNeeded(string workspaceAcronym, DatahubProjectDBContext ctx); /// /// Queries the costs for the given subscription id within the given date range. diff --git a/Portal/src/Datahub.Application/Services/Storage/IWorkspaceStorageManagementService.cs b/Portal/src/Datahub.Application/Services/Storage/IWorkspaceStorageManagementService.cs index eacba0541..75f8bbab4 100644 --- a/Portal/src/Datahub.Application/Services/Storage/IWorkspaceStorageManagementService.cs +++ b/Portal/src/Datahub.Application/Services/Storage/IWorkspaceStorageManagementService.cs @@ -1,4 +1,6 @@ -namespace Datahub.Application.Services.Storage +using Datahub.Core.Model.Context; + +namespace Datahub.Application.Services.Storage { public interface IWorkspaceStorageManagementService { @@ -22,7 +24,8 @@ public interface IWorkspaceStorageManagementService /// Checks if a storage update is needed for a workspace /// /// The workspace acronym to check for + /// Project db context to use /// True if it is needed, false otherwise - public bool CheckUpdateNeeded(string workspaceAcronym); + public bool CheckUpdateNeeded(string workspaceAcronym, DatahubProjectDBContext ctx); } } \ No newline at end of file diff --git a/Portal/src/Datahub.Core/Extensions/ProjectResourceGroupExtension.cs b/Portal/src/Datahub.Core/Extensions/ProjectResourceGroupExtension.cs index bf10ef8b3..2f9ad8af2 100644 --- a/Portal/src/Datahub.Core/Extensions/ProjectResourceGroupExtension.cs +++ b/Portal/src/Datahub.Core/Extensions/ProjectResourceGroupExtension.cs @@ -16,6 +16,7 @@ public static string GetResourceGroupName(this Datahub_Project project) { var jsonContent = JsonSerializer.Deserialize(newProjectResource.JsonContent); string rgName = jsonContent["resource_group_name"]!.ToString(); + if (rgName == "Missing") throw new Exception("Resource group name not found"); return rgName; } @@ -30,6 +31,7 @@ public static string GetResourceGroupNameFromBlob(this Datahub_Project project) { var jsonContent = JsonSerializer.Deserialize(blobStorageResource.JsonContent); var rgName = jsonContent["resource_group_name"]!.ToString(); + if (rgName == "Missing") throw new Exception("Resource group name not found"); return rgName; } diff --git a/Portal/src/Datahub.Infrastructure.Offline/OfflineWorkspaceCostManagementService.cs b/Portal/src/Datahub.Infrastructure.Offline/OfflineWorkspaceCostManagementService.cs index 8207bbdfd..4cf5e5ba7 100644 --- a/Portal/src/Datahub.Infrastructure.Offline/OfflineWorkspaceCostManagementService.cs +++ b/Portal/src/Datahub.Infrastructure.Offline/OfflineWorkspaceCostManagementService.cs @@ -1,4 +1,5 @@ using Datahub.Application.Services.Cost; +using Datahub.Core.Model.Context; namespace Datahub.Infrastructure.Offline { @@ -19,7 +20,7 @@ public Task VerifyAndRefreshWorkspaceCostsAsync(string workspaceAcronym, L throw new NotImplementedException(); } - public bool CheckUpdateNeeded(string workspaceAcronym) + public bool CheckUpdateNeeded(string workspaceAcronym, DatahubProjectDBContext ctx) { throw new NotImplementedException(); } diff --git a/Portal/src/Datahub.Infrastructure.Offline/OfflineWorkspaceStorageManagementService.cs b/Portal/src/Datahub.Infrastructure.Offline/OfflineWorkspaceStorageManagementService.cs index ebfc29d8d..b262a887a 100644 --- a/Portal/src/Datahub.Infrastructure.Offline/OfflineWorkspaceStorageManagementService.cs +++ b/Portal/src/Datahub.Infrastructure.Offline/OfflineWorkspaceStorageManagementService.cs @@ -1,4 +1,5 @@ using Datahub.Application.Services.Storage; +using Datahub.Core.Model.Context; namespace Datahub.Infrastructure.Offline { @@ -14,7 +15,7 @@ public Task UpdateStorageCapacity(string workspaceAcronym, List? throw new NotImplementedException(); } - public bool CheckUpdateNeeded(string workspaceAcronym) + public bool CheckUpdateNeeded(string workspaceAcronym, DatahubProjectDBContext ctx) { throw new NotImplementedException(); } diff --git a/Portal/src/Datahub.Infrastructure/Services/Cost/WorkspaceCostManagementService.cs b/Portal/src/Datahub.Infrastructure/Services/Cost/WorkspaceCostManagementService.cs index 1f1cdd0ea..c90de2ea3 100644 --- a/Portal/src/Datahub.Infrastructure/Services/Cost/WorkspaceCostManagementService.cs +++ b/Portal/src/Datahub.Infrastructure/Services/Cost/WorkspaceCostManagementService.cs @@ -110,8 +110,9 @@ public async Task VerifyAndRefreshWorkspaceCostsAsync(string workspaceAcro if (diff > REFRESH_THRESHOLD) { - logger.LogWarning("Workspace costs for {WorkspaceAcronym} do not match Azure costs (diff = ${Diff} > ${Threshold}). " + - "Refreshing costs for workspace", workspaceAcronym, diff, REFRESH_THRESHOLD); + logger.LogWarning( + "Workspace costs for {WorkspaceAcronym} do not match Azure costs (diff = ${Diff} > ${Threshold}). " + + "Refreshing costs for workspace", workspaceAcronym, diff, REFRESH_THRESHOLD); if (executeRefresh) return await RefreshWorkspaceCostsAsync(workspaceAcronym); return true; } @@ -120,10 +121,10 @@ public async Task VerifyAndRefreshWorkspaceCostsAsync(string workspaceAcro } /// - public bool CheckUpdateNeeded(string workspaceAcronym) + public bool CheckUpdateNeeded(string workspaceAcronym, DatahubProjectDBContext ctx) { - using var ctx = dbContextFactory.CreateDbContext(); var credits = ctx.Project_Credits + .AsNoTracking() .Include(c => c.Project) .FirstOrDefault(c => c.Project.Project_Acronym_CD == workspaceAcronym); if (credits is null) return true; @@ -142,6 +143,7 @@ public async Task> QuerySubscriptionCostsAsync(string sub { subscriptionId = $"/subscriptions/{subscriptionId}"; } + var queryResult = await QueryScopeCostsAsync(subscriptionId, startDate, endDate, granularity, rgNames); return queryResult; } @@ -535,7 +537,9 @@ internal List ParseQueryResult(List queryResults) { lstDailyCosts.Add(new DailyServiceCost { - Amount = costColumn < 0 ? 0 : decimal.Parse(r[costColumn].ToString(), CultureInfo.InvariantCulture), + Amount = costColumn < 0 + ? 0 + : decimal.Parse(r[costColumn].ToString(), CultureInfo.InvariantCulture), Source = serviceColumn < 0 ? String.Empty : r[serviceColumn].ToString().Replace("\"", ""), diff --git a/Portal/src/Datahub.Infrastructure/Services/Storage/WorkspaceStorageManagementService.cs b/Portal/src/Datahub.Infrastructure/Services/Storage/WorkspaceStorageManagementService.cs index b360360eb..3137de7a0 100644 --- a/Portal/src/Datahub.Infrastructure/Services/Storage/WorkspaceStorageManagementService.cs +++ b/Portal/src/Datahub.Infrastructure/Services/Storage/WorkspaceStorageManagementService.cs @@ -104,13 +104,15 @@ await ctx.Project_Storage_Avgs.FirstOrDefaultAsync(p => } /// - public bool CheckUpdateNeeded(string workspaceAcronym) + public bool CheckUpdateNeeded(string workspaceAcronym, DatahubProjectDBContext ctx) { - using var ctx = dbContextFactory.CreateDbContext(); var date = DateTime.UtcNow.Date; - var project = ctx.Projects.First(p => p.Project_Acronym_CD == workspaceAcronym); - var projectAverage = ctx.Project_Storage_Avgs.FirstOrDefault(p => - p.ProjectId == project.Project_ID && p.Date == date.Date); + var project = ctx.Projects + .AsNoTracking() + .First(p => p.Project_Acronym_CD == workspaceAcronym); + var projectAverage = ctx.Project_Storage_Avgs + .AsNoTracking() + .FirstOrDefault(p => p.ProjectId == project.Project_ID && p.Date == date.Date); if (projectAverage is null) return true; return false; } @@ -148,7 +150,7 @@ internal async Task> GetStorageAccountIds(string workspaceAcronym) logger.LogError("Resource group with id {RgId} not found", rgId); throw new Exception($"Resource group with id {rgId} not found"); } - + var storageAccountsCollection = response.Value.GetStorageAccounts(); var storageAccountsPageable = storageAccountsCollection.GetAll(); var storageAccounts = storageAccountsPageable.ToList(); @@ -163,7 +165,7 @@ internal async Task> GetStorageAccountIds(string workspaceAcronym) return storageIds; } - + #endregion } } \ No newline at end of file diff --git a/Portal/test/Datahub.SpecflowTests/Hooks/ProjectUsageHook.cs b/Portal/test/Datahub.SpecflowTests/Hooks/ProjectUsageHook.cs index 7c12ff96d..36ef878a5 100644 --- a/Portal/test/Datahub.SpecflowTests/Hooks/ProjectUsageHook.cs +++ b/Portal/test/Datahub.SpecflowTests/Hooks/ProjectUsageHook.cs @@ -117,10 +117,10 @@ public void BeforeScenarioWorkspaceCosts(IObjectContainer objectContainer, } }); workspaceCostsManagementService - .CheckUpdateNeeded(Testing.WorkspaceAcronym) + .CheckUpdateNeeded(Testing.WorkspaceAcronym, Arg.Any()) .Returns(true); workspaceCostsManagementService - .CheckUpdateNeeded(Testing.WorkspaceAcronym2) + .CheckUpdateNeeded(Testing.WorkspaceAcronym2, Arg.Any()) .Returns(false); workspaceBudgetManagementService .GetWorkspaceBudgetAmountAsync(Arg.Any()) @@ -132,10 +132,10 @@ public void BeforeScenarioWorkspaceCosts(IObjectContainer objectContainer, .UpdateStorageCapacity(Arg.Any()) .Returns(100.0); workspaceStorageManagementService - .CheckUpdateNeeded(Testing.WorkspaceAcronym) + .CheckUpdateNeeded(Testing.WorkspaceAcronym, Arg.Any()) .Returns(true); workspaceStorageManagementService - .CheckUpdateNeeded(Testing.WorkspaceAcronym2) + .CheckUpdateNeeded(Testing.WorkspaceAcronym2, Arg.Any()) .Returns(false); workspaceRgManagementService .GetAllSubscriptionResourceGroupsAsync(Arg.Any()) diff --git a/ServerlessOperations/src/Datahub.Functions/ProjectUsageScheduler.cs b/ServerlessOperations/src/Datahub.Functions/ProjectUsageScheduler.cs index 66ee4e55f..4b45d2b22 100644 --- a/ServerlessOperations/src/Datahub.Functions/ProjectUsageScheduler.cs +++ b/ServerlessOperations/src/Datahub.Functions/ProjectUsageScheduler.cs @@ -36,6 +36,7 @@ public class ProjectUsageScheduler( public bool Mock { get; set; } = false; private readonly ILogger _logger = loggerFactory.CreateLogger(); private readonly AzureConfig _azConfig = new(config); + private bool _forceUpdate = false; private const int WORKSPACE_UPDATE_LIMIT = 100; [Function("ProjectUsageScheduler")] @@ -69,7 +70,7 @@ public async Task RunHttp( _logger.LogInformation("Request body: {Body}", body); var schedulerRequest = ParseRequestBody(body); - + _forceUpdate = schedulerRequest.Acronyms.Count != 0; // If acronyms are given, we force update for those projects _logger.LogInformation("Manual rollover is set to: {ManualRollover}", schedulerRequest.ManualRollover); _logger.LogInformation("Acronyms: {Acronyms}", schedulerRequest.Acronyms); await RunScheduler(schedulerRequest.Acronyms, schedulerRequest.ManualRollover); @@ -123,11 +124,12 @@ public async Task RunHttp( _logger.LogInformation("Sending messages to update usage and capacity for {Count} projects", projects.Count); var costMessages = 0; var storageMessages = 0; + var ctx = await dbContextFactory.CreateDbContextAsync(); foreach (var usageMessage in projects.Select(resource => new ProjectUsageUpdateMessage( resource.Project_Acronym_CD, costBlobName, totalBlobName, manualRollover))) { - var (costUpdate, storageUpdate) = await SendMessagesIfNeeded(usageMessage); + var (costUpdate, storageUpdate) = await SendMessagesIfNeeded(usageMessage, ctx); costMessages += costUpdate ? 1 : 0; storageMessages += storageUpdate ? 1 : 0; // We delay to avoid sending all the messages at the same time to avoid throttling @@ -139,12 +141,12 @@ public async Task RunHttp( return (costMessages, storageMessages); } - internal async Task<(bool, bool)> SendMessagesIfNeeded(ProjectUsageUpdateMessage usageMessage) + internal async Task<(bool, bool)> SendMessagesIfNeeded(ProjectUsageUpdateMessage usageMessage, DatahubProjectDBContext ctx) { try { - var costUpdate = NeedsCostUpdate(usageMessage.ProjectAcronym); - var storageUpdate = NeedsStorageUpdate(usageMessage.ProjectAcronym); + var costUpdate = NeedsCostUpdate(usageMessage.ProjectAcronym, ctx); + var storageUpdate = NeedsStorageUpdate(usageMessage.ProjectAcronym, ctx); if (costUpdate) { @@ -178,7 +180,9 @@ internal async Task> GetProjects(List acronyms, in await using var ctx = await dbContextFactory.CreateDbContextAsync(); var projects = ctx.Projects .Include(p => p.DatahubAzureSubscription) - .Include(p => p.Credits).ToList(); + .Include(p => p.Credits) + .AsNoTracking() + .ToList(); if (acronyms.Any()) { @@ -188,7 +192,7 @@ internal async Task> GetProjects(List acronyms, in else { // Otherwise, we grab the last 100 projects that were updated the longest ago - projects = projects.OrderBy(LastUpdate).Where(NeedsUpdate).Take(limit).ToList(); + projects = projects.Where(p => NeedsUpdate(p, ctx)).OrderBy(LastUpdate).Take(limit).ToList(); } return projects; @@ -276,19 +280,19 @@ private DateTime LastUpdate(Datahub_Project p) return p.Credits?.LastUpdate ?? DateTime.MinValue; } - private bool NeedsUpdate(Datahub_Project p) + private bool NeedsUpdate(Datahub_Project p, DatahubProjectDBContext ctx) { - return NeedsCostUpdate(p.Project_Acronym_CD) || NeedsStorageUpdate(p.Project_Acronym_CD); + return _forceUpdate || NeedsCostUpdate(p.Project_Acronym_CD, ctx) || NeedsStorageUpdate(p.Project_Acronym_CD, ctx); } - private bool NeedsCostUpdate(string workspaceAcronym) + private bool NeedsCostUpdate(string workspaceAcronym, DatahubProjectDBContext ctx) { - return workspaceCostMgmtService.CheckUpdateNeeded(workspaceAcronym); + return _forceUpdate || workspaceCostMgmtService.CheckUpdateNeeded(workspaceAcronym, ctx); } - private bool NeedsStorageUpdate(string workspaceAcronym) + private bool NeedsStorageUpdate(string workspaceAcronym, DatahubProjectDBContext ctx) { - return workspaceStorageMgmtService.CheckUpdateNeeded(workspaceAcronym); + return _forceUpdate || workspaceStorageMgmtService.CheckUpdateNeeded(workspaceAcronym, ctx); } static ProjectCapacityUpdateMessage ConvertToCapacityUpdateMessage(ProjectUsageUpdateMessage message) diff --git a/ServerlessOperations/src/Datahub.Functions/ProjectUsageUpdater.cs b/ServerlessOperations/src/Datahub.Functions/ProjectUsageUpdater.cs index 27da1ebcd..a4899117b 100644 --- a/ServerlessOperations/src/Datahub.Functions/ProjectUsageUpdater.cs +++ b/ServerlessOperations/src/Datahub.Functions/ProjectUsageUpdater.cs @@ -31,9 +31,11 @@ public class ProjectUsageUpdater( IConfiguration config) { private readonly ILogger _logger = loggerFactory.CreateLogger(); + private static readonly SemaphoreSlim _costUpdateSemaphore = new(1, 10); + private static readonly SemaphoreSlim _storageUpdateSemaphore = new(1, 10); private readonly AzureConfig _azConfig = new(config); public bool Mock = false; - public List MockCosts { get; set; } = new(); + public List MockCosts { get; set; } = new(); [Function("ProjectUsageUpdater")] @@ -45,7 +47,20 @@ public async Task Run( { // deserialize message var message = await serviceBusReceivedMessage.DeserializeAndUnwrapMessageAsync(); - await UpdateUsage(message, cancellationToken); + await _costUpdateSemaphore.WaitAsync(cancellationToken); + try + { + await UpdateUsage(message, cancellationToken); + } + catch (Exception e) + { + _logger.LogError("Error updating usage: {Error}", e.Message); + throw new Exception(e.Message); + } + finally + { + _costUpdateSemaphore.Release(); + } } [Function("ProjectCapacityUsageUpdater")] @@ -56,7 +71,20 @@ public async Task UpdateProjectCapacity( CancellationToken cancellationToken) { var message = await serviceBusReceivedMessage.DeserializeAndUnwrapMessageAsync(); - await UpdateCapacity(message, cancellationToken); + await _storageUpdateSemaphore.WaitAsync(cancellationToken); + try + { + await UpdateCapacity(message, cancellationToken); + } + catch (Exception e) + { + _logger.LogError("Error updating capacity: {Error}", e.Message); + throw new Exception(e.Message); + } + finally + { + _storageUpdateSemaphore.Release(); + } } internal async Task UpdateUsage(ProjectUsageUpdateMessage message, CancellationToken cancellationToken) @@ -157,6 +185,7 @@ internal async Task> FromBlob(string fileName) { return MockCosts; } + _logger.LogInformation("Downloading from blob {FileName}", fileName); var blobServiceClient = new BlobServiceClient(_azConfig.MediaStorageConnectionString); var containerClient = blobServiceClient.GetBlobContainerClient("costs"); diff --git a/ServerlessOperations/template.settings.json b/ServerlessOperations/template.settings.json index 376766220..eab42ae3b 100644 --- a/ServerlessOperations/template.settings.json +++ b/ServerlessOperations/template.settings.json @@ -25,7 +25,9 @@ "DatahubServiceBus": { "ConnectionString": "@Microsoft.KeyVault(VaultName=fsdh-key-$Environment;SecretName=service-bus-connection-string)" }, - "MediaStorageConnectionString": "@Microsoft.KeyVault(VaultName=fsdh-key-$Environment;SecretName=datahub-media-storage-connection-string)", + "Media": { + "StorageConnectionString": "@Microsoft.KeyVault(VaultName=fsdh-key-$Environment;SecretName=datahub-media-storage-connection-string)" + }, "FUNC_SP_CLIENT_ID": "@Microsoft.KeyVault(VaultName=fsdh-key-$Environment;SecretName=datahubportal-client-id)", "FUNC_SP_CLIENT_SECRET": "@Microsoft.KeyVault(VaultName=fsdh-key-$Environment;SecretName=datahubportal-client-secret)", "FUNCTIONS_EXTENSION_VERSION": "~4",