Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Utilize AsyncPageable for query return types #3165

Merged
merged 20 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions SDK v2 migration guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,9 @@ These span across all service clients.
#### Notable breaking changes

- Operations that offer concurrency protection using `ETag`s, now take a parameter `onlyIfUnchanged` that relies on the ETag property of the submitted entity.
- `IotHubServiceClient.Query.CreateAsync<T>(...)` is now async.
- Call `QueryResponse<T>.MoveNextAsync()` in a loop (end when it returns `false`) and access `QueryResponse<T>.Current`.
- `IotHubServiceClient.Query.CreateAsync<T>(...)` now returns an `AsyncPageable` of the queried results.
- Iterate on entries by using `await foreach (ClientTwin queriedTwin in client.Query.CreateAsync<ClientTwin>(queryText))`.
- Iterate on pages of entries by using `await foreach (Page<ClientTwin> queriedTwinPage in _client.Query.CreateAsync<ClientTwin>(queryText).AsPages())`
- `JobProperties` properties that hold Azure Storage SAS URIs are now of type `System.Uri` instead of `string`.
- `JobProperties` has been split into several classes with only the necessary properties for the specified operation.
- See `ExportJobProperties`, `ImportJobProperties`, and `IotHubJobResponse`.
Expand Down Expand Up @@ -378,7 +379,7 @@ These span across all service clients.

#### Notable breaking changes
- `JobClient.ScheduleTwinUpdateAsync(...)` previously returned a `JobResponse`, now returns `ScheduledJob`.
- `ScheduleJobs.GetAsync()` return type has changed to `QueryResponse<ScheduledJob>` from `IEnumerable<JobResponse>`.
- `ScheduleJobs.GetAsync()` return type has changed to `AsyncPageable<ScheduledJob>` from `IEnumerable<JobResponse>`.

| v1 API | Equivalent v2 API | Notes |
|:---|:---|:---|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Net;
using System.Security.Authentication;
using System.Threading.Tasks;
using Azure;
using FluentAssertions;
using Microsoft.Azure.Devices.Client;
using Microsoft.VisualStudio.TestTools.UnitTesting;
Expand All @@ -23,7 +24,7 @@ public async Task ServiceClient_QueryDevicesInvalidServiceCertificateHttp_Fails(
using var sc = new IotHubServiceClient(TestConfiguration.IotHub.ConnectionStringInvalidServiceCertificate);

// act
Func<Task> act = async () => await sc.Query.CreateAsync<ClientTwin>("select * from devices").ConfigureAwait(false);
Func<Task> act = async () => await sc.Query.CreateAsync<ClientTwin>("select * from devices").GetAsyncEnumerator().MoveNextAsync().ConfigureAwait(false);

// assert
var error = await act.Should().ThrowAsync<IotHubServiceException>();
Expand Down
162 changes: 84 additions & 78 deletions e2e/test/iothub/service/QueryClientE2ETests.cs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.Text.Json;
using System.Threading.Tasks;
using Azure;

namespace Microsoft.Azure.Devices.Samples.JobsSample
{
Expand Down Expand Up @@ -67,11 +68,9 @@ public async Task RunSampleAsync()
}

// *************************************** Get all Jobs ***************************************
QueryResponse<ScheduledJob> queryResults = await _jobClient.ScheduledJobs.CreateQueryAsync();
AsyncPageable<ScheduledJob> queryResults = _jobClient.ScheduledJobs.CreateQueryAsync();

IEnumerable<ScheduledJob> getJobs = queryResults.CurrentPage;

foreach (ScheduledJob job in getJobs)
await foreach (ScheduledJob job in queryResults)
{
Console.WriteLine(JsonSerializer.Serialize(job, new JsonSerializerOptions { WriteIndented = true }));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Azure;
using Azure.Storage.Blobs;
using Azure.Storage.Sas;
using Newtonsoft.Json;
Expand Down Expand Up @@ -173,14 +174,15 @@ private async Task<int> PrintDeviceCountAsync()
try
{
string countSqlQuery = "select count() AS numberOfDevices from devices";
QueryResponse<Dictionary<string, int>> countQuery = await _hubClient.Query.CreateAsync<Dictionary<string, int>>(countSqlQuery);
if (!await countQuery.MoveNextAsync())
AsyncPageable<Dictionary<string, int>> countQuery = _hubClient.Query.CreateAsync<Dictionary<string, int>>(countSqlQuery);
IAsyncEnumerator<Dictionary<string, int>> enumerator = countQuery.GetAsyncEnumerator();
if (!await enumerator.MoveNextAsync())
{
Console.WriteLine($"Failed to run device count query.");
return 0;
}

if (!countQuery.Current.TryGetValue("numberOfDevices", out deviceCount))
if (!enumerator.Current.TryGetValue("numberOfDevices", out deviceCount))
{
Console.WriteLine($"Failed to get device count from query result.");
return 0;
Expand Down Expand Up @@ -224,11 +226,13 @@ private async Task<IReadOnlyList<ExportImportDevice>> GetDeviceIdsToDeleteAsync(
}
string queryText = queryTextSb.ToString();
Console.WriteLine($"Using query: {queryText}");
var options = new QueryOptions { PageSize = 1000 };
QueryResponse<DeviceQueryResult> devicesQuery = await _hubClient.Query.CreateAsync<DeviceQueryResult>(queryText, options);
while (await devicesQuery.MoveNextAsync())
AsyncPageable<DeviceQueryResult> devicesQuery = _hubClient.Query.CreateAsync<DeviceQueryResult>(queryText);
await foreach (Page<DeviceQueryResult> page in devicesQuery.AsPages(null, 1000))
{
devicesToDelete.Add(new ExportImportDevice(new Device(devicesQuery.Current.DeviceId), ImportMode.Delete));
foreach (DeviceQueryResult queryResult in page.Values)
{
devicesToDelete.Add(new ExportImportDevice(new Device(queryResult.DeviceId), ImportMode.Delete));
}
}

return devicesToDelete;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure;

namespace Microsoft.Azure.Devices.Samples
{
Expand Down Expand Up @@ -158,20 +159,19 @@ private async Task EnumerateTwinsAsync()
string queryText = $"SELECT * FROM devices WHERE STARTSWITH(id, '{_parameters.DevicePrefix}')";
Console.WriteLine($"Using query text of: {queryText}");

QueryResponse<ClientTwin> query = await _client.Query.CreateAsync<ClientTwin>(queryText);
AsyncPageable<ClientTwin> query = _client.Query.CreateAsync<ClientTwin>(queryText);

while (await query.MoveNextAsync())
await foreach (ClientTwin queriedTwin in query)
{
ClientTwin twin = query.Current;
Console.WriteLine($"{twin.DeviceId}");
Console.WriteLine($"\tIs edge: {twin.Capabilities.IsIotEdge}");
if (!string.IsNullOrWhiteSpace(twin.DeviceScope))
Console.WriteLine($"{queriedTwin.DeviceId}");
Console.WriteLine($"\tIs edge: {queriedTwin.Capabilities.IsIotEdge}");
if (!string.IsNullOrWhiteSpace(queriedTwin.DeviceScope))
{
Console.WriteLine($"\tDevice scope: {twin.DeviceScope}");
Console.WriteLine($"\tDevice scope: {queriedTwin.DeviceScope}");
}
if (twin.ParentScopes?.Any() ?? false)
if (queriedTwin.ParentScopes?.Any() ?? false)
{
Console.WriteLine($"\tParent scope: {twin.ParentScopes[0]}");
Console.WriteLine($"\tParent scope: {queriedTwin.ParentScopes[0]}");
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion iothub/service/src/Jobs/Models/JobQueryOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace Microsoft.Azure.Devices
/// <summary>
/// Specifies the options associated with job queries.
/// </summary>
public class JobQueryOptions : QueryOptions
public class JobQueryOptions
{
/// <summary>
/// The job type to query.
Expand Down
3 changes: 2 additions & 1 deletion iothub/service/src/Jobs/ScheduledJobsClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Azure;

namespace Microsoft.Azure.Devices
{
Expand Down Expand Up @@ -121,7 +122,7 @@ await _internalRetryHandler
/// For a complete list of possible error cases, see <see cref="IotHubServiceErrorCode"/>.
/// </exception>
/// <exception cref="OperationCanceledException">If the provided <paramref name="cancellationToken"/> has requested cancellation.</exception>
public virtual Task<QueryResponse<ScheduledJob>> CreateQueryAsync(JobQueryOptions options = null, CancellationToken cancellationToken = default)
public virtual AsyncPageable<ScheduledJob> CreateQueryAsync(JobQueryOptions options = null, CancellationToken cancellationToken = default)
{
return _queryClient.CreateJobsQueryAsync(options, cancellationToken);
}
Expand Down
4 changes: 2 additions & 2 deletions iothub/service/src/Query/Models/QueriedPage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ internal sealed class QueriedPage<T>
// in an async function.
internal QueriedPage(HttpResponseMessage response, string payload)
{
Items = JsonConvert.DeserializeObject<IEnumerable<T>>(payload);
Items = JsonConvert.DeserializeObject<IReadOnlyList<T>>(payload);
ContinuationToken = response.Headers.SafeGetValue(ContinuationTokenHeader);
}

[JsonProperty("items")]
internal IEnumerable<T> Items { get; set; }
internal IReadOnlyList<T> Items { get; set; }

[JsonProperty("continuationToken")]
internal string ContinuationToken { get; set; }
Expand Down
60 changes: 60 additions & 0 deletions iothub/service/src/Query/PageableHelpers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Azure;

namespace Microsoft.Azure.Devices
{
/// <summary>
/// Copy of a subset of the helper functions defined in the Azure.Core class by the same name:
/// https://github.com/Azure/autorest.csharp/blob/main/src/assets/Generator.Shared/PageableHelpers.cs
/// </summary>
internal static class PageableHelpers
{
internal static AsyncPageable<T> CreateAsyncEnumerable<T>(Func<int?, Task<Page<T>>> firstPageFunc, Func<string, int?, Task<Page<T>>> nextPageFunc, int? pageSize = default) where T : notnull
{
AsyncPageFunc<T> first = (continuationToken, pageSizeHint) => firstPageFunc(pageSizeHint);
AsyncPageFunc<T> next = nextPageFunc != null ? new AsyncPageFunc<T>(nextPageFunc) : null;
return new FuncAsyncPageable<T>(first, next, pageSize);
}

internal delegate Task<Page<T>> AsyncPageFunc<T>(string continuationToken = default, int? pageSizeHint = default);
internal delegate Page<T> PageFunc<T>(string continuationToken = default, int? pageSizeHint = default);

internal class FuncAsyncPageable<T> : AsyncPageable<T> where T : notnull
{
private readonly AsyncPageFunc<T> _firstPageFunc;
private readonly AsyncPageFunc<T> _nextPageFunc;
private readonly int? _defaultPageSize;

internal FuncAsyncPageable(AsyncPageFunc<T> firstPageFunc, AsyncPageFunc<T> nextPageFunc, int? defaultPageSize = default)
{
_firstPageFunc = firstPageFunc;
_nextPageFunc = nextPageFunc;
_defaultPageSize = defaultPageSize;
}

public override async IAsyncEnumerable<Page<T>> AsPages(string continuationToken = default, int? pageSizeHint = default)
{
AsyncPageFunc<T> pageFunc = string.IsNullOrEmpty(continuationToken) ? _firstPageFunc : _nextPageFunc;

if (pageFunc == null)
{
yield break;
}

int? pageSize = pageSizeHint ?? _defaultPageSize;
do
{
Page<T> pageResponse = await pageFunc(continuationToken, pageSize).ConfigureAwait(false);
yield return pageResponse;
continuationToken = pageResponse.ContinuationToken;
pageFunc = _nextPageFunc;
} while (!string.IsNullOrEmpty(continuationToken) && pageFunc != null);
}
}
}
}
Loading