Skip to content

Commit

Permalink
Utilize AsyncPageable for DPS query return types (#3176)
Browse files Browse the repository at this point in the history
see #3165, but for the DPS service client this time.

This PR also removes some superfluous classes like ```ContractApiResponse``` since they were needless abstractions that made it harder to implement this change

I also change all the ```CreateAsync``` Query APIs to just ```Create``` since the function itself is not async. The first service request isn't made until the iteration begins.

I also removed the ```IContractApiHttp``` interface since it only had one implementation and we don't currently allow users to override this implementation in any way.
  • Loading branch information
timtay-microsoft authored Mar 23, 2023
1 parent 590f4f9 commit 99c559c
Show file tree
Hide file tree
Showing 36 changed files with 913 additions and 1,010 deletions.
24 changes: 18 additions & 6 deletions SDK v2 migration guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,9 @@ These span across all service clients.
#### Notable breaking changes

- Operations that offer concurrency protection using `ETag`s, now take a parameter `onlyIfUnchanged` that relies on the ETag property of the submitted entity.
- `IotHubServiceClient.Query.CreateAsync<T>(...)` now returns an `AsyncPageable` of the queried results.
- Iterate on entries by using `await foreach (ClientTwin queriedTwin in client.Query.CreateAsync<ClientTwin>(queryText))`.
- Iterate on pages of entries by using `await foreach (Page<ClientTwin> queriedTwinPage in _client.Query.CreateAsync<ClientTwin>(queryText).AsPages())`
- `IotHubServiceClient.Query.Create<T>(...)` now returns an `AsyncPageable` of the queried results.
- Iterate on entries by using `await foreach (ClientTwin queriedTwin in client.Query.Create<ClientTwin>(queryText))`.
- Iterate on pages of entries by using `await foreach (Page<ClientTwin> queriedTwinPage in _client.Query.Create<ClientTwin>(queryText).AsPages())`.
- `JobProperties` properties that hold Azure Storage SAS URIs are now of type `System.Uri` instead of `string`.
- `JobProperties` has been split into several classes with only the necessary properties for the specified operation.
- See `ExportJobProperties`, `ImportJobProperties`, and `IotHubJobResponse`.
Expand Down Expand Up @@ -326,7 +326,7 @@ These span across all service clients.
| `DeviceConnectionState` | `ClientConnectionState` | See² |
| `DeviceStatus` | `ClientStatus` | See² |
| `DeviceCapabilities` | `ClientCapabilities` | See² |
| `RegistryManager.CreateQuery(...)` | `IotHubServiceClient.Query.CreateAsync<T>(...)` | |
| `RegistryManager.CreateQuery(...)` | `IotHubServiceClient.Query.Create<T>(...)` | |
| `RegistryManager.AddConfigurationAsync(...)` | `IotHubServiceClient.Configurations.CreateAsync(...)` | |
| `RegistryManager.GetConfigurationsAsync(int maxCount)`| `IotHubServiceClient.Configurations.GetAsync(int maxCount)` | |
| `RegistryManager.RemoveConfigurationAsync(...)` | `IotHubServiceClient.Configurations.DeleteAsync(...)` | |
Expand Down Expand Up @@ -385,7 +385,7 @@ These span across all service clients.
|:---|:---|:---|
| `JobsClient` | `IotHubServiceClient`, subclients `ScheduledJobs` | |
| `JobClient.GetJobAsync(...)` | `IotHubServiceClient.ScheduledJobs.GetAsync(...)` | |
| `JobClient.CreateQuery()` | `IotHubServiceClient.ScheduledJobs.CreateQueryAsync()` | |
| `JobClient.CreateQuery()` | `IotHubServiceClient.ScheduledJobs.CreateQuery()` | |
| `JobsClient.ScheduleTwinUpdateAsync(...)` | `IotHubServiceClient.ScheduledJobs.ScheduledTwinUpdateAsync(...)` | |
| `JobType.ExportDevices` | `JobType.Export` | Matches the actual value expected by the service.¹ |
| `JobType.ImportDevices` | `JobType.Import` | See¹ |
Expand Down Expand Up @@ -449,7 +449,15 @@ These span across all service clients.

#### Notable breaking changes

- Query methods (like for individual and group enrollments) now take a query string (and optionally a page size parameter), and the `Query` result no longer requires disposing.
- `ProvisioningServiceClient.DeviceRegistrationStates.CreateEnrollmentGroupQuery(...)` now returns an `AsyncPageable` of the queried results.
- Iterate on entries by using `await foreach (DeviceRegistrationState registrationState in client.DeviceRegistrationStates.CreateEnrollmentGroupQuery(queryText))`.
- Iterate on pages of entries by using `await foreach (Page<DeviceRegistrationState> registrationStatePage in client.DeviceRegistrationStates.CreateEnrollmentGroupQuery(queryText).AsPages())`.
- `ProvisioningServiceClient.IndividualEnrollments.CreateQuery(...)` now returns an `AsyncPageable` of the queried results.
- Iterate on entries by using `await foreach (IndividualEnrollment enrollment in client.IndividualEnrollments.CreateQuery(queryText))`.
- Iterate on pages of entries by using `await foreach (Page<IndividualEnrollment> enrollmentsPage in client.IndividualEnrollments.CreateQuery(queryText).AsPages())`.
- `ProvisioningServiceClient.EnrollmentGroups.CreateQuery(...)` now returns an `AsyncPageable` of the queried results.
- Iterate on entries by using `await foreach (EnrollmentGroup enrollment in client.EnrollmentGroups.CreateQuery(queryText))`.
- Iterate on pages of entries by using `await foreach (Page<EnrollmentGroup> enrollmentsPage in client.EnrollmentGroups.CreateQuery(queryText).AsPages())`.
- ETag fields on the classes `IndividualEnrollment`, `EnrollmentGroup`, and `DeviceRegistrationState` are now taken as the `Azure.ETag` type instead of strings.
- Twin.Tags is now of type `IDictionary<string, object>`.
- `CustomAllocationDefinition.WebhookUri` is now of type `System.Uri` instead of `System.String`.
Expand Down Expand Up @@ -505,6 +513,10 @@ These span across all service clients.
| `X509CertificateInfo.SHA256Thumbprint` | `X509CertificateInfo.Sha256Thumbprint` | See³ |
| `ProvisioningServiceClientException` | `ProvisioningServiceException` | |
| `ProvisioningClientCapabilities.IotEdge` | `InitialClientCapabilities.IsIotEdge` | Boolean properties should start with a verb, usually "Is". |
| `Query` | Class removed | `AsyncPageable` type replaces this type and is returned by all query functions now |
| `QueryResult` | Class removed | `AsyncPageable` type replaces this type and is returned by all query functions now |
| `QueryResultType` | Class removed | The `AsyncPageable` returned by each Query API has a hardcoded type now (`IndividualEnrollment`, `EnrollmentGroup`, or `DeviceRegistrationState`) |


### Security provider client

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public async Task ServiceClient_QueryDevicesInvalidServiceCertificateHttp_Fails(
using var sc = new IotHubServiceClient(TestConfiguration.IotHub.ConnectionStringInvalidServiceCertificate);

// act
Func<Task> act = async () => await sc.Query.CreateAsync<ClientTwin>("select * from devices").GetAsyncEnumerator().MoveNextAsync().ConfigureAwait(false);
Func<Task> act = async () => await sc.Query.Create<ClientTwin>("select * from devices").GetAsyncEnumerator().MoveNextAsync().ConfigureAwait(false);

// assert
var error = await act.Should().ThrowAsync<IotHubServiceException>();
Expand Down
66 changes: 40 additions & 26 deletions e2e/test/iothub/service/QueryClientE2ETests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public async Task TwinQuery_Works()

await WaitForDevicesToBeQueryableAsync(serviceClient.Query, queryText, 2).ConfigureAwait(false);

AsyncPageable<ClientTwin> queryResponse = serviceClient.Query.CreateAsync<ClientTwin>(queryText);
AsyncPageable<ClientTwin> queryResponse = serviceClient.Query.Create<ClientTwin>(queryText);
IAsyncEnumerator<ClientTwin> enumerator = queryResponse.GetAsyncEnumerator();

// assert
Expand Down Expand Up @@ -77,8 +77,8 @@ public async Task TwinQuery_CustomPaginationWorks()
await WaitForDevicesToBeQueryableAsync(serviceClient.Query, queryText, 3).ConfigureAwait(false);

AsyncPageable<ClientTwin> queryResponse = serviceClient.Query.
CreateAsync<ClientTwin>(queryText);
IAsyncEnumerator<Page<ClientTwin>> enumerator = queryResponse.AsPages(null, 1).GetAsyncEnumerator();
Create<ClientTwin>(queryText);
await using IAsyncEnumerator<Page<ClientTwin>> enumerator = queryResponse.AsPages(null, 1).GetAsyncEnumerator();
(await enumerator.MoveNextAsync().ConfigureAwait(false)).Should().BeTrue("Should have at least one page of jobs.");

// assert
Expand All @@ -89,13 +89,13 @@ public async Task TwinQuery_CustomPaginationWorks()

// restart the query, but with a page size of 3 this time
queryResponse = serviceClient.Query.
CreateAsync<ClientTwin>(queryText);
enumerator = queryResponse.AsPages(null, 3).GetAsyncEnumerator();
Create<ClientTwin>(queryText);
await using IAsyncEnumerator<Page<ClientTwin>> nextEnumerator = queryResponse.AsPages(null, 3).GetAsyncEnumerator();

// consume the first page of results so the next MoveNextAsync gets a new page
(await enumerator.MoveNextAsync().ConfigureAwait(false)).Should().BeTrue("Should have at least one page of jobs.");
(await nextEnumerator.MoveNextAsync().ConfigureAwait(false)).Should().BeTrue("Should have at least one page of jobs.");

currentPage = enumerator.Current;
currentPage = nextEnumerator.Current;
currentPage.Values.Count.Should().Be(3);
IEnumerator<ClientTwin> pageContentsEnumerator = currentPage.Values.GetEnumerator();
pageContentsEnumerator.MoveNext().Should().BeTrue();
Expand All @@ -111,7 +111,7 @@ public async Task TwinQuery_CustomPaginationWorks()
ClientTwin thirdQueriedTwin = pageContentsEnumerator.Current;
thirdQueriedTwin.DeviceId.Should().BeOneOf(testDevice1.Id, testDevice2.Id, testDevice3.Id);

(await enumerator.MoveNextAsync().ConfigureAwait(false)).Should().BeFalse("After 3 query results in one page, there should not be a second page");
(await nextEnumerator.MoveNextAsync().ConfigureAwait(false)).Should().BeFalse("After 3 query results in one page, there should not be a second page");
}

[TestMethod]
Expand All @@ -131,21 +131,23 @@ public async Task TwinQuery_IterateByItemAcrossPages()

await WaitForDevicesToBeQueryableAsync(serviceClient.Query, queryText, 3).ConfigureAwait(false);

AsyncPageable<ClientTwin> twinQuery = serviceClient.Query.
CreateAsync<ClientTwin>(queryText);
// For this test, we want the query logic to have to fetch multiple pages of results. To force
// that, set the page size to 1 when there are 3 total results to be queried.
IAsyncEnumerable<Page<ClientTwin>> twinPages = serviceClient.Query.
Create<ClientTwin>(queryText)
.AsPages(null, 1);

// assert

// For this test, we want the query logic to have to fetch multiple pages of results. To force
// that, set the page size to 1 when there are 3 total results to be queried.
IAsyncEnumerable<Page<ClientTwin>> twinPages = twinQuery.AsPages(null, 1);
var returnedTwinDeviceIds = new List<string>();
await foreach (Page<ClientTwin> queriedTwinPage in twinPages)
{
foreach (ClientTwin queriedTwin in queriedTwinPage.Values)
{
returnedTwinDeviceIds.Add(queriedTwin.DeviceId);
}

queriedTwinPage.GetRawResponse().Dispose();
}

var expectedDeviceIds = new List<string>() { testDevice1.Id, testDevice2.Id, testDevice3.Id };
Expand All @@ -172,7 +174,7 @@ public async Task TwinQuery_IterateByItemWorksWithinPage()
await WaitForDevicesToBeQueryableAsync(serviceClient.Query, queryText, 3).ConfigureAwait(false);

AsyncPageable<ClientTwin> twinQuery = serviceClient.Query
.CreateAsync<ClientTwin>(queryText);
.Create<ClientTwin>(queryText);

// assert

Expand All @@ -186,6 +188,8 @@ public async Task TwinQuery_IterateByItemWorksWithinPage()
{
returnedTwinDeviceIds.Add(queriedTwin.DeviceId);
}

queriedTwinPage.GetRawResponse().Dispose();
}

var expectedDeviceIds = new List<string>() { testDevice1.Id, testDevice2.Id, testDevice3.Id };
Expand All @@ -205,7 +209,7 @@ public async Task JobQuery_QueryWorks()
string query = "SELECT * FROM devices.jobs";
await WaitForJobToBeQueryableAsync(serviceClient.Query, query, 1).ConfigureAwait(false);

AsyncPageable<ScheduledJob> queryResponse = serviceClient.Query.CreateAsync<ScheduledJob>(query);
AsyncPageable<ScheduledJob> queryResponse = serviceClient.Query.Create<ScheduledJob>(query);
IAsyncEnumerator<ScheduledJob> enumerator = queryResponse.GetAsyncEnumerator();
(await enumerator.MoveNextAsync().ConfigureAwait(false)).Should().BeTrue("Should have at least one page of jobs.");
ScheduledJob queriedJob = enumerator.Current;
Expand Down Expand Up @@ -234,7 +238,7 @@ public async Task JobQuery_QueryByTypeWorks()
await ScheduleJobToBeQueriedAsync(serviceClient.ScheduledJobs, testDevice.Id).ConfigureAwait(false);
await WaitForJobToBeQueryableAsync(serviceClient.Query, 1, null, null).ConfigureAwait(false);

AsyncPageable<ScheduledJob> queryResponse = serviceClient.Query.CreateJobsQueryAsync();
AsyncPageable<ScheduledJob> queryResponse = serviceClient.Query.CreateJobsQuery();
IAsyncEnumerator<ScheduledJob> enumerator = queryResponse.GetAsyncEnumerator();
(await enumerator.MoveNextAsync().ConfigureAwait(false)).Should().BeTrue("Should have at least one page of jobs.");
ScheduledJob queriedJob = enumerator.Current;
Expand All @@ -258,8 +262,8 @@ public async Task RawQuery_QueryWorks()

string query = "SELECT COUNT() as TotalNumberOfDevices FROM devices";

AsyncPageable<RawQuerySerializationClass> queryResponse = serviceClient.Query.CreateAsync<RawQuerySerializationClass>(query);
IAsyncEnumerator<RawQuerySerializationClass> enumerator = queryResponse.GetAsyncEnumerator();
AsyncPageable<RawQuerySerializationClass> queryResponse = serviceClient.Query.Create<RawQuerySerializationClass>(query);
await using IAsyncEnumerator<RawQuerySerializationClass> enumerator = queryResponse.GetAsyncEnumerator();
(await enumerator.MoveNextAsync().ConfigureAwait(false)).Should().BeTrue("Should have at least one page of jobs.");
RawQuerySerializationClass queriedJob = enumerator.Current;
queriedJob.TotalNumberOfDevices.Should().BeGreaterOrEqualTo(0);
Expand All @@ -271,30 +275,37 @@ private async Task WaitForDevicesToBeQueryableAsync(QueryClient queryClient, str
// so keep executing the query until both devices are returned in the results or until a timeout.
using var cancellationTokenSource = new CancellationTokenSource(_queryableDelayTimeout);
CancellationToken cancellationToken = cancellationTokenSource.Token;
IAsyncEnumerator<Page<ClientTwin>> enumerator = queryClient.CreateAsync<ClientTwin>(query).AsPages().GetAsyncEnumerator();
IAsyncEnumerator<Page<ClientTwin>> enumerator = queryClient.Create<ClientTwin>(query).AsPages().GetAsyncEnumerator();
await enumerator.MoveNextAsync();
while (enumerator.Current.Values.Count < expectedCount)
{
await Task.Delay(100).ConfigureAwait(false);
enumerator = queryClient.CreateAsync<ClientTwin>(query).AsPages().GetAsyncEnumerator();
await enumerator.DisposeAsync().ConfigureAwait(false);
enumerator = queryClient.Create<ClientTwin>(query).AsPages().GetAsyncEnumerator();
await enumerator.MoveNextAsync().ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested(); // timed out waiting for the devices to become queryable
}

await enumerator.DisposeAsync().ConfigureAwait(false);
}

private async Task WaitForJobToBeQueryableAsync(QueryClient queryClient, string query, int expectedCount)
{
// There is some latency between the creation of the test devices and when they are queryable,
// so keep executing the query until both devices are returned in the results or until a timeout.
using var cancellationTokenSource = new CancellationTokenSource(_queryableDelayTimeout);
IAsyncEnumerator<Page<ScheduledJob>> enumerator;
do
IAsyncEnumerator<Page<ScheduledJob>> enumerator = queryClient.Create<ScheduledJob>(query).AsPages().GetAsyncEnumerator();
await enumerator.MoveNextAsync().ConfigureAwait(false);
while (enumerator.Current.Values.Count < expectedCount)
{
await Task.Delay(100).ConfigureAwait(false);
cancellationTokenSource.Token.IsCancellationRequested.Should().BeFalse("timed out waiting for the devices to become queryable");
enumerator = queryClient.CreateAsync<ScheduledJob>(query).AsPages().GetAsyncEnumerator();
await enumerator.DisposeAsync().ConfigureAwait(false);
enumerator = queryClient.Create<ScheduledJob>(query).AsPages().GetAsyncEnumerator();
await enumerator.MoveNextAsync().ConfigureAwait(false);
} while (enumerator.Current.Values.Count < expectedCount);
}

await enumerator.DisposeAsync().ConfigureAwait(false);
}

private async Task WaitForJobToBeQueryableAsync(QueryClient queryClient, int expectedCount, JobType? jobType = null, JobStatus? status = null)
Expand All @@ -308,15 +319,18 @@ private async Task WaitForJobToBeQueryableAsync(QueryClient queryClient, int exp
JobType = jobType,
JobStatus = status,
};
IAsyncEnumerator<Page<ScheduledJob>> enumerator = queryClient.CreateJobsQueryAsync(options).AsPages().GetAsyncEnumerator();
IAsyncEnumerator<Page<ScheduledJob>> enumerator = queryClient.CreateJobsQuery(options).AsPages().GetAsyncEnumerator();
await enumerator.MoveNextAsync().ConfigureAwait(false);
while (enumerator.Current.Values.Count < expectedCount)
{
cancellationTokenSource.Token.IsCancellationRequested.Should().BeFalse("timed out waiting for the devices to become queryable");
await Task.Delay(100).ConfigureAwait(false);
enumerator = queryClient.CreateJobsQueryAsync(options).AsPages().GetAsyncEnumerator();
await enumerator.DisposeAsync().ConfigureAwait(false);
enumerator = queryClient.CreateJobsQuery(options).AsPages().GetAsyncEnumerator();
await enumerator.MoveNextAsync().ConfigureAwait(false);
}

await enumerator.DisposeAsync().ConfigureAwait(false);
}

private static async Task<string> ScheduleJobToBeQueriedAsync(ScheduledJobsClient jobsClient, string deviceId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Threading.Tasks;
using Azure;
using FluentAssertions;
using Microsoft.Azure.Devices.E2ETests.Helpers;
using Microsoft.Azure.Devices.Provisioning.Client;
Expand Down Expand Up @@ -37,10 +38,10 @@ public async Task ProvisioningServiceClient_QueryInvalidServiceCertificateHttp_F
{
// arrange
using var provisioningServiceClient = new ProvisioningServiceClient(TestConfiguration.Provisioning.ConnectionStringInvalidServiceCertificate);
Query q = provisioningServiceClient.EnrollmentGroups.CreateQuery("SELECT * FROM enrollmentGroups");
AsyncPageable<EnrollmentGroup> q = provisioningServiceClient.EnrollmentGroups.CreateQuery("SELECT * FROM enrollmentGroups");

// act
Func<Task> act = async () => await q.NextAsync();
Func<Task> act = async () => await q.GetAsyncEnumerator().MoveNextAsync();

// assert
var error = await act.Should().ThrowAsync<ProvisioningServiceException>().ConfigureAwait(false);
Expand Down
Loading

0 comments on commit 99c559c

Please sign in to comment.