From 75aebb304aca458365a8e6be5d1c00078863d2fa Mon Sep 17 00:00:00 2001 From: TDEVMPJ Date: Sun, 30 Apr 2023 18:30:11 -0400 Subject: [PATCH 1/6] Add taskHubName parameter to all relevant SPs. Add TaskHubMode parameter to the settings. Breaking changes in vInstance and vHistory views usage. --- .../SqlDurabilityOptions.cs | 2 +- .../SqlScaleMonitor.cs | 2 +- src/DurableTask.SqlServer/Scripts/logic.sql | 115 +++++++++++------- src/DurableTask.SqlServer/SqlDbManager.cs | 11 ++ .../SqlOrchestrationService.cs | 79 ++++++++---- .../SqlOrchestrationServiceSettings.cs | 25 ++-- src/DurableTask.SqlServer/SqlUtils.cs | 50 ++++++++ .../Integration/DatabaseManagement.cs | 16 ++- .../Utils/TestService.cs | 2 +- 9 files changed, 216 insertions(+), 86 deletions(-) diff --git a/src/DurableTask.SqlServer.AzureFunctions/SqlDurabilityOptions.cs b/src/DurableTask.SqlServer.AzureFunctions/SqlDurabilityOptions.cs index 3b7fc99..53ebc72 100644 --- a/src/DurableTask.SqlServer.AzureFunctions/SqlDurabilityOptions.cs +++ b/src/DurableTask.SqlServer.AzureFunctions/SqlDurabilityOptions.cs @@ -17,7 +17,7 @@ class SqlDurabilityOptions public string ConnectionStringName { get; set; } = "SQLDB_Connection"; [JsonProperty("taskHubName")] - public string TaskHubName { get; set; } = "default"; + public string TaskHubName { get; set; } = SqlOrchestrationServiceSettings.DefaultTaskHubName; [JsonProperty("taskEventLockTimeout")] public TimeSpan TaskEventLockTimeout { get; set; } = TimeSpan.FromMinutes(2); diff --git a/src/DurableTask.SqlServer.AzureFunctions/SqlScaleMonitor.cs b/src/DurableTask.SqlServer.AzureFunctions/SqlScaleMonitor.cs index 447dc3e..2e6b363 100644 --- a/src/DurableTask.SqlServer.AzureFunctions/SqlScaleMonitor.cs +++ b/src/DurableTask.SqlServer.AzureFunctions/SqlScaleMonitor.cs @@ -26,7 +26,7 @@ class SqlScaleMonitor : IScaleMonitor public SqlScaleMonitor(SqlOrchestrationService service, string taskHubName) { this.service = service ?? throw new ArgumentNullException(nameof(service)); - this.Descriptor = new ScaleMonitorDescriptor($"DurableTask-SqlServer:{taskHubName ?? "default"}"); + this.Descriptor = new ScaleMonitorDescriptor($"DurableTask-SqlServer:{taskHubName ?? SqlOrchestrationServiceSettings.DefaultTaskHubName}"); } /// diff --git a/src/DurableTask.SqlServer/Scripts/logic.sql b/src/DurableTask.SqlServer/Scripts/logic.sql index 65a0f55..5ebe255 100644 --- a/src/DurableTask.SqlServer/Scripts/logic.sql +++ b/src/DurableTask.SqlServer/Scripts/logic.sql @@ -1,7 +1,7 @@ -- Copyright (c) Microsoft Corporation. -- Licensed under the MIT License. -CREATE OR ALTER FUNCTION __SchemaNamePlaceholder__.CurrentTaskHub() +CREATE OR ALTER FUNCTION __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName varchar(150)) RETURNS varchar(50) WITH EXECUTE AS CALLER AS @@ -11,31 +11,34 @@ BEGIN -- 1: Task hub names are inferred from the user credential DECLARE @taskHubMode sql_variant = (SELECT TOP 1 [Value] FROM GlobalSettings WHERE [Name] = 'TaskHubMode'); - DECLARE @taskHub varchar(150) - - IF @taskHubMode = 0 - SET @taskHub = APP_NAME() - IF @taskHubMode = 1 - SET @taskHub = USER_NAME() - - IF @taskHub IS NULL - SET @taskHub = 'default' + DECLARE @taskHub varchar(150) = @TaskHubName; + IF @taskHubMode = 1 + BEGIN + SET @taskHub = USER_NAME(); + IF @TaskHubName IS NOT NULL AND @TaskHubName <> '' + BEGIN + DECLARE @msg nvarchar(max) = FORMATMESSAGE('Task hub name ''%s'' is invalid in multi tennant mode (it can be NULL, or '''').', @TaskHubName); + -- function will not allow raising custom error, so approximate with invalid cast + RETURN cast(@msg as int); + END + END + -- if the name is too long, keep the first 16 characters and hash the rest IF LEN(@taskHub) > 50 - SET @taskHub = CONVERT(varchar(16), @taskHub) + '__' + CONVERT(varchar(32), HASHBYTES('MD5', @taskHub), 2) + SET @taskHub = CONVERT(varchar(16), @taskHub) + '__' + CONVERT(varchar(32), HASHBYTES('MD5', @taskHub), 2); - RETURN @taskHub + RETURN @taskHub; END GO -CREATE OR ALTER FUNCTION __SchemaNamePlaceholder__.GetScaleMetric() +CREATE OR ALTER FUNCTION __SchemaNamePlaceholder__.GetScaleMetric(@TaskHubName varchar(150)) RETURNS INT WITH EXECUTE AS CALLER AS BEGIN - DECLARE @taskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub() + DECLARE @taskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) DECLARE @now datetime2 = SYSUTCDATETIME() DECLARE @liveInstances bigint = 0 @@ -57,12 +60,12 @@ END GO -CREATE OR ALTER FUNCTION __SchemaNamePlaceholder__.GetScaleRecommendation(@MaxOrchestrationsPerWorker real, @MaxActivitiesPerWorker real) +CREATE OR ALTER FUNCTION __SchemaNamePlaceholder__.GetScaleRecommendation(@TaskHubName varchar(150), @MaxOrchestrationsPerWorker real, @MaxActivitiesPerWorker real) RETURNS INT WITH EXECUTE AS CALLER AS BEGIN - DECLARE @taskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub() + DECLARE @taskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) DECLARE @now datetime2 = SYSUTCDATETIME() DECLARE @liveInstances bigint = 0 @@ -103,20 +106,20 @@ AS I.[CompletedTime], I.[RuntimeStatus], (SELECT TOP 1 [Text] FROM Payloads P WHERE - P.[TaskHub] = __SchemaNamePlaceholder__.CurrentTaskHub() AND + P.[TaskHub] = I.[TaskHub] AND P.[InstanceID] = I.[InstanceID] AND P.[PayloadID] = I.[CustomStatusPayloadID]) AS [CustomStatusText], (SELECT TOP 1 [Text] FROM Payloads P WHERE - P.[TaskHub] = __SchemaNamePlaceholder__.CurrentTaskHub() AND + P.[TaskHub] = I.[TaskHub] AND P.[InstanceID] = I.[InstanceID] AND P.[PayloadID] = I.[InputPayloadID]) AS [InputText], (SELECT TOP 1 [Text] FROM Payloads P WHERE - P.[TaskHub] = __SchemaNamePlaceholder__.CurrentTaskHub() AND + P.[TaskHub] = I.[TaskHub] AND P.[InstanceID] = I.[InstanceID] AND P.[PayloadID] = I.[OutputPayloadID]) AS [OutputText] FROM Instances I - WHERE - I.[TaskHub] = __SchemaNamePlaceholder__.CurrentTaskHub() + -- like is the simplest way to keep indexed seek with conditional where + WHERE I.[TaskHub] like ISNULL(__SchemaNamePlaceholder__.CurrentTaskHub(null), '%') GO CREATE OR ALTER VIEW __SchemaNamePlaceholder__.vHistory @@ -134,16 +137,17 @@ AS H.[RuntimeStatus], H.[VisibleTime], (SELECT TOP 1 [Text] FROM Payloads P WHERE - P.[TaskHub] = __SchemaNamePlaceholder__.CurrentTaskHub() AND + P.[TaskHub] = H.[TaskHub] AND P.[InstanceID] = H.[InstanceID] AND P.[PayloadID] = H.[DataPayloadID]) AS [Payload] FROM History H - WHERE - H.[TaskHub] = __SchemaNamePlaceholder__.CurrentTaskHub() + -- like is the simplest way to keep indexed seek with conditional where + WHERE H.[TaskHub] like ISNULL(__SchemaNamePlaceholder__.CurrentTaskHub(null), '%') GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__.CreateInstance + @TaskHubName varchar(150), @Name varchar(300), @Version varchar(100) = NULL, @InstanceID varchar(100) = NULL, @@ -153,7 +157,7 @@ CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__.CreateInstance @DedupeStatuses varchar(MAX) = 'Pending,Running' AS BEGIN - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub() + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) DECLARE @EventType varchar(30) = 'ExecutionStarted' DECLARE @RuntimeStatus varchar(30) = 'Pending' @@ -184,7 +188,7 @@ BEGIN -- Purge the existing instance data so that it can be overwritten DECLARE @instancesToPurge InstanceIDs INSERT INTO @instancesToPurge VALUES (@InstanceID) - EXEC __SchemaNamePlaceholder__.PurgeInstanceStateByID @instancesToPurge + EXEC __SchemaNamePlaceholder__.PurgeInstanceStateByID @TaskHubName, @instancesToPurge END COMMIT TRANSACTION @@ -253,11 +257,12 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__.GetInstanceHistory + @TaskHubName varchar(150), @InstanceID varchar(100), @GetInputsAndOutputs bit = 0 AS BEGIN - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub() + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) DECLARE @ParentInstanceID varchar(100) DECLARE @Version varchar(100) @@ -296,6 +301,7 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__.RaiseEvent + @TaskHubName varchar(150), @Name varchar(300), @InstanceID varchar(100) = NULL, @PayloadText varchar(MAX) = NULL, @@ -304,7 +310,7 @@ AS BEGIN BEGIN TRANSACTION - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub() + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) -- External event messages must target new instances or they must use -- the "auto start" instance ID format of @orchestrationname@identifier. @@ -364,6 +370,7 @@ END GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__.TerminateInstance + @TaskHubName varchar(150), @InstanceID varchar(100), @Reason varchar(max) = NULL AS @@ -375,7 +382,7 @@ BEGIN -- order across all stored procedures that execute within a transaction. -- Table order for this sproc: Instances --> (NewEvents --> Payloads --> NewEvents) - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub() + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) DECLARE @existingStatus varchar(30) = ( SELECT TOP 1 existing.[RuntimeStatus] @@ -423,10 +430,11 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__.PurgeInstanceStateByID + @TaskHubName varchar(150), @InstanceIDs InstanceIDs READONLY AS BEGIN - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub() + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) BEGIN TRANSACTION @@ -446,11 +454,12 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__.PurgeInstanceStateByTime + @TaskHubName varchar(150), @ThresholdTime datetime2, @FilterType tinyint = 0 AS BEGIN - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub() + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) DECLARE @instanceIDs InstanceIDs @@ -475,7 +484,7 @@ BEGIN END DECLARE @deletedInstances int - EXECUTE @deletedInstances = __SchemaNamePlaceholder__.PurgeInstanceStateByID @instanceIDs + EXECUTE @deletedInstances = __SchemaNamePlaceholder__.PurgeInstanceStateByID @TaskHubName, @instanceIDs RETURN @deletedInstances END GO @@ -506,6 +515,7 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._LockNextOrchestration + @TaskHubName varchar(150), @BatchSize int, @LockedBy varchar(100), @LockExpiration datetime2 @@ -516,7 +526,7 @@ BEGIN DECLARE @parentInstanceID varchar(100) DECLARE @version varchar(100) DECLARE @runtimeStatus varchar(30) - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub() + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) BEGIN TRANSACTION @@ -567,7 +577,7 @@ BEGIN @parentInstanceID as [ParentInstanceID], @version as [Version] FROM NewEvents N - LEFT OUTER JOIN __SchemaNamePlaceholder__.[Payloads] P ON + LEFT OUTER JOIN [Payloads] P ON P.[TaskHub] = @TaskHub AND P.[InstanceID] = N.[InstanceID] AND P.[PayloadID] = N.[PayloadID] @@ -619,6 +629,7 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._CheckpointOrchestration + @TaskHubName varchar(150), @InstanceID varchar(100), @ExecutionID varchar(50), @RuntimeStatus varchar(30), @@ -631,7 +642,7 @@ AS BEGIN BEGIN TRANSACTION - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub() + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) DECLARE @InputPayloadID uniqueidentifier DECLARE @CustomStatusPayloadID uniqueidentifier @@ -911,11 +922,12 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._DiscardEventsAndUnlockInstance + @TaskHubName varchar(150), @InstanceID varchar(100), @DeletedEvents MessageIDs READONLY AS BEGIN - DECLARE @taskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub() + DECLARE @taskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) -- We return the list of deleted messages so that the caller can issue a -- warning about missing messages @@ -935,6 +947,7 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._AddOrchestrationEvents + @TaskHubName varchar(150), @NewOrchestrationEvents OrchestrationEvents READONLY AS BEGIN @@ -945,7 +958,7 @@ BEGIN -- order across all stored procedures that execute within a transaction. -- Table order for this sproc: Payloads --> NewEvents - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub() + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) -- External event messages can create new instances -- NOTE: There is a chance this could result in deadlocks if two @@ -1016,13 +1029,14 @@ END GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__.QuerySingleOrchestration + @TaskHubName varchar(150), @InstanceID varchar(100), @ExecutionID varchar(50) = NULL, @FetchInput bit = 1, @FetchOutput bit = 1 AS BEGIN - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub() + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) SELECT TOP 1 I.[InstanceID], @@ -1056,6 +1070,7 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._QueryManyOrchestrations + @TaskHubName varchar(150), @PageSize smallint = 100, @PageNumber int = 0, @FetchInput bit = 1, @@ -1067,7 +1082,7 @@ CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._QueryManyOrchestrations @ExcludeSubOrchestrations bit = 0 AS BEGIN - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub() + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) SELECT I.[InstanceID], @@ -1107,11 +1122,12 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._LockNextTask + @TaskHubName varchar(150), @LockedBy varchar(100), @LockExpiration datetime2 AS BEGIN - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub() + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) DECLARE @now datetime2 = SYSUTCDATETIME() DECLARE @SequenceNumber bigint @@ -1163,11 +1179,12 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._RenewOrchestrationLocks + @TaskHubName varchar(150), @InstanceID varchar(100), @LockExpiration datetime2 AS BEGIN - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub() + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) UPDATE Instances SET [LockExpiration] = @LockExpiration @@ -1177,11 +1194,12 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._RenewTaskLocks + @TaskHubName varchar(150), @RenewingTasks MessageIDs READONLY, @LockExpiration datetime2 AS BEGIN - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub() + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) UPDATE N SET [LockExpiration] = @LockExpiration @@ -1193,11 +1211,12 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._CompleteTasks + @TaskHubName varchar(150), @CompletedTasks MessageIDs READONLY, @Results TaskEvents READONLY AS BEGIN - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub() + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) BEGIN TRANSACTION @@ -1290,13 +1309,14 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._RewindInstance + @TaskHubName varchar(150), @InstanceID varchar(100), @Reason varchar(max) = NULL AS BEGIN BEGIN TRANSACTION - EXEC __SchemaNamePlaceholder__._RewindInstanceRecursive @InstanceID, @Reason + EXEC __SchemaNamePlaceholder__._RewindInstanceRecursive @TaskHubName, @InstanceID, @Reason COMMIT TRANSACTION END @@ -1304,11 +1324,12 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._RewindInstanceRecursive + @TaskHubName varchar(150), @InstanceID varchar(100), @Reason varchar(max) = NULL AS BEGIN - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub() + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) -- *** IMPORTANT *** -- To prevent deadlocks, it is important to maintain consistent table access @@ -1325,7 +1346,7 @@ BEGIN -- Instance IDs can be overwritten only if the orchestration is in a terminal state IF @existingStatus NOT IN ('Failed') BEGIN - DECLARE @msg nvarchar(4000) = FORMATMESSAGE('Cannot rewing instance with ID ''%s'' because it is not in a ''Failed'' state, but in ''%s'' state.', @InstanceID, @existingStatus); + DECLARE @msg nvarchar(4000) = FORMATMESSAGE('Cannot rewind instance with ID ''%s'' because it is not in a ''Failed'' state, but in ''%s'' state.', @InstanceID, @existingStatus); THROW 50001, @msg, 1; END @@ -1410,7 +1431,7 @@ BEGIN WHILE @@FETCH_STATUS = 0 BEGIN -- Call rewind recursively on the failing suborchestrations - EXECUTE __SchemaNamePlaceholder__._RewindInstanceRecursive @subOrchestrationInstanceID, @Reason + EXECUTE __SchemaNamePlaceholder__._RewindInstanceRecursive @TaskHubName, @subOrchestrationInstanceID, @Reason FETCH NEXT FROM subOrchestrationCursor INTO @subOrchestrationInstanceID END CLOSE subOrchestrationCursor diff --git a/src/DurableTask.SqlServer/SqlDbManager.cs b/src/DurableTask.SqlServer/SqlDbManager.cs index f3b8a88..4b236d3 100644 --- a/src/DurableTask.SqlServer/SqlDbManager.cs +++ b/src/DurableTask.SqlServer/SqlDbManager.cs @@ -117,6 +117,17 @@ public async Task CreateOrUpgradeSchemaAsync(bool recreateIfExists) await SqlUtils.ExecuteNonQueryAsync(command, this.traceHelper); } + // Update task hub naming mode + using (SqlCommand command = dbLock.CreateCommand()) + { + command.CommandText = $"{this.settings.SchemaName}.SetGlobalSetting"; + command.CommandType = CommandType.StoredProcedure; + command.Parameters.Add("@Name", SqlDbType.VarChar, 300).Value = "TaskHubMode"; + command.Parameters.Add("@Value", SqlDbType.Variant).Value = this.settings.CreateMultiTennantTaskHub ? 1 : 0; + + await SqlUtils.ExecuteNonQueryAsync(command, this.traceHelper); + } + await dbLock.CommitAsync(); } diff --git a/src/DurableTask.SqlServer/SqlOrchestrationService.cs b/src/DurableTask.SqlServer/SqlOrchestrationService.cs index 74f018d..a266bd9 100644 --- a/src/DurableTask.SqlServer/SqlOrchestrationService.cs +++ b/src/DurableTask.SqlServer/SqlOrchestrationService.cs @@ -37,6 +37,7 @@ public class SqlOrchestrationService : OrchestrationServiceBase readonly SqlDbManager dbManager; readonly string lockedByValue; readonly string userId; + string? taskHubNameValue; public SqlOrchestrationService(SqlOrchestrationServiceSettings? settings) { @@ -86,14 +87,19 @@ async Task GetAndOpenConnectionAsync(CancellationToken cancelToke return connection; } - SqlCommand GetSprocCommand(SqlConnection connection, string sprocName) + SqlCommand GetTaskHubSprocCommand(SqlConnection connection, string sprocName) { - SqlCommand command = connection.CreateCommand(); - command.CommandType = CommandType.StoredProcedure; - command.CommandText = sprocName; + SqlCommand command = connection.GetSprocCommand(string.Concat(this.settings.SchemaName, ".", sprocName)); + this.AddTaskHubNameParameter(command); return command; } + SqlCommand GetFuncCommand(SqlConnection connection, string funcName, SqlDbType retType, int retSize = 0) + => connection.GetFuncCommand(string.Concat(this.settings.SchemaName, ".", funcName), retType, retSize); + + void AddTaskHubNameParameter(SqlCommand command) + => command.AddTaskHubNameParameter(this.taskHubNameValue); + public override Task CreateAsync(bool recreateInstanceStore) => this.dbManager.CreateOrUpgradeSchemaAsync(recreateInstanceStore); @@ -110,6 +116,34 @@ public override Task DeleteAsync(bool deleteInstanceStore) return this.dbManager.DeleteSchemaAsync(); } + public override async Task StartAsync() + { + await base.StartAsync(); + // We want to distinguish between task hub name inferred from user name (taskHubMode=1) + // and application specified name (taskHubMode=0). In case of application specified + // name, taskHubName is reused as parameter for each SP call. Sending and then recieving + // null can only happen when taskHubMode=0. + using SqlConnection connection = await this.GetAndOpenConnectionAsync(this.ShutdownToken); + this.taskHubNameValue = null; + var taskHubName = await CallCurrentTaskHub(null); + if (taskHubName == null) + { + this.taskHubNameValue = this.settings.TaskHubName; + // CurrentTaskHub function compresses taskHubNameValue to fit column of size 50 + if (this.taskHubNameValue.Length > 50) + { + this.taskHubNameValue = await CallCurrentTaskHub(this.taskHubNameValue); + } + } + + Task CallCurrentTaskHub(string? value) + { + using SqlCommand command = this.GetFuncCommand(connection, "CurrentTaskHub", SqlDbType.VarChar, 50); + command.AddTaskHubNameParameter(value); + return command.ExecuteFuncAsync(this.ShutdownToken); + } + } + public override async Task LockNextTaskOrchestrationWorkItemAsync( TimeSpan receiveTimeout, CancellationToken cancellationToken) @@ -119,7 +153,7 @@ public override Task DeleteAsync(bool deleteInstanceStore) do { using SqlConnection connection = await this.GetAndOpenConnectionAsync(cancellationToken); - using SqlCommand command = this.GetSprocCommand(connection, $"{this.settings.SchemaName}._LockNextOrchestration"); + using SqlCommand command = this.GetTaskHubSprocCommand(connection, "_LockNextOrchestration"); int batchSize = this.settings.WorkItemBatchSize; DateTime lockExpiration = DateTime.UtcNow.Add(this.settings.WorkItemLockTimeout); @@ -230,9 +264,7 @@ public override Task DeleteAsync(bool deleteInstanceStore) reader.Close(); // Delete the events and release the orchestration instance lock - using SqlCommand discardCommand = this.GetSprocCommand( - connection, - $"{this.settings.SchemaName}._DiscardEventsAndUnlockInstance"); + using SqlCommand discardCommand = this.GetTaskHubSprocCommand(connection, "_DiscardEventsAndUnlockInstance"); discardCommand.Parameters.Add("@InstanceID", SqlDbType.VarChar, 100).Value = instanceId; discardCommand.Parameters.AddMessageIdParameter("@DeletedEvents", messages, this.settings.SchemaName); try @@ -313,7 +345,7 @@ await SqlUtils.ExecuteNonQueryAsync( public override async Task RenewTaskOrchestrationWorkItemLockAsync(TaskOrchestrationWorkItem workItem) { using SqlConnection connection = await this.GetAndOpenConnectionAsync(); - using SqlCommand command = this.GetSprocCommand(connection, $"{this.settings.SchemaName}._RenewOrchestrationLocks"); + using SqlCommand command = this.GetTaskHubSprocCommand(connection, "_RenewOrchestrationLocks"); DateTime lockExpiration = DateTime.UtcNow.Add(this.settings.WorkItemLockTimeout); @@ -340,7 +372,7 @@ public override async Task CompleteTaskOrchestrationWorkItemAsync( Stopwatch sw = Stopwatch.StartNew(); using SqlConnection connection = await this.GetAndOpenConnectionAsync(); - using SqlCommand command = this.GetSprocCommand(connection, $"{this.settings.SchemaName}._CheckpointOrchestration"); + using SqlCommand command = this.GetTaskHubSprocCommand(connection, "_CheckpointOrchestration"); OrchestrationInstance instance = newRuntimeState.OrchestrationInstance!; IList newEvents = newRuntimeState.NewEvents; @@ -414,7 +446,7 @@ public override async Task CompleteTaskOrchestrationWorkItemAsync( while (!shutdownCancellationToken.IsCancellationRequested) { using SqlConnection connection = await this.GetAndOpenConnectionAsync(shutdownCancellationToken); - using SqlCommand command = this.GetSprocCommand(connection, $"{this.settings.SchemaName}._LockNextTask"); + using SqlCommand command = this.GetTaskHubSprocCommand(connection, "_LockNextTask"); DateTime lockExpiration = DateTime.UtcNow.Add(this.settings.WorkItemLockTimeout); @@ -454,7 +486,7 @@ public override async Task CompleteTaskOrchestrationWorkItemAsync( public override async Task RenewTaskActivityWorkItemLockAsync(TaskActivityWorkItem workItem) { using SqlConnection connection = await this.GetAndOpenConnectionAsync(); - using SqlCommand command = this.GetSprocCommand(connection, $"{this.settings.SchemaName}._RenewTaskLocks"); + using SqlCommand command = this.GetTaskHubSprocCommand(connection, "_RenewTaskLocks"); DateTime lockExpiration = DateTime.UtcNow.Add(this.settings.WorkItemLockTimeout); @@ -471,7 +503,7 @@ public override async Task RenewTaskActivityWorkItemLockAs public override async Task CompleteTaskActivityWorkItemAsync(TaskActivityWorkItem workItem, TaskMessage responseMessage) { using SqlConnection connection = await this.GetAndOpenConnectionAsync(); - using SqlCommand command = this.GetSprocCommand(connection, $"{this.settings.SchemaName}._CompleteTasks"); + using SqlCommand command = this.GetTaskHubSprocCommand(connection, "_CompleteTasks"); command.Parameters.AddMessageIdParameter("@CompletedTasks", workItem.TaskMessage, this.settings.SchemaName); command.Parameters.AddTaskEventsParameter("@Results", responseMessage, this.settings.SchemaName); @@ -501,7 +533,7 @@ public override async Task CompleteTaskActivityWorkItemAsync(TaskActivityWorkIte public override async Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses) { using SqlConnection connection = await this.GetAndOpenConnectionAsync(); - using SqlCommand command = this.GetSprocCommand(connection, $"{this.settings.SchemaName}.CreateInstance"); + using SqlCommand command = this.GetTaskHubSprocCommand(connection, "CreateInstance"); ExecutionStartedEvent startEvent = (ExecutionStartedEvent)creationMessage.Event; OrchestrationInstance instance = startEvent.OrchestrationInstance; @@ -531,7 +563,7 @@ public override async Task CreateTaskOrchestrationAsync(TaskMessage creationMess public override async Task SendTaskOrchestrationMessageAsync(TaskMessage message) { using SqlConnection connection = await this.GetAndOpenConnectionAsync(); - using SqlCommand command = this.GetSprocCommand(connection, $"{this.settings.SchemaName}._AddOrchestrationEvents"); + using SqlCommand command = this.GetTaskHubSprocCommand(connection, "_AddOrchestrationEvents"); command.Parameters.AddOrchestrationEventsParameter("@NewOrchestrationEvents", message, this.settings.SchemaName); @@ -592,7 +624,7 @@ public override async Task WaitForOrchestrationAsync( CancellationToken cancellationToken) { using SqlConnection connection = await this.GetAndOpenConnectionAsync(cancellationToken); - using SqlCommand command = this.GetSprocCommand(connection, $"{this.settings.SchemaName}.QuerySingleOrchestration"); + using SqlCommand command = this.GetTaskHubSprocCommand(connection, "QuerySingleOrchestration"); command.Parameters.Add("@InstanceID", SqlDbType.VarChar, size: 100).Value = instanceId; command.Parameters.Add("@ExecutionID", SqlDbType.VarChar, size: 50).Value = executionId; @@ -616,7 +648,7 @@ public override async Task WaitForOrchestrationAsync( public override async Task GetOrchestrationHistoryAsync(string instanceId, string? executionIdFilter) { using SqlConnection connection = await this.GetAndOpenConnectionAsync(); - using SqlCommand command = this.GetSprocCommand(connection, $"{this.settings.SchemaName}.GetInstanceHistory"); + using SqlCommand command = this.GetTaskHubSprocCommand(connection, "GetInstanceHistory"); command.Parameters.Add("@InstanceID", SqlDbType.VarChar, size: 100).Value = instanceId; command.Parameters.Add("@GetInputsAndOutputs", SqlDbType.Bit).Value = true; // There's no clean way for the caller to specify this currently @@ -661,7 +693,7 @@ static List ReadHistoryEvents( public override async Task ForceTerminateTaskOrchestrationAsync(string instanceId, string? reason) { using SqlConnection connection = await this.GetAndOpenConnectionAsync(); - using SqlCommand command = this.GetSprocCommand(connection, $"{this.settings.SchemaName}.TerminateInstance"); + using SqlCommand command = this.GetTaskHubSprocCommand(connection, "TerminateInstance"); command.Parameters.Add("@InstanceID", SqlDbType.VarChar, size: 100).Value = instanceId; command.Parameters.Add("@Reason", SqlDbType.VarChar).Value = reason; @@ -677,7 +709,7 @@ public async Task PurgeOrchestrationHistoryAsync(IEnumerable instan } using SqlConnection connection = await this.GetAndOpenConnectionAsync(); - using SqlCommand command = this.GetSprocCommand(connection, $"{this.settings.SchemaName}.PurgeInstanceStateByID"); + using SqlCommand command = this.GetTaskHubSprocCommand(connection, "PurgeInstanceStateByID"); SqlParameter instancesDeletedReturnValue = command.Parameters.Add("@InstancesDeleted", SqlDbType.Int); instancesDeletedReturnValue.Direction = ParameterDirection.ReturnValue; @@ -703,7 +735,7 @@ public override async Task PurgeOrchestrationHistoryAsync( OrchestrationStateTimeRangeFilterType timeRangeFilterType) { using SqlConnection connection = await this.GetAndOpenConnectionAsync(); - using SqlCommand command = this.GetSprocCommand(connection, $"{this.settings.SchemaName}.PurgeInstanceStateByTime"); + using SqlCommand command = this.GetTaskHubSprocCommand(connection, "PurgeInstanceStateByTime"); command.Parameters.Add("@ThresholdTime", SqlDbType.DateTime2).Value = maxThresholdDateTimeUtc; command.Parameters.Add("@FilterType", SqlDbType.TinyInt).Value = (int)timeRangeFilterType; @@ -808,7 +840,7 @@ public async Task> GetManyOrchestrations DateTime createdTimeTo = query.CreatedTimeTo.ToSqlUtcDateTime(DateTime.MaxValue); using SqlConnection connection = await this.GetAndOpenConnectionAsync(cancellationToken); - using SqlCommand command = this.GetSprocCommand(connection, $"{this.settings.SchemaName}._QueryManyOrchestrations"); + using SqlCommand command = this.GetTaskHubSprocCommand(connection, "_QueryManyOrchestrations"); command.Parameters.Add("@PageSize", SqlDbType.SmallInt).Value = query.PageSize; command.Parameters.Add("@PageNumber", SqlDbType.Int).Value = query.PageNumber; @@ -849,7 +881,7 @@ public async Task> GetManyOrchestrations public async Task RewindTaskOrchestrationAsync(string instanceId, string reason) { using SqlConnection connection = await this.GetAndOpenConnectionAsync(); - using SqlCommand command = this.GetSprocCommand(connection, $"{this.settings.SchemaName}._RewindInstance"); + using SqlCommand command = this.GetTaskHubSprocCommand(connection, "_RewindInstance"); command.Parameters.Add("@InstanceID", SqlDbType.VarChar, size: 100).Value = instanceId; command.Parameters.Add("@Reason", SqlDbType.VarChar).Value = reason; @@ -880,7 +912,8 @@ public async Task GetRecommendedReplicaCountAsync(int? currentReplicaCount { using SqlConnection connection = await this.GetAndOpenConnectionAsync(cancellationToken); using SqlCommand command = connection.CreateCommand(); - command.CommandText = $"SELECT {this.settings.SchemaName}.GetScaleRecommendation(@MaxConcurrentOrchestrations, @MaxConcurrentActivities)"; + command.CommandText = $"SELECT {this.settings.SchemaName}.GetScaleRecommendation(@TaskHubName, @MaxConcurrentOrchestrations, @MaxConcurrentActivities)"; + this.AddTaskHubNameParameter(command); command.Parameters.Add("@MaxConcurrentOrchestrations", SqlDbType.Int).Value = this.MaxConcurrentTaskOrchestrationWorkItems; command.Parameters.Add("@MaxConcurrentActivities", SqlDbType.Int).Value = this.MaxConcurrentTaskActivityWorkItems; diff --git a/src/DurableTask.SqlServer/SqlOrchestrationServiceSettings.cs b/src/DurableTask.SqlServer/SqlOrchestrationServiceSettings.cs index 7833f23..d21f38a 100644 --- a/src/DurableTask.SqlServer/SqlOrchestrationServiceSettings.cs +++ b/src/DurableTask.SqlServer/SqlOrchestrationServiceSettings.cs @@ -14,6 +14,11 @@ namespace DurableTask.SqlServer /// public class SqlOrchestrationServiceSettings { + /// + /// Default value for property + /// + public static readonly string DefaultTaskHubName = "default"; + /// /// Initializes a new instance of the class. /// @@ -27,15 +32,10 @@ public SqlOrchestrationServiceSettings(string connectionString, string? taskHubN throw new ArgumentNullException(nameof(connectionString)); } - this.TaskHubName = taskHubName ?? "default"; + this.TaskHubName = taskHubName ?? DefaultTaskHubName; this.SchemaName = schemaName ?? "dt"; - var builder = new SqlConnectionStringBuilder(connectionString) - { - // We use the task hub name as the application name so that - // stored procedures have easy access to this information. - ApplicationName = this.TaskHubName, - }; + var builder = new SqlConnectionStringBuilder(connectionString); if (string.IsNullOrEmpty(builder.InitialCatalog)) { @@ -71,7 +71,8 @@ public SqlOrchestrationServiceSettings(string connectionString, string? taskHubN public string SchemaName { get; } /// - /// Gets or sets the name of the app. Used for logging purposes. + /// Gets or sets the name of the app. Used as prefix of the lock owner, + /// and need to be unique across machines that connect to the hub. /// [JsonProperty("appName")] public string AppName { get; set; } = Environment.MachineName; @@ -102,6 +103,14 @@ public SqlOrchestrationServiceSettings(string connectionString, string? taskHubN [JsonProperty("createDatabaseIfNotExists")] public bool CreateDatabaseIfNotExists { get; set; } + /// + /// Gets or sets a flag indicating wether task hub is created for multi-tennant naming + /// (derived from sql server user name) or for application specified naming. + /// + /// + [JsonProperty("createMultiTennantTaskHub ")] + public bool CreateMultiTennantTaskHub { get; set; } = true; + /// /// Gets a SQL connection string associated with the configured task hub. /// diff --git a/src/DurableTask.SqlServer/SqlUtils.cs b/src/DurableTask.SqlServer/SqlUtils.cs index 3466a55..cabe833 100644 --- a/src/DurableTask.SqlServer/SqlUtils.cs +++ b/src/DurableTask.SqlServer/SqlUtils.cs @@ -388,6 +388,15 @@ static IEnumerable GetInstanceIdRecords(IEnumerable insta return param; } + internal static void AddTaskHubNameParameter(this SqlCommand command, string? value) + { + command.Parameters.Add(new SqlParameter("@TaskHubName", SqlDbType.VarChar, 150) + { + IsNullable = true, + Value = value ?? (object)DBNull.Value + }); + } + public static Task ExecuteReaderAsync( DbCommand command, LogHelper traceHelper, @@ -453,6 +462,47 @@ static async Task ExecuteSprocAndTraceAsync( } } + /// + /// Allocated command to execute stored procedure + /// + /// Existing database connection + /// Stored procedure name with schema + public static SqlCommand GetSprocCommand(this SqlConnection connection, string sprocName) + { + SqlCommand command = connection.CreateCommand(); + command.CommandType = CommandType.StoredProcedure; + command.CommandText = sprocName; + return command; + } + + /// + /// Returns command with out-of-the box return value parameter to be used + /// with + /// + /// Existing database connection + /// Function name with schema prefix + /// Return type of the function + /// Optional return type size + public static SqlCommand GetFuncCommand(this SqlConnection connection, string funcName, SqlDbType retType, int retSize = 0) + { + SqlCommand command = connection.CreateCommand(); + command.CommandType = CommandType.StoredProcedure; + command.CommandText = funcName; + command.Parameters.Add(new SqlParameter("@RETURN_VALUE", retType, retSize) + { + Direction = ParameterDirection.ReturnValue, + IsNullable = true + }); + return command; + } + + public static async Task ExecuteFuncAsync(this DbCommand command, CancellationToken cancellationToken) + { + await command.ExecuteNonQueryAsync(cancellationToken); + object value = command.Parameters[0].Value; + return value == DBNull.Value ? default : (T) value; + } + public static bool IsUniqueKeyViolation(SqlException exception) { return exception.Errors.Cast().Any(e => e.Class == 14 && (e.Number == 2601 || e.Number == 2627)); diff --git a/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs b/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs index 95473f9..0dc3d10 100644 --- a/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs +++ b/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs @@ -96,7 +96,8 @@ public async Task CanCreateAndDropSchema(bool isDatabaseMissing) LogAssert.ExecutedSqlScript("schema-1.0.0.sql"), LogAssert.ExecutedSqlScript("logic.sql"), LogAssert.ExecutedSqlScript("permissions.sql"), - LogAssert.SprocCompleted("dt._UpdateVersion")) + LogAssert.SprocCompleted("dt._UpdateVersion"), + LogAssert.SprocCompleted("dt.SetGlobalSetting")) .EndOfLog(); ValidateDatabaseSchema(testDb); @@ -155,7 +156,8 @@ public async Task CanCreateAndDropSchemaWithCustomSchemaName(bool isDatabaseMiss LogAssert.ExecutedSqlScript("schema-1.0.0.sql"), LogAssert.ExecutedSqlScript("logic.sql"), LogAssert.ExecutedSqlScript("permissions.sql"), - LogAssert.SprocCompleted($"{schemaName}._UpdateVersion")) + LogAssert.SprocCompleted($"{schemaName}._UpdateVersion"), + LogAssert.SprocCompleted($"{schemaName}.SetGlobalSetting")) .EndOfLog(); ValidateDatabaseSchema(testDb, schemaName); @@ -216,7 +218,8 @@ public async Task CanCreateAndDropMultipleSchemas(bool isDatabaseMissing) LogAssert.ExecutedSqlScript("schema-1.0.0.sql"), LogAssert.ExecutedSqlScript("logic.sql"), LogAssert.ExecutedSqlScript("permissions.sql"), - LogAssert.SprocCompleted($"{firstTestSchemaName}._UpdateVersion")) + LogAssert.SprocCompleted($"{firstTestSchemaName}._UpdateVersion"), + LogAssert.SprocCompleted($"{firstTestSchemaName}.SetGlobalSetting")) .Expect( LogAssert.CheckedDatabase()) .Expect( @@ -225,7 +228,8 @@ public async Task CanCreateAndDropMultipleSchemas(bool isDatabaseMissing) LogAssert.ExecutedSqlScript("schema-1.0.0.sql"), LogAssert.ExecutedSqlScript("logic.sql"), LogAssert.ExecutedSqlScript("permissions.sql"), - LogAssert.SprocCompleted($"{secondTestSchemaName}._UpdateVersion")) + LogAssert.SprocCompleted($"{secondTestSchemaName}._UpdateVersion"), + LogAssert.SprocCompleted($"{secondTestSchemaName}.SetGlobalSetting")) .EndOfLog(); ValidateDatabaseSchema(testDb, secondTestSchemaName); @@ -307,7 +311,8 @@ public async Task CanCreateIfNotExists(bool isDatabaseMissing) LogAssert.ExecutedSqlScript("schema-1.0.0.sql"), LogAssert.ExecutedSqlScript("logic.sql"), LogAssert.ExecutedSqlScript("permissions.sql"), - LogAssert.SprocCompleted("dt._UpdateVersion")) + LogAssert.SprocCompleted("dt._UpdateVersion"), + LogAssert.SprocCompleted("dt.SetGlobalSetting")) .EndOfLog(); ValidateDatabaseSchema(testDb); @@ -364,6 +369,7 @@ public void SchemaCreationIsSerializedAndIdempotent(bool isDatabaseMissing) LogAssert.ExecutedSqlScript("logic.sql"), LogAssert.ExecutedSqlScript("permissions.sql"), LogAssert.SprocCompleted("dt._UpdateVersion"), + LogAssert.SprocCompleted("dt.SetGlobalSetting"), // 2nd LogAssert.AcquiredAppLock(), LogAssert.SprocCompleted("dt._GetVersions"), diff --git a/test/DurableTask.SqlServer.Tests/Utils/TestService.cs b/test/DurableTask.SqlServer.Tests/Utils/TestService.cs index 61824dc..a190800 100644 --- a/test/DurableTask.SqlServer.Tests/Utils/TestService.cs +++ b/test/DurableTask.SqlServer.Tests/Utils/TestService.cs @@ -261,7 +261,7 @@ public IEnumerable GetAndValidateLogs() public async Task GetTaskHubNameAsync() { return (string)await SharedTestHelpers.ExecuteSqlAsync( - "SELECT dt.CurrentTaskHub()", + "SELECT dt.CurrentTaskHub(null)", this.testCredential.ConnectionString); } From e88327ff0c32f5f2ef5a32ab939cd88350055e44 Mon Sep 17 00:00:00 2001 From: TDEVMPJ Date: Mon, 22 May 2023 06:56:09 -0400 Subject: [PATCH 2/6] Fix issue with failing client queries due to not initialized taskHubName in TaskHubMode=1. --- src/DurableTask.SqlServer/Scripts/logic.sql | 8 --- .../SqlOrchestrationService.cs | 52 ++++++++++--------- 2 files changed, 27 insertions(+), 33 deletions(-) diff --git a/src/DurableTask.SqlServer/Scripts/logic.sql b/src/DurableTask.SqlServer/Scripts/logic.sql index 5ebe255..74c612b 100644 --- a/src/DurableTask.SqlServer/Scripts/logic.sql +++ b/src/DurableTask.SqlServer/Scripts/logic.sql @@ -14,15 +14,7 @@ BEGIN DECLARE @taskHub varchar(150) = @TaskHubName; IF @taskHubMode = 1 - BEGIN SET @taskHub = USER_NAME(); - IF @TaskHubName IS NOT NULL AND @TaskHubName <> '' - BEGIN - DECLARE @msg nvarchar(max) = FORMATMESSAGE('Task hub name ''%s'' is invalid in multi tennant mode (it can be NULL, or '''').', @TaskHubName); - -- function will not allow raising custom error, so approximate with invalid cast - RETURN cast(@msg as int); - END - END -- if the name is too long, keep the first 16 characters and hash the rest IF LEN(@taskHub) > 50 diff --git a/src/DurableTask.SqlServer/SqlOrchestrationService.cs b/src/DurableTask.SqlServer/SqlOrchestrationService.cs index a266bd9..549c426 100644 --- a/src/DurableTask.SqlServer/SqlOrchestrationService.cs +++ b/src/DurableTask.SqlServer/SqlOrchestrationService.cs @@ -37,7 +37,7 @@ public class SqlOrchestrationService : OrchestrationServiceBase readonly SqlDbManager dbManager; readonly string lockedByValue; readonly string userId; - string? taskHubNameValue; + string taskHubNameValue; public SqlOrchestrationService(SqlOrchestrationServiceSettings? settings) { @@ -46,6 +46,7 @@ public SqlOrchestrationService(SqlOrchestrationServiceSettings? settings) this.dbManager = new SqlDbManager(this.settings, this.traceHelper); this.lockedByValue = $"{this.settings.AppName},{Process.GetCurrentProcess().Id}"; this.userId = new SqlConnectionStringBuilder(this.settings.TaskHubConnectionString).UserID ?? string.Empty; + this.taskHubNameValue = settings?.TaskHubName ?? SqlOrchestrationServiceSettings.DefaultTaskHubName; } public override int MaxConcurrentTaskOrchestrationWorkItems => this.settings.MaxActiveOrchestrations; @@ -97,9 +98,32 @@ SqlCommand GetTaskHubSprocCommand(SqlConnection connection, string sprocName) SqlCommand GetFuncCommand(SqlConnection connection, string funcName, SqlDbType retType, int retSize = 0) => connection.GetFuncCommand(string.Concat(this.settings.SchemaName, ".", funcName), retType, retSize); - void AddTaskHubNameParameter(SqlCommand command) + void AddTaskHubNameParameter(SqlCommand command) => command.AddTaskHubNameParameter(this.taskHubNameValue); + /// + /// Cache value of task hub name parameter in the TaskHubMode=0 + /// + /// + /// CurrentTaskHub T-SQL function compresses task hub value to fit column + /// of size 50. In TaskHubMode=0 one can avoid this cost by caching result + /// of the transformation. Caching makes more sense for worker, as it will + /// typically issue more calls. In TaskHubMode=1 parameter name is ignored. + /// + async Task CacheTaskHubNameValue() + { + if (this.taskHubNameValue.Length > 50) + { + using SqlConnection connection = await this.GetAndOpenConnectionAsync(this.ShutdownToken); + using SqlCommand command = this.GetFuncCommand(connection, "CurrentTaskHub", SqlDbType.VarChar, 50); + command.AddTaskHubNameParameter(this.taskHubNameValue); + var taskHubName = (await command.ExecuteFuncAsync(this.ShutdownToken))!; + // Logic in t-sql function will preserve first 16 characters. + if (0 == string.Compare(taskHubName, 0, this.taskHubNameValue, 0, 16, StringComparison.OrdinalIgnoreCase)) + this.taskHubNameValue = taskHubName; + } + } + public override Task CreateAsync(bool recreateInstanceStore) => this.dbManager.CreateOrUpgradeSchemaAsync(recreateInstanceStore); @@ -119,29 +143,7 @@ public override Task DeleteAsync(bool deleteInstanceStore) public override async Task StartAsync() { await base.StartAsync(); - // We want to distinguish between task hub name inferred from user name (taskHubMode=1) - // and application specified name (taskHubMode=0). In case of application specified - // name, taskHubName is reused as parameter for each SP call. Sending and then recieving - // null can only happen when taskHubMode=0. - using SqlConnection connection = await this.GetAndOpenConnectionAsync(this.ShutdownToken); - this.taskHubNameValue = null; - var taskHubName = await CallCurrentTaskHub(null); - if (taskHubName == null) - { - this.taskHubNameValue = this.settings.TaskHubName; - // CurrentTaskHub function compresses taskHubNameValue to fit column of size 50 - if (this.taskHubNameValue.Length > 50) - { - this.taskHubNameValue = await CallCurrentTaskHub(this.taskHubNameValue); - } - } - - Task CallCurrentTaskHub(string? value) - { - using SqlCommand command = this.GetFuncCommand(connection, "CurrentTaskHub", SqlDbType.VarChar, 50); - command.AddTaskHubNameParameter(value); - return command.ExecuteFuncAsync(this.ShutdownToken); - } + await this.CacheTaskHubNameValue(); } public override async Task LockNextTaskOrchestrationWorkItemAsync( From f913903ca24ef3b1796c513ae7b6f048f1943057 Mon Sep 17 00:00:00 2001 From: TDEVMPJ Date: Sat, 3 Jun 2023 17:56:28 -0400 Subject: [PATCH 3/6] Create private set of functions, move task hub name parameter to the end of parameter list and make it optional. Restore presence of taskHubName in connection string by default. Call all functions with SP approach. --- .../Scripts/drop-schema.sql | 3 + src/DurableTask.SqlServer/Scripts/logic.sql | 164 ++++++++++-------- .../Scripts/permissions.sql | 3 + .../SqlOrchestrationService.cs | 28 +-- .../SqlOrchestrationServiceSettings.cs | 26 ++- src/DurableTask.SqlServer/SqlUtils.cs | 8 +- .../Integration/DatabaseManagement.cs | 3 + .../Utils/TestService.cs | 2 +- 8 files changed, 148 insertions(+), 89 deletions(-) diff --git a/src/DurableTask.SqlServer/Scripts/drop-schema.sql b/src/DurableTask.SqlServer/Scripts/drop-schema.sql index f50f629..3dd6d95 100644 --- a/src/DurableTask.SqlServer/Scripts/drop-schema.sql +++ b/src/DurableTask.SqlServer/Scripts/drop-schema.sql @@ -2,8 +2,11 @@ -- Licensed under the MIT License. -- Functions +DROP FUNCTION IF EXISTS __SchemaNamePlaceholder__._CurrentTaskHub DROP FUNCTION IF EXISTS __SchemaNamePlaceholder__.CurrentTaskHub +DROP FUNCTION IF EXISTS __SchemaNamePlaceholder__._GetScaleMetric DROP FUNCTION IF EXISTS __SchemaNamePlaceholder__.GetScaleMetric +DROP FUNCTION IF EXISTS __SchemaNamePlaceholder__._GetScaleRecommendation DROP FUNCTION IF EXISTS __SchemaNamePlaceholder__.GetScaleRecommendation -- Views diff --git a/src/DurableTask.SqlServer/Scripts/logic.sql b/src/DurableTask.SqlServer/Scripts/logic.sql index 74c612b..0d65f92 100644 --- a/src/DurableTask.SqlServer/Scripts/logic.sql +++ b/src/DurableTask.SqlServer/Scripts/logic.sql @@ -1,7 +1,7 @@ -- Copyright (c) Microsoft Corporation. -- Licensed under the MIT License. -CREATE OR ALTER FUNCTION __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName varchar(150)) +CREATE OR ALTER FUNCTION __SchemaNamePlaceholder__._CurrentTaskHub(@TaskHubName varchar(150)) RETURNS varchar(50) WITH EXECUTE AS CALLER AS @@ -11,10 +11,14 @@ BEGIN -- 1: Task hub names are inferred from the user credential DECLARE @taskHubMode sql_variant = (SELECT TOP 1 [Value] FROM GlobalSettings WHERE [Name] = 'TaskHubMode'); - DECLARE @taskHub varchar(150) = @TaskHubName; + DECLARE @taskHub varchar(150); IF @taskHubMode = 1 SET @taskHub = USER_NAME(); + ELSE IF @TaskHubName is NULL + SET @taskHub = APP_NAME(); + ELSE + SET @taskHub = @TaskHubName; -- if the name is too long, keep the first 16 characters and hash the rest IF LEN(@taskHub) > 50 @@ -24,13 +28,21 @@ BEGIN END GO - -CREATE OR ALTER FUNCTION __SchemaNamePlaceholder__.GetScaleMetric(@TaskHubName varchar(150)) +CREATE OR ALTER FUNCTION __SchemaNamePlaceholder__.CurrentTaskHub() + RETURNS varchar(50) + WITH EXECUTE AS CALLER +AS +BEGIN + RETURN __SchemaNamePlaceholder__._CurrentTaskHub(null); +END +GO + +CREATE OR ALTER FUNCTION __SchemaNamePlaceholder__._GetScaleMetric(@TaskHubName varchar(150)) RETURNS INT WITH EXECUTE AS CALLER AS BEGIN - DECLARE @taskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) + DECLARE @taskHub varchar(50) = __SchemaNamePlaceholder__._CurrentTaskHub(@TaskHubName) DECLARE @now datetime2 = SYSUTCDATETIME() DECLARE @liveInstances bigint = 0 @@ -51,13 +63,21 @@ BEGIN END GO +CREATE OR ALTER FUNCTION __SchemaNamePlaceholder__.GetScaleMetric() + RETURNS INT + WITH EXECUTE AS CALLER +AS +BEGIN + RETURN __SchemaNamePlaceholder__._GetScaleMetric(null); +END +GO -CREATE OR ALTER FUNCTION __SchemaNamePlaceholder__.GetScaleRecommendation(@TaskHubName varchar(150), @MaxOrchestrationsPerWorker real, @MaxActivitiesPerWorker real) +CREATE OR ALTER FUNCTION __SchemaNamePlaceholder__._GetScaleRecommendation(@MaxOrchestrationsPerWorker real, @MaxActivitiesPerWorker real, @TaskHubName varchar(150)) RETURNS INT WITH EXECUTE AS CALLER AS BEGIN - DECLARE @taskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) + DECLARE @taskHub varchar(50) = __SchemaNamePlaceholder__._CurrentTaskHub(@TaskHubName) DECLARE @now datetime2 = SYSUTCDATETIME() DECLARE @liveInstances bigint = 0 @@ -84,6 +104,14 @@ BEGIN END GO +CREATE OR ALTER FUNCTION __SchemaNamePlaceholder__.GetScaleRecommendation(@MaxOrchestrationsPerWorker real, @MaxActivitiesPerWorker real) + RETURNS INT + WITH EXECUTE AS CALLER +AS +BEGIN + RETURN __SchemaNamePlaceholder__.GetScaleRecommendation(@MaxOrchestrationsPerWorker, @MaxActivitiesPerWorker, NULL); +END +GO CREATE OR ALTER VIEW __SchemaNamePlaceholder__.vInstances AS @@ -110,8 +138,8 @@ AS P.[InstanceID] = I.[InstanceID] AND P.[PayloadID] = I.[OutputPayloadID]) AS [OutputText] FROM Instances I - -- like is the simplest way to keep indexed seek with conditional where - WHERE I.[TaskHub] like ISNULL(__SchemaNamePlaceholder__.CurrentTaskHub(null), '%') + -- like operator is the simplest way to keep indexed seek with conditional where + WHERE I.[TaskHub] LIKE ISNULL(__SchemaNamePlaceholder__._CurrentTaskHub(NULL), '%') GO CREATE OR ALTER VIEW __SchemaNamePlaceholder__.vHistory @@ -133,23 +161,23 @@ AS P.[InstanceID] = H.[InstanceID] AND P.[PayloadID] = H.[DataPayloadID]) AS [Payload] FROM History H - -- like is the simplest way to keep indexed seek with conditional where - WHERE H.[TaskHub] like ISNULL(__SchemaNamePlaceholder__.CurrentTaskHub(null), '%') + -- like operator is the simplest way to keep indexed seek with conditional where + WHERE H.[TaskHub] LIKE ISNULL(__SchemaNamePlaceholder__._CurrentTaskHub(NULL), '%') GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__.CreateInstance - @TaskHubName varchar(150), @Name varchar(300), @Version varchar(100) = NULL, @InstanceID varchar(100) = NULL, @ExecutionID varchar(50) = NULL, @InputText varchar(MAX) = NULL, @StartTime datetime2 = NULL, - @DedupeStatuses varchar(MAX) = 'Pending,Running' + @DedupeStatuses varchar(MAX) = 'Pending,Running', + @TaskHubName varchar(150) = NULL AS BEGIN - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__._CurrentTaskHub(@TaskHubName) DECLARE @EventType varchar(30) = 'ExecutionStarted' DECLARE @RuntimeStatus varchar(30) = 'Pending' @@ -180,7 +208,7 @@ BEGIN -- Purge the existing instance data so that it can be overwritten DECLARE @instancesToPurge InstanceIDs INSERT INTO @instancesToPurge VALUES (@InstanceID) - EXEC __SchemaNamePlaceholder__.PurgeInstanceStateByID @TaskHubName, @instancesToPurge + EXEC __SchemaNamePlaceholder__.PurgeInstanceStateByID @instancesToPurge, @TaskHubName END COMMIT TRANSACTION @@ -249,12 +277,12 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__.GetInstanceHistory - @TaskHubName varchar(150), @InstanceID varchar(100), - @GetInputsAndOutputs bit = 0 + @GetInputsAndOutputs bit = 0, + @TaskHubName varchar(150) = NULL AS BEGIN - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__._CurrentTaskHub(@TaskHubName) DECLARE @ParentInstanceID varchar(100) DECLARE @Version varchar(100) @@ -293,16 +321,16 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__.RaiseEvent - @TaskHubName varchar(150), @Name varchar(300), @InstanceID varchar(100) = NULL, @PayloadText varchar(MAX) = NULL, - @DeliveryTime datetime2 = NULL + @DeliveryTime datetime2 = NULL, + @TaskHubName varchar(150) = NULL AS BEGIN BEGIN TRANSACTION - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__._CurrentTaskHub(@TaskHubName) -- External event messages must target new instances or they must use -- the "auto start" instance ID format of @orchestrationname@identifier. @@ -362,9 +390,9 @@ END GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__.TerminateInstance - @TaskHubName varchar(150), @InstanceID varchar(100), - @Reason varchar(max) = NULL + @Reason varchar(max) = NULL, + @TaskHubName varchar(150) = NULL AS BEGIN BEGIN TRANSACTION @@ -374,7 +402,7 @@ BEGIN -- order across all stored procedures that execute within a transaction. -- Table order for this sproc: Instances --> (NewEvents --> Payloads --> NewEvents) - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__._CurrentTaskHub(@TaskHubName) DECLARE @existingStatus varchar(30) = ( SELECT TOP 1 existing.[RuntimeStatus] @@ -422,11 +450,11 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__.PurgeInstanceStateByID - @TaskHubName varchar(150), - @InstanceIDs InstanceIDs READONLY + @InstanceIDs InstanceIDs READONLY, + @TaskHubName varchar(150) = NULL AS BEGIN - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__._CurrentTaskHub(@TaskHubName) BEGIN TRANSACTION @@ -446,12 +474,12 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__.PurgeInstanceStateByTime - @TaskHubName varchar(150), @ThresholdTime datetime2, - @FilterType tinyint = 0 + @FilterType tinyint = 0, + @TaskHubName varchar(150) = NULL AS BEGIN - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__._CurrentTaskHub(@TaskHubName) DECLARE @instanceIDs InstanceIDs @@ -476,7 +504,7 @@ BEGIN END DECLARE @deletedInstances int - EXECUTE @deletedInstances = __SchemaNamePlaceholder__.PurgeInstanceStateByID @TaskHubName, @instanceIDs + EXECUTE @deletedInstances = __SchemaNamePlaceholder__.PurgeInstanceStateByID @instanceIDs, @TaskHubName RETURN @deletedInstances END GO @@ -507,10 +535,10 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._LockNextOrchestration - @TaskHubName varchar(150), @BatchSize int, @LockedBy varchar(100), - @LockExpiration datetime2 + @LockExpiration datetime2, + @TaskHubName varchar(150) = NULL AS BEGIN DECLARE @now datetime2 = SYSUTCDATETIME() @@ -518,7 +546,7 @@ BEGIN DECLARE @parentInstanceID varchar(100) DECLARE @version varchar(100) DECLARE @runtimeStatus varchar(30) - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__._CurrentTaskHub(@TaskHubName) BEGIN TRANSACTION @@ -621,7 +649,6 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._CheckpointOrchestration - @TaskHubName varchar(150), @InstanceID varchar(100), @ExecutionID varchar(50), @RuntimeStatus varchar(30), @@ -629,12 +656,13 @@ CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._CheckpointOrchestration @DeletedEvents MessageIDs READONLY, @NewHistoryEvents HistoryEvents READONLY, @NewOrchestrationEvents OrchestrationEvents READONLY, - @NewTaskEvents TaskEvents READONLY + @NewTaskEvents TaskEvents READONLY, + @TaskHubName varchar(150) = NULL AS BEGIN BEGIN TRANSACTION - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__._CurrentTaskHub(@TaskHubName) DECLARE @InputPayloadID uniqueidentifier DECLARE @CustomStatusPayloadID uniqueidentifier @@ -914,12 +942,12 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._DiscardEventsAndUnlockInstance - @TaskHubName varchar(150), @InstanceID varchar(100), - @DeletedEvents MessageIDs READONLY + @DeletedEvents MessageIDs READONLY, + @TaskHubName varchar(150) = NULL AS BEGIN - DECLARE @taskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) + DECLARE @taskHub varchar(50) = __SchemaNamePlaceholder__._CurrentTaskHub(@TaskHubName) -- We return the list of deleted messages so that the caller can issue a -- warning about missing messages @@ -939,8 +967,8 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._AddOrchestrationEvents - @TaskHubName varchar(150), - @NewOrchestrationEvents OrchestrationEvents READONLY + @NewOrchestrationEvents OrchestrationEvents READONLY, + @TaskHubName varchar(150) = NULL AS BEGIN BEGIN TRANSACTION @@ -950,7 +978,7 @@ BEGIN -- order across all stored procedures that execute within a transaction. -- Table order for this sproc: Payloads --> NewEvents - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__._CurrentTaskHub(@TaskHubName) -- External event messages can create new instances -- NOTE: There is a chance this could result in deadlocks if two @@ -1021,14 +1049,14 @@ END GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__.QuerySingleOrchestration - @TaskHubName varchar(150), @InstanceID varchar(100), @ExecutionID varchar(50) = NULL, @FetchInput bit = 1, - @FetchOutput bit = 1 + @FetchOutput bit = 1, + @TaskHubName varchar(150) = NULL AS BEGIN - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__._CurrentTaskHub(@TaskHubName) SELECT TOP 1 I.[InstanceID], @@ -1062,7 +1090,6 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._QueryManyOrchestrations - @TaskHubName varchar(150), @PageSize smallint = 100, @PageNumber int = 0, @FetchInput bit = 1, @@ -1071,10 +1098,11 @@ CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._QueryManyOrchestrations @CreatedTimeTo datetime2 = NULL, @RuntimeStatusFilter varchar(200) = NULL, @InstanceIDPrefix varchar(100) = NULL, - @ExcludeSubOrchestrations bit = 0 + @ExcludeSubOrchestrations bit = 0, + @TaskHubName varchar(150) = NULL AS BEGIN - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__._CurrentTaskHub(@TaskHubName) SELECT I.[InstanceID], @@ -1114,12 +1142,12 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._LockNextTask - @TaskHubName varchar(150), @LockedBy varchar(100), - @LockExpiration datetime2 + @LockExpiration datetime2, + @TaskHubName varchar(150) = NULL AS BEGIN - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__._CurrentTaskHub(@TaskHubName) DECLARE @now datetime2 = SYSUTCDATETIME() DECLARE @SequenceNumber bigint @@ -1171,12 +1199,12 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._RenewOrchestrationLocks - @TaskHubName varchar(150), @InstanceID varchar(100), - @LockExpiration datetime2 + @LockExpiration datetime2, + @TaskHubName varchar(150) = NULL AS BEGIN - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__._CurrentTaskHub(@TaskHubName) UPDATE Instances SET [LockExpiration] = @LockExpiration @@ -1186,12 +1214,12 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._RenewTaskLocks - @TaskHubName varchar(150), @RenewingTasks MessageIDs READONLY, - @LockExpiration datetime2 + @LockExpiration datetime2, + @TaskHubName varchar(150) = NULL AS BEGIN - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__._CurrentTaskHub(@TaskHubName) UPDATE N SET [LockExpiration] = @LockExpiration @@ -1203,12 +1231,12 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._CompleteTasks - @TaskHubName varchar(150), @CompletedTasks MessageIDs READONLY, - @Results TaskEvents READONLY + @Results TaskEvents READONLY, + @TaskHubName varchar(150) = NULL AS BEGIN - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__._CurrentTaskHub(@TaskHubName) BEGIN TRANSACTION @@ -1301,14 +1329,14 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._RewindInstance - @TaskHubName varchar(150), @InstanceID varchar(100), - @Reason varchar(max) = NULL + @Reason varchar(max) = NULL, + @TaskHubName varchar(150) = NULL AS BEGIN BEGIN TRANSACTION - EXEC __SchemaNamePlaceholder__._RewindInstanceRecursive @TaskHubName, @InstanceID, @Reason + EXEC __SchemaNamePlaceholder__._RewindInstanceRecursive @InstanceID, @Reason, @TaskHubName COMMIT TRANSACTION END @@ -1316,12 +1344,12 @@ GO CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._RewindInstanceRecursive - @TaskHubName varchar(150), @InstanceID varchar(100), - @Reason varchar(max) = NULL + @Reason varchar(max) = NULL, + @TaskHubName varchar(150) = NULL AS BEGIN - DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub(@TaskHubName) + DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__._CurrentTaskHub(@TaskHubName) -- *** IMPORTANT *** -- To prevent deadlocks, it is important to maintain consistent table access @@ -1423,7 +1451,7 @@ BEGIN WHILE @@FETCH_STATUS = 0 BEGIN -- Call rewind recursively on the failing suborchestrations - EXECUTE __SchemaNamePlaceholder__._RewindInstanceRecursive @TaskHubName, @subOrchestrationInstanceID, @Reason + EXECUTE __SchemaNamePlaceholder__._RewindInstanceRecursive @subOrchestrationInstanceID, @Reason, @TaskHubName FETCH NEXT FROM subOrchestrationCursor INTO @subOrchestrationInstanceID END CLOSE subOrchestrationCursor diff --git a/src/DurableTask.SqlServer/Scripts/permissions.sql b/src/DurableTask.SqlServer/Scripts/permissions.sql index 1566cd9..8dae446 100644 --- a/src/DurableTask.SqlServer/Scripts/permissions.sql +++ b/src/DurableTask.SqlServer/Scripts/permissions.sql @@ -14,8 +14,11 @@ END -- database user can access data created by another database user. -- Functions +GRANT EXECUTE ON OBJECT::__SchemaNamePlaceholder__._GetScaleMetric TO __SchemaNamePlaceholder___runtime GRANT EXECUTE ON OBJECT::__SchemaNamePlaceholder__.GetScaleMetric TO __SchemaNamePlaceholder___runtime +GRANT EXECUTE ON OBJECT::__SchemaNamePlaceholder__._GetScaleRecommendation TO __SchemaNamePlaceholder___runtime GRANT EXECUTE ON OBJECT::__SchemaNamePlaceholder__.GetScaleRecommendation TO __SchemaNamePlaceholder___runtime +GRANT EXECUTE ON OBJECT::__SchemaNamePlaceholder__._CurrentTaskHub TO __SchemaNamePlaceholder___runtime GRANT EXECUTE ON OBJECT::__SchemaNamePlaceholder__.CurrentTaskHub TO __SchemaNamePlaceholder___runtime -- Public sprocs diff --git a/src/DurableTask.SqlServer/SqlOrchestrationService.cs b/src/DurableTask.SqlServer/SqlOrchestrationService.cs index 549c426..2816e2b 100644 --- a/src/DurableTask.SqlServer/SqlOrchestrationService.cs +++ b/src/DurableTask.SqlServer/SqlOrchestrationService.cs @@ -19,6 +19,7 @@ namespace DurableTask.SqlServer using DurableTask.Core.Query; using DurableTask.SqlServer.SqlTypes; using DurableTask.SqlServer.Utils; + using Dynamitey.DynamicObjects; using Microsoft.Data.SqlClient; using Newtonsoft.Json; @@ -95,8 +96,12 @@ SqlCommand GetTaskHubSprocCommand(SqlConnection connection, string sprocName) return command; } - SqlCommand GetFuncCommand(SqlConnection connection, string funcName, SqlDbType retType, int retSize = 0) - => connection.GetFuncCommand(string.Concat(this.settings.SchemaName, ".", funcName), retType, retSize); + SqlCommand GetTaskHubFuncCommand(SqlConnection connection, string funcName, SqlDbType retType, int retSize = 0) + { + SqlCommand command = connection.GetFuncCommand(string.Concat(this.settings.SchemaName, ".", funcName), retType, retSize); + this.AddTaskHubNameParameter(command); + return command; + } void AddTaskHubNameParameter(SqlCommand command) => command.AddTaskHubNameParameter(this.taskHubNameValue); @@ -115,10 +120,10 @@ async Task CacheTaskHubNameValue() if (this.taskHubNameValue.Length > 50) { using SqlConnection connection = await this.GetAndOpenConnectionAsync(this.ShutdownToken); - using SqlCommand command = this.GetFuncCommand(connection, "CurrentTaskHub", SqlDbType.VarChar, 50); - command.AddTaskHubNameParameter(this.taskHubNameValue); - var taskHubName = (await command.ExecuteFuncAsync(this.ShutdownToken))!; + using SqlCommand command = this.GetTaskHubFuncCommand(connection, "_CurrentTaskHub", SqlDbType.VarChar, 50); + var taskHubName = await command.ExecuteFuncAsync(this.ShutdownToken); // Logic in t-sql function will preserve first 16 characters. + // Also, value is never null from executing db function if (0 == string.Compare(taskHubName, 0, this.taskHubNameValue, 0, 16, StringComparison.OrdinalIgnoreCase)) this.taskHubNameValue = taskHubName; } @@ -896,7 +901,7 @@ public async Task RewindTaskOrchestrationAsync(string instanceId, string reason) /// /// /// - /// This method calls the dt.GetScaleRecommendation SQL function and gets back a number that represents + /// This method calls the dt._GetScaleRecommendation SQL function and gets back a number that represents /// the recommended number of worker replicas that should be allocated to handle the current event backlog. /// The SQL function takes the values of and /// as inputs. @@ -913,13 +918,10 @@ public async Task RewindTaskOrchestrationAsync(string instanceId, string reason) public async Task GetRecommendedReplicaCountAsync(int? currentReplicaCount = null, CancellationToken cancellationToken = default) { using SqlConnection connection = await this.GetAndOpenConnectionAsync(cancellationToken); - using SqlCommand command = connection.CreateCommand(); - command.CommandText = $"SELECT {this.settings.SchemaName}.GetScaleRecommendation(@TaskHubName, @MaxConcurrentOrchestrations, @MaxConcurrentActivities)"; - this.AddTaskHubNameParameter(command); - command.Parameters.Add("@MaxConcurrentOrchestrations", SqlDbType.Int).Value = this.MaxConcurrentTaskOrchestrationWorkItems; - command.Parameters.Add("@MaxConcurrentActivities", SqlDbType.Int).Value = this.MaxConcurrentTaskActivityWorkItems; - - int recommendedReplicaCount = (int)await command.ExecuteScalarAsync(cancellationToken); + using SqlCommand command = this.GetTaskHubFuncCommand(connection, "_GetScaleRecommendation", SqlDbType.Int); + command.Parameters.Add("@MaxOrchestrationsPerWorker", SqlDbType.Int).Value = this.MaxConcurrentTaskOrchestrationWorkItems; + command.Parameters.Add("@MaxActivitiesPerWorker", SqlDbType.Int).Value = this.MaxConcurrentTaskActivityWorkItems; + int recommendedReplicaCount = await command.ExecuteFuncAsync(cancellationToken); if (currentReplicaCount != null && currentReplicaCount != recommendedReplicaCount) { this.traceHelper.ReplicaCountChangeRecommended(currentReplicaCount.Value, recommendedReplicaCount); diff --git a/src/DurableTask.SqlServer/SqlOrchestrationServiceSettings.cs b/src/DurableTask.SqlServer/SqlOrchestrationServiceSettings.cs index d21f38a..b2561dd 100644 --- a/src/DurableTask.SqlServer/SqlOrchestrationServiceSettings.cs +++ b/src/DurableTask.SqlServer/SqlOrchestrationServiceSettings.cs @@ -19,13 +19,20 @@ public class SqlOrchestrationServiceSettings /// public static readonly string DefaultTaskHubName = "default"; + /// + /// Default value for property + /// + public static readonly string DefaultSchemaName = "dt"; + /// /// Initializes a new instance of the class. /// /// The connection string for connecting to the database. /// Optional. The name of the task hub. If not specified, a default name will be used. /// Optional. The name of the schema. If not specified, the default 'dt' value will be used. - public SqlOrchestrationServiceSettings(string connectionString, string? taskHubName = null, string? schemaName = null) + /// Optional. When true task hub name is added to connection string. + /// Default value is true to match how connection pool behaved before introdcution of this parameter. + public SqlOrchestrationServiceSettings(string connectionString, string? taskHubName = null, string? schemaName = null, bool connectionStringWithTaskHubName = true) { if (string.IsNullOrEmpty(connectionString)) { @@ -33,7 +40,7 @@ public SqlOrchestrationServiceSettings(string connectionString, string? taskHubN } this.TaskHubName = taskHubName ?? DefaultTaskHubName; - this.SchemaName = schemaName ?? "dt"; + this.SchemaName = schemaName ?? DefaultSchemaName; var builder = new SqlConnectionStringBuilder(connectionString); @@ -42,6 +49,14 @@ public SqlOrchestrationServiceSettings(string connectionString, string? taskHubN throw new ArgumentException("Database or Initial Catalog must be specified in the connection string.", nameof(connectionString)); } + if (connectionStringWithTaskHubName) + { + // We use the task hub name as the application name so that + // stored procedures have easy access to this information. + builder.ApplicationName = this.TaskHubName; + this.ConnectionStringHasTaskHubName = true; + } + this.DatabaseName = builder.InitialCatalog; this.TaskHubConnectionString = builder.ToString(); } @@ -70,6 +85,13 @@ public SqlOrchestrationServiceSettings(string connectionString, string? taskHubN [JsonProperty("schemaName")] public string SchemaName { get; } + /// + /// True when connection string + /// property is set to the value of the . + /// + [JsonProperty("connectionStringHasTaskHubName ")] + public bool ConnectionStringHasTaskHubName { get; } + /// /// Gets or sets the name of the app. Used as prefix of the lock owner, /// and need to be unique across machines that connect to the hub. diff --git a/src/DurableTask.SqlServer/SqlUtils.cs b/src/DurableTask.SqlServer/SqlUtils.cs index cabe833..29c29a6 100644 --- a/src/DurableTask.SqlServer/SqlUtils.cs +++ b/src/DurableTask.SqlServer/SqlUtils.cs @@ -485,9 +485,7 @@ public static SqlCommand GetSprocCommand(this SqlConnection connection, string s /// Optional return type size public static SqlCommand GetFuncCommand(this SqlConnection connection, string funcName, SqlDbType retType, int retSize = 0) { - SqlCommand command = connection.CreateCommand(); - command.CommandType = CommandType.StoredProcedure; - command.CommandText = funcName; + SqlCommand command = connection.GetSprocCommand(funcName); command.Parameters.Add(new SqlParameter("@RETURN_VALUE", retType, retSize) { Direction = ParameterDirection.ReturnValue, @@ -496,11 +494,11 @@ public static SqlCommand GetFuncCommand(this SqlConnection connection, string fu return command; } - public static async Task ExecuteFuncAsync(this DbCommand command, CancellationToken cancellationToken) + public static async Task ExecuteFuncAsync(this DbCommand command, CancellationToken cancellationToken) { await command.ExecuteNonQueryAsync(cancellationToken); object value = command.Parameters[0].Value; - return value == DBNull.Value ? default : (T) value; + return value == DBNull.Value ? default! : (T) value; } public static bool IsUniqueKeyViolation(SqlException exception) diff --git a/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs b/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs index 0dc3d10..c12e47d 100644 --- a/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs +++ b/test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs @@ -454,8 +454,11 @@ static void ValidateDatabaseSchema(TestDatabase database, string schemaName = "d var expectedFunctionNames = new HashSet(StringComparer.Ordinal) { + $"{schemaName}._CurrentTaskHub", $"{schemaName}.CurrentTaskHub", + $"{schemaName}._GetScaleMetric", $"{schemaName}.GetScaleMetric", + $"{schemaName}._GetScaleRecommendation", $"{schemaName}.GetScaleRecommendation", }; diff --git a/test/DurableTask.SqlServer.Tests/Utils/TestService.cs b/test/DurableTask.SqlServer.Tests/Utils/TestService.cs index a190800..61824dc 100644 --- a/test/DurableTask.SqlServer.Tests/Utils/TestService.cs +++ b/test/DurableTask.SqlServer.Tests/Utils/TestService.cs @@ -261,7 +261,7 @@ public IEnumerable GetAndValidateLogs() public async Task GetTaskHubNameAsync() { return (string)await SharedTestHelpers.ExecuteSqlAsync( - "SELECT dt.CurrentTaskHub(null)", + "SELECT dt.CurrentTaskHub()", this.testCredential.ConnectionString); } From 040261c28254c64c544020e91907c7988395940c Mon Sep 17 00:00:00 2001 From: TDEVMPJ Date: Sat, 3 Jun 2023 19:53:32 -0400 Subject: [PATCH 4/6] Fix views to work with new and old hub name handling. --- src/DurableTask.SqlServer/Scripts/logic.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/DurableTask.SqlServer/Scripts/logic.sql b/src/DurableTask.SqlServer/Scripts/logic.sql index 0d65f92..c6f4953 100644 --- a/src/DurableTask.SqlServer/Scripts/logic.sql +++ b/src/DurableTask.SqlServer/Scripts/logic.sql @@ -139,7 +139,7 @@ AS P.[PayloadID] = I.[OutputPayloadID]) AS [OutputText] FROM Instances I -- like operator is the simplest way to keep indexed seek with conditional where - WHERE I.[TaskHub] LIKE ISNULL(__SchemaNamePlaceholder__._CurrentTaskHub(NULL), '%') + WHERE I.[TaskHub] LIKE __SchemaNamePlaceholder__._CurrentTaskHub('%') GO CREATE OR ALTER VIEW __SchemaNamePlaceholder__.vHistory @@ -162,7 +162,7 @@ AS P.[PayloadID] = H.[DataPayloadID]) AS [Payload] FROM History H -- like operator is the simplest way to keep indexed seek with conditional where - WHERE H.[TaskHub] LIKE ISNULL(__SchemaNamePlaceholder__._CurrentTaskHub(NULL), '%') + WHERE H.[TaskHub] LIKE __SchemaNamePlaceholder__._CurrentTaskHub('%') GO From a85a5e8409496d3fae2e48c4b6c05cf89e80aaa7 Mon Sep 17 00:00:00 2001 From: TDEVMPJ Date: Sun, 4 Jun 2023 08:23:56 -0400 Subject: [PATCH 5/6] Cleanup usings and parameter name. --- src/DurableTask.SqlServer/SqlOrchestrationService.cs | 1 - .../SqlOrchestrationServiceSettings.cs | 8 ++++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/DurableTask.SqlServer/SqlOrchestrationService.cs b/src/DurableTask.SqlServer/SqlOrchestrationService.cs index 2816e2b..6517f7f 100644 --- a/src/DurableTask.SqlServer/SqlOrchestrationService.cs +++ b/src/DurableTask.SqlServer/SqlOrchestrationService.cs @@ -19,7 +19,6 @@ namespace DurableTask.SqlServer using DurableTask.Core.Query; using DurableTask.SqlServer.SqlTypes; using DurableTask.SqlServer.Utils; - using Dynamitey.DynamicObjects; using Microsoft.Data.SqlClient; using Newtonsoft.Json; diff --git a/src/DurableTask.SqlServer/SqlOrchestrationServiceSettings.cs b/src/DurableTask.SqlServer/SqlOrchestrationServiceSettings.cs index b2561dd..ccbb231 100644 --- a/src/DurableTask.SqlServer/SqlOrchestrationServiceSettings.cs +++ b/src/DurableTask.SqlServer/SqlOrchestrationServiceSettings.cs @@ -30,9 +30,9 @@ public class SqlOrchestrationServiceSettings /// The connection string for connecting to the database. /// Optional. The name of the task hub. If not specified, a default name will be used. /// Optional. The name of the schema. If not specified, the default 'dt' value will be used. - /// Optional. When true task hub name is added to connection string. - /// Default value is true to match how connection pool behaved before introdcution of this parameter. - public SqlOrchestrationServiceSettings(string connectionString, string? taskHubName = null, string? schemaName = null, bool connectionStringWithTaskHubName = true) + /// Optional. When true, task hub name is added to connection string. + /// Default value is true to match behavior of connection pool before introdcution of this parameter. + public SqlOrchestrationServiceSettings(string connectionString, string? taskHubName = null, string? schemaName = null, bool addTaskHubToConnectionString = true) { if (string.IsNullOrEmpty(connectionString)) { @@ -49,7 +49,7 @@ public SqlOrchestrationServiceSettings(string connectionString, string? taskHubN throw new ArgumentException("Database or Initial Catalog must be specified in the connection string.", nameof(connectionString)); } - if (connectionStringWithTaskHubName) + if (addTaskHubToConnectionString) { // We use the task hub name as the application name so that // stored procedures have easy access to this information. From 046251b9aadd503a1b40ee0ad44f810f4d1a7717 Mon Sep 17 00:00:00 2001 From: TDEVMPJ Date: Sun, 31 Dec 2023 09:35:07 -0500 Subject: [PATCH 6/6] Allow ENV set connection string to be used with tests. Resolve spelling issues raised by @cgillum in PR review. --- src/DurableTask.SqlServer/SqlDbManager.cs | 2 +- .../SqlOrchestrationServiceSettings.cs | 8 ++++---- .../Utils/SharedTestHelpers.cs | 14 +++++++++++--- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/src/DurableTask.SqlServer/SqlDbManager.cs b/src/DurableTask.SqlServer/SqlDbManager.cs index 4b236d3..1bd877f 100644 --- a/src/DurableTask.SqlServer/SqlDbManager.cs +++ b/src/DurableTask.SqlServer/SqlDbManager.cs @@ -123,7 +123,7 @@ public async Task CreateOrUpgradeSchemaAsync(bool recreateIfExists) command.CommandText = $"{this.settings.SchemaName}.SetGlobalSetting"; command.CommandType = CommandType.StoredProcedure; command.Parameters.Add("@Name", SqlDbType.VarChar, 300).Value = "TaskHubMode"; - command.Parameters.Add("@Value", SqlDbType.Variant).Value = this.settings.CreateMultiTennantTaskHub ? 1 : 0; + command.Parameters.Add("@Value", SqlDbType.Variant).Value = this.settings.CreateMultitenantTaskHub ? 1 : 0; await SqlUtils.ExecuteNonQueryAsync(command, this.traceHelper); } diff --git a/src/DurableTask.SqlServer/SqlOrchestrationServiceSettings.cs b/src/DurableTask.SqlServer/SqlOrchestrationServiceSettings.cs index 3078c89..a7fd1c9 100644 --- a/src/DurableTask.SqlServer/SqlOrchestrationServiceSettings.cs +++ b/src/DurableTask.SqlServer/SqlOrchestrationServiceSettings.cs @@ -31,7 +31,7 @@ public class SqlOrchestrationServiceSettings /// Optional. The name of the task hub. If not specified, a default name will be used. /// Optional. The name of the schema. If not specified, the default 'dt' value will be used. /// Optional. When true, task hub name is added to connection string. - /// Default value is true to match behavior of connection pool before introdcution of this parameter. + /// Default value is true to match behavior of connection pool before introduction of this parameter. public SqlOrchestrationServiceSettings(string connectionString, string? taskHubName = null, string? schemaName = null, bool addTaskHubToConnectionString = true) { if (string.IsNullOrEmpty(connectionString)) @@ -174,12 +174,12 @@ public SqlOrchestrationServiceSettings(string connectionString, string? taskHubN public bool CreateDatabaseIfNotExists { get; set; } /// - /// Gets or sets a flag indicating wether task hub is created for multi-tennant naming + /// Gets or sets a flag indicating wether task hub is created for multitenant naming /// (derived from sql server user name) or for application specified naming. /// /// - [JsonProperty("createMultiTennantTaskHub ")] - public bool CreateMultiTennantTaskHub { get; set; } = true; + [JsonProperty("createMultitenantTaskHub ")] + public bool CreateMultitenantTaskHub { get; set; } = true; /// /// Gets a SQL connection string associated with the configured task hub. diff --git a/test/DurableTask.SqlServer.Tests/Utils/SharedTestHelpers.cs b/test/DurableTask.SqlServer.Tests/Utils/SharedTestHelpers.cs index 16d4f29..e6c6a5d 100644 --- a/test/DurableTask.SqlServer.Tests/Utils/SharedTestHelpers.cs +++ b/test/DurableTask.SqlServer.Tests/Utils/SharedTestHelpers.cs @@ -30,9 +30,17 @@ public static string GetTestName(ITestOutputHelper output) public static string GetDefaultConnectionString(string database = "DurableDB") { - // The default for local development on a Windows OS - string defaultConnectionString = $"Server=localhost;Database={database};Trusted_Connection=True;Encrypt=False;"; - var builder = new SqlConnectionStringBuilder(defaultConnectionString); + // Externally configured connection string for local environemnt (workaround defaults below). + // Fixme: CoreScenarios.MultiInstanceQueries require Initial Catalog=DurableDB to be set. + string environmentConnectionString = Environment.GetEnvironmentVariable("SQLDB_Connection"); + + var connectionString = !string.IsNullOrWhiteSpace(environmentConnectionString) + ? environmentConnectionString + // The default for local development on a Windows OS + : $"Server=localhost;Trusted_Connection=True;Encrypt=False;"; + + var builder = new SqlConnectionStringBuilder(connectionString); + builder.InitialCatalog = database; // The use of SA_PASSWORD is intended for use with the mssql docker container string saPassword = Environment.GetEnvironmentVariable("SA_PASSWORD");