diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e2296b2..22dd04e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,15 +13,15 @@ env: AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} NSERVICEBUS_AMAZONSQS_S3BUCKET: ${{ secrets.NSERVICEBUS_AMAZONSQS_S3BUCKET }} jobs: - build: + build: name: ${{ matrix.name }} runs-on: ${{ matrix.os }} strategy: matrix: include: - - os: windows-2019 + - os: windows-2022 name: Windows - - os: ubuntu-20.04 + - os: ubuntu-22.04 name: Linux fail-fast: false steps: @@ -33,13 +33,11 @@ jobs: - name: Checkout uses: actions/checkout@v4.1.1 with: - fetch-depth: 0 + fetch-depth: 0 - name: Setup .NET SDK uses: actions/setup-dotnet@v4.0.0 with: - dotnet-version: | - 7.0.x - 6.0.x + dotnet-version: 8.0.x - name: Build run: dotnet build src --configuration Release - name: Upload packages @@ -51,4 +49,3 @@ jobs: retention-days: 7 - name: Run tests uses: Particular/run-tests-action@v1.7.0 - diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 334f60b..60a0b61 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -8,16 +8,16 @@ env: DOTNET_NOLOGO: true jobs: release: - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 steps: - name: Checkout uses: actions/checkout@v4.1.1 with: - fetch-depth: 0 + fetch-depth: 0 - name: Setup .NET SDK uses: actions/setup-dotnet@v4.0.0 with: - dotnet-version: 7.0.x + dotnet-version: 8.0.x - name: Build run: dotnet build src --configuration Release - name: Sign NuGet packages @@ -36,4 +36,4 @@ jobs: - name: Deploy uses: Particular/push-octopus-package-action@v1.2.1 with: - octopus-deploy-api-key: ${{ secrets.OCTOPUS_DEPLOY_API_KEY }} \ No newline at end of file + octopus-deploy-api-key: ${{ secrets.OCTOPUS_DEPLOY_API_KEY }} diff --git a/src/Custom.Build.props b/src/Custom.Build.props index 3b7e325..de2f7d0 100644 --- a/src/Custom.Build.props +++ b/src/Custom.Build.props @@ -1,12 +1,11 @@ - 0.9.0 netstandard2.0 - 1.0 + 2.0 minor diff --git a/src/GuardTemplates.DotSettings b/src/GuardTemplates.DotSettings deleted file mode 100644 index 4352395..0000000 --- a/src/GuardTemplates.DotSettings +++ /dev/null @@ -1,56 +0,0 @@ - - True - guard - Guards against null arguments - Guard.AgainstNull("$ARG$", $ARG$); - True - True - Guard - True - True - InCSharpStatement - 2.0 - True - complete() - 0 - True - guarde - Guards against null or empty arguments - Guard.AgainstNullAndEmpty("$ARG$", $ARG$); - True - True - Guard - True - True - InCSharpStatement - 2.0 - True - complete() - 0 - True - guardn - Guard against negative arguments - Guard.AgainstNegative("$ARG$", $ARG$); - True - True - Guard - True - True - InCSharpStatement - 2.0 - True - 0 - True - guardnz - Guards against negative and zero - Guard.AgainstNegativeAndZero("$ARG$", $ARG$); - True - True - Guard - True - True - InCSharpStatement - 2.0 - True - complete() - 0 \ No newline at end of file diff --git a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/.editorconfig b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/.editorconfig index 0f61b0e..7efe105 100644 --- a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/.editorconfig +++ b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/.editorconfig @@ -2,6 +2,7 @@ # Justification: Test project dotnet_diagnostic.CA2007.severity = none +dotnet_diagnostic.PS0018.severity = none # Justification: Tests don't support cancellation and don't need to forward IMessageHandlerContext.CancellationToken dotnet_diagnostic.NSB0002.severity = suggestion diff --git a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/AwsLambdaSQSEndpointTestBase.cs b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/AwsLambdaSQSEndpointTestBase.cs index 356a8f8..1fffc22 100644 --- a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/AwsLambdaSQSEndpointTestBase.cs +++ b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/AwsLambdaSQSEndpointTestBase.cs @@ -15,7 +15,6 @@ using Amazon.SQS; using Amazon.SQS.Model; using Microsoft.Extensions.DependencyInjection; - using Microsoft.VisualStudio.TestPlatform.ObjectModel; using NUnit.Framework; [TestFixture] @@ -40,7 +39,7 @@ class AwsLambdaSQSEndpointTestBase [SetUp] public async Task Setup() { - queueNames = new List(); + queueNames = []; Prefix = Path.GetFileNameWithoutExtension(Path.GetRandomFileName()).ToLowerInvariant(); @@ -103,14 +102,14 @@ await s3Client.DeleteObjectsAsync(new DeleteObjectsRequest } } - protected AwsLambdaSQSEndpointConfiguration DefaultLambdaEndpointConfiguration(TTestContext testContext) + protected AwsLambdaSQSEndpointConfiguration DefaultLambdaEndpointConfiguration(TTestContext testContext, bool useXmlSerializer = false) { - var configuration = DefaultLambdaEndpointConfiguration(); + var configuration = DefaultLambdaEndpointConfiguration(useXmlSerializer); configuration.AdvancedConfiguration.RegisterComponents(c => c.AddSingleton(typeof(TTestContext), testContext)); return configuration; } - protected AwsLambdaSQSEndpointConfiguration DefaultLambdaEndpointConfiguration() + protected AwsLambdaSQSEndpointConfiguration DefaultLambdaEndpointConfiguration(bool useXmlSerializer = false) { var configuration = new AwsLambdaSQSEndpointConfiguration(QueueName, CreateSQSClient(), CreateSNSClient()); configuration.Transport.QueueNamePrefix = Prefix; @@ -118,7 +117,20 @@ protected AwsLambdaSQSEndpointConfiguration DefaultLambdaEndpointConfiguration() configuration.Transport.S3 = new S3Settings(BucketName, Prefix, CreateS3Client()); var advanced = configuration.AdvancedConfiguration; - advanced.SendFailedMessagesTo(ErrorQueueAddress); + + if (useXmlSerializer) + { + advanced.UseSerialization(); + } + else + { + advanced.UseSerialization(); + } + + var recoverability = advanced.Recoverability(); + + recoverability.Immediate(i => i.NumberOfRetries(0)); + recoverability.Delayed(d => d.NumberOfRetries(0)); return configuration; } @@ -138,6 +150,7 @@ protected void RegisterQueueNameToCleanup(string queueName) S3 = new S3Settings(BucketName, Prefix, CreateS3Client()) }; + endpointConfiguration.UseSerialization(); endpointConfiguration.UseTransport(transport); var endpointInstance = await Endpoint.Start(endpointConfiguration) @@ -156,8 +169,8 @@ protected void RegisterQueueNameToCleanup(string queueName) { MaxNumberOfMessages = count, WaitTimeSeconds = 20, - AttributeNames = new List { "SentTimestamp" }, - MessageAttributeNames = new List { "*" } + AttributeNames = ["SentTimestamp"], + MessageAttributeNames = ["*"] }; var receivedMessages = await sqsClient.ReceiveMessageAsync(receiveRequest); @@ -183,8 +196,8 @@ await sqsClient.SendMessageAsync(sendMessageRequest) { MaxNumberOfMessages = 10, WaitTimeSeconds = 20, - AttributeNames = new List { "SentTimestamp" }, - MessageAttributeNames = new List { "*" } + AttributeNames = ["SentTimestamp"], + MessageAttributeNames = ["*"] }; var receivedMessages = await sqsClient.ReceiveMessageAsync(receiveRequest); @@ -214,8 +227,8 @@ protected async Task RetrieveMessagesInErrorQueue(int maxMessageCount { MaxNumberOfMessages = maxMessageCount, WaitTimeSeconds = 20, - AttributeNames = new List { "SentTimestamp" }, - MessageAttributeNames = new List { "*" } + AttributeNames = ["SentTimestamp"], + MessageAttributeNames = ["*"] }; var receivedMessages = await sqsClient.ReceiveMessageAsync(receiveRequest); diff --git a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/NServiceBus.AwsLambda.SQS.AcceptanceTests.csproj b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/NServiceBus.AwsLambda.SQS.AcceptanceTests.csproj index c0fe736..b6f9805 100644 --- a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/NServiceBus.AwsLambda.SQS.AcceptanceTests.csproj +++ b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/NServiceBus.AwsLambda.SQS.AcceptanceTests.csproj @@ -1,20 +1,22 @@ - + - net6.0;net7.0 - true - NServiceBus.AcceptanceTests + net8.0 + + + + + + + + - - - - diff --git a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/NativeIntegration/When_receiving_a_native_message_with_encoding.cs b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/NativeIntegration/When_receiving_a_native_message_with_encoding.cs index e23c1d3..6f0a90d 100644 --- a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/NativeIntegration/When_receiving_a_native_message_with_encoding.cs +++ b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/NativeIntegration/When_receiving_a_native_message_with_encoding.cs @@ -24,7 +24,7 @@ public async Task Should_be_processed_when_messagetypefullname_present() var context = new TestContext(); - var endpoint = new AwsLambdaSQSEndpoint(_ => DefaultLambdaEndpointConfiguration(context)); + var endpoint = new AwsLambdaSQSEndpoint(_ => DefaultLambdaEndpointConfiguration(context, useXmlSerializer: true)); await endpoint.Process(receivedMessages, null); @@ -48,7 +48,7 @@ public async Task Should_fail_when_messagetypefullname_not_present() var context = new TestContext(); - var endpoint = new AwsLambdaSQSEndpoint(_ => DefaultLambdaEndpointConfiguration(context)); + var endpoint = new AwsLambdaSQSEndpoint(_ => DefaultLambdaEndpointConfiguration(context, useXmlSerializer: true)); await endpoint.Process(receivedMessages, null); @@ -74,7 +74,7 @@ public async Task Should_preserve_poison_message_attributes_in_error_queue() var context = new TestContext(); - var endpoint = new AwsLambdaSQSEndpoint(_ => DefaultLambdaEndpointConfiguration(context)); + var endpoint = new AwsLambdaSQSEndpoint(_ => DefaultLambdaEndpointConfiguration(context, useXmlSerializer: true)); await endpoint.Process(receivedMessages, null); var poisonMessages = await RetrieveMessagesInErrorQueue(); @@ -103,12 +103,7 @@ public async Task Should_preserve_message_attributes_in_error_queue() }, FailingMessageToSend); - var endpoint = new AwsLambdaSQSEndpoint(ctx => - { - var configuration = DefaultLambdaEndpointConfiguration(); - configuration.AdvancedConfiguration.Recoverability().Immediate(s => s.NumberOfRetries(0)); - return configuration; - }); + var endpoint = new AwsLambdaSQSEndpoint(_ => DefaultLambdaEndpointConfiguration(useXmlSerializer: true)); await endpoint.Process(receivedMessages, null); var poisonMessages = await RetrieveMessagesInErrorQueue(); @@ -139,7 +134,7 @@ public async Task Should_support_loading_body_from_s3() var context = new TestContext(); - var endpoint = new AwsLambdaSQSEndpoint(_ => DefaultLambdaEndpointConfiguration(context)); + var endpoint = new AwsLambdaSQSEndpoint(_ => DefaultLambdaEndpointConfiguration(context, useXmlSerializer: true)); await endpoint.Process(receivedMessages, null); diff --git a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/NativeIntegration/When_receiving_a_native_message_without_wrapper.cs b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/NativeIntegration/When_receiving_a_native_message_without_wrapper.cs index 50bd597..d838b11 100644 --- a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/NativeIntegration/When_receiving_a_native_message_without_wrapper.cs +++ b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/NativeIntegration/When_receiving_a_native_message_without_wrapper.cs @@ -30,7 +30,7 @@ public async Task Should_be_processed_when_nsbheaders_present_with_messageid() var context = new TestContext(); - var endpoint = new AwsLambdaSQSEndpoint(_ => DefaultLambdaEndpointConfiguration(context)); + var endpoint = new AwsLambdaSQSEndpoint(_ => DefaultLambdaEndpointConfiguration(context, useXmlSerializer: true)); await endpoint.Process(receivedMessages, null); @@ -57,7 +57,7 @@ public async Task Should_be_processed_when_nsbheaders_present_without_messageid( var context = new TestContext(); - var endpoint = new AwsLambdaSQSEndpoint(_ => DefaultLambdaEndpointConfiguration(context)); + var endpoint = new AwsLambdaSQSEndpoint(_ => DefaultLambdaEndpointConfiguration(context, useXmlSerializer: true)); await endpoint.Process(receivedMessages, null); @@ -88,7 +88,7 @@ public async Task Should_support_loading_body_from_s3() var context = new TestContext(); - var endpoint = new AwsLambdaSQSEndpoint(_ => DefaultLambdaEndpointConfiguration(context)); + var endpoint = new AwsLambdaSQSEndpoint(_ => DefaultLambdaEndpointConfiguration(context, useXmlSerializer: true)); await endpoint.Process(receivedMessages, null); @@ -119,7 +119,7 @@ public async Task Should_preserve_poison_message_attributes_in_error_queue() var endpoint = new AwsLambdaSQSEndpoint(ctx => { - var configuration = DefaultLambdaEndpointConfiguration(); + var configuration = DefaultLambdaEndpointConfiguration(useXmlSerializer: true); var transport = configuration.Transport; transport.S3 = new S3Settings(BucketName, Prefix, CreateS3Client()); @@ -162,13 +162,11 @@ public async Task Should_preserve_message_attributes_in_error_queue() var endpoint = new AwsLambdaSQSEndpoint(ctx => { - var configuration = DefaultLambdaEndpointConfiguration(); + var configuration = DefaultLambdaEndpointConfiguration(useXmlSerializer: true); var transport = configuration.Transport; transport.S3 = new S3Settings(BucketName, Prefix, CreateS3Client()); - var advanced = configuration.AdvancedConfiguration; - advanced.Recoverability().Immediate(s => s.NumberOfRetries(0)); return configuration; }); diff --git a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/ReceiveMessageResponseExtensions.cs b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/ReceiveMessageResponseExtensions.cs index a510479..3ff1ed6 100644 --- a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/ReceiveMessageResponseExtensions.cs +++ b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/ReceiveMessageResponseExtensions.cs @@ -1,6 +1,5 @@ namespace NServiceBus.AcceptanceTests { - using System.Collections.Generic; using Amazon.Lambda.SQSEvents; using Amazon.SQS.Model; @@ -10,7 +9,7 @@ public static SQSEvent ToSQSEvent(this ReceiveMessageResponse response) { var @event = new SQSEvent { - Records = new List() + Records = [] }; foreach (var message in response.Messages) { diff --git a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_a_handler_sends_a_message.cs b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_a_handler_sends_a_message.cs index 9d81b06..b4d821d 100644 --- a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_a_handler_sends_a_message.cs +++ b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_a_handler_sends_a_message.cs @@ -20,7 +20,7 @@ public async Task The_message_should_be_received() var destinationConfiguration = new EndpointConfiguration(destinationEndpointName); - destinationConfiguration.SendFailedMessagesTo(ErrorQueueAddress); + destinationConfiguration.UseSerialization(); destinationConfiguration.EnableInstallers(); destinationConfiguration.RegisterComponents(c => c.AddSingleton(typeof(TestContext), context)); destinationConfiguration.UseTransport(new SqsTransport(CreateSQSClient(), CreateSNSClient()) diff --git a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_a_message_handler_always_throws.cs b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_a_message_handler_always_throws.cs index 31345ad..b82320e 100644 --- a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_a_message_handler_always_throws.cs +++ b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_a_message_handler_always_throws.cs @@ -21,8 +21,6 @@ public async Task The_messages_should_forward_to_error_queue_by_default() var messagesInErrorQueueCount = await CountMessagesInErrorQueue(); Assert.AreEqual(receivedMessages.Records.Count, messagesInErrorQueueCount, "Error queue count mismatch"); - - Assert.AreEqual(messagesInErrorQueueCount * 6, context.HandlerInvokationCount, "Immediate/Delayed Retry count mismatch"); } public class TestContext diff --git a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_message_fails_processing.cs b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_message_fails_processing.cs index fe5d665..063e64b 100644 --- a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_message_fails_processing.cs +++ b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_message_fails_processing.cs @@ -1,11 +1,11 @@ namespace NServiceBus.AcceptanceTests { - using NUnit.Framework; - using System.Text.Json; - using System.Threading.Tasks; using System; using System.Linq; + using System.Text.Json; using System.Threading; + using System.Threading.Tasks; + using NUnit.Framework; class When_message_fails_processing : AwsLambdaSQSEndpointTestBase { @@ -20,7 +20,6 @@ public async Task Should_move_message_to_error_queue() Assert.DoesNotThrowAsync(() => endpoint.Process(receivedMessages, null), "message should be moved to the error queue instead"); - Assert.AreEqual(6, context.HandlerInvocationCount, "should immediately retry message before moving it to the error queue"); var errorMessages = await RetrieveMessagesInErrorQueue(); Assert.AreEqual(1, errorMessages.Records.Count); JsonDocument errorMessage = JsonSerializer.Deserialize(errorMessages.Records.First().Body); @@ -49,7 +48,6 @@ public async Task Should_rethrow_when_disabling_error_queue() StringAssert.Contains("Failed to process message", exception.Message); Assert.AreEqual("simulated exception", exception.InnerException.Message); Assert.AreEqual(0, await CountMessagesInErrorQueue()); - Assert.AreEqual(6, context.HandlerInvocationCount, "should immediately retry message before moving it to the error queue"); } public class TestContext diff --git a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_using_send_only.cs b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_using_send_only.cs index 6aa548b..10828c8 100644 --- a/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_using_send_only.cs +++ b/src/NServiceBus.AwsLambda.SQS.AcceptanceTests/When_using_send_only.cs @@ -35,7 +35,7 @@ public async Task Should_send_messages() var destinationTransport = new SqsTransport(CreateSQSClient(), CreateSNSClient()); - destinationConfiguration.SendFailedMessagesTo(ErrorQueueAddress); + destinationConfiguration.UseSerialization(); destinationConfiguration.EnableInstallers(); destinationConfiguration.RegisterComponents(c => c.AddSingleton(typeof(TestContext), context)); destinationConfiguration.UseTransport(destinationTransport); diff --git a/src/NServiceBus.AwsLambda.SQS.Analyzer.Tests/.editorconfig b/src/NServiceBus.AwsLambda.SQS.Analyzer.Tests/.editorconfig new file mode 100644 index 0000000..5f68a61 --- /dev/null +++ b/src/NServiceBus.AwsLambda.SQS.Analyzer.Tests/.editorconfig @@ -0,0 +1,4 @@ +[*.cs] + +# Justification: Test project +dotnet_diagnostic.CA2007.severity = none diff --git a/src/NServiceBus.AwsLambda.SQS.Analyzer.Tests/AnalyzerTestFixture.cs b/src/NServiceBus.AwsLambda.SQS.Analyzer.Tests/AnalyzerTestFixture.cs index 181b8d2..4921de7 100644 --- a/src/NServiceBus.AwsLambda.SQS.Analyzer.Tests/AnalyzerTestFixture.cs +++ b/src/NServiceBus.AwsLambda.SQS.Analyzer.Tests/AnalyzerTestFixture.cs @@ -29,7 +29,7 @@ protected async Task Assert(string[] expectedDiagnosticIds, string markupCode, s var (code, markupSpans) = Parse(markupCode); var project = CreateProject(code); - await WriteCode(project); + await WriteCode(project, cancellationToken); var compilerDiagnostics = (await Task.WhenAll(project.Documents .Select(doc => doc.GetCompilerDiagnostics(cancellationToken)))) @@ -58,7 +58,7 @@ protected async Task Assert(string[] expectedDiagnosticIds, string markupCode, s NUnit.Framework.CollectionAssert.AreEqual(expectedSpansAndIds, actualSpansAndIds); } - protected static async Task WriteCode(Project project) + protected static async Task WriteCode(Project project, CancellationToken cancellationToken = default) { if (!VerboseLogging) { @@ -68,7 +68,7 @@ protected static async Task WriteCode(Project project) foreach (var document in project.Documents) { Console.WriteLine(document.Name); - var code = await document.GetCode(); + var code = await document.GetCode(cancellationToken); foreach (var (line, index) in code.Replace("\r\n", "\n").Split('\n') .Select((line, index) => (line, index))) { @@ -108,9 +108,8 @@ static AnalyzerTestFixture() MetadataReference.CreateFromFile(typeof(Enumerable).GetTypeInfo().Assembly.Location), MetadataReference.CreateFromFile(typeof(System.Linq.Expressions.Expression).GetTypeInfo().Assembly .Location), -#if NET + MetadataReference.CreateFromFile(Assembly.Load("System.Runtime").Location), -#endif MetadataReference.CreateFromFile(typeof(IAwsLambdaSQSEndpoint).GetTypeInfo().Assembly.Location), MetadataReference.CreateFromFile(typeof(EndpointConfiguration).GetTypeInfo().Assembly.Location), MetadataReference.CreateFromFile(typeof(SqsTransport).GetTypeInfo().Assembly.Location)); diff --git a/src/NServiceBus.AwsLambda.SQS.Analyzer.Tests/ConfigurationAnalyzerTests.cs b/src/NServiceBus.AwsLambda.SQS.Analyzer.Tests/ConfigurationAnalyzerTests.cs index 441cdaa..ab74c1c 100644 --- a/src/NServiceBus.AwsLambda.SQS.Analyzer.Tests/ConfigurationAnalyzerTests.cs +++ b/src/NServiceBus.AwsLambda.SQS.Analyzer.Tests/ConfigurationAnalyzerTests.cs @@ -17,9 +17,9 @@ public class ConfigurationAnalyzerTests : AnalyzerTestFixture + - net6.0;net7.0 - true - ..\NServiceBusTests.snk + net8.0 + + + + + - - + - - - - - diff --git a/src/NServiceBus.AwsLambda.SQS.Analyzer.Tests/OptionsAnalyzerTests.cs b/src/NServiceBus.AwsLambda.SQS.Analyzer.Tests/OptionsAnalyzerTests.cs index ac16c70..7f9aed8 100644 --- a/src/NServiceBus.AwsLambda.SQS.Analyzer.Tests/OptionsAnalyzerTests.cs +++ b/src/NServiceBus.AwsLambda.SQS.Analyzer.Tests/OptionsAnalyzerTests.cs @@ -13,7 +13,7 @@ public class OptionsAnalyzerTests : AnalyzerTestFixture public Task DiagnosticIsReportedForOptions(string optionsType, string method, string diagnosticId) { var source = - $@"using NServiceBus; + $@"using NServiceBus; class Foo {{ void Bar({optionsType} options) diff --git a/src/NServiceBus.AwsLambda.SQS.Analyzer/NServiceBus.AwsLambda.SQS.Analyzer.csproj b/src/NServiceBus.AwsLambda.SQS.Analyzer/NServiceBus.AwsLambda.SQS.Analyzer.csproj index af24890..e3a39ce 100644 --- a/src/NServiceBus.AwsLambda.SQS.Analyzer/NServiceBus.AwsLambda.SQS.Analyzer.csproj +++ b/src/NServiceBus.AwsLambda.SQS.Analyzer/NServiceBus.AwsLambda.SQS.Analyzer.csproj @@ -1,4 +1,4 @@ - + $(AnalyzerTargetFramework) @@ -7,12 +7,13 @@ ..\NServiceBus.snk false false + true - - + + @@ -20,4 +21,5 @@ $(MinVerMajor).$(MinVerMinor).$(MinVerPatch).0 - + + \ No newline at end of file diff --git a/src/NServiceBus.AwsLambda.SQS.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt b/src/NServiceBus.AwsLambda.SQS.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt index 1adef98..072f6cb 100644 --- a/src/NServiceBus.AwsLambda.SQS.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt +++ b/src/NServiceBus.AwsLambda.SQS.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt @@ -4,18 +4,18 @@ namespace NServiceBus { public AwsLambdaSQSEndpoint(System.Func configurationFactory) { } public System.Threading.Tasks.Task Process(Amazon.Lambda.SQSEvents.SQSEvent @event, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default) { } - public System.Threading.Tasks.Task Publish(object message, Amazon.Lambda.Core.ILambdaContext lambdaContext) { } - public System.Threading.Tasks.Task Publish(object message, NServiceBus.PublishOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext) { } - public System.Threading.Tasks.Task Publish(System.Action messageConstructor, Amazon.Lambda.Core.ILambdaContext lambdaContext) { } - public System.Threading.Tasks.Task Publish(System.Action messageConstructor, NServiceBus.PublishOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext) { } - public System.Threading.Tasks.Task Send(object message, Amazon.Lambda.Core.ILambdaContext lambdaContext) { } - public System.Threading.Tasks.Task Send(object message, NServiceBus.SendOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext) { } - public System.Threading.Tasks.Task Send(System.Action messageConstructor, Amazon.Lambda.Core.ILambdaContext lambdaContext) { } - public System.Threading.Tasks.Task Send(System.Action messageConstructor, NServiceBus.SendOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext) { } - public System.Threading.Tasks.Task Subscribe(System.Type eventType, Amazon.Lambda.Core.ILambdaContext lambdaContext) { } - public System.Threading.Tasks.Task Subscribe(System.Type eventType, NServiceBus.SubscribeOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext) { } - public System.Threading.Tasks.Task Unsubscribe(System.Type eventType, Amazon.Lambda.Core.ILambdaContext lambdaContext) { } - public System.Threading.Tasks.Task Unsubscribe(System.Type eventType, NServiceBus.UnsubscribeOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext) { } + public System.Threading.Tasks.Task Publish(object message, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default) { } + public System.Threading.Tasks.Task Publish(object message, NServiceBus.PublishOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default) { } + public System.Threading.Tasks.Task Publish(System.Action messageConstructor, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default) { } + public System.Threading.Tasks.Task Publish(System.Action messageConstructor, NServiceBus.PublishOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default) { } + public System.Threading.Tasks.Task Send(object message, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default) { } + public System.Threading.Tasks.Task Send(object message, NServiceBus.SendOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default) { } + public System.Threading.Tasks.Task Send(System.Action messageConstructor, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default) { } + public System.Threading.Tasks.Task Send(System.Action messageConstructor, NServiceBus.SendOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default) { } + public System.Threading.Tasks.Task Subscribe(System.Type eventType, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default) { } + public System.Threading.Tasks.Task Subscribe(System.Type eventType, NServiceBus.SubscribeOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default) { } + public System.Threading.Tasks.Task Unsubscribe(System.Type eventType, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default) { } + public System.Threading.Tasks.Task Unsubscribe(System.Type eventType, NServiceBus.UnsubscribeOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default) { } } public class AwsLambdaSQSEndpointConfiguration { @@ -31,17 +31,17 @@ namespace NServiceBus public interface IAwsLambdaSQSEndpoint { System.Threading.Tasks.Task Process(Amazon.Lambda.SQSEvents.SQSEvent @event, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default); - System.Threading.Tasks.Task Publish(object message, Amazon.Lambda.Core.ILambdaContext lambdaContext); - System.Threading.Tasks.Task Publish(object message, NServiceBus.PublishOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext); - System.Threading.Tasks.Task Publish(System.Action messageConstructor, Amazon.Lambda.Core.ILambdaContext lambdaContext); - System.Threading.Tasks.Task Publish(System.Action messageConstructor, NServiceBus.PublishOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext); - System.Threading.Tasks.Task Send(object message, Amazon.Lambda.Core.ILambdaContext lambdaContext); - System.Threading.Tasks.Task Send(object message, NServiceBus.SendOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext); - System.Threading.Tasks.Task Send(System.Action messageConstructor, Amazon.Lambda.Core.ILambdaContext lambdaContext); - System.Threading.Tasks.Task Send(System.Action messageConstructor, NServiceBus.SendOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext); - System.Threading.Tasks.Task Subscribe(System.Type eventType, Amazon.Lambda.Core.ILambdaContext lambdaContext); - System.Threading.Tasks.Task Subscribe(System.Type eventType, NServiceBus.SubscribeOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext); - System.Threading.Tasks.Task Unsubscribe(System.Type eventType, Amazon.Lambda.Core.ILambdaContext lambdaContext); - System.Threading.Tasks.Task Unsubscribe(System.Type eventType, NServiceBus.UnsubscribeOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext); + System.Threading.Tasks.Task Publish(object message, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default); + System.Threading.Tasks.Task Publish(object message, NServiceBus.PublishOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default); + System.Threading.Tasks.Task Publish(System.Action messageConstructor, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default); + System.Threading.Tasks.Task Publish(System.Action messageConstructor, NServiceBus.PublishOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default); + System.Threading.Tasks.Task Send(object message, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default); + System.Threading.Tasks.Task Send(object message, NServiceBus.SendOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default); + System.Threading.Tasks.Task Send(System.Action messageConstructor, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default); + System.Threading.Tasks.Task Send(System.Action messageConstructor, NServiceBus.SendOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default); + System.Threading.Tasks.Task Subscribe(System.Type eventType, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default); + System.Threading.Tasks.Task Subscribe(System.Type eventType, NServiceBus.SubscribeOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default); + System.Threading.Tasks.Task Unsubscribe(System.Type eventType, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default); + System.Threading.Tasks.Task Unsubscribe(System.Type eventType, NServiceBus.UnsubscribeOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default); } } \ No newline at end of file diff --git a/src/NServiceBus.AwsLambda.SQS.Tests/NServiceBus.AwsLambda.SQS.Tests.csproj b/src/NServiceBus.AwsLambda.SQS.Tests/NServiceBus.AwsLambda.SQS.Tests.csproj index 2ff208b..a2c1dbf 100644 --- a/src/NServiceBus.AwsLambda.SQS.Tests/NServiceBus.AwsLambda.SQS.Tests.csproj +++ b/src/NServiceBus.AwsLambda.SQS.Tests/NServiceBus.AwsLambda.SQS.Tests.csproj @@ -1,25 +1,24 @@ - + - net6.0;net7.0 - true - NServiceBus.AwsLambda.Tests + net8.0 - - - - + + + + + + + + + - - - - diff --git a/src/NServiceBus.AwsLambda.SQS/AwsLambdaEndpoint.cs b/src/NServiceBus.AwsLambda.SQS/AwsLambdaEndpoint.cs index 7101540..a485831 100644 --- a/src/NServiceBus.AwsLambda.SQS/AwsLambdaEndpoint.cs +++ b/src/NServiceBus.AwsLambda.SQS/AwsLambdaEndpoint.cs @@ -57,17 +57,17 @@ await Task.WhenAll(processTasks) .ConfigureAwait(false); } - async Task InitializeEndpointIfNecessary(ILambdaContext executionContext, CancellationToken token = default) + async Task InitializeEndpointIfNecessary(ILambdaContext executionContext, CancellationToken cancellationToken) { if (pipeline == null) { - await semaphoreLock.WaitAsync(token) + await semaphoreLock.WaitAsync(cancellationToken) .ConfigureAwait(false); try { if (pipeline == null) { - var transportInfrastructure = await Initialize(executionContext, token).ConfigureAwait(false); + var transportInfrastructure = await Initialize(executionContext, cancellationToken).ConfigureAwait(false); pipeline = transportInfrastructure.PipelineInvoker; } @@ -79,7 +79,7 @@ await semaphoreLock.WaitAsync(token) } } - async Task Initialize(ILambdaContext executionContext, CancellationToken token) + async Task Initialize(ILambdaContext executionContext, CancellationToken cancellationToken) { var configuration = configurationFactory(executionContext); @@ -89,7 +89,7 @@ async Task Initialize(ILambdaContext executio var serverlessTransport = new ServerlessTransport(configuration.Transport); configuration.EndpointConfiguration.UseTransport(serverlessTransport); - endpoint = await Endpoint.Start(configuration.EndpointConfiguration, token).ConfigureAwait(false); + endpoint = await Endpoint.Start(configuration.EndpointConfiguration, cancellationToken).ConfigureAwait(false); var transportInfrastructure = serverlessTransport.GetTransportInfrastructure(endpoint); isSendOnly = transportInfrastructure.IsSendOnly; @@ -97,139 +97,139 @@ async Task Initialize(ILambdaContext executio if (!isSendOnly) { receiveQueueAddress = transportInfrastructure.PipelineInvoker.ReceiveAddress; - receiveQueueUrl = await GetQueueUrl(receiveQueueAddress).ConfigureAwait(false); - errorQueueUrl = await GetQueueUrl(transportInfrastructure.ErrorQueueAddress).ConfigureAwait(false); + receiveQueueUrl = await GetQueueUrl(receiveQueueAddress, cancellationToken).ConfigureAwait(false); + errorQueueUrl = await GetQueueUrl(transportInfrastructure.ErrorQueueAddress, cancellationToken).ConfigureAwait(false); } return transportInfrastructure; } /// - public async Task Send(object message, SendOptions options, ILambdaContext lambdaContext) + public async Task Send(object message, SendOptions options, ILambdaContext lambdaContext, CancellationToken cancellationToken = default) { - await InitializeEndpointIfNecessary(lambdaContext).ConfigureAwait(false); + await InitializeEndpointIfNecessary(lambdaContext, cancellationToken).ConfigureAwait(false); - await endpoint.Send(message, options).ConfigureAwait(false); + await endpoint.Send(message, options, cancellationToken).ConfigureAwait(false); } /// - public Task Send(object message, ILambdaContext lambdaContext) - => Send(message, new SendOptions(), lambdaContext); + public Task Send(object message, ILambdaContext lambdaContext, CancellationToken cancellationToken = default) + => Send(message, new SendOptions(), lambdaContext, cancellationToken); /// - public async Task Send(Action messageConstructor, SendOptions options, ILambdaContext lambdaContext) + public async Task Send(Action messageConstructor, SendOptions options, ILambdaContext lambdaContext, CancellationToken cancellationToken = default) { - await InitializeEndpointIfNecessary(lambdaContext) + await InitializeEndpointIfNecessary(lambdaContext, cancellationToken) .ConfigureAwait(false); - await endpoint.Send(messageConstructor, options) + await endpoint.Send(messageConstructor, options, cancellationToken) .ConfigureAwait(false); } /// - public async Task Send(Action messageConstructor, ILambdaContext lambdaContext) + public async Task Send(Action messageConstructor, ILambdaContext lambdaContext, CancellationToken cancellationToken = default) { - await InitializeEndpointIfNecessary(lambdaContext) + await InitializeEndpointIfNecessary(lambdaContext, cancellationToken) .ConfigureAwait(false); - await Send(messageConstructor, new SendOptions(), lambdaContext) + await Send(messageConstructor, new SendOptions(), lambdaContext, cancellationToken) .ConfigureAwait(false); } /// - public async Task Publish(object message, PublishOptions options, ILambdaContext lambdaContext) + public async Task Publish(object message, PublishOptions options, ILambdaContext lambdaContext, CancellationToken cancellationToken = default) { - await InitializeEndpointIfNecessary(lambdaContext) + await InitializeEndpointIfNecessary(lambdaContext, cancellationToken) .ConfigureAwait(false); - await endpoint.Publish(message, options) + await endpoint.Publish(message, options, cancellationToken) .ConfigureAwait(false); } /// - public async Task Publish(Action messageConstructor, PublishOptions options, ILambdaContext lambdaContext) + public async Task Publish(Action messageConstructor, PublishOptions options, ILambdaContext lambdaContext, CancellationToken cancellationToken = default) { - await InitializeEndpointIfNecessary(lambdaContext) + await InitializeEndpointIfNecessary(lambdaContext, cancellationToken) .ConfigureAwait(false); - await endpoint.Publish(messageConstructor, options) + await endpoint.Publish(messageConstructor, options, cancellationToken) .ConfigureAwait(false); } /// - public async Task Publish(object message, ILambdaContext lambdaContext) + public async Task Publish(object message, ILambdaContext lambdaContext, CancellationToken cancellationToken = default) { - await InitializeEndpointIfNecessary(lambdaContext) + await InitializeEndpointIfNecessary(lambdaContext, cancellationToken) .ConfigureAwait(false); - await endpoint.Publish(message) + await endpoint.Publish(message, cancellationToken) .ConfigureAwait(false); } /// - public async Task Publish(Action messageConstructor, ILambdaContext lambdaContext) + public async Task Publish(Action messageConstructor, ILambdaContext lambdaContext, CancellationToken cancellationToken = default) { - await InitializeEndpointIfNecessary(lambdaContext) + await InitializeEndpointIfNecessary(lambdaContext, cancellationToken) .ConfigureAwait(false); - await endpoint.Publish(messageConstructor) + await endpoint.Publish(messageConstructor, cancellationToken) .ConfigureAwait(false); } /// - public async Task Subscribe(Type eventType, SubscribeOptions options, ILambdaContext lambdaContext) + public async Task Subscribe(Type eventType, SubscribeOptions options, ILambdaContext lambdaContext, CancellationToken cancellationToken = default) { - await InitializeEndpointIfNecessary(lambdaContext) + await InitializeEndpointIfNecessary(lambdaContext, cancellationToken) .ConfigureAwait(false); - await endpoint.Subscribe(eventType, options) + await endpoint.Subscribe(eventType, options, cancellationToken) .ConfigureAwait(false); } /// - public async Task Subscribe(Type eventType, ILambdaContext lambdaContext) + public async Task Subscribe(Type eventType, ILambdaContext lambdaContext, CancellationToken cancellationToken = default) { - await InitializeEndpointIfNecessary(lambdaContext) + await InitializeEndpointIfNecessary(lambdaContext, cancellationToken) .ConfigureAwait(false); - await endpoint.Subscribe(eventType) + await endpoint.Subscribe(eventType, cancellationToken) .ConfigureAwait(false); } /// - public async Task Unsubscribe(Type eventType, UnsubscribeOptions options, ILambdaContext lambdaContext) + public async Task Unsubscribe(Type eventType, UnsubscribeOptions options, ILambdaContext lambdaContext, CancellationToken cancellationToken = default) { - await InitializeEndpointIfNecessary(lambdaContext) + await InitializeEndpointIfNecessary(lambdaContext, cancellationToken) .ConfigureAwait(false); - await endpoint.Unsubscribe(eventType, options) + await endpoint.Unsubscribe(eventType, options, cancellationToken) .ConfigureAwait(false); } /// - public async Task Unsubscribe(Type eventType, ILambdaContext lambdaContext) + public async Task Unsubscribe(Type eventType, ILambdaContext lambdaContext, CancellationToken cancellationToken = default) { - await InitializeEndpointIfNecessary(lambdaContext) + await InitializeEndpointIfNecessary(lambdaContext, cancellationToken) .ConfigureAwait(false); - await endpoint.Unsubscribe(eventType) + await endpoint.Unsubscribe(eventType, cancellationToken) .ConfigureAwait(false); } - async Task GetQueueUrl(string queueName) + async Task GetQueueUrl(string queueName, CancellationToken cancellationToken) { try { - return (await sqsClient.GetQueueUrlAsync(queueName).ConfigureAwait(false)).QueueUrl; + return (await sqsClient.GetQueueUrlAsync(queueName, cancellationToken).ConfigureAwait(false)).QueueUrl; } - catch (Exception e) + catch (Exception ex) when (!ex.IsCausedBy(cancellationToken)) { - Logger.Error($"Failed to obtain the queue URL for queue {queueName} (derived from configured name {queueName}).", e); + Logger.Error($"Failed to obtain the queue URL for queue {queueName} (derived from configured name {queueName}).", ex); throw; } } - async Task ProcessMessage(SQSEvent.SQSMessage receivedLambdaMessage, ILambdaContext lambdaContext, CancellationToken token) + async Task ProcessMessage(SQSEvent.SQSMessage receivedLambdaMessage, ILambdaContext lambdaContext, CancellationToken cancellationToken) { var arrayPool = ArrayPool.Shared; ReadOnlyMemory messageBody = null; @@ -258,7 +258,7 @@ async Task ProcessMessage(SQSEvent.SQSMessage receivedLambdaMessage, ILambdaCont { transportMessage = new TransportMessage { - Headers = JsonSerializer.Deserialize>(headersAttribute.StringValue) ?? new Dictionary(), + Headers = JsonSerializer.Deserialize>(headersAttribute.StringValue) ?? [], Body = receivedMessage.Body }; transportMessage.Headers[Headers.MessageId] = messageId; @@ -303,9 +303,9 @@ async Task ProcessMessage(SQSEvent.SQSMessage receivedLambdaMessage, ILambdaCont } } - (messageBody, messageBodyBuffer) = await transportMessage.RetrieveBody(messageId, s3Settings, arrayPool, token).ConfigureAwait(false); + (messageBody, messageBodyBuffer) = await transportMessage.RetrieveBody(messageId, s3Settings, arrayPool, cancellationToken).ConfigureAwait(false); } - catch (Exception ex) when (!ex.IsCausedBy(token)) + catch (Exception ex) when (!ex.IsCausedBy(cancellationToken)) { // Can't deserialize. This is a poison message exception = ex; @@ -316,20 +316,20 @@ async Task ProcessMessage(SQSEvent.SQSMessage receivedLambdaMessage, ILambdaCont { LogPoisonMessage(messageId, exception); - await MovePoisonMessageToErrorQueue(receivedMessage, token).ConfigureAwait(false); + await MovePoisonMessageToErrorQueue(receivedMessage, cancellationToken).ConfigureAwait(false); return; } if (!IsMessageExpired(receivedMessage, transportMessage.Headers, messageId, sqsClient.Config.ClockOffset)) { // here we also want to use the native message id because the core demands it like that - await ProcessMessageWithInMemoryRetries(transportMessage.Headers, nativeMessageId, messageBody, receivedMessage, receivedLambdaMessage, lambdaContext, token).ConfigureAwait(false); + await ProcessMessageWithInMemoryRetries(transportMessage.Headers, nativeMessageId, messageBody, receivedMessage, receivedLambdaMessage, lambdaContext, cancellationToken).ConfigureAwait(false); } // Always delete the message from the queue. // If processing failed, the onError handler will have moved the message // to a retry queue. - await DeleteMessageAndBodyIfRequired(receivedMessage, transportMessage.S3BodyKey).ConfigureAwait(false); + await DeleteMessageAndBodyIfRequired(receivedMessage, transportMessage.S3BodyKey, cancellationToken).ConfigureAwait(false); } finally { @@ -367,7 +367,7 @@ static bool IsMessageExpired(Message receivedMessage, Dictionary return true; } - async Task ProcessMessageWithInMemoryRetries(Dictionary headers, string nativeMessageId, ReadOnlyMemory body, Message nativeMessage, SQSEvent.SQSMessage nativeLambdaMessage, ILambdaContext lambdaContext, CancellationToken token) + async Task ProcessMessageWithInMemoryRetries(Dictionary headers, string nativeMessageId, ReadOnlyMemory body, Message nativeMessage, SQSEvent.SQSMessage nativeLambdaMessage, ILambdaContext lambdaContext, CancellationToken cancellationToken) { var immediateProcessingAttempts = 0; var errorHandled = false; @@ -387,7 +387,7 @@ async Task ProcessMessageWithInMemoryRetries(Dictionary headers, try { - token.ThrowIfCancellationRequested(); + cancellationToken.ThrowIfCancellationRequested(); var messageContext = new MessageContext( nativeMessageId, @@ -397,35 +397,25 @@ async Task ProcessMessageWithInMemoryRetries(Dictionary headers, receiveQueueAddress, context); - await Process(messageContext, lambdaContext, token).ConfigureAwait(false); + await Process(messageContext, lambdaContext, cancellationToken).ConfigureAwait(false); return; } - catch (Exception ex) - when (!(ex is OperationCanceledException && token.IsCancellationRequested)) + catch (Exception ex) when (!ex.IsCausedBy(cancellationToken)) { immediateProcessingAttempts++; - ErrorHandleResult errorHandlerResult; - try - { - var errorContext = new ErrorContext( - ex, - new Dictionary(headers), - nativeMessageId, - body, - transportTransaction, - immediateProcessingAttempts, - receiveQueueAddress, - context); - - errorHandlerResult = await ProcessFailedMessage(errorContext, lambdaContext).ConfigureAwait(false); - } - catch (Exception onErrorEx) - { - Logger.Warn($"Failed to execute recoverability policy for message with native ID: `{nativeMessageId}`", onErrorEx); - throw; - } + var errorContext = new ErrorContext( + ex, + new Dictionary(headers), + nativeMessageId, + body, + transportTransaction, + immediateProcessingAttempts, + receiveQueueAddress, + context); + + var errorHandlerResult = await ProcessFailedMessage(errorContext, lambdaContext, nativeMessageId, cancellationToken).ConfigureAwait(false); errorHandled = errorHandlerResult == ErrorHandleResult.Handled; } @@ -436,20 +426,29 @@ async Task Process(MessageContext messageContext, ILambdaContext executionContex { await InitializeEndpointIfNecessary(executionContext, cancellationToken) .ConfigureAwait(false); - await pipeline.PushMessage(messageContext) + + await pipeline.PushMessage(messageContext, cancellationToken) .ConfigureAwait(false); } - async Task ProcessFailedMessage(ErrorContext errorContext, ILambdaContext executionContext) + async Task ProcessFailedMessage(ErrorContext errorContext, ILambdaContext executionContext, string nativeMessageId, CancellationToken cancellationToken) { - await InitializeEndpointIfNecessary(executionContext) - .ConfigureAwait(false); + try + { + await InitializeEndpointIfNecessary(executionContext, cancellationToken) + .ConfigureAwait(false); - return await pipeline.PushFailedMessage(errorContext) - .ConfigureAwait(false); + return await pipeline.PushFailedMessage(errorContext, cancellationToken) + .ConfigureAwait(false); + } + catch (Exception onErrorEx) when (!onErrorEx.IsCausedBy(cancellationToken)) + { + Logger.Warn($"Failed to execute recoverability policy for message with native ID: `{nativeMessageId}`", onErrorEx); + throw; + } } - async Task DeleteMessageAndBodyIfRequired(Message message, string messageS3BodyKey) + async Task DeleteMessageAndBodyIfRequired(Message message, string messageS3BodyKey, CancellationToken cancellationToken) { try { @@ -488,7 +487,7 @@ await sqsClient.SendMessageAsync(new SendMessageRequest // and can't be re-sent. Unfortunately all the SQS metadata // such as SentTimestamp is reset with this send. } - catch (Exception ex) + catch (Exception ex) when (!ex.IsCausedBy(cancellationToken)) { Logger.Error($"Error moving poison message to error queue at url {errorQueueUrl}. Moving back to input queue.", ex); try diff --git a/src/NServiceBus.AwsLambda.SQS/AwsLambdaSQSEndpointConfiguration.cs b/src/NServiceBus.AwsLambda.SQS/AwsLambdaSQSEndpointConfiguration.cs index 72089b7..fa21c3e 100644 --- a/src/NServiceBus.AwsLambda.SQS/AwsLambdaSQSEndpointConfiguration.cs +++ b/src/NServiceBus.AwsLambda.SQS/AwsLambdaSQSEndpointConfiguration.cs @@ -5,7 +5,6 @@ using Amazon.SimpleNotificationService; using Amazon.SQS; using AwsLambda.SQS; - using NServiceBus.Logging; using Serialization; @@ -37,10 +36,7 @@ public AwsLambdaSQSEndpointConfiguration(string endpointName, IAmazonSQS sqsClie recoverabilityPolicy.SendFailedMessagesToErrorQueue = true; - // delayed delivery is disabled by default as the required FIFO queue might not exist - EndpointConfiguration.Recoverability() - .Delayed(c => c.NumberOfRetries(0)) - .CustomPolicy(recoverabilityPolicy.Invoke); + EndpointConfiguration.Recoverability().CustomPolicy(recoverabilityPolicy.Invoke); if (sqsClient is null && snsClient is null) { diff --git a/src/NServiceBus.AwsLambda.SQS/IAwsLambdaSQSEndpoint.cs b/src/NServiceBus.AwsLambda.SQS/IAwsLambdaSQSEndpoint.cs index 0b82815..13ce351 100644 --- a/src/NServiceBus.AwsLambda.SQS/IAwsLambdaSQSEndpoint.cs +++ b/src/NServiceBus.AwsLambda.SQS/IAwsLambdaSQSEndpoint.cs @@ -20,64 +20,64 @@ public interface IAwsLambdaSQSEndpoint /// /// Sends the provided message. /// - Task Send(object message, SendOptions options, ILambdaContext lambdaContext); + Task Send(object message, SendOptions options, ILambdaContext lambdaContext, CancellationToken cancellationToken = default); /// /// Sends the provided message. /// - Task Send(object message, ILambdaContext lambdaContext); + Task Send(object message, ILambdaContext lambdaContext, CancellationToken cancellationToken = default); /// /// Instantiates a message of type T and sends it. /// - Task Send(Action messageConstructor, SendOptions options, ILambdaContext lambdaContext); + Task Send(Action messageConstructor, SendOptions options, ILambdaContext lambdaContext, CancellationToken cancellationToken = default); /// /// Instantiates a message of type T and sends it. /// - Task Send(Action messageConstructor, ILambdaContext lambdaContext); + Task Send(Action messageConstructor, ILambdaContext lambdaContext, CancellationToken cancellationToken = default); /// /// Publish the message to subscribers. /// - Task Publish(object message, PublishOptions options, ILambdaContext lambdaContext); + Task Publish(object message, PublishOptions options, ILambdaContext lambdaContext, CancellationToken cancellationToken = default); /// /// Instantiates a message of type T and publishes it. /// - Task Publish(Action messageConstructor, PublishOptions options, ILambdaContext lambdaContext); + Task Publish(Action messageConstructor, PublishOptions options, ILambdaContext lambdaContext, CancellationToken cancellationToken = default); /// /// Instantiates a message of type T and publishes it. /// - Task Publish(object message, ILambdaContext lambdaContext); + Task Publish(object message, ILambdaContext lambdaContext, CancellationToken cancellationToken = default); /// /// Instantiates a message of type T and publishes it. /// - Task Publish(Action messageConstructor, ILambdaContext lambdaContext); + Task Publish(Action messageConstructor, ILambdaContext lambdaContext, CancellationToken cancellationToken = default); /// /// Subscribes to receive published messages of the specified type. /// This method is only necessary if you turned off auto-subscribe. /// - Task Subscribe(Type eventType, SubscribeOptions options, ILambdaContext lambdaContext); + Task Subscribe(Type eventType, SubscribeOptions options, ILambdaContext lambdaContext, CancellationToken cancellationToken = default); /// /// Subscribes to receive published messages of the specified type. /// This method is only necessary if you turned off auto-subscribe. /// - Task Subscribe(Type eventType, ILambdaContext lambdaContext); + Task Subscribe(Type eventType, ILambdaContext lambdaContext, CancellationToken cancellationToken = default); /// /// Unsubscribes to receive published messages of the specified type. /// - Task Unsubscribe(Type eventType, UnsubscribeOptions options, ILambdaContext lambdaContext); + Task Unsubscribe(Type eventType, UnsubscribeOptions options, ILambdaContext lambdaContext, CancellationToken cancellationToken = default); /// /// Unsubscribes to receive published messages of the specified type. /// - Task Unsubscribe(Type eventType, ILambdaContext lambdaContext); + Task Unsubscribe(Type eventType, ILambdaContext lambdaContext, CancellationToken cancellationToken = default); } } \ No newline at end of file diff --git a/src/NServiceBus.AwsLambda.SQS/NServiceBus.AwsLambda.SQS.csproj b/src/NServiceBus.AwsLambda.SQS/NServiceBus.AwsLambda.SQS.csproj index a99ab42..42eb757 100644 --- a/src/NServiceBus.AwsLambda.SQS/NServiceBus.AwsLambda.SQS.csproj +++ b/src/NServiceBus.AwsLambda.SQS/NServiceBus.AwsLambda.SQS.csproj @@ -1,20 +1,24 @@  - net6.0;net7.0 + net8.0 true ..\NServiceBus.snk - + - + + + + + + + + - - - - + diff --git a/src/NServiceBus.AwsLambda.SQS/S3EncryptionMethodExtensions.cs b/src/NServiceBus.AwsLambda.SQS/S3EncryptionMethodExtensions.cs index cbb1ef9..5f6c41b 100644 --- a/src/NServiceBus.AwsLambda.SQS/S3EncryptionMethodExtensions.cs +++ b/src/NServiceBus.AwsLambda.SQS/S3EncryptionMethodExtensions.cs @@ -1,7 +1,7 @@ #nullable enable namespace NServiceBus.AwsLambda.SQS { - using System.Reflection; + using System.Runtime.CompilerServices; using Amazon.S3.Model; static class S3EncryptionMethodExtensions @@ -13,9 +13,10 @@ public static void ModifyRequest(this S3EncryptionMethod encryptionMethod, GetOb return; } - // TODO: Optimize - var methodInfo = encryptionMethod.GetType().GetMethod("ModifyGetRequest", BindingFlags.NonPublic | BindingFlags.Instance); - methodInfo!.Invoke(encryptionMethod, new object?[] { request }); + ModifyGetRequest(encryptionMethod, request); } + + [UnsafeAccessor(UnsafeAccessorKind.Method, Name = "ModifyGetRequest")] + static extern void ModifyGetRequest(S3EncryptionMethod encryptionMethod, GetObjectRequest get); } } \ No newline at end of file diff --git a/src/NServiceBus.AwsLambda.SQS/SQSEventExtensions.cs b/src/NServiceBus.AwsLambda.SQS/SQSEventExtensions.cs index 45e1a75..1a2923e 100644 --- a/src/NServiceBus.AwsLambda.SQS/SQSEventExtensions.cs +++ b/src/NServiceBus.AwsLambda.SQS/SQSEventExtensions.cs @@ -1,7 +1,6 @@ namespace NServiceBus.AwsLambda.SQS { using System.Collections.Generic; - using System.IO; using Amazon.SQS.Model; using static Amazon.Lambda.SQSEvents.SQSEvent; @@ -48,8 +47,8 @@ static MessageAttributeValue CopyMessageAttributeValue(MessageAttribute source) target.BinaryValue = source.BinaryValue ?? target.BinaryValue; // The SQS client returns empty lists instead of null - target.StringListValues = source.StringListValues ?? new List(0); - target.BinaryListValues = source.BinaryListValues ?? new List(0); + target.StringListValues = source.StringListValues ?? []; + target.BinaryListValues = source.BinaryListValues ?? []; return target; } diff --git a/src/NServiceBus.AwsLambda.SQS/TransportMessage.cs b/src/NServiceBus.AwsLambda.SQS/TransportMessage.cs index b38de6e..1d054dd 100644 --- a/src/NServiceBus.AwsLambda.SQS/TransportMessage.cs +++ b/src/NServiceBus.AwsLambda.SQS/TransportMessage.cs @@ -32,11 +32,7 @@ public TransportMessage(OutgoingMessage outgoingMessage, DispatchProperties prop if (outgoingMessage.Body.Length != 0) { -#if NETFRAMEWORK - Body = Convert.ToBase64String(outgoingMessage.Body.ToArray()); -#else Body = Convert.ToBase64String(outgoingMessage.Body.Span); -#endif } else { diff --git a/src/NServiceBus.AwsLambda.SQS/TransportMessageExtensions.cs b/src/NServiceBus.AwsLambda.SQS/TransportMessageExtensions.cs index 5f033f1..0e5c2b8 100644 --- a/src/NServiceBus.AwsLambda.SQS/TransportMessageExtensions.cs +++ b/src/NServiceBus.AwsLambda.SQS/TransportMessageExtensions.cs @@ -4,7 +4,6 @@ namespace NServiceBus.AwsLambda.SQS using System; using System.Buffers; using System.IO; - using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; diff --git a/src/NServiceBus.AwsLambda.SQS/TransportWrapper/IMessageProcessor.cs b/src/NServiceBus.AwsLambda.SQS/TransportWrapper/IMessageProcessor.cs index 6e004c1..b6412ec 100644 --- a/src/NServiceBus.AwsLambda.SQS/TransportWrapper/IMessageProcessor.cs +++ b/src/NServiceBus.AwsLambda.SQS/TransportWrapper/IMessageProcessor.cs @@ -1,12 +1,13 @@ namespace NServiceBus.AwsLambda.SQS.TransportWrapper { + using System.Threading; using System.Threading.Tasks; using Transport; interface IMessageProcessor { string ReceiveAddress { get; } - Task PushFailedMessage(ErrorContext errorContext); - Task PushMessage(MessageContext messageContext); + Task PushFailedMessage(ErrorContext errorContext, CancellationToken cancellationToken = default); + Task PushMessage(MessageContext messageContext, CancellationToken cancellationToken = default); } } \ No newline at end of file diff --git a/src/NServiceBus.AwsLambda.SQS/TransportWrapper/PipelineInvoker.cs b/src/NServiceBus.AwsLambda.SQS/TransportWrapper/PipelineInvoker.cs index 8a9fe14..ae40790 100644 --- a/src/NServiceBus.AwsLambda.SQS/TransportWrapper/PipelineInvoker.cs +++ b/src/NServiceBus.AwsLambda.SQS/TransportWrapper/PipelineInvoker.cs @@ -30,11 +30,11 @@ Task IMessageReceiver.Initialize(PushRuntimeSettings limitations, OnMessage onMe Task IMessageReceiver.StopReceive(CancellationToken cancellationToken) => Task.CompletedTask; - public Task PushFailedMessage(ErrorContext errorContext) => onError(errorContext); + public Task PushFailedMessage(ErrorContext errorContext, CancellationToken cancellationToken = default) => onError(errorContext, cancellationToken); Task IMessageReceiver.ChangeConcurrency(PushRuntimeSettings limitations, CancellationToken cancellationToken) => Task.CompletedTask; - public Task PushMessage(MessageContext messageContext) => onMessage.Invoke(messageContext); + public Task PushMessage(MessageContext messageContext, CancellationToken cancellationToken = default) => onMessage.Invoke(messageContext, cancellationToken); OnMessage onMessage; OnError onError; diff --git a/src/NServiceBus.AwsLambda.SQS/TransportWrapper/SendOnlyMessageProcessor.cs b/src/NServiceBus.AwsLambda.SQS/TransportWrapper/SendOnlyMessageProcessor.cs index 5929d29..186fc1f 100644 --- a/src/NServiceBus.AwsLambda.SQS/TransportWrapper/SendOnlyMessageProcessor.cs +++ b/src/NServiceBus.AwsLambda.SQS/TransportWrapper/SendOnlyMessageProcessor.cs @@ -1,6 +1,7 @@ namespace NServiceBus.AwsLambda.SQS.TransportWrapper { using System; + using System.Threading; using System.Threading.Tasks; using Transport; @@ -8,10 +9,10 @@ class SendOnlyMessageProcessor : IMessageProcessor { public string ReceiveAddress => string.Empty; - public Task PushFailedMessage(ErrorContext errorContext) => throw new InvalidOperationException( + public Task PushFailedMessage(ErrorContext errorContext, CancellationToken cancellationToken = default) => throw new InvalidOperationException( $"This endpoint cannot process messages because it is configured in send-only mode. Remove the '{nameof(EndpointConfiguration)}.{nameof(EndpointConfiguration.SendOnly)}' configuration.'" ); - public Task PushMessage(MessageContext messageContext) => throw new InvalidOperationException( + public Task PushMessage(MessageContext messageContext, CancellationToken cancellationToken = default) => throw new InvalidOperationException( $"This endpoint cannot process messages because it is configured in send-only mode. Remove the '{nameof(EndpointConfiguration)}.{nameof(EndpointConfiguration.SendOnly)}' configuration.'" ); } diff --git a/src/NServiceBus.AwsLambda.SQS/TransportWrapper/ServerlessTransport.cs b/src/NServiceBus.AwsLambda.SQS/TransportWrapper/ServerlessTransport.cs index 2d4dc56..82b8baa 100644 --- a/src/NServiceBus.AwsLambda.SQS/TransportWrapper/ServerlessTransport.cs +++ b/src/NServiceBus.AwsLambda.SQS/TransportWrapper/ServerlessTransport.cs @@ -32,15 +32,6 @@ public ServerlessTransportInfrastructure GetTransportInfrastructure(IEndpointIns // IEndpointInstance is only required to guarantee that GetTransportInfrastructure can't be called before NServiceBus called Initialize. serverlessTransportInfrastructure; -#pragma warning disable CS0672 // Member overrides obsolete member -#pragma warning disable CS0618 // Type or member is obsolete - - public override string ToTransportAddress(QueueAddress address) => BaseTransport.ToTransportAddress(address); - -#pragma warning restore CS0618 // Type or member is obsolete -#pragma warning restore CS0672 // Member overrides obsolete member - - public override IReadOnlyCollection GetSupportedTransactionModes() => supportedTransactionModes; readonly TransportTransactionMode[] supportedTransactionModes = diff --git a/src/NServiceBus.AwsLambda.Sqs.sln b/src/NServiceBus.AwsLambda.Sqs.sln index 85fe6a1..04db16a 100644 --- a/src/NServiceBus.AwsLambda.Sqs.sln +++ b/src/NServiceBus.AwsLambda.Sqs.sln @@ -11,7 +11,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NServiceBus.AwsLambda.SQS.A EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{D637E9DB-45CF-400D-BC9B-D2B1BA892ED5}" ProjectSection(SolutionItems) = preProject + ..\.github\workflows\ci.yml = ..\.github\workflows\ci.yml Custom.Build.props = Custom.Build.props + ..\.github\workflows\release.yml = ..\.github\workflows\release.yml EndProjectSection EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NServiceBus.AwsLambda.SQS.Analyzer", "NServiceBus.AwsLambda.SQS.Analyzer\NServiceBus.AwsLambda.SQS.Analyzer.csproj", "{F38B0ACB-6689-4F92-A3AC-06FEAD6E96BB}"