From 5589407d37f896fa7034bd7256162b50ef18bccb Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Tue, 14 Mar 2023 16:28:03 -0700 Subject: [PATCH 01/19] Utilize AsyncPageable for query return types This change requires we copy some internal code from the Azure.Core library, but the public facing classes are the Azure.Core defined classes. --- .../IoTHubCertificateValidationE2ETest.cs | 3 +- .../iothub/service/QueryClientE2ETests.cs | 134 ++++++----- ...rovisioningCertificateValidationE2ETest.cs | 2 +- ...sioningServiceIndividualEnrollmentTests.cs | 2 +- .../getting started/JobsSample/JobsSample.cs | 7 +- .../CleanupDevicesSample.cs | 13 +- .../RegistryManagerSample.cs | 18 +- .../service/src/Jobs/ScheduledJobsClient.cs | 3 +- .../service/src/Query/Models/QueriedPage.cs | 4 +- iothub/service/src/Query/PageableHelpers.cs | 62 +++++ iothub/service/src/Query/QueryClient.cs | 178 ++++++++------ iothub/service/src/Query/QueryResponse.cs | 221 +++++------------- iothub/service/tests/QueryClientTests.cs | 14 +- iothub/service/tests/QueryResponseTests.cs | 48 ---- 14 files changed, 334 insertions(+), 375 deletions(-) create mode 100644 iothub/service/src/Query/PageableHelpers.cs delete mode 100644 iothub/service/tests/QueryResponseTests.cs diff --git a/e2e/test/iothub/service/IoTHubCertificateValidationE2ETest.cs b/e2e/test/iothub/service/IoTHubCertificateValidationE2ETest.cs index a07a7b801e..c74953beef 100644 --- a/e2e/test/iothub/service/IoTHubCertificateValidationE2ETest.cs +++ b/e2e/test/iothub/service/IoTHubCertificateValidationE2ETest.cs @@ -5,6 +5,7 @@ using System.Net; using System.Security.Authentication; using System.Threading.Tasks; +using Azure; using FluentAssertions; using Microsoft.Azure.Devices.Client; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -23,7 +24,7 @@ public async Task ServiceClient_QueryDevicesInvalidServiceCertificateHttp_Fails( using var sc = new IotHubServiceClient(TestConfiguration.IotHub.ConnectionStringInvalidServiceCertificate); // act - Func act = async () => await sc.Query.CreateAsync("select * from devices").ConfigureAwait(false); + Func act = async () => await sc.Query.CreateAsync("select * from devices").GetAsyncEnumerator().MoveNextAsync().ConfigureAwait(false); // assert var error = await act.Should().ThrowAsync(); diff --git a/e2e/test/iothub/service/QueryClientE2ETests.cs b/e2e/test/iothub/service/QueryClientE2ETests.cs index c94f34011a..6cc167f640 100644 --- a/e2e/test/iothub/service/QueryClientE2ETests.cs +++ b/e2e/test/iothub/service/QueryClientE2ETests.cs @@ -7,6 +7,7 @@ using System.Net; using System.Threading; using System.Threading.Tasks; +using Azure; using FluentAssertions; using Microsoft.Azure.Devices.E2ETests.Helpers; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -44,19 +45,20 @@ public async Task TwinQuery_Works() await WaitForDevicesToBeQueryableAsync(serviceClient.Query, queryText, 2).ConfigureAwait(false); - QueryResponse queryResponse = await serviceClient.Query.CreateAsync(queryText).ConfigureAwait(false); + AsyncPageable queryResponse = serviceClient.Query + .CreateAsync(queryText); + IAsyncEnumerator enumerator = queryResponse.GetAsyncEnumerator(); // assert - - (await queryResponse.MoveNextAsync().ConfigureAwait(false)).Should().BeTrue("Should have at least one page of jobs."); - ClientTwin firstQueriedTwin = queryResponse.Current; + (await enumerator.MoveNextAsync().ConfigureAwait(false)).Should().BeTrue("Should have at least one page of jobs."); + ClientTwin firstQueriedTwin = enumerator.Current; firstQueriedTwin.DeviceId.Should().BeOneOf(testDevice1.Id, testDevice2.Id); - (await queryResponse.MoveNextAsync().ConfigureAwait(false)).Should().BeTrue(); - ClientTwin secondQueriedTwin = queryResponse.Current; + (await enumerator.MoveNextAsync().ConfigureAwait(false)).Should().BeTrue(); + ClientTwin secondQueriedTwin = enumerator.Current; secondQueriedTwin.DeviceId.Should().BeOneOf(testDevice1.Id, testDevice2.Id); secondQueriedTwin.DeviceId.Should().NotBe(firstQueriedTwin.DeviceId); - (await queryResponse.MoveNextAsync().ConfigureAwait(false)).Should().BeFalse(); + (await enumerator.MoveNextAsync().ConfigureAwait(false)).Should().BeFalse(); } [TestMethod] @@ -80,37 +82,42 @@ public async Task TwinQuery_CustomPaginationWorks() await WaitForDevicesToBeQueryableAsync(serviceClient.Query, queryText, 3).ConfigureAwait(false); - QueryResponse queryResponse = await serviceClient.Query.CreateAsync(queryText, firstPageOptions).ConfigureAwait(false); + AsyncPageable queryResponse = serviceClient.Query. + CreateAsync(queryText, firstPageOptions); + IAsyncEnumerator> enumerator = queryResponse.AsPages().GetAsyncEnumerator(); + (await enumerator.MoveNextAsync().ConfigureAwait(false)).Should().BeTrue("Should have at least one page of jobs."); // assert + Page currentPage = enumerator.Current; + currentPage.Values.Count.Should().Be(1); + currentPage.Values[0].DeviceId.Should().BeOneOf(testDevice1.Id, testDevice2.Id, testDevice3.Id); - queryResponse.CurrentPage.Count().Should().Be(1); - ClientTwin firstQueriedTwin = queryResponse.CurrentPage.First(); - firstQueriedTwin.DeviceId.Should().BeOneOf(testDevice1.Id, testDevice2.Id, testDevice3.Id); + + // restart the query, but with a page size of 3 this time + queryResponse = serviceClient.Query. + CreateAsync(queryText); + enumerator = queryResponse.AsPages(null, 3).GetAsyncEnumerator(); // consume the first page of results so the next MoveNextAsync gets a new page - (await queryResponse.MoveNextAsync().ConfigureAwait(false)).Should().BeTrue("Should have at least one page of jobs."); + (await enumerator.MoveNextAsync().ConfigureAwait(false)).Should().BeTrue("Should have at least one page of jobs."); - var secondPageOptions = new QueryOptions - { - PageSize = 2 - }; + currentPage = enumerator.Current; + currentPage.Values.Count.Should().Be(3); + IEnumerator pageContentsEnumerator = currentPage.Values.GetEnumerator(); + pageContentsEnumerator.MoveNext().Should().BeTrue(); - (await queryResponse.MoveNextAsync(secondPageOptions).ConfigureAwait(false)).Should().BeTrue(); - queryResponse.CurrentPage.Count().Should().Be(2); - IEnumerator secondPageEnumerator = queryResponse.CurrentPage.GetEnumerator(); - secondPageEnumerator.MoveNext().Should().BeTrue(); - ClientTwin secondQueriedTwin = secondPageEnumerator.Current; + ClientTwin firstQueriedTwin = pageContentsEnumerator.Current; + firstQueriedTwin.DeviceId.Should().BeOneOf(testDevice1.Id, testDevice2.Id, testDevice3.Id); + pageContentsEnumerator.MoveNext().Should().BeTrue(); + + ClientTwin secondQueriedTwin = pageContentsEnumerator.Current; secondQueriedTwin.DeviceId.Should().BeOneOf(testDevice1.Id, testDevice2.Id, testDevice3.Id); - secondQueriedTwin.DeviceId.Should().NotBe(firstQueriedTwin.DeviceId); + pageContentsEnumerator.MoveNext().Should().BeTrue(); - secondPageEnumerator.MoveNext().Should().BeTrue(); - ClientTwin thirdQueriedTwin = secondPageEnumerator.Current; + ClientTwin thirdQueriedTwin = pageContentsEnumerator.Current; thirdQueriedTwin.DeviceId.Should().BeOneOf(testDevice1.Id, testDevice2.Id, testDevice3.Id); - thirdQueriedTwin.DeviceId.Should().NotBe(firstQueriedTwin.DeviceId); - thirdQueriedTwin.DeviceId.Should().NotBe(secondQueriedTwin.DeviceId); - secondPageEnumerator.MoveNext().Should().BeFalse(); + (await enumerator.MoveNextAsync().ConfigureAwait(false)).Should().BeFalse("After 3 query results in one page, there should not be a second page"); } [TestMethod] @@ -137,15 +144,13 @@ public async Task TwinQuery_IterateByItemAcrossPages() await WaitForDevicesToBeQueryableAsync(serviceClient.Query, queryText, 3).ConfigureAwait(false); - QueryResponse twinQuery = await serviceClient.Query - .CreateAsync(queryText, queryOptions) - .ConfigureAwait(false); + AsyncPageable twinQuery = serviceClient.Query. + CreateAsync(queryText, queryOptions); // assert - List returnedTwinDeviceIds = new(); - while (await twinQuery.MoveNextAsync().ConfigureAwait(false)) + var returnedTwinDeviceIds = new List(); + await foreach (ClientTwin queriedTwin in twinQuery) { - ClientTwin queriedTwin = twinQuery.Current; returnedTwinDeviceIds.Add(queriedTwin.DeviceId); } @@ -178,15 +183,13 @@ public async Task TwinQuery_IterateByItemWorksWithinPage() await WaitForDevicesToBeQueryableAsync(serviceClient.Query, queryText, 3).ConfigureAwait(false); - QueryResponse twinQuery = await serviceClient.Query - .CreateAsync(queryText, queryOptions) - .ConfigureAwait(false); + AsyncPageable twinQuery = serviceClient.Query + .CreateAsync(queryText, queryOptions); // assert - List returnedTwinDeviceIds = new(); - while (await twinQuery.MoveNextAsync().ConfigureAwait(false)) + var returnedTwinDeviceIds = new List(); + await foreach (ClientTwin queriedTwin in twinQuery) { - ClientTwin queriedTwin = twinQuery.Current; returnedTwinDeviceIds.Add(queriedTwin.DeviceId); } @@ -208,9 +211,10 @@ public async Task JobQuery_QueryWorks() await WaitForJobToBeQueryableAsync(serviceClient.Query, query, 1).ConfigureAwait(false); - QueryResponse queryResponse = await serviceClient.Query.CreateAsync(query).ConfigureAwait(false); - (await queryResponse.MoveNextAsync().ConfigureAwait(false)).Should().BeTrue("Should have at least one page of jobs."); - ScheduledJob queriedJob = queryResponse.Current; + AsyncPageable queryResponse = serviceClient.Query.CreateAsync(query); + IAsyncEnumerator enumerator = queryResponse.GetAsyncEnumerator(); + (await enumerator.MoveNextAsync().ConfigureAwait(false)).Should().BeTrue("Should have at least one page of jobs."); + ScheduledJob queriedJob = enumerator.Current; // Each IoT hub has a low limit for the number of parallel jobs allowed. Because of that, // tests in this suite are written to work even if the queried job isn't the one they created. @@ -233,12 +237,13 @@ public async Task JobQuery_QueryByTypeWorks() var serviceClient = TestDevice.ServiceClient; await using TestDevice testDevice = await TestDevice.GetTestDeviceAsync(_idPrefix).ConfigureAwait(false); - await QueryClientE2ETests.ScheduleJobToBeQueriedAsync(serviceClient.ScheduledJobs, testDevice.Id).ConfigureAwait(false); + await ScheduleJobToBeQueriedAsync(serviceClient.ScheduledJobs, testDevice.Id).ConfigureAwait(false); await WaitForJobToBeQueryableAsync(serviceClient.Query, 1, null, null).ConfigureAwait(false); - QueryResponse queryResponse = await serviceClient.Query.CreateJobsQueryAsync().ConfigureAwait(false); - (await queryResponse.MoveNextAsync().ConfigureAwait(false)).Should().BeTrue("Should have at least one page of jobs."); - ScheduledJob queriedJob = queryResponse.Current; + AsyncPageable queryResponse = serviceClient.Query.CreateJobsQueryAsync(); + IAsyncEnumerator enumerator = queryResponse.GetAsyncEnumerator(); + (await enumerator.MoveNextAsync().ConfigureAwait(false)).Should().BeTrue("Should have at least one page of jobs."); + ScheduledJob queriedJob = enumerator.Current; // Each IoT hub has a low limit for the number of parallel jobs allowed. Because of that, // tests in this suite are written to work even if the queried job isn't the one they created. @@ -259,9 +264,10 @@ public async Task RawQuery_QueryWorks() string query = "SELECT COUNT() as TotalNumberOfDevices FROM devices"; - QueryResponse queryResponse = await serviceClient.Query.CreateAsync(query).ConfigureAwait(false); - (await queryResponse.MoveNextAsync().ConfigureAwait(false)).Should().BeTrue("Should have at least one page of jobs."); - RawQuerySerializationClass queriedJob = queryResponse.Current; + AsyncPageable queryResponse = serviceClient.Query.CreateAsync(query); + IAsyncEnumerator enumerator = queryResponse.GetAsyncEnumerator(); + (await enumerator.MoveNextAsync().ConfigureAwait(false)).Should().BeTrue("Should have at least one page of jobs."); + RawQuerySerializationClass queriedJob = enumerator.Current; queriedJob.TotalNumberOfDevices.Should().BeGreaterOrEqualTo(0); } @@ -271,11 +277,15 @@ private async Task WaitForDevicesToBeQueryableAsync(QueryClient queryClient, str // so keep executing the query until both devices are returned in the results or until a timeout. using var cancellationTokenSource = new CancellationTokenSource(_queryableDelayTimeout); CancellationToken cancellationToken = cancellationTokenSource.Token; - QueryResponse queryResponse = await queryClient.CreateAsync(query).ConfigureAwait(false); - while (queryResponse.CurrentPage.Count() < expectedCount) + AsyncPageable queryResponse = queryClient.CreateAsync(query); + IAsyncEnumerator> enumerator = queryResponse.AsPages().GetAsyncEnumerator(); + await enumerator.MoveNextAsync(); + while (enumerator.Current.Values.Count < expectedCount) { await Task.Delay(100).ConfigureAwait(false); - queryResponse = await queryClient.CreateAsync(query).ConfigureAwait(false); + queryResponse = queryClient.CreateAsync(query); + enumerator = queryResponse.AsPages().GetAsyncEnumerator(); + await enumerator.MoveNextAsync().ConfigureAwait(false); cancellationToken.ThrowIfCancellationRequested(); // timed out waiting for the devices to become queryable } } @@ -285,12 +295,16 @@ private async Task WaitForJobToBeQueryableAsync(QueryClient queryClient, string // There is some latency between the creation of the test devices and when they are queryable, // so keep executing the query until both devices are returned in the results or until a timeout. using var cancellationTokenSource = new CancellationTokenSource(_queryableDelayTimeout); - QueryResponse queryResponse = await queryClient.CreateAsync(query).ConfigureAwait(false); - while (queryResponse.CurrentPage.Count() < expectedCount) + AsyncPageable queryResponse = queryClient.CreateAsync(query); + IAsyncEnumerator> enumerator = queryResponse.AsPages().GetAsyncEnumerator(); + await enumerator.MoveNextAsync().ConfigureAwait(false); + while (enumerator.Current.Values.Count < expectedCount) { cancellationTokenSource.Token.IsCancellationRequested.Should().BeFalse("timed out waiting for the devices to become queryable"); await Task.Delay(100).ConfigureAwait(false); - queryResponse = await queryClient.CreateAsync(query).ConfigureAwait(false); + queryResponse = queryClient.CreateAsync(query); + enumerator = queryResponse.AsPages().GetAsyncEnumerator(); + await enumerator.MoveNextAsync().ConfigureAwait(false); } } @@ -305,12 +319,16 @@ private async Task WaitForJobToBeQueryableAsync(QueryClient queryClient, int exp JobType = jobType, JobStatus = status, }; - QueryResponse queryResponse = await queryClient.CreateJobsQueryAsync(options).ConfigureAwait(false); - while (queryResponse.CurrentPage.Count() < expectedCount) + AsyncPageable queryResponse = queryClient.CreateJobsQueryAsync(options); + IAsyncEnumerator> enumerator = queryResponse.AsPages().GetAsyncEnumerator(); + await enumerator.MoveNextAsync().ConfigureAwait(false); + while (enumerator.Current.Values.Count < expectedCount) { cancellationTokenSource.Token.IsCancellationRequested.Should().BeFalse("timed out waiting for the devices to become queryable"); await Task.Delay(100).ConfigureAwait(false); - queryResponse = await queryClient.CreateJobsQueryAsync(options).ConfigureAwait(false); + queryResponse = queryClient.CreateJobsQueryAsync(options); + enumerator = queryResponse.AsPages().GetAsyncEnumerator(); + await enumerator.MoveNextAsync().ConfigureAwait(false); } } diff --git a/e2e/test/provisioning/ProvisioningCertificateValidationE2ETest.cs b/e2e/test/provisioning/ProvisioningCertificateValidationE2ETest.cs index 8cbe003904..fc7c1e840e 100644 --- a/e2e/test/provisioning/ProvisioningCertificateValidationE2ETest.cs +++ b/e2e/test/provisioning/ProvisioningCertificateValidationE2ETest.cs @@ -37,7 +37,7 @@ public async Task ProvisioningServiceClient_QueryInvalidServiceCertificateHttp_F { // arrange using var provisioningServiceClient = new ProvisioningServiceClient(TestConfiguration.Provisioning.ConnectionStringInvalidServiceCertificate); - Query q = provisioningServiceClient.EnrollmentGroups.CreateQuery("SELECT * FROM enrollmentGroups"); + Devices.Provisioning.Service.Query q = provisioningServiceClient.EnrollmentGroups.CreateQuery("SELECT * FROM enrollmentGroups"); // act Func act = async () => await q.NextAsync(); diff --git a/e2e/test/provisioning/ProvisioningServiceIndividualEnrollmentTests.cs b/e2e/test/provisioning/ProvisioningServiceIndividualEnrollmentTests.cs index 2ccac3e2cf..b741e48fce 100644 --- a/e2e/test/provisioning/ProvisioningServiceIndividualEnrollmentTests.cs +++ b/e2e/test/provisioning/ProvisioningServiceIndividualEnrollmentTests.cs @@ -226,7 +226,7 @@ private static async Task ProvisioningServiceClient_IndividualEnrollments_Query_ using var provisioningServiceClient = new ProvisioningServiceClient(TestConfiguration.Provisioning.ConnectionString, options); string queryString = "SELECT * FROM enrollments"; - Query query = provisioningServiceClient.IndividualEnrollments.CreateQuery(queryString); + Devices.Provisioning.Service.Query query = provisioningServiceClient.IndividualEnrollments.CreateQuery(queryString); while (query.HasNext()) { QueryResult queryResult = await query.NextAsync().ConfigureAwait(false); diff --git a/iothub/service/samples/getting started/JobsSample/JobsSample.cs b/iothub/service/samples/getting started/JobsSample/JobsSample.cs index 17fa04ff0f..663015e7fd 100644 --- a/iothub/service/samples/getting started/JobsSample/JobsSample.cs +++ b/iothub/service/samples/getting started/JobsSample/JobsSample.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Text.Json; using System.Threading.Tasks; +using Azure; namespace Microsoft.Azure.Devices.Samples.JobsSample { @@ -67,11 +68,9 @@ public async Task RunSampleAsync() } // *************************************** Get all Jobs *************************************** - QueryResponse queryResults = await _jobClient.ScheduledJobs.CreateQueryAsync(); + AsyncPageable queryResults = _jobClient.ScheduledJobs.CreateQueryAsync(); - IEnumerable getJobs = queryResults.CurrentPage; - - foreach (ScheduledJob job in getJobs) + await foreach (ScheduledJob job in queryResults) { Console.WriteLine(JsonSerializer.Serialize(job, new JsonSerializerOptions { WriteIndented = true })); } diff --git a/iothub/service/samples/how to guides/CleanupDevicesSample/CleanupDevicesSample.cs b/iothub/service/samples/how to guides/CleanupDevicesSample/CleanupDevicesSample.cs index 85c0b44764..be433164d2 100644 --- a/iothub/service/samples/how to guides/CleanupDevicesSample/CleanupDevicesSample.cs +++ b/iothub/service/samples/how to guides/CleanupDevicesSample/CleanupDevicesSample.cs @@ -8,6 +8,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; +using Azure; using Azure.Storage.Blobs; using Azure.Storage.Sas; using Newtonsoft.Json; @@ -173,14 +174,14 @@ private async Task PrintDeviceCountAsync() try { string countSqlQuery = "select count() AS numberOfDevices from devices"; - QueryResponse> countQuery = await _hubClient.Query.CreateAsync>(countSqlQuery); - if (!await countQuery.MoveNextAsync()) + AsyncPageable> countQuery = _hubClient.Query.CreateAsync>(countSqlQuery); + if (!await countQuery.GetAsyncEnumerator().MoveNextAsync()) { Console.WriteLine($"Failed to run device count query."); return 0; } - if (!countQuery.Current.TryGetValue("numberOfDevices", out deviceCount)) + if (!countQuery.GetAsyncEnumerator().Current.TryGetValue("numberOfDevices", out deviceCount)) { Console.WriteLine($"Failed to get device count from query result."); return 0; @@ -225,10 +226,10 @@ private async Task> GetDeviceIdsToDeleteAsync( string queryText = queryTextSb.ToString(); Console.WriteLine($"Using query: {queryText}"); var options = new QueryOptions { PageSize = 1000 }; - QueryResponse devicesQuery = await _hubClient.Query.CreateAsync(queryText, options); - while (await devicesQuery.MoveNextAsync()) + AsyncPageable devicesQuery = _hubClient.Query.CreateAsync(queryText, options); + await foreach (DeviceQueryResult queryResult in devicesQuery) { - devicesToDelete.Add(new ExportImportDevice(new Device(devicesQuery.Current.DeviceId), ImportMode.Delete)); + devicesToDelete.Add(new ExportImportDevice(new Device(queryResult.DeviceId), ImportMode.Delete)); } return devicesToDelete; diff --git a/iothub/service/samples/how to guides/RegistryManagerSample/RegistryManagerSample.cs b/iothub/service/samples/how to guides/RegistryManagerSample/RegistryManagerSample.cs index 43ddfeff3e..fdcfdf5d75 100644 --- a/iothub/service/samples/how to guides/RegistryManagerSample/RegistryManagerSample.cs +++ b/iothub/service/samples/how to guides/RegistryManagerSample/RegistryManagerSample.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; +using Azure; namespace Microsoft.Azure.Devices.Samples { @@ -158,20 +159,19 @@ private async Task EnumerateTwinsAsync() string queryText = $"SELECT * FROM devices WHERE STARTSWITH(id, '{_parameters.DevicePrefix}')"; Console.WriteLine($"Using query text of: {queryText}"); - QueryResponse query = await _client.Query.CreateAsync(queryText); + AsyncPageable query = _client.Query.CreateAsync(queryText); - while (await query.MoveNextAsync()) + await foreach (ClientTwin queriedTwin in query) { - ClientTwin twin = query.Current; - Console.WriteLine($"{twin.DeviceId}"); - Console.WriteLine($"\tIs edge: {twin.Capabilities.IsIotEdge}"); - if (!string.IsNullOrWhiteSpace(twin.DeviceScope)) + Console.WriteLine($"{queriedTwin.DeviceId}"); + Console.WriteLine($"\tIs edge: {queriedTwin.Capabilities.IsIotEdge}"); + if (!string.IsNullOrWhiteSpace(queriedTwin.DeviceScope)) { - Console.WriteLine($"\tDevice scope: {twin.DeviceScope}"); + Console.WriteLine($"\tDevice scope: {queriedTwin.DeviceScope}"); } - if (twin.ParentScopes?.Any() ?? false) + if (queriedTwin.ParentScopes?.Any() ?? false) { - Console.WriteLine($"\tParent scope: {twin.ParentScopes[0]}"); + Console.WriteLine($"\tParent scope: {queriedTwin.ParentScopes[0]}"); } } } diff --git a/iothub/service/src/Jobs/ScheduledJobsClient.cs b/iothub/service/src/Jobs/ScheduledJobsClient.cs index 710181fb5e..1ebf3f2a50 100644 --- a/iothub/service/src/Jobs/ScheduledJobsClient.cs +++ b/iothub/service/src/Jobs/ScheduledJobsClient.cs @@ -7,6 +7,7 @@ using System.Net.Http; using System.Threading; using System.Threading.Tasks; +using Azure; namespace Microsoft.Azure.Devices { @@ -121,7 +122,7 @@ await _internalRetryHandler /// For a complete list of possible error cases, see . /// /// If the provided has requested cancellation. - public virtual Task> CreateQueryAsync(JobQueryOptions options = null, CancellationToken cancellationToken = default) + public virtual AsyncPageable CreateQueryAsync(JobQueryOptions options = null, CancellationToken cancellationToken = default) { return _queryClient.CreateJobsQueryAsync(options, cancellationToken); } diff --git a/iothub/service/src/Query/Models/QueriedPage.cs b/iothub/service/src/Query/Models/QueriedPage.cs index 7883e25ef8..f5120f57e2 100644 --- a/iothub/service/src/Query/Models/QueriedPage.cs +++ b/iothub/service/src/Query/Models/QueriedPage.cs @@ -18,12 +18,12 @@ internal sealed class QueriedPage // in an async function. internal QueriedPage(HttpResponseMessage response, string payload) { - Items = JsonConvert.DeserializeObject>(payload); + Items = JsonConvert.DeserializeObject>(payload); ContinuationToken = response.Headers.SafeGetValue(ContinuationTokenHeader); } [JsonProperty("items")] - internal IEnumerable Items { get; set; } + internal IReadOnlyList Items { get; set; } [JsonProperty("continuationToken")] internal string ContinuationToken { get; set; } diff --git a/iothub/service/src/Query/PageableHelpers.cs b/iothub/service/src/Query/PageableHelpers.cs new file mode 100644 index 0000000000..a093a27321 --- /dev/null +++ b/iothub/service/src/Query/PageableHelpers.cs @@ -0,0 +1,62 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Azure; + +namespace Microsoft.Azure.Devices.Query +{ + /// + /// Copy of a subset of the helper functions defined in the Azure.Core class by the same name: + /// https://github.com/Azure/autorest.csharp/blob/main/src/assets/Generator.Shared/PageableHelpers.cs + /// + internal class PageableHelpers + { +#pragma warning disable CS8632 // The annotation for nullable reference types should only be used in code within a '#nullable' annotations context. + public static AsyncPageable CreateAsyncEnumerable(Func>> firstPageFunc, Func>>? nextPageFunc, int? pageSize = default) where T : notnull + { + AsyncPageFunc first = (continuationToken, pageSizeHint) => firstPageFunc(pageSizeHint); + AsyncPageFunc? next = nextPageFunc != null ? new AsyncPageFunc(nextPageFunc) : null; + return new FuncAsyncPageable(first, next, pageSize); + } + + internal delegate Task> AsyncPageFunc(string? continuationToken = default, int? pageSizeHint = default); + internal delegate Page PageFunc(string? continuationToken = default, int? pageSizeHint = default); + + internal class FuncAsyncPageable : AsyncPageable where T : notnull + { + private readonly AsyncPageFunc _firstPageFunc; + private readonly AsyncPageFunc? _nextPageFunc; + private readonly int? _defaultPageSize; + + public FuncAsyncPageable(AsyncPageFunc firstPageFunc, AsyncPageFunc? nextPageFunc, int? defaultPageSize = default) + { + _firstPageFunc = firstPageFunc; + _nextPageFunc = nextPageFunc; + _defaultPageSize = defaultPageSize; + } + + public override async IAsyncEnumerable> AsPages(string? continuationToken = default, int? pageSizeHint = default) + { + AsyncPageFunc? pageFunc = string.IsNullOrEmpty(continuationToken) ? _firstPageFunc : _nextPageFunc; + + if (pageFunc == null) + { + yield break; + } + + int? pageSize = pageSizeHint ?? _defaultPageSize; + do + { + Page pageResponse = await pageFunc(continuationToken, pageSize).ConfigureAwait(false); + yield return pageResponse; + continuationToken = pageResponse.ContinuationToken; + pageFunc = _nextPageFunc; + } while (!string.IsNullOrEmpty(continuationToken) && pageFunc != null); + } + } + } +#pragma warning restore CS8632 // The annotation for nullable reference types should only be used in code within a '#nullable' annotations context. +} diff --git a/iothub/service/src/Query/QueryClient.cs b/iothub/service/src/Query/QueryClient.cs index bd2ba0a019..b2d7ce4473 100644 --- a/iothub/service/src/Query/QueryClient.cs +++ b/iothub/service/src/Query/QueryClient.cs @@ -7,6 +7,10 @@ using System.Net.Http.Headers; using System.Threading; using System.Threading.Tasks; +using Azure; +using Azure.Core; +using Microsoft.Azure.Devices.Query; +using static Microsoft.Azure.Devices.Amqp.AmqpClientHelper; namespace Microsoft.Azure.Devices { @@ -76,24 +80,22 @@ internal QueryClient( /// /// Iterate twins: /// - /// QueryResponse<Twin> queriedTwins = await iotHubServiceClient.Query.CreateAsync<Twin>("SELECT * FROM devices"); - /// while (await queriedTwins.MoveNextAsync()) + /// AsyncPageable<Twin> twinQuery = iotHubServiceClient.Query.CreateAsync<Twin>("SELECT * FROM devices"); + /// await foreach (Twin queriedTwin in twinQuery) /// { - /// Twin queriedTwin = queriedTwins.Current; /// Console.WriteLine(queriedTwin); /// } /// /// Or scheduled jobs: /// - /// QueryResponse<ScheduledJob> queriedJobs = await iotHubServiceClient.Query.CreateAsync<ScheduledJob>("SELECT * FROM devices.jobs"); - /// while (await queriedJobs.MoveNextAsync()) + /// AsyncPageable<ScheduledJob> jobsQuery = await iotHubServiceClient.Query.CreateAsync<ScheduledJob>("SELECT * FROM devices.jobs"); + /// await foreach (ScheduledJob queriedJob in jobsQuery) /// { - /// ScheduledJob queriedJob = queriedJobs.Current; /// Console.WriteLine(queriedJob); /// } /// /// - public virtual async Task> CreateAsync(string query, QueryOptions options = default, CancellationToken cancellationToken = default) + public virtual AsyncPageable CreateAsync(string query, QueryOptions options = default, CancellationToken cancellationToken = default) { if (Logging.IsEnabled) Logging.Enter(this, "Creating query.", nameof(CreateAsync)); @@ -104,45 +106,29 @@ public virtual async Task> CreateAsync(string query, QueryOp try { - using HttpRequestMessage request = _httpRequestMessageFactory.CreateRequest( - HttpMethod.Post, - s_queryUri, - _credentialProvider, - new QuerySpecification { Sql = query }); - request.Content.Headers.ContentType = new MediaTypeHeaderValue("application/json") { CharSet = "utf-8" }; - if (!string.IsNullOrWhiteSpace(options?.ContinuationToken)) + async Task> nextPageFunc(string continuationToken, int? pageSizeHint) { - request.Headers.Add(ContinuationTokenHeader, options?.ContinuationToken); - } + using HttpRequestMessage request = _httpRequestMessageFactory.CreateRequest( + HttpMethod.Post, + s_queryUri, + _credentialProvider, + new QuerySpecification { Sql = query }); - if (options?.PageSize != null) - { - request.Headers.Add(PageSizeHeader, options.PageSize.ToString()); + return await BuildAndSendRequest(request, options, continuationToken, pageSizeHint, cancellationToken); } - HttpResponseMessage response = null; - - await _internalRetryHandler - .RunWithRetryAsync( - async () => - { - response = await _httpClient.SendAsync(request, cancellationToken).ConfigureAwait(false); - }, - cancellationToken) - .ConfigureAwait(false); - - await HttpMessageHelper.ValidateHttpResponseStatusAsync(HttpStatusCode.OK, response).ConfigureAwait(false); - string responsePayload = await response.Content.ReadAsStringAsync().ConfigureAwait(false); - var page = new QueriedPage(response, responsePayload); - return new QueryResponse(this, query, page.Items, page.ContinuationToken, options?.PageSize); - } - catch (HttpRequestException ex) - { - if (Fx.ContainsAuthenticationException(ex)) + async Task> firstPageFunc(int? pageSizeHint) { - throw new IotHubServiceException(ex.Message, HttpStatusCode.Unauthorized, IotHubServiceErrorCode.IotHubUnauthorizedAccess, null, ex); + using HttpRequestMessage request = _httpRequestMessageFactory.CreateRequest( + HttpMethod.Post, + s_queryUri, + _credentialProvider, + new QuerySpecification { Sql = query }); + + return await BuildAndSendRequest(request, options, null, pageSizeHint, cancellationToken); } - throw new IotHubServiceException(ex.Message, HttpStatusCode.RequestTimeout, IotHubServiceErrorCode.Unknown, null, ex); + + return PageableHelpers.CreateAsyncEnumerable(firstPageFunc, nextPageFunc, options?.PageSize); } catch (Exception ex) when (Logging.IsEnabled) { @@ -170,14 +156,14 @@ await _internalRetryHandler /// If the provided cancellation token has requested cancellation. /// /// - /// QueryResponse<ScheduledJob> queriedJobs = await iotHubServiceClient.Query.CreateJobsQueryAsync(); - /// while (await queriedJobs.MoveNextAsync()) + /// AsyncPageable<ScheduledJob> jobsQuery = await iotHubServiceClient.Query.CreateJobsQueryAsync(); + /// await foreach (ScheduledJob scheduledJob in jobsQuery) /// { - /// Console.WriteLine(queriedJobs.Current.JobId); + /// Console.WriteLine(scheduledJob.JobId); /// } /// /// - public virtual async Task> CreateJobsQueryAsync(JobQueryOptions options = default, CancellationToken cancellationToken = default) + public virtual AsyncPageable CreateJobsQueryAsync(JobQueryOptions options = default, CancellationToken cancellationToken = default) { if (Logging.IsEnabled) Logging.Enter(this, $"Creating query with jobType: {options?.JobType}, jobStatus: {options?.JobStatus}, pageSize: {options?.PageSize}", nameof(CreateAsync)); @@ -186,38 +172,31 @@ public virtual async Task> CreateJobsQueryAsync(JobQ try { - using HttpRequestMessage request = _httpRequestMessageFactory.CreateRequest( - HttpMethod.Get, - s_jobsQueryFormat, - _credentialProvider, - null, - BuildQueryJobQueryString(options)); - - if (!string.IsNullOrWhiteSpace(options?.ContinuationToken)) + async Task> nextPageFunc(string continuationToken, int? pageSizeHint) { - request.Headers.Add(ContinuationTokenHeader, options?.ContinuationToken); - } + using HttpRequestMessage request = _httpRequestMessageFactory.CreateRequest( + HttpMethod.Get, + s_jobsQueryFormat, + _credentialProvider, + null, + BuildQueryJobQueryString(options)); - if (options?.PageSize != null) - { - request.Headers.Add(PageSizeHeader, options.PageSize.ToString()); + return await BuildAndSendRequest(request, options, continuationToken, pageSizeHint, cancellationToken); } - HttpResponseMessage response = null; + async Task> firstPageFunc(int? pageSizeHint) + { + using HttpRequestMessage request = _httpRequestMessageFactory.CreateRequest( + HttpMethod.Get, + s_jobsQueryFormat, + _credentialProvider, + null, + BuildQueryJobQueryString(options)); - await _internalRetryHandler - .RunWithRetryAsync( - async () => - { - response = await _httpClient.SendAsync(request, cancellationToken).ConfigureAwait(false); - }, - cancellationToken) - .ConfigureAwait(false); + return await BuildAndSendRequest(request, options, null, pageSizeHint, cancellationToken); + } - await HttpMessageHelper.ValidateHttpResponseStatusAsync(HttpStatusCode.OK, response).ConfigureAwait(false); - string responsePayload = await response.Content.ReadAsStringAsync().ConfigureAwait(false); - var page = new QueriedPage(response, responsePayload); - return new QueryResponse(this, options?.JobType, options?.JobStatus, page.Items, page.ContinuationToken, options?.PageSize); + return PageableHelpers.CreateAsyncEnumerable(firstPageFunc, nextPageFunc, options?.PageSize); } catch (HttpRequestException ex) { @@ -239,6 +218,65 @@ await _internalRetryHandler } } + private async Task> BuildAndSendRequest(HttpRequestMessage request, QueryOptions options, string continuationToken, int? pageSizeHint, CancellationToken cancellationToken) + { + if (!string.IsNullOrWhiteSpace(options?.ContinuationToken)) + { + request.Headers.Add(ContinuationTokenHeader, options?.ContinuationToken); + } + else if (!string.IsNullOrWhiteSpace(continuationToken)) + { + request.Headers.Add(ContinuationTokenHeader, continuationToken); + } + + if (options?.PageSize != null) + { + request.Headers.Add(PageSizeHeader, options.PageSize.ToString()); + } + else if (pageSizeHint != null) + { + request.Headers.Add(PageSizeHeader, pageSizeHint.ToString()); + } + + if (request.Content != null) + { + request.Content.Headers.ContentType = new MediaTypeHeaderValue("application/json") + { + CharSet = "utf-8" + }; + } + + HttpResponseMessage response = null; + + try + { + await _internalRetryHandler + .RunWithRetryAsync( + async () => + { + response = await _httpClient.SendAsync(request, cancellationToken).ConfigureAwait(false); + }, + cancellationToken) + .ConfigureAwait(false); + } + catch (HttpRequestException ex) + { + if (Fx.ContainsAuthenticationException(ex)) + { + throw new IotHubServiceException(ex.Message, HttpStatusCode.Unauthorized, IotHubServiceErrorCode.IotHubUnauthorizedAccess, null, ex); + } + throw new IotHubServiceException(ex.Message, HttpStatusCode.RequestTimeout, IotHubServiceErrorCode.Unknown, null, ex); + } + + await HttpMessageHelper.ValidateHttpResponseStatusAsync(HttpStatusCode.OK, response).ConfigureAwait(false); + string responsePayload = await response.Content.ReadAsStringAsync().ConfigureAwait(false); + QueriedPage page = new QueriedPage(response, responsePayload); +#pragma warning disable CA2000 // Dispose objects before losing scope + // The disposable QueryResponse object is the user's responsibility, not the SDK's + return Page.FromValues(page.Items, page.ContinuationToken, new QueryResponse(response)); +#pragma warning restore CA2000 // Dispose objects before losing scope + } + private static string BuildQueryJobQueryString(JobQueryOptions options) { string queryString = ""; diff --git a/iothub/service/src/Query/QueryResponse.cs b/iothub/service/src/Query/QueryResponse.cs index 20379bcd9a..d5147955c0 100644 --- a/iothub/service/src/Query/QueryResponse.cs +++ b/iothub/service/src/Query/QueryResponse.cs @@ -3,194 +3,79 @@ using System; using System.Collections.Generic; -using System.Diagnostics; -using System.Threading; -using System.Threading.Tasks; +using System.IO; +using System.Linq; +using System.Net.Http; +using Azure; +using Azure.Core; -namespace Microsoft.Azure.Devices +namespace Microsoft.Azure.Devices.Query { - /// - /// An iterable set of queried items. - /// - /// - /// The type of the queried items. For instance, when using a query such as "SELECT * FROM devices", - /// this type should be type . When using a query such as "SELECT * FROM devices.jobs", - /// this type should be type . - /// - public class QueryResponse + internal class QueryResponse : Response { - private readonly QueryClient _client; - private readonly string _originalQuery; - private readonly JobType? _jobType; - private readonly JobStatus? _jobStatus; - private readonly int? _defaultPageSize; - private IEnumerator _items; + private HttpResponseMessage _httpResponse; + private List _httpHeaders; - internal QueryResponse( - QueryClient client, - string query, - IEnumerable queryResults, - string continuationToken, - int? defaultPageSize) - { - _client = client; - _originalQuery = query; - CurrentPage = queryResults; - _items = queryResults.GetEnumerator(); - ContinuationToken = continuationToken; - Current = _items.Current; - _defaultPageSize = defaultPageSize; - } + internal QueryResponse(HttpResponseMessage httpResponse) + { + _httpResponse = httpResponse; - internal QueryResponse( - QueryClient client, - JobType? jobType, - JobStatus? jobStatus, - IEnumerable queryResults, - string continuationToken, - int? defaultPageSize) - { - _client = client; - _jobType = jobType; - _jobStatus = jobStatus; - CurrentPage = queryResults; - _items = queryResults.GetEnumerator(); - ContinuationToken = continuationToken; - Current = _items.Current; - _defaultPageSize = defaultPageSize; + _httpHeaders = new List(); + foreach (var header in _httpResponse.Headers) + { + _httpHeaders.Add(new HttpHeader(header.Key, header.Value.First())); + } } - /// - /// Gets the continuation token to use for continuing the enumeration. - /// - /// - /// This library will handle this value for you automatically when fetching the next - /// pages of results. This value is exposed only for more unusual cases where users - /// choose to continue a previously interrupted query from a different machine, for example. - /// - public string ContinuationToken { get; internal set; } - - /// - /// The current page of queried items. - /// - /// - /// While you can iterate over the queried page of items using this, there is no logic - /// built into it that allows you to fetch the next page of results automatically. Because - /// of that, most users are better off following the sample code that iterates item by item - /// rather than page by page. - /// - /// - /// - /// QueryResponse<Twin> queriedTwins = await iotHubServiceClient.Query.CreateAsync<Twin>("SELECT * FROM devices"); - /// while (await queriedTwins.MoveNextAsync()) - /// { - /// Twin queriedTwin = queriedTwins.Current; - /// Console.WriteLine(queriedTwin); - /// } - /// - /// - public IEnumerable CurrentPage { get; internal set; } + public override int Status => (int)_httpResponse.StatusCode; //TODO check this - /// - /// Get the current item in the current page of the query results. Can be called multiple times without advancing the query. - /// - /// - /// Like with a more typical implementation of IEnumerator, this value is null until the first - /// call is made. - /// - /// - /// - /// QueryResponse<Twin> queriedTwins = await iotHubServiceClient.Query.CreateAsync<Twin>("SELECT * FROM devices"); - /// while (await queriedTwins.MoveNextAsync()) // no item is skipped by calling this first - /// { - /// Twin queriedTwin = queriedTwins.Current; - /// Console.WriteLine(queriedTwin); - /// } - /// - /// - public T Current { get; private set; } + public override string ReasonPhrase => _httpResponse.ReasonPhrase; - /// - /// Advances to the next element of the query results. - /// - /// True if there was a next item in the query results. False if there were no more items. - /// - /// If this method made a request to IoT hub to get the next page of items but IoT hub responded to - /// the request with a non-successful status code. For example, if the provided request was throttled, - /// with is thrown. For a complete list of possible error cases, - /// see . - /// - /// If the provided cancellation token has requested cancellation. - /// - /// - /// QueryResponse<Twin> queriedTwins = await iotHubServiceClient.Query.CreateAsync<Twin>("SELECT * FROM devices"); - /// while (await queriedTwins.MoveNextAsync()) - /// { - /// Twin queriedTwin = queriedTwins.Current; - /// Console.WriteLine(queriedTwin); - /// } - /// - /// - /// - /// Like with a more typical implementation of IEnumerator, this function should be called once before checking - /// . - /// - /// This function is async because it may make a service request to fetch the next page of results if the current page - /// of results has been advanced through already. Note that this function will return True even if it is at the end - /// of a particular page of items as long as there is at least one more page to be fetched. - /// - public async Task MoveNextAsync(QueryOptions queryOptions = default, CancellationToken cancellationToken = default) + public override Stream ContentStream { - cancellationToken.ThrowIfCancellationRequested(); + get => _httpResponse.Content.ReadAsStreamAsync().Result; + set => throw new NotImplementedException(); //TODO who needs this? + } + public override string ClientRequestId + { + get => throw new NotImplementedException(); + set => throw new NotImplementedException(); + } - if (_items.MoveNext()) - { - // Current page of results still had an item to return to the user. - Current = _items.Current; - Debug.Assert(_items.Current != null); - return true; - } + public override void Dispose() + { + _httpResponse?.Dispose(); + } - if (ContinuationToken == null && queryOptions?.ContinuationToken == null) - { - // The query has no more pages of results to return and the last page has been - // completely exhausted. - return false; - } + protected override bool ContainsHeader(string name) + { + return _httpResponse.Headers.Contains(name); + } - // User's can pass in a continuation token themselves, but the default behavior - // is to use the continuation token saved by this class when it last retrieved a page. - var queryOptionsClone = new JobQueryOptions - { - ContinuationToken = queryOptions?.ContinuationToken ?? ContinuationToken, - PageSize = queryOptions?.PageSize ?? _defaultPageSize, - JobType = _jobType, - JobStatus = _jobStatus, - }; + protected override IEnumerable EnumerateHeaders() + { + return _httpHeaders; + } - if (!string.IsNullOrEmpty(_originalQuery)) + protected override bool TryGetHeader(string name, out string value) + { + IEnumerable outVariableHeaders = new List(); + bool found = _httpResponse.Headers.TryGetValues(name, out outVariableHeaders); + if (found) { - QueryResponse response = await _client - .CreateAsync(_originalQuery, queryOptionsClone, cancellationToken) - .ConfigureAwait(false); - CurrentPage = response.CurrentPage; - _items = CurrentPage.GetEnumerator(); - _items.MoveNext(); - Current = _items.Current; - ContinuationToken = response.ContinuationToken; + value = outVariableHeaders.First(); } else { - // Job type and job status may still be null here, but that's okay - QueryResponse response = await _client - .CreateJobsQueryAsync(queryOptionsClone, cancellationToken) - .ConfigureAwait(false); - CurrentPage = (IEnumerable)response.CurrentPage; - Current = CurrentPage.GetEnumerator().Current; - ContinuationToken = response.ContinuationToken; + value = ""; } - return true; + return found; + } + + protected override bool TryGetHeaderValues(string name, out IEnumerable values) + { + return _httpResponse.Headers.TryGetValues(name, out values); } } } diff --git a/iothub/service/tests/QueryClientTests.cs b/iothub/service/tests/QueryClientTests.cs index 477fc63ace..8b33ecc87a 100644 --- a/iothub/service/tests/QueryClientTests.cs +++ b/iothub/service/tests/QueryClientTests.cs @@ -15,6 +15,7 @@ using FluentAssertions; using System.Collections; using Newtonsoft.Json.Linq; +using Azure; namespace Microsoft.Azure.Devices.Tests { @@ -67,10 +68,11 @@ public async Task QueryClient_CreateAsync() s_retryHandler); // act - QueryResponse response = await queryClient.CreateAsync(query); + AsyncPageable response = queryClient.CreateAsync(query); + await response.GetAsyncEnumerator().MoveNextAsync().ConfigureAwait(false); // assert - response.CurrentPage.First().DeviceId.Should().Be("foo"); + response.GetAsyncEnumerator().Current.DeviceId.Should().Be("foo"); } [TestMethod] @@ -80,7 +82,7 @@ public async Task QueryClient_CreateAsync_NullParamterThrows() using var serviceClient = new IotHubServiceClient(s_connectionString); // act - Func act = async () => await serviceClient.Query.CreateAsync(null); + Func act = async () => await serviceClient.Query.CreateAsync(null).GetAsyncEnumerator().MoveNextAsync(); // assert await act.Should().ThrowAsync(); @@ -122,7 +124,7 @@ public async Task QueryClient_CreateAsync_IotHubNotFound_ThrowsIotHubServiceExce // act // query returns HttpStatusCode.NotFound - Func act = async () => await queryClient.CreateAsync("SELECT * FROM devices"); + Func act = async () => await queryClient.CreateAsync("SELECT * FROM devices").GetAsyncEnumerator().MoveNextAsync(); // assert var error = await act.Should().ThrowAsync(); @@ -161,7 +163,7 @@ public async Task QueryClient_CreateJobsQueryAsync() s_retryHandler); // act - Func act = async () => await queryClient.CreateJobsQueryAsync(); + Func act = async () => await queryClient.CreateJobsQueryAsync().GetAsyncEnumerator().MoveNextAsync(); // assert await act.Should().NotThrowAsync(); @@ -202,7 +204,7 @@ public async Task QueryClient_CreateJobsQuery_IotHubNotFound_ThrowsIotHubService s_retryHandler); // act - Func act = async () => await queryClient.CreateJobsQueryAsync(); + Func act = async () => await queryClient.CreateJobsQueryAsync().GetAsyncEnumerator().MoveNextAsync(); // assert var error = await act.Should().ThrowAsync(); diff --git a/iothub/service/tests/QueryResponseTests.cs b/iothub/service/tests/QueryResponseTests.cs deleted file mode 100644 index 89c1706a69..0000000000 --- a/iothub/service/tests/QueryResponseTests.cs +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using FluentAssertions; -using Microsoft.VisualStudio.TestTools.UnitTesting; -using Moq; - -namespace Microsoft.Azure.Devices.Tests -{ - [TestClass] - [TestCategory("Unit")] - public class QueryResponseTests - { - [TestMethod] - public async Task QueryResponse_MoveNextAsync() - { - // arrange - string query = "select * from devices where deviceId = 'foo'"; - var twin1 = new ClientTwin("foo"); - var twin2 = new ClientTwin("foo"); - var queryClient = new Mock(); - - // act - var response = new QueryResponse( - queryClient.Object, - query, - new List { twin1, twin2 }, - "", - 5); - - var expectedResponses = new List { twin1, twin2 }; - - // assert - for (int i = 0; i < expectedResponses.Count; i++) - { - await response.MoveNextAsync(); - ClientTwin queriedTwin = response.Current; - queriedTwin.Should().NotBeNull(); - queriedTwin.DeviceId.Should().Be(expectedResponses[i].DeviceId); - } - } - } -} From 69248ddc0852e5cd2f67164a58e131837567d32c Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Tue, 14 Mar 2023 16:41:20 -0700 Subject: [PATCH 02/19] fixup --- ...rovisioningCertificateValidationE2ETest.cs | 2 +- ...sioningServiceIndividualEnrollmentTests.cs | 2 +- .../CleanupDevicesSample.cs | 5 +++-- iothub/service/src/Query/PageableHelpers.cs | 20 +++++++++---------- iothub/service/src/Query/QueryClient.cs | 3 --- iothub/service/src/Query/QueryResponse.cs | 2 +- 6 files changed, 15 insertions(+), 19 deletions(-) diff --git a/e2e/test/provisioning/ProvisioningCertificateValidationE2ETest.cs b/e2e/test/provisioning/ProvisioningCertificateValidationE2ETest.cs index fc7c1e840e..8cbe003904 100644 --- a/e2e/test/provisioning/ProvisioningCertificateValidationE2ETest.cs +++ b/e2e/test/provisioning/ProvisioningCertificateValidationE2ETest.cs @@ -37,7 +37,7 @@ public async Task ProvisioningServiceClient_QueryInvalidServiceCertificateHttp_F { // arrange using var provisioningServiceClient = new ProvisioningServiceClient(TestConfiguration.Provisioning.ConnectionStringInvalidServiceCertificate); - Devices.Provisioning.Service.Query q = provisioningServiceClient.EnrollmentGroups.CreateQuery("SELECT * FROM enrollmentGroups"); + Query q = provisioningServiceClient.EnrollmentGroups.CreateQuery("SELECT * FROM enrollmentGroups"); // act Func act = async () => await q.NextAsync(); diff --git a/e2e/test/provisioning/ProvisioningServiceIndividualEnrollmentTests.cs b/e2e/test/provisioning/ProvisioningServiceIndividualEnrollmentTests.cs index b741e48fce..2ccac3e2cf 100644 --- a/e2e/test/provisioning/ProvisioningServiceIndividualEnrollmentTests.cs +++ b/e2e/test/provisioning/ProvisioningServiceIndividualEnrollmentTests.cs @@ -226,7 +226,7 @@ private static async Task ProvisioningServiceClient_IndividualEnrollments_Query_ using var provisioningServiceClient = new ProvisioningServiceClient(TestConfiguration.Provisioning.ConnectionString, options); string queryString = "SELECT * FROM enrollments"; - Devices.Provisioning.Service.Query query = provisioningServiceClient.IndividualEnrollments.CreateQuery(queryString); + Query query = provisioningServiceClient.IndividualEnrollments.CreateQuery(queryString); while (query.HasNext()) { QueryResult queryResult = await query.NextAsync().ConfigureAwait(false); diff --git a/iothub/service/samples/how to guides/CleanupDevicesSample/CleanupDevicesSample.cs b/iothub/service/samples/how to guides/CleanupDevicesSample/CleanupDevicesSample.cs index be433164d2..cc47e8b047 100644 --- a/iothub/service/samples/how to guides/CleanupDevicesSample/CleanupDevicesSample.cs +++ b/iothub/service/samples/how to guides/CleanupDevicesSample/CleanupDevicesSample.cs @@ -175,13 +175,14 @@ private async Task PrintDeviceCountAsync() { string countSqlQuery = "select count() AS numberOfDevices from devices"; AsyncPageable> countQuery = _hubClient.Query.CreateAsync>(countSqlQuery); - if (!await countQuery.GetAsyncEnumerator().MoveNextAsync()) + IAsyncEnumerator> enumerator = countQuery.GetAsyncEnumerator(); + if (!await enumerator.MoveNextAsync()) { Console.WriteLine($"Failed to run device count query."); return 0; } - if (!countQuery.GetAsyncEnumerator().Current.TryGetValue("numberOfDevices", out deviceCount)) + if (!enumerator.Current.TryGetValue("numberOfDevices", out deviceCount)) { Console.WriteLine($"Failed to get device count from query result."); return 0; diff --git a/iothub/service/src/Query/PageableHelpers.cs b/iothub/service/src/Query/PageableHelpers.cs index a093a27321..19d7e983a5 100644 --- a/iothub/service/src/Query/PageableHelpers.cs +++ b/iothub/service/src/Query/PageableHelpers.cs @@ -6,7 +6,7 @@ using System.Threading.Tasks; using Azure; -namespace Microsoft.Azure.Devices.Query +namespace Microsoft.Azure.Devices { /// /// Copy of a subset of the helper functions defined in the Azure.Core class by the same name: @@ -14,33 +14,32 @@ namespace Microsoft.Azure.Devices.Query /// internal class PageableHelpers { -#pragma warning disable CS8632 // The annotation for nullable reference types should only be used in code within a '#nullable' annotations context. - public static AsyncPageable CreateAsyncEnumerable(Func>> firstPageFunc, Func>>? nextPageFunc, int? pageSize = default) where T : notnull + public static AsyncPageable CreateAsyncEnumerable(Func>> firstPageFunc, Func>> nextPageFunc, int? pageSize = default) where T : notnull { AsyncPageFunc first = (continuationToken, pageSizeHint) => firstPageFunc(pageSizeHint); - AsyncPageFunc? next = nextPageFunc != null ? new AsyncPageFunc(nextPageFunc) : null; + AsyncPageFunc next = nextPageFunc != null ? new AsyncPageFunc(nextPageFunc) : null; return new FuncAsyncPageable(first, next, pageSize); } - internal delegate Task> AsyncPageFunc(string? continuationToken = default, int? pageSizeHint = default); - internal delegate Page PageFunc(string? continuationToken = default, int? pageSizeHint = default); + internal delegate Task> AsyncPageFunc(string continuationToken = default, int? pageSizeHint = default); + internal delegate Page PageFunc(string continuationToken = default, int? pageSizeHint = default); internal class FuncAsyncPageable : AsyncPageable where T : notnull { private readonly AsyncPageFunc _firstPageFunc; - private readonly AsyncPageFunc? _nextPageFunc; + private readonly AsyncPageFunc _nextPageFunc; private readonly int? _defaultPageSize; - public FuncAsyncPageable(AsyncPageFunc firstPageFunc, AsyncPageFunc? nextPageFunc, int? defaultPageSize = default) + public FuncAsyncPageable(AsyncPageFunc firstPageFunc, AsyncPageFunc nextPageFunc, int? defaultPageSize = default) { _firstPageFunc = firstPageFunc; _nextPageFunc = nextPageFunc; _defaultPageSize = defaultPageSize; } - public override async IAsyncEnumerable> AsPages(string? continuationToken = default, int? pageSizeHint = default) + public override async IAsyncEnumerable> AsPages(string continuationToken = default, int? pageSizeHint = default) { - AsyncPageFunc? pageFunc = string.IsNullOrEmpty(continuationToken) ? _firstPageFunc : _nextPageFunc; + AsyncPageFunc pageFunc = string.IsNullOrEmpty(continuationToken) ? _firstPageFunc : _nextPageFunc; if (pageFunc == null) { @@ -58,5 +57,4 @@ public override async IAsyncEnumerable> AsPages(string? continuationToke } } } -#pragma warning restore CS8632 // The annotation for nullable reference types should only be used in code within a '#nullable' annotations context. } diff --git a/iothub/service/src/Query/QueryClient.cs b/iothub/service/src/Query/QueryClient.cs index b2d7ce4473..87f6072306 100644 --- a/iothub/service/src/Query/QueryClient.cs +++ b/iothub/service/src/Query/QueryClient.cs @@ -8,9 +8,6 @@ using System.Threading; using System.Threading.Tasks; using Azure; -using Azure.Core; -using Microsoft.Azure.Devices.Query; -using static Microsoft.Azure.Devices.Amqp.AmqpClientHelper; namespace Microsoft.Azure.Devices { diff --git a/iothub/service/src/Query/QueryResponse.cs b/iothub/service/src/Query/QueryResponse.cs index d5147955c0..ce54ecbf64 100644 --- a/iothub/service/src/Query/QueryResponse.cs +++ b/iothub/service/src/Query/QueryResponse.cs @@ -9,7 +9,7 @@ using Azure; using Azure.Core; -namespace Microsoft.Azure.Devices.Query +namespace Microsoft.Azure.Devices { internal class QueryResponse : Response { From 733fb372dd21470a08633c13f79a87c7cbfc8617 Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Tue, 14 Mar 2023 16:53:17 -0700 Subject: [PATCH 03/19] fixup --- iothub/service/src/Query/QueryOptions.cs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/iothub/service/src/Query/QueryOptions.cs b/iothub/service/src/Query/QueryOptions.cs index 45330727a5..4d9e9b5a0c 100644 --- a/iothub/service/src/Query/QueryOptions.cs +++ b/iothub/service/src/Query/QueryOptions.cs @@ -19,12 +19,11 @@ public class QueryOptions /// /// /// - /// QueryResponse<Twin> queriedTwins = await iotHubServiceClient.Query.CreateAsync<Twin>("SELECT * FROM devices"); - /// // This call will use the previous continuation token for you when it comes time to get the + /// AsyncEnumerable<Twin> twinQuery = await iotHubServiceClient.Query.CreateAsync<Twin>("SELECT * FROM devices"); + /// // This call will use the current continuation token for you when it comes time to get the /// // next page of results. - /// while (await queriedTwins.MoveNextAsync()) + /// await foreach (ClientTwin queriedTwin in twinQuery) /// { - /// Twin queriedTwin = queriedTwins.Current; /// Console.WriteLine(queriedTwin); /// } /// From 68f5887aca3e374e1776b520dc720124f8b9951b Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Tue, 14 Mar 2023 17:06:11 -0700 Subject: [PATCH 04/19] Remove QueryOptions --- .../iothub/service/QueryClientE2ETests.cs | 48 +++++++++---------- .../src/Jobs/Models/JobQueryOptions.cs | 2 +- iothub/service/src/Query/QueryClient.cs | 35 +++++--------- iothub/service/src/Query/QueryOptions.cs | 38 --------------- 4 files changed, 36 insertions(+), 87 deletions(-) delete mode 100644 iothub/service/src/Query/QueryOptions.cs diff --git a/e2e/test/iothub/service/QueryClientE2ETests.cs b/e2e/test/iothub/service/QueryClientE2ETests.cs index 6cc167f640..916c9c3d87 100644 --- a/e2e/test/iothub/service/QueryClientE2ETests.cs +++ b/e2e/test/iothub/service/QueryClientE2ETests.cs @@ -73,18 +73,13 @@ public async Task TwinQuery_CustomPaginationWorks() await using TestDevice testDevice3 = await TestDevice.GetTestDeviceAsync(_idPrefix).ConfigureAwait(false); string queryText = $"select * from devices where deviceId = '{testDevice1.Id}' OR deviceId = '{testDevice2.Id}' OR deviceId = '{testDevice3.Id}'"; - var firstPageOptions = new QueryOptions - { - PageSize = 1 - }; // act - await WaitForDevicesToBeQueryableAsync(serviceClient.Query, queryText, 3).ConfigureAwait(false); AsyncPageable queryResponse = serviceClient.Query. - CreateAsync(queryText, firstPageOptions); - IAsyncEnumerator> enumerator = queryResponse.AsPages().GetAsyncEnumerator(); + CreateAsync(queryText); + IAsyncEnumerator> enumerator = queryResponse.AsPages(null, 1).GetAsyncEnumerator(); (await enumerator.MoveNextAsync().ConfigureAwait(false)).Should().BeTrue("Should have at least one page of jobs."); // assert @@ -133,25 +128,25 @@ public async Task TwinQuery_IterateByItemAcrossPages() string queryText = $"select * from devices where deviceId = '{testDevice1.Id}' OR deviceId = '{testDevice2.Id}' OR deviceId = '{testDevice3.Id}'"; - // For this test, we want the query logic to have to fetch multiple pages of results. To force - // that, set the page size to 1 when there are 3 total results to be queried. - var queryOptions = new QueryOptions - { - PageSize = 1 - }; - // act await WaitForDevicesToBeQueryableAsync(serviceClient.Query, queryText, 3).ConfigureAwait(false); AsyncPageable twinQuery = serviceClient.Query. - CreateAsync(queryText, queryOptions); + CreateAsync(queryText); // assert + + // For this test, we want the query logic to have to fetch multiple pages of results. To force + // that, set the page size to 1 when there are 3 total results to be queried. + IAsyncEnumerable> twinPages = twinQuery.AsPages(null, 1); var returnedTwinDeviceIds = new List(); - await foreach (ClientTwin queriedTwin in twinQuery) + await foreach (Page queriedTwinPage in twinPages) { - returnedTwinDeviceIds.Add(queriedTwin.DeviceId); + foreach (ClientTwin queriedTwin in queriedTwinPage.Values) + { + returnedTwinDeviceIds.Add(queriedTwin.DeviceId); + } } var expectedDeviceIds = new List() { testDevice1.Id, testDevice2.Id, testDevice3.Id }; @@ -172,25 +167,26 @@ public async Task TwinQuery_IterateByItemWorksWithinPage() string queryText = $"select * from devices where deviceId = '{testDevice1.Id}' OR deviceId = '{testDevice2.Id}' OR deviceId = '{testDevice3.Id}'"; - // For this test, we want the query logic to only fetch one page of results. To force - // that, set the page size to 3 when there are 3 total results to be queried. - var queryOptions = new QueryOptions - { - PageSize = 3 - }; // act await WaitForDevicesToBeQueryableAsync(serviceClient.Query, queryText, 3).ConfigureAwait(false); AsyncPageable twinQuery = serviceClient.Query - .CreateAsync(queryText, queryOptions); + .CreateAsync(queryText); // assert + + // For this test, we want the query logic to only fetch one page of results. To force + // that, set the page size to 3 when there are 3 total results to be queried. + IAsyncEnumerable> twinPages = twinQuery.AsPages(null, 3); var returnedTwinDeviceIds = new List(); - await foreach (ClientTwin queriedTwin in twinQuery) + await foreach (Page queriedTwinPage in twinPages) { - returnedTwinDeviceIds.Add(queriedTwin.DeviceId); + foreach (ClientTwin queriedTwin in queriedTwinPage.Values) + { + returnedTwinDeviceIds.Add(queriedTwin.DeviceId); + } } var expectedDeviceIds = new List() { testDevice1.Id, testDevice2.Id, testDevice3.Id }; diff --git a/iothub/service/src/Jobs/Models/JobQueryOptions.cs b/iothub/service/src/Jobs/Models/JobQueryOptions.cs index 1f3b6d8f85..208ee0b2ee 100644 --- a/iothub/service/src/Jobs/Models/JobQueryOptions.cs +++ b/iothub/service/src/Jobs/Models/JobQueryOptions.cs @@ -6,7 +6,7 @@ namespace Microsoft.Azure.Devices /// /// Specifies the options associated with job queries. /// - public class JobQueryOptions : QueryOptions + public class JobQueryOptions { /// /// The job type to query. diff --git a/iothub/service/src/Query/QueryClient.cs b/iothub/service/src/Query/QueryClient.cs index 87f6072306..a622f40741 100644 --- a/iothub/service/src/Query/QueryClient.cs +++ b/iothub/service/src/Query/QueryClient.cs @@ -57,7 +57,6 @@ internal QueryClient( /// /// The query. See this document /// for more details on how to build this query. - /// The optional parameters to execute the query with. /// Task cancellation token. /// /// The type to deserialize the set of items into. For example, when running a query like "SELECT * FROM devices", @@ -92,7 +91,7 @@ internal QueryClient( /// } /// /// - public virtual AsyncPageable CreateAsync(string query, QueryOptions options = default, CancellationToken cancellationToken = default) + public virtual AsyncPageable CreateAsync(string query, CancellationToken cancellationToken = default) { if (Logging.IsEnabled) Logging.Enter(this, "Creating query.", nameof(CreateAsync)); @@ -111,7 +110,7 @@ async Task> nextPageFunc(string continuationToken, int? pageSizeHint) _credentialProvider, new QuerySpecification { Sql = query }); - return await BuildAndSendRequest(request, options, continuationToken, pageSizeHint, cancellationToken); + return await BuildAndSendRequest(request, continuationToken, pageSizeHint, cancellationToken); } async Task> firstPageFunc(int? pageSizeHint) @@ -122,10 +121,10 @@ async Task> firstPageFunc(int? pageSizeHint) _credentialProvider, new QuerySpecification { Sql = query }); - return await BuildAndSendRequest(request, options, null, pageSizeHint, cancellationToken); + return await BuildAndSendRequest(request, null, pageSizeHint, cancellationToken); } - return PageableHelpers.CreateAsyncEnumerable(firstPageFunc, nextPageFunc, options?.PageSize); + return PageableHelpers.CreateAsyncEnumerable(firstPageFunc, nextPageFunc, null); } catch (Exception ex) when (Logging.IsEnabled) { @@ -163,7 +162,7 @@ async Task> firstPageFunc(int? pageSizeHint) public virtual AsyncPageable CreateJobsQueryAsync(JobQueryOptions options = default, CancellationToken cancellationToken = default) { if (Logging.IsEnabled) - Logging.Enter(this, $"Creating query with jobType: {options?.JobType}, jobStatus: {options?.JobStatus}, pageSize: {options?.PageSize}", nameof(CreateAsync)); + Logging.Enter(this, $"Creating query with jobType: {options?.JobType}, jobStatus: {options?.JobStatus}", nameof(CreateAsync)); cancellationToken.ThrowIfCancellationRequested(); @@ -178,7 +177,7 @@ async Task> nextPageFunc(string continuationToken, int? pageS null, BuildQueryJobQueryString(options)); - return await BuildAndSendRequest(request, options, continuationToken, pageSizeHint, cancellationToken); + return await BuildAndSendRequest(request, continuationToken, pageSizeHint, cancellationToken); } async Task> firstPageFunc(int? pageSizeHint) @@ -190,10 +189,10 @@ async Task> firstPageFunc(int? pageSizeHint) null, BuildQueryJobQueryString(options)); - return await BuildAndSendRequest(request, options, null, pageSizeHint, cancellationToken); + return await BuildAndSendRequest(request, null, pageSizeHint, cancellationToken); } - return PageableHelpers.CreateAsyncEnumerable(firstPageFunc, nextPageFunc, options?.PageSize); + return PageableHelpers.CreateAsyncEnumerable(firstPageFunc, nextPageFunc); } catch (HttpRequestException ex) { @@ -205,32 +204,24 @@ async Task> firstPageFunc(int? pageSizeHint) } catch (Exception ex) when (Logging.IsEnabled) { - Logging.Error(this, $"Creating query with jobType: {options?.JobType}, jobStatus: {options?.JobStatus}, pageSize: {options?.PageSize} threw an exception: {ex}", nameof(CreateAsync)); + Logging.Error(this, $"Creating query with jobType: {options?.JobType}, jobStatus: {options?.JobStatus} threw an exception: {ex}", nameof(CreateAsync)); throw; } finally { if (Logging.IsEnabled) - Logging.Exit(this, $"Creating query with jobType: {options?.JobType}, jobStatus: {options?.JobStatus}, pageSize: {options?.PageSize}", nameof(CreateAsync)); + Logging.Exit(this, $"Creating query with jobType: {options?.JobType}, jobStatus: {options?.JobStatus}", nameof(CreateAsync)); } } - private async Task> BuildAndSendRequest(HttpRequestMessage request, QueryOptions options, string continuationToken, int? pageSizeHint, CancellationToken cancellationToken) + private async Task> BuildAndSendRequest(HttpRequestMessage request, string continuationToken, int? pageSizeHint, CancellationToken cancellationToken) { - if (!string.IsNullOrWhiteSpace(options?.ContinuationToken)) - { - request.Headers.Add(ContinuationTokenHeader, options?.ContinuationToken); - } - else if (!string.IsNullOrWhiteSpace(continuationToken)) + if (!string.IsNullOrWhiteSpace(continuationToken)) { request.Headers.Add(ContinuationTokenHeader, continuationToken); } - if (options?.PageSize != null) - { - request.Headers.Add(PageSizeHeader, options.PageSize.ToString()); - } - else if (pageSizeHint != null) + if (pageSizeHint != null) { request.Headers.Add(PageSizeHeader, pageSizeHint.ToString()); } diff --git a/iothub/service/src/Query/QueryOptions.cs b/iothub/service/src/Query/QueryOptions.cs deleted file mode 100644 index 4d9e9b5a0c..0000000000 --- a/iothub/service/src/Query/QueryOptions.cs +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -namespace Microsoft.Azure.Devices -{ - /// - /// Specifies the options associated with queries. - /// - public class QueryOptions - { - /// - /// The token to use for continuing the query enumeration. - /// - /// - /// By default, this library will fill in this value for you as needed. For example, if you run - /// a query of page size 5 that has 10 total items to return, this library will fetch the second - /// page of results even if you do not provide this value when calling MoveNextAsync() at the end - /// of the first page of results. - /// - /// - /// - /// AsyncEnumerable<Twin> twinQuery = await iotHubServiceClient.Query.CreateAsync<Twin>("SELECT * FROM devices"); - /// // This call will use the current continuation token for you when it comes time to get the - /// // next page of results. - /// await foreach (ClientTwin queriedTwin in twinQuery) - /// { - /// Console.WriteLine(queriedTwin); - /// } - /// - /// - public string ContinuationToken { get; set; } - - /// - /// The page size to request for each page of query results. - /// - public int? PageSize { get; set; } - } -} From 9f5a56528b62b659c091fc48a48c379c453524e3 Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Tue, 14 Mar 2023 19:35:34 -0700 Subject: [PATCH 05/19] asd --- .../CleanupDevicesSample/CleanupDevicesSample.cs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/iothub/service/samples/how to guides/CleanupDevicesSample/CleanupDevicesSample.cs b/iothub/service/samples/how to guides/CleanupDevicesSample/CleanupDevicesSample.cs index cc47e8b047..9f0d4654b0 100644 --- a/iothub/service/samples/how to guides/CleanupDevicesSample/CleanupDevicesSample.cs +++ b/iothub/service/samples/how to guides/CleanupDevicesSample/CleanupDevicesSample.cs @@ -226,11 +226,13 @@ private async Task> GetDeviceIdsToDeleteAsync( } string queryText = queryTextSb.ToString(); Console.WriteLine($"Using query: {queryText}"); - var options = new QueryOptions { PageSize = 1000 }; - AsyncPageable devicesQuery = _hubClient.Query.CreateAsync(queryText, options); - await foreach (DeviceQueryResult queryResult in devicesQuery) + AsyncPageable devicesQuery = _hubClient.Query.CreateAsync(queryText); + await foreach (Page page in devicesQuery.AsPages(null, 1000)) { - devicesToDelete.Add(new ExportImportDevice(new Device(queryResult.DeviceId), ImportMode.Delete)); + foreach (DeviceQueryResult queryResult in page.Values) + { + devicesToDelete.Add(new ExportImportDevice(new Device(queryResult.DeviceId), ImportMode.Delete)); + } } return devicesToDelete; From 26b1bf56b9987682df6567a59bd8e16f987cce26 Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Tue, 14 Mar 2023 20:23:34 -0700 Subject: [PATCH 06/19] cancellation token checks --- iothub/service/src/Query/QueryClient.cs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/iothub/service/src/Query/QueryClient.cs b/iothub/service/src/Query/QueryClient.cs index a622f40741..e03ad5a030 100644 --- a/iothub/service/src/Query/QueryClient.cs +++ b/iothub/service/src/Query/QueryClient.cs @@ -104,6 +104,7 @@ public virtual AsyncPageable CreateAsync(string query, CancellationToken c { async Task> nextPageFunc(string continuationToken, int? pageSizeHint) { + cancellationToken.ThrowIfCancellationRequested(); using HttpRequestMessage request = _httpRequestMessageFactory.CreateRequest( HttpMethod.Post, s_queryUri, @@ -115,6 +116,7 @@ async Task> nextPageFunc(string continuationToken, int? pageSizeHint) async Task> firstPageFunc(int? pageSizeHint) { + cancellationToken.ThrowIfCancellationRequested(); using HttpRequestMessage request = _httpRequestMessageFactory.CreateRequest( HttpMethod.Post, s_queryUri, @@ -170,6 +172,7 @@ public virtual AsyncPageable CreateJobsQueryAsync(JobQueryOptions { async Task> nextPageFunc(string continuationToken, int? pageSizeHint) { + cancellationToken.ThrowIfCancellationRequested(); using HttpRequestMessage request = _httpRequestMessageFactory.CreateRequest( HttpMethod.Get, s_jobsQueryFormat, @@ -182,6 +185,7 @@ async Task> nextPageFunc(string continuationToken, int? pageS async Task> firstPageFunc(int? pageSizeHint) { + cancellationToken.ThrowIfCancellationRequested(); using HttpRequestMessage request = _httpRequestMessageFactory.CreateRequest( HttpMethod.Get, s_jobsQueryFormat, From 7051ff106a1fcb2a92893cdfe234bc745723896c Mon Sep 17 00:00:00 2001 From: Tim Taylor Date: Wed, 15 Mar 2023 10:57:33 -0700 Subject: [PATCH 07/19] Apply suggestions from code review Co-authored-by: David R. Williamson --- .../iothub/service/QueryClientE2ETests.cs | 3 +- iothub/service/src/Query/QueryClient.cs | 32 +++++++++---------- iothub/service/src/Query/QueryResponse.cs | 4 ++- 3 files changed, 19 insertions(+), 20 deletions(-) diff --git a/e2e/test/iothub/service/QueryClientE2ETests.cs b/e2e/test/iothub/service/QueryClientE2ETests.cs index 916c9c3d87..047b79691e 100644 --- a/e2e/test/iothub/service/QueryClientE2ETests.cs +++ b/e2e/test/iothub/service/QueryClientE2ETests.cs @@ -45,8 +45,7 @@ public async Task TwinQuery_Works() await WaitForDevicesToBeQueryableAsync(serviceClient.Query, queryText, 2).ConfigureAwait(false); - AsyncPageable queryResponse = serviceClient.Query - .CreateAsync(queryText); + AsyncPageable queryResponse = serviceClient.Query.CreateAsync(queryText); IAsyncEnumerator enumerator = queryResponse.GetAsyncEnumerator(); // assert diff --git a/iothub/service/src/Query/QueryClient.cs b/iothub/service/src/Query/QueryClient.cs index e03ad5a030..5c6b260cdd 100644 --- a/iothub/service/src/Query/QueryClient.cs +++ b/iothub/service/src/Query/QueryClient.cs @@ -174,11 +174,11 @@ async Task> nextPageFunc(string continuationToken, int? pageS { cancellationToken.ThrowIfCancellationRequested(); using HttpRequestMessage request = _httpRequestMessageFactory.CreateRequest( - HttpMethod.Get, - s_jobsQueryFormat, - _credentialProvider, - null, - BuildQueryJobQueryString(options)); + HttpMethod.Get, + s_jobsQueryFormat, + _credentialProvider, + null, + BuildQueryJobQueryString(options)); return await BuildAndSendRequest(request, continuationToken, pageSizeHint, cancellationToken); } @@ -187,13 +187,13 @@ async Task> firstPageFunc(int? pageSizeHint) { cancellationToken.ThrowIfCancellationRequested(); using HttpRequestMessage request = _httpRequestMessageFactory.CreateRequest( - HttpMethod.Get, - s_jobsQueryFormat, - _credentialProvider, - null, - BuildQueryJobQueryString(options)); + HttpMethod.Get, + s_jobsQueryFormat, + _credentialProvider, + null, + BuildQueryJobQueryString(options)); - return await BuildAndSendRequest(request, null, pageSizeHint, cancellationToken); + return await BuildAndSendRequest(request, null, pageSizeHint, cancellationToken).ConfigureAwait(false); } return PageableHelpers.CreateAsyncEnumerable(firstPageFunc, nextPageFunc); @@ -218,7 +218,7 @@ async Task> firstPageFunc(int? pageSizeHint) } } - private async Task> BuildAndSendRequest(HttpRequestMessage request, string continuationToken, int? pageSizeHint, CancellationToken cancellationToken) + private async Task> BuildAndSendRequestAsync(HttpRequestMessage request, string continuationToken, int? pageSizeHint, CancellationToken cancellationToken) { if (!string.IsNullOrWhiteSpace(continuationToken)) { @@ -230,12 +230,10 @@ private async Task> BuildAndSendRequest(HttpRequestMessage request, s request.Headers.Add(PageSizeHeader, pageSizeHint.ToString()); } - if (request.Content != null) + (request.Content?.Headers.?ContentType ??= new MediaTypeHeaderValue("application/json") { - request.Content.Headers.ContentType = new MediaTypeHeaderValue("application/json") - { - CharSet = "utf-8" - }; + CharSet = "utf-8" + }; } HttpResponseMessage response = null; diff --git a/iothub/service/src/Query/QueryResponse.cs b/iothub/service/src/Query/QueryResponse.cs index ce54ecbf64..ed44a05461 100644 --- a/iothub/service/src/Query/QueryResponse.cs +++ b/iothub/service/src/Query/QueryResponse.cs @@ -60,7 +60,9 @@ protected override IEnumerable EnumerateHeaders() protected override bool TryGetHeader(string name, out string value) { IEnumerable outVariableHeaders = new List(); - bool found = _httpResponse.Headers.TryGetValues(name, out outVariableHeaders); + value = _httpResponse.Headers.TryGetValues(name, out outVariableHeaders) + ? outVariableHeaders.First() + : string.Empty; if (found) { value = outVariableHeaders.First(); From 2b7ce2780e58e1cc7f68aa55a329dd17af579208 Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Wed, 15 Mar 2023 11:26:00 -0700 Subject: [PATCH 08/19] PR comments --- iothub/service/src/Query/PageableHelpers.cs | 2 +- iothub/service/src/Query/QueryClient.cs | 37 ++++++++++++--------- iothub/service/src/Query/QueryResponse.cs | 29 +++++++--------- iothub/service/tests/QueryClientTests.cs | 6 +--- 4 files changed, 36 insertions(+), 38 deletions(-) diff --git a/iothub/service/src/Query/PageableHelpers.cs b/iothub/service/src/Query/PageableHelpers.cs index 19d7e983a5..99e657ebf0 100644 --- a/iothub/service/src/Query/PageableHelpers.cs +++ b/iothub/service/src/Query/PageableHelpers.cs @@ -12,7 +12,7 @@ namespace Microsoft.Azure.Devices /// Copy of a subset of the helper functions defined in the Azure.Core class by the same name: /// https://github.com/Azure/autorest.csharp/blob/main/src/assets/Generator.Shared/PageableHelpers.cs /// - internal class PageableHelpers + internal static class PageableHelpers { public static AsyncPageable CreateAsyncEnumerable(Func>> firstPageFunc, Func>> nextPageFunc, int? pageSize = default) where T : notnull { diff --git a/iothub/service/src/Query/QueryClient.cs b/iothub/service/src/Query/QueryClient.cs index 5c6b260cdd..117d49ba2b 100644 --- a/iothub/service/src/Query/QueryClient.cs +++ b/iothub/service/src/Query/QueryClient.cs @@ -2,6 +2,7 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; +using System.IO; using System.Net; using System.Net.Http; using System.Net.Http.Headers; @@ -111,7 +112,7 @@ async Task> nextPageFunc(string continuationToken, int? pageSizeHint) _credentialProvider, new QuerySpecification { Sql = query }); - return await BuildAndSendRequest(request, continuationToken, pageSizeHint, cancellationToken); + return await BuildAndSendRequestAsync(request, continuationToken, pageSizeHint, cancellationToken).ConfigureAwait(false); } async Task> firstPageFunc(int? pageSizeHint) @@ -123,7 +124,7 @@ async Task> firstPageFunc(int? pageSizeHint) _credentialProvider, new QuerySpecification { Sql = query }); - return await BuildAndSendRequest(request, null, pageSizeHint, cancellationToken); + return await BuildAndSendRequestAsync(request, null, pageSizeHint, cancellationToken).ConfigureAwait(false); } return PageableHelpers.CreateAsyncEnumerable(firstPageFunc, nextPageFunc, null); @@ -174,13 +175,13 @@ async Task> nextPageFunc(string continuationToken, int? pageS { cancellationToken.ThrowIfCancellationRequested(); using HttpRequestMessage request = _httpRequestMessageFactory.CreateRequest( - HttpMethod.Get, - s_jobsQueryFormat, - _credentialProvider, - null, - BuildQueryJobQueryString(options)); + HttpMethod.Get, + s_jobsQueryFormat, + _credentialProvider, + null, + BuildQueryJobQueryString(options)); - return await BuildAndSendRequest(request, continuationToken, pageSizeHint, cancellationToken); + return await BuildAndSendRequestAsync(request, continuationToken, pageSizeHint, cancellationToken).ConfigureAwait(false); } async Task> firstPageFunc(int? pageSizeHint) @@ -193,7 +194,7 @@ async Task> firstPageFunc(int? pageSizeHint) null, BuildQueryJobQueryString(options)); - return await BuildAndSendRequest(request, null, pageSizeHint, cancellationToken).ConfigureAwait(false); + return await BuildAndSendRequestAsync(request, null, pageSizeHint, cancellationToken).ConfigureAwait(false); } return PageableHelpers.CreateAsyncEnumerable(firstPageFunc, nextPageFunc); @@ -220,6 +221,8 @@ async Task> firstPageFunc(int? pageSizeHint) private async Task> BuildAndSendRequestAsync(HttpRequestMessage request, string continuationToken, int? pageSizeHint, CancellationToken cancellationToken) { + cancellationToken.ThrowIfCancellationRequested(); + if (!string.IsNullOrWhiteSpace(continuationToken)) { request.Headers.Add(ContinuationTokenHeader, continuationToken); @@ -230,10 +233,12 @@ private async Task> BuildAndSendRequestAsync(HttpRequestMessage reque request.Headers.Add(PageSizeHeader, pageSizeHint.ToString()); } - (request.Content?.Headers.?ContentType ??= new MediaTypeHeaderValue("application/json") - { - CharSet = "utf-8" - }; + if (request.Content != null) + { + request.Content.Headers.ContentType = new MediaTypeHeaderValue("application/json") + { + CharSet = "utf-8" + }; } HttpResponseMessage response = null; @@ -259,11 +264,13 @@ await _internalRetryHandler } await HttpMessageHelper.ValidateHttpResponseStatusAsync(HttpStatusCode.OK, response).ConfigureAwait(false); - string responsePayload = await response.Content.ReadAsStringAsync().ConfigureAwait(false); + Stream bodyStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false); + using StreamReader bodyStreamReader = new StreamReader(bodyStream); + string responsePayload = await bodyStreamReader.ReadToEndAsync().ConfigureAwait(false); QueriedPage page = new QueriedPage(response, responsePayload); #pragma warning disable CA2000 // Dispose objects before losing scope // The disposable QueryResponse object is the user's responsibility, not the SDK's - return Page.FromValues(page.Items, page.ContinuationToken, new QueryResponse(response)); + return Page.FromValues(page.Items, page.ContinuationToken, new QueryResponse(response, bodyStream)); #pragma warning restore CA2000 // Dispose objects before losing scope } diff --git a/iothub/service/src/Query/QueryResponse.cs b/iothub/service/src/Query/QueryResponse.cs index ed44a05461..11d38879b4 100644 --- a/iothub/service/src/Query/QueryResponse.cs +++ b/iothub/service/src/Query/QueryResponse.cs @@ -11,14 +11,21 @@ namespace Microsoft.Azure.Devices { + /// + /// The local implementation of the Azure.Core Response type. Libraries in the azure-sdk-for-net repo have access to + /// helper functions to instantiate the abstract class Response, but this library is not in that repo yet. Because of that, + /// we need to implement the abstract class. + /// internal class QueryResponse : Response { private HttpResponseMessage _httpResponse; + private Stream _bodyStream; private List _httpHeaders; - internal QueryResponse(HttpResponseMessage httpResponse) + internal QueryResponse(HttpResponseMessage httpResponse, Stream bodyStream) { _httpResponse = httpResponse; + _bodyStream = bodyStream; _httpHeaders = new List(); foreach (var header in _httpResponse.Headers) @@ -33,8 +40,8 @@ internal QueryResponse(HttpResponseMessage httpResponse) public override Stream ContentStream { - get => _httpResponse.Content.ReadAsStreamAsync().Result; - set => throw new NotImplementedException(); //TODO who needs this? + get => _bodyStream; + set => _bodyStream = value; } public override string ClientRequestId { @@ -59,20 +66,8 @@ protected override IEnumerable EnumerateHeaders() protected override bool TryGetHeader(string name, out string value) { - IEnumerable outVariableHeaders = new List(); - value = _httpResponse.Headers.TryGetValues(name, out outVariableHeaders) - ? outVariableHeaders.First() - : string.Empty; - if (found) - { - value = outVariableHeaders.First(); - } - else - { - value = ""; - } - - return found; + value = _httpResponse.Headers.SafeGetValue(name); + return string.IsNullOrWhiteSpace(value); } protected override bool TryGetHeaderValues(string name, out IEnumerable values) diff --git a/iothub/service/tests/QueryClientTests.cs b/iothub/service/tests/QueryClientTests.cs index 8b33ecc87a..3575c2e4b8 100644 --- a/iothub/service/tests/QueryClientTests.cs +++ b/iothub/service/tests/QueryClientTests.cs @@ -7,15 +7,11 @@ using System.Net.Http; using System.Net; using System.Threading; -using System.Text; using System.Threading.Tasks; +using Azure; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; -using Newtonsoft.Json; using FluentAssertions; -using System.Collections; -using Newtonsoft.Json.Linq; -using Azure; namespace Microsoft.Azure.Devices.Tests { From 151053fab6333b2d27de6630f351aa18fa886b8e Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Wed, 15 Mar 2023 11:32:17 -0700 Subject: [PATCH 09/19] this works --- iothub/service/src/Query/QueryResponse.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/iothub/service/src/Query/QueryResponse.cs b/iothub/service/src/Query/QueryResponse.cs index 11d38879b4..1d816886ee 100644 --- a/iothub/service/src/Query/QueryResponse.cs +++ b/iothub/service/src/Query/QueryResponse.cs @@ -34,7 +34,7 @@ internal QueryResponse(HttpResponseMessage httpResponse, Stream bodyStream) } } - public override int Status => (int)_httpResponse.StatusCode; //TODO check this + public override int Status => (int)_httpResponse.StatusCode; public override string ReasonPhrase => _httpResponse.ReasonPhrase; @@ -45,8 +45,8 @@ public override Stream ContentStream } public override string ClientRequestId { - get => throw new NotImplementedException(); - set => throw new NotImplementedException(); + get => throw new NotImplementedException("This SDK does not define this feature"); + set => throw new NotImplementedException("This SDK does not define this feature"); } public override void Dispose() From 03ad08680092042fada1c990b7d90c77b2d599e9 Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Wed, 15 Mar 2023 11:34:24 -0700 Subject: [PATCH 10/19] spacing --- iothub/service/src/Query/QueryResponse.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/iothub/service/src/Query/QueryResponse.cs b/iothub/service/src/Query/QueryResponse.cs index 1d816886ee..311d72662b 100644 --- a/iothub/service/src/Query/QueryResponse.cs +++ b/iothub/service/src/Query/QueryResponse.cs @@ -43,6 +43,7 @@ public override Stream ContentStream get => _bodyStream; set => _bodyStream = value; } + public override string ClientRequestId { get => throw new NotImplementedException("This SDK does not define this feature"); From b436cb449a2154bded23d84f9b3babde5ca31ebf Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Wed, 15 Mar 2023 14:43:19 -0700 Subject: [PATCH 11/19] fix unit test --- iothub/service/tests/QueryClientTests.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/iothub/service/tests/QueryClientTests.cs b/iothub/service/tests/QueryClientTests.cs index 3575c2e4b8..32418f018f 100644 --- a/iothub/service/tests/QueryClientTests.cs +++ b/iothub/service/tests/QueryClientTests.cs @@ -65,10 +65,11 @@ public async Task QueryClient_CreateAsync() // act AsyncPageable response = queryClient.CreateAsync(query); - await response.GetAsyncEnumerator().MoveNextAsync().ConfigureAwait(false); + IAsyncEnumerator enumerator = response.GetAsyncEnumerator(); + await enumerator.MoveNextAsync().ConfigureAwait(false); // assert - response.GetAsyncEnumerator().Current.DeviceId.Should().Be("foo"); + enumerator.Current.DeviceId.Should().Be("foo"); } [TestMethod] From 7299b70e24871512e8802c96365c1ad03e9e49be Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Wed, 15 Mar 2023 14:48:56 -0700 Subject: [PATCH 12/19] testing something out --- e2e/test/iothub/service/QueryClientE2ETests.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/e2e/test/iothub/service/QueryClientE2ETests.cs b/e2e/test/iothub/service/QueryClientE2ETests.cs index 047b79691e..09b8984ddb 100644 --- a/e2e/test/iothub/service/QueryClientE2ETests.cs +++ b/e2e/test/iothub/service/QueryClientE2ETests.cs @@ -343,6 +343,8 @@ private static async Task ScheduleJobToBeQueriedAsync(ScheduledJobsClient jobsCl // Each IoT hub has a low limit for the number of parallel jobs allowed. Because of that, // tests in this suite are written to work even if the queried job isn't the one they created. VerboseTestLogger.WriteLine("Throttled when creating job. Will use existing job(s) to test query"); + VerboseTestLogger.WriteLine(ex.Message); + VerboseTestLogger.WriteLine(ex.StackTrace); } } } From 6d30af05a4736a30256e771ef83664855a2a2c00 Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Wed, 15 Mar 2023 14:58:28 -0700 Subject: [PATCH 13/19] test --- .../iothub/service/QueryClientE2ETests.cs | 50 ++++++++++++------- 1 file changed, 33 insertions(+), 17 deletions(-) diff --git a/e2e/test/iothub/service/QueryClientE2ETests.cs b/e2e/test/iothub/service/QueryClientE2ETests.cs index 09b8984ddb..1fbaece9e0 100644 --- a/e2e/test/iothub/service/QueryClientE2ETests.cs +++ b/e2e/test/iothub/service/QueryClientE2ETests.cs @@ -29,6 +29,13 @@ public class QueryClientE2ETests : E2EMsTestBase // timeout is for how long to wait for this latency before failing the test. private readonly TimeSpan _queryableDelayTimeout = TimeSpan.FromMinutes(1); + private static readonly IIotHubServiceRetryPolicy s_scheduleJobRetryPolicy = new HubServiceTestRetryPolicy( + new() + { + IotHubServiceErrorCode.ThrottlingException, + IotHubServiceErrorCode.ThrottlingBacklogTimeout, + }); + [TestMethod] [Timeout(TestTimeoutMilliseconds)] public async Task TwinQuery_Works() @@ -329,23 +336,32 @@ private async Task WaitForJobToBeQueryableAsync(QueryClient queryClient, int exp private static async Task ScheduleJobToBeQueriedAsync(ScheduledJobsClient jobsClient, string deviceId) { - try - { - var twinUpdate = new ClientTwin(); - twinUpdate.Properties.Desired["key"] = "value"; - - TwinScheduledJob scheduledJob = await jobsClient - .ScheduleTwinUpdateAsync("DeviceId IN ['" + deviceId + "']", twinUpdate, DateTimeOffset.UtcNow.AddMinutes(3)) - .ConfigureAwait(false); - } - catch (IotHubServiceException ex) when (ex.StatusCode is (HttpStatusCode)429) - { - // Each IoT hub has a low limit for the number of parallel jobs allowed. Because of that, - // tests in this suite are written to work even if the queried job isn't the one they created. - VerboseTestLogger.WriteLine("Throttled when creating job. Will use existing job(s) to test query"); - VerboseTestLogger.WriteLine(ex.Message); - VerboseTestLogger.WriteLine(ex.StackTrace); - } + // Attempt to schedule a job until it works or until hub complains there are too many active jobs + await RetryOperationHelper + .RunWithHubServiceRetryAsync( + async () => + { + try + { + var twinUpdate = new ClientTwin(); + twinUpdate.Properties.Desired["key"] = "value"; + + TwinScheduledJob scheduledJob = await jobsClient + .ScheduleTwinUpdateAsync("DeviceId IN ['" + deviceId + "']", twinUpdate, DateTimeOffset.UtcNow.AddMinutes(3)) + .ConfigureAwait(false); + } + catch (IotHubServiceException ex) when (ex.StatusCode is (HttpStatusCode)429 && ex.Message.Contains("ThrottlingMaxActiveJobCountExceeded")) + { + // Each IoT hub has a low limit for the number of parallel jobs allowed. Because of that, + // tests in this suite are written to work even if the queried job isn't the one they created. + VerboseTestLogger.WriteLine("Throttled when creating job. Will use existing job(s) to test query"); + VerboseTestLogger.WriteLine(ex.Message); + VerboseTestLogger.WriteLine(ex.StackTrace); + } + + }, + s_scheduleJobRetryPolicy, + CancellationToken.None); } } } From e58b13f4efb9c8faede7f8ed693d2eef6c95749d Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Wed, 15 Mar 2023 15:01:07 -0700 Subject: [PATCH 14/19] cr comments --- iothub/service/src/Query/PageableHelpers.cs | 4 ++-- iothub/service/src/Query/QueryResponse.cs | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/iothub/service/src/Query/PageableHelpers.cs b/iothub/service/src/Query/PageableHelpers.cs index 99e657ebf0..cee39e1beb 100644 --- a/iothub/service/src/Query/PageableHelpers.cs +++ b/iothub/service/src/Query/PageableHelpers.cs @@ -14,7 +14,7 @@ namespace Microsoft.Azure.Devices /// internal static class PageableHelpers { - public static AsyncPageable CreateAsyncEnumerable(Func>> firstPageFunc, Func>> nextPageFunc, int? pageSize = default) where T : notnull + internal static AsyncPageable CreateAsyncEnumerable(Func>> firstPageFunc, Func>> nextPageFunc, int? pageSize = default) where T : notnull { AsyncPageFunc first = (continuationToken, pageSizeHint) => firstPageFunc(pageSizeHint); AsyncPageFunc next = nextPageFunc != null ? new AsyncPageFunc(nextPageFunc) : null; @@ -30,7 +30,7 @@ internal class FuncAsyncPageable : AsyncPageable where T : notnull private readonly AsyncPageFunc _nextPageFunc; private readonly int? _defaultPageSize; - public FuncAsyncPageable(AsyncPageFunc firstPageFunc, AsyncPageFunc nextPageFunc, int? defaultPageSize = default) + internal FuncAsyncPageable(AsyncPageFunc firstPageFunc, AsyncPageFunc nextPageFunc, int? defaultPageSize = default) { _firstPageFunc = firstPageFunc; _nextPageFunc = nextPageFunc; diff --git a/iothub/service/src/Query/QueryResponse.cs b/iothub/service/src/Query/QueryResponse.cs index 311d72662b..5377f93bbb 100644 --- a/iothub/service/src/Query/QueryResponse.cs +++ b/iothub/service/src/Query/QueryResponse.cs @@ -53,10 +53,12 @@ public override string ClientRequestId public override void Dispose() { _httpResponse?.Dispose(); + _bodyStream?.Dispose(); } protected override bool ContainsHeader(string name) { + Argument.AssertNotNullOrWhiteSpace(name, nameof(name)); return _httpResponse.Headers.Contains(name); } @@ -67,12 +69,14 @@ protected override IEnumerable EnumerateHeaders() protected override bool TryGetHeader(string name, out string value) { + Argument.AssertNotNullOrWhiteSpace(name, nameof(name)); value = _httpResponse.Headers.SafeGetValue(name); return string.IsNullOrWhiteSpace(value); } protected override bool TryGetHeaderValues(string name, out IEnumerable values) { + Argument.AssertNotNullOrWhiteSpace(name, nameof(name)); return _httpResponse.Headers.TryGetValues(name, out values); } } From ce9e37f4ecd22bb5b76bdff486a676f7cb4b48cf Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Wed, 15 Mar 2023 15:30:48 -0700 Subject: [PATCH 15/19] asdf --- .../iothub/service/QueryClientE2ETests.cs | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/e2e/test/iothub/service/QueryClientE2ETests.cs b/e2e/test/iothub/service/QueryClientE2ETests.cs index 1fbaece9e0..c613ede9da 100644 --- a/e2e/test/iothub/service/QueryClientE2ETests.cs +++ b/e2e/test/iothub/service/QueryClientE2ETests.cs @@ -207,11 +207,9 @@ public async Task JobQuery_QueryWorks() IotHubServiceClient serviceClient = TestDevice.ServiceClient; await using TestDevice testDevice = await TestDevice.GetTestDeviceAsync(_idPrefix).ConfigureAwait(false); - await ScheduleJobToBeQueriedAsync(serviceClient.ScheduledJobs, testDevice.Id).ConfigureAwait(false); - string query = "SELECT * FROM devices.jobs"; - await WaitForJobToBeQueryableAsync(serviceClient.Query, query, 1).ConfigureAwait(false); + await WaitForJobToBeQueryableAsync(serviceClient, testDevice.Id, query, 1).ConfigureAwait(false); AsyncPageable queryResponse = serviceClient.Query.CreateAsync(query); IAsyncEnumerator enumerator = queryResponse.GetAsyncEnumerator(); @@ -239,8 +237,7 @@ public async Task JobQuery_QueryByTypeWorks() var serviceClient = TestDevice.ServiceClient; await using TestDevice testDevice = await TestDevice.GetTestDeviceAsync(_idPrefix).ConfigureAwait(false); - await ScheduleJobToBeQueriedAsync(serviceClient.ScheduledJobs, testDevice.Id).ConfigureAwait(false); - await WaitForJobToBeQueryableAsync(serviceClient.Query, 1, null, null).ConfigureAwait(false); + await WaitForJobToBeQueryableAsync(serviceClient, testDevice.Id, 1, null, null).ConfigureAwait(false); AsyncPageable queryResponse = serviceClient.Query.CreateJobsQueryAsync(); IAsyncEnumerator enumerator = queryResponse.GetAsyncEnumerator(); @@ -292,25 +289,30 @@ private async Task WaitForDevicesToBeQueryableAsync(QueryClient queryClient, str } } - private async Task WaitForJobToBeQueryableAsync(QueryClient queryClient, string query, int expectedCount) + private async Task WaitForJobToBeQueryableAsync(IotHubServiceClient serviceClient, string deviceId, string query, int expectedCount) { // There is some latency between the creation of the test devices and when they are queryable, // so keep executing the query until both devices are returned in the results or until a timeout. using var cancellationTokenSource = new CancellationTokenSource(_queryableDelayTimeout); - AsyncPageable queryResponse = queryClient.CreateAsync(query); + AsyncPageable queryResponse = serviceClient.Query.CreateAsync(query); IAsyncEnumerator> enumerator = queryResponse.AsPages().GetAsyncEnumerator(); await enumerator.MoveNextAsync().ConfigureAwait(false); while (enumerator.Current.Values.Count < expectedCount) { + // If this is just called once, there is a chance that it both fails to schedule a job because the + // hub is at it's max quota for concurrent jobs but then those jobs finish and become unqueriable before + // this function can query them. To avoid this, keep trying to schedule jobs until at least one is queriable. + await ScheduleJobToBeQueriedAsync(serviceClient.ScheduledJobs, deviceId).ConfigureAwait(false); + cancellationTokenSource.Token.IsCancellationRequested.Should().BeFalse("timed out waiting for the devices to become queryable"); await Task.Delay(100).ConfigureAwait(false); - queryResponse = queryClient.CreateAsync(query); + queryResponse = serviceClient.Query.CreateAsync(query); enumerator = queryResponse.AsPages().GetAsyncEnumerator(); await enumerator.MoveNextAsync().ConfigureAwait(false); } } - private async Task WaitForJobToBeQueryableAsync(QueryClient queryClient, int expectedCount, JobType? jobType = null, JobStatus? status = null) + private async Task WaitForJobToBeQueryableAsync(IotHubServiceClient serviceClient, string deviceId, int expectedCount, JobType? jobType = null, JobStatus? status = null) { // There is some latency between the creation of the test devices and when they are queryable, // so keep executing the query until both devices are returned in the results or until a timeout. @@ -321,14 +323,19 @@ private async Task WaitForJobToBeQueryableAsync(QueryClient queryClient, int exp JobType = jobType, JobStatus = status, }; - AsyncPageable queryResponse = queryClient.CreateJobsQueryAsync(options); + AsyncPageable queryResponse = serviceClient.Query.CreateJobsQueryAsync(options); IAsyncEnumerator> enumerator = queryResponse.AsPages().GetAsyncEnumerator(); await enumerator.MoveNextAsync().ConfigureAwait(false); while (enumerator.Current.Values.Count < expectedCount) { + // If this is just called once, there is a chance that it both fails to schedule a job because the + // hub is at it's max quota for concurrent jobs but then those jobs finish and become unqueriable before + // this function can query them. To avoid this, keep trying to schedule jobs until at least one is queriable. + await ScheduleJobToBeQueriedAsync(serviceClient.ScheduledJobs, deviceId).ConfigureAwait(false); + cancellationTokenSource.Token.IsCancellationRequested.Should().BeFalse("timed out waiting for the devices to become queryable"); await Task.Delay(100).ConfigureAwait(false); - queryResponse = queryClient.CreateJobsQueryAsync(options); + queryResponse = serviceClient.Query.CreateJobsQueryAsync(options); enumerator = queryResponse.AsPages().GetAsyncEnumerator(); await enumerator.MoveNextAsync().ConfigureAwait(false); } From 8fdf56f045930e1dc014752a61309504a93fee6b Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Wed, 15 Mar 2023 16:07:32 -0700 Subject: [PATCH 16/19] Revert "asdf" This reverts commit ce9e37f4ecd22bb5b76bdff486a676f7cb4b48cf. --- .../iothub/service/QueryClientE2ETests.cs | 29 +++++++------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/e2e/test/iothub/service/QueryClientE2ETests.cs b/e2e/test/iothub/service/QueryClientE2ETests.cs index c613ede9da..1fbaece9e0 100644 --- a/e2e/test/iothub/service/QueryClientE2ETests.cs +++ b/e2e/test/iothub/service/QueryClientE2ETests.cs @@ -207,9 +207,11 @@ public async Task JobQuery_QueryWorks() IotHubServiceClient serviceClient = TestDevice.ServiceClient; await using TestDevice testDevice = await TestDevice.GetTestDeviceAsync(_idPrefix).ConfigureAwait(false); + await ScheduleJobToBeQueriedAsync(serviceClient.ScheduledJobs, testDevice.Id).ConfigureAwait(false); + string query = "SELECT * FROM devices.jobs"; - await WaitForJobToBeQueryableAsync(serviceClient, testDevice.Id, query, 1).ConfigureAwait(false); + await WaitForJobToBeQueryableAsync(serviceClient.Query, query, 1).ConfigureAwait(false); AsyncPageable queryResponse = serviceClient.Query.CreateAsync(query); IAsyncEnumerator enumerator = queryResponse.GetAsyncEnumerator(); @@ -237,7 +239,8 @@ public async Task JobQuery_QueryByTypeWorks() var serviceClient = TestDevice.ServiceClient; await using TestDevice testDevice = await TestDevice.GetTestDeviceAsync(_idPrefix).ConfigureAwait(false); - await WaitForJobToBeQueryableAsync(serviceClient, testDevice.Id, 1, null, null).ConfigureAwait(false); + await ScheduleJobToBeQueriedAsync(serviceClient.ScheduledJobs, testDevice.Id).ConfigureAwait(false); + await WaitForJobToBeQueryableAsync(serviceClient.Query, 1, null, null).ConfigureAwait(false); AsyncPageable queryResponse = serviceClient.Query.CreateJobsQueryAsync(); IAsyncEnumerator enumerator = queryResponse.GetAsyncEnumerator(); @@ -289,30 +292,25 @@ private async Task WaitForDevicesToBeQueryableAsync(QueryClient queryClient, str } } - private async Task WaitForJobToBeQueryableAsync(IotHubServiceClient serviceClient, string deviceId, string query, int expectedCount) + private async Task WaitForJobToBeQueryableAsync(QueryClient queryClient, string query, int expectedCount) { // There is some latency between the creation of the test devices and when they are queryable, // so keep executing the query until both devices are returned in the results or until a timeout. using var cancellationTokenSource = new CancellationTokenSource(_queryableDelayTimeout); - AsyncPageable queryResponse = serviceClient.Query.CreateAsync(query); + AsyncPageable queryResponse = queryClient.CreateAsync(query); IAsyncEnumerator> enumerator = queryResponse.AsPages().GetAsyncEnumerator(); await enumerator.MoveNextAsync().ConfigureAwait(false); while (enumerator.Current.Values.Count < expectedCount) { - // If this is just called once, there is a chance that it both fails to schedule a job because the - // hub is at it's max quota for concurrent jobs but then those jobs finish and become unqueriable before - // this function can query them. To avoid this, keep trying to schedule jobs until at least one is queriable. - await ScheduleJobToBeQueriedAsync(serviceClient.ScheduledJobs, deviceId).ConfigureAwait(false); - cancellationTokenSource.Token.IsCancellationRequested.Should().BeFalse("timed out waiting for the devices to become queryable"); await Task.Delay(100).ConfigureAwait(false); - queryResponse = serviceClient.Query.CreateAsync(query); + queryResponse = queryClient.CreateAsync(query); enumerator = queryResponse.AsPages().GetAsyncEnumerator(); await enumerator.MoveNextAsync().ConfigureAwait(false); } } - private async Task WaitForJobToBeQueryableAsync(IotHubServiceClient serviceClient, string deviceId, int expectedCount, JobType? jobType = null, JobStatus? status = null) + private async Task WaitForJobToBeQueryableAsync(QueryClient queryClient, int expectedCount, JobType? jobType = null, JobStatus? status = null) { // There is some latency between the creation of the test devices and when they are queryable, // so keep executing the query until both devices are returned in the results or until a timeout. @@ -323,19 +321,14 @@ private async Task WaitForJobToBeQueryableAsync(IotHubServiceClient serviceClien JobType = jobType, JobStatus = status, }; - AsyncPageable queryResponse = serviceClient.Query.CreateJobsQueryAsync(options); + AsyncPageable queryResponse = queryClient.CreateJobsQueryAsync(options); IAsyncEnumerator> enumerator = queryResponse.AsPages().GetAsyncEnumerator(); await enumerator.MoveNextAsync().ConfigureAwait(false); while (enumerator.Current.Values.Count < expectedCount) { - // If this is just called once, there is a chance that it both fails to schedule a job because the - // hub is at it's max quota for concurrent jobs but then those jobs finish and become unqueriable before - // this function can query them. To avoid this, keep trying to schedule jobs until at least one is queriable. - await ScheduleJobToBeQueriedAsync(serviceClient.ScheduledJobs, deviceId).ConfigureAwait(false); - cancellationTokenSource.Token.IsCancellationRequested.Should().BeFalse("timed out waiting for the devices to become queryable"); await Task.Delay(100).ConfigureAwait(false); - queryResponse = serviceClient.Query.CreateJobsQueryAsync(options); + queryResponse = queryClient.CreateJobsQueryAsync(options); enumerator = queryResponse.AsPages().GetAsyncEnumerator(); await enumerator.MoveNextAsync().ConfigureAwait(false); } From fa242aea44d9634b21bd7321398e4b14793226dc Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Mon, 20 Mar 2023 11:14:22 -0700 Subject: [PATCH 17/19] Update migration guide --- SDK v2 migration guide.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/SDK v2 migration guide.md b/SDK v2 migration guide.md index fb8e50a207..65016455eb 100644 --- a/SDK v2 migration guide.md +++ b/SDK v2 migration guide.md @@ -241,8 +241,9 @@ These span across all clients. #### Notable breaking changes - Operations that offer concurrency protection using `ETag`s, now take a parameter `onlyIfUnchanged` that relies on the ETag property of the submitted entity. -- `IotHubServiceClient.Query.CreateAsync(...)` is now async. - - Call `QueryResponse.MoveNextAsync()` in a loop (end when it returns `false`) and access `QueryResponse.Current`. +- `IotHubServiceClient.Query.CreateAsync(...)` now returns an `AsyncPageable` of the queried results. + - Iterate on entries by using `await foreach (ClientTwin queriedTwin in client.Query.CreateAsync(queryText))`. + - Iterate on pages of entries by using `await foreach (Page queriedTwinPage in _client.Query.CreateAsync(queryText).AsPages())` - `JobProperties` properties that hold Azure Storage SAS URIs are now of type `System.Uri` instead of `string`. - `JobProperties` has been split into several classes with only the necessary properties for the specified operation. - See `ExportJobProperties`, `ImportJobProperties`, and `IotHubJobResponse`. @@ -335,7 +336,7 @@ These span across all clients. #### Notable breaking changes - `JobClient.ScheduleTwinUpdateAsync(...)` previously returned a `JobResponse`, now returns `ScheduledJob`. -- `ScheduleJobs.GetAsync()` return type has changed to `QueryResponse` from `IEnumerable`. +- `ScheduleJobs.GetAsync()` return type has changed to `AsyncPageable` from `IEnumerable`. | v1 API | Equivalent v2 API | Notes | |:---|:---|:---| From 4658c6f6a0854621ecb13afc9c26d4d09f3d79e8 Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Mon, 20 Mar 2023 15:01:51 -0700 Subject: [PATCH 18/19] fixup --- e2e/test/iothub/service/QueryClientE2ETests.cs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/e2e/test/iothub/service/QueryClientE2ETests.cs b/e2e/test/iothub/service/QueryClientE2ETests.cs index ea5fa72da5..3838544800 100644 --- a/e2e/test/iothub/service/QueryClientE2ETests.cs +++ b/e2e/test/iothub/service/QueryClientE2ETests.cs @@ -29,13 +29,6 @@ public class QueryClientE2ETests : E2EMsTestBase // timeout is for how long to wait for this latency before failing the test. private readonly TimeSpan _queryableDelayTimeout = TimeSpan.FromMinutes(1); - private static readonly IIotHubServiceRetryPolicy s_scheduleJobRetryPolicy = new HubServiceTestRetryPolicy( - new() - { - IotHubServiceErrorCode.ThrottlingException, - IotHubServiceErrorCode.ThrottlingBacklogTimeout, - }); - [TestMethod] [Timeout(TestTimeoutMilliseconds)] public async Task TwinQuery_Works() From 8a89868ddc755273d57743ed0e34d33ef4e3dbe2 Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Mon, 20 Mar 2023 15:10:35 -0700 Subject: [PATCH 19/19] Simplify --- .../iothub/service/QueryClientE2ETests.cs | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/e2e/test/iothub/service/QueryClientE2ETests.cs b/e2e/test/iothub/service/QueryClientE2ETests.cs index 3838544800..b6cf2ae9cf 100644 --- a/e2e/test/iothub/service/QueryClientE2ETests.cs +++ b/e2e/test/iothub/service/QueryClientE2ETests.cs @@ -271,14 +271,12 @@ private async Task WaitForDevicesToBeQueryableAsync(QueryClient queryClient, str // so keep executing the query until both devices are returned in the results or until a timeout. using var cancellationTokenSource = new CancellationTokenSource(_queryableDelayTimeout); CancellationToken cancellationToken = cancellationTokenSource.Token; - AsyncPageable queryResponse = queryClient.CreateAsync(query); - IAsyncEnumerator> enumerator = queryResponse.AsPages().GetAsyncEnumerator(); + IAsyncEnumerator> enumerator = queryClient.CreateAsync(query).AsPages().GetAsyncEnumerator(); await enumerator.MoveNextAsync(); while (enumerator.Current.Values.Count < expectedCount) { await Task.Delay(100).ConfigureAwait(false); - queryResponse = queryClient.CreateAsync(query); - enumerator = queryResponse.AsPages().GetAsyncEnumerator(); + enumerator = queryClient.CreateAsync(query).AsPages().GetAsyncEnumerator(); await enumerator.MoveNextAsync().ConfigureAwait(false); cancellationToken.ThrowIfCancellationRequested(); // timed out waiting for the devices to become queryable } @@ -289,15 +287,14 @@ private async Task WaitForJobToBeQueryableAsync(QueryClient queryClient, string // There is some latency between the creation of the test devices and when they are queryable, // so keep executing the query until both devices are returned in the results or until a timeout. using var cancellationTokenSource = new CancellationTokenSource(_queryableDelayTimeout); - IAsyncEnumerable> queryResponse; IAsyncEnumerator> enumerator; do { await Task.Delay(100).ConfigureAwait(false); cancellationTokenSource.Token.IsCancellationRequested.Should().BeFalse("timed out waiting for the devices to become queryable"); - queryResponse = queryClient.CreateAsync(query).AsPages(); - enumerator = queryResponse.GetAsyncEnumerator(); - } while (await enumerator.MoveNextAsync().ConfigureAwait(false) && enumerator.Current.Values.Count < expectedCount); + enumerator = queryClient.CreateAsync(query).AsPages().GetAsyncEnumerator(); + await enumerator.MoveNextAsync().ConfigureAwait(false); + } while (enumerator.Current.Values.Count < expectedCount); } private async Task WaitForJobToBeQueryableAsync(QueryClient queryClient, int expectedCount, JobType? jobType = null, JobStatus? status = null) @@ -311,15 +308,13 @@ private async Task WaitForJobToBeQueryableAsync(QueryClient queryClient, int exp JobType = jobType, JobStatus = status, }; - AsyncPageable queryResponse = queryClient.CreateJobsQueryAsync(options); - IAsyncEnumerator> enumerator = queryResponse.AsPages().GetAsyncEnumerator(); + IAsyncEnumerator> enumerator = queryClient.CreateJobsQueryAsync(options).AsPages().GetAsyncEnumerator(); await enumerator.MoveNextAsync().ConfigureAwait(false); while (enumerator.Current.Values.Count < expectedCount) { cancellationTokenSource.Token.IsCancellationRequested.Should().BeFalse("timed out waiting for the devices to become queryable"); await Task.Delay(100).ConfigureAwait(false); - queryResponse = queryClient.CreateJobsQueryAsync(options); - enumerator = queryResponse.AsPages().GetAsyncEnumerator(); + enumerator = queryClient.CreateJobsQueryAsync(options).AsPages().GetAsyncEnumerator(); await enumerator.MoveNextAsync().ConfigureAwait(false); } }