Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query: Adds ODE continuation token support for non-ODE pipelines #4009

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ internal static class CosmosQueryExecutionContextFactory
{
private const string InternalPartitionKeyDefinitionProperty = "x-ms-query-partitionkey-definition";
private const string OptimisticDirectExecution = "OptimisticDirectExecution";
private const string OptimisticDirectExecutionToken = "OptimisticDirectExecutionToken";
akotalwar marked this conversation as resolved.
Show resolved Hide resolved
private const string Passthrough = "Passthrough";
private const string Specialized = "Specialized";
private const int PageSizeFactorForTop = 5;
Expand Down Expand Up @@ -307,6 +308,19 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateFromPartitione
}
else
{
if (inputParameters.InitialUserContinuationToken != null)
{
CosmosObject cosmosObjectContinuationToken = inputParameters.InitialUserContinuationToken as CosmosObject;
if (cosmosObjectContinuationToken.ContainsKey(OptimisticDirectExecutionToken))
akotalwar marked this conversation as resolved.
Show resolved Hide resolved
{
return TryCatch<IQueryPipelineStage>.FromException(
new MalformedContinuationTokenException(
$"Operation cannot be resumed with the following token, as it requires the Optimistic Direct Execution pipeline which has been disabled on your SDK. " +
$"Enable Optimistic Direct Execution from QueryRequestOptions to avoid this issue." +
akotalwar marked this conversation as resolved.
Show resolved Hide resolved
$"{inputParameters.InitialUserContinuationToken}."));
}
}

if (createPassthroughQuery)
{
SetTestInjectionPipelineType(inputParameters, Passthrough);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
using Cosmos.Scripts;
using Microsoft.Azure.Cosmos.Fluent;
using Microsoft.Azure.Cosmos.Linq;
using Microsoft.Azure.Cosmos.Query.Core;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents.Collections;
using Microsoft.VisualStudio.TestTools.UnitTesting;
Expand Down Expand Up @@ -786,6 +785,61 @@ public async Task QueryActivityIdWithContinuationTokenAndTraceTest()

}

[TestMethod]
public async Task TesOdeTokenCompatibilityWithNonOdePipeline()
{
using (ITrace rootTrace = Trace.GetRootTrace("Root Trace"))
{
ResponseMessage responseMessage = null;
akotalwar marked this conversation as resolved.
Show resolved Hide resolved
string query = "select top 200 * from c";
string expectedErrorMessage = "Operation cannot be resumed with the following token, as it requires Optimistic Direct Execution pipeline, which has been disabled for your SDK";

CosmosClient client = DirectCosmosClient;
Container container = client.GetContainer(DatabaseId, ContainerId);

// Create items
for (int i = 0; i < 500; i++)
{
await container.CreateItemAsync<ToDoActivity>(ToDoActivity.CreateRandomToDoActivity());
}

QueryRequestOptions queryRequestOptions = new QueryRequestOptions
{
MaxItemCount = 50,
EnableOptimisticDirectExecution = true
};

FeedIteratorInternal feedIterator =
akotalwar marked this conversation as resolved.
Show resolved Hide resolved
(FeedIteratorInternal)container.GetItemQueryStreamIterator(
query,
null,
queryRequestOptions);

responseMessage = await feedIterator.ReadNextAsync(rootTrace, CancellationToken.None);
string continuationToken = responseMessage.ContinuationToken;

QueryRequestOptions newQueryRequestOptions = new QueryRequestOptions
{
MaxItemCount = 50,
EnableOptimisticDirectExecution = false
};

// use Continuation Token to create new iterator and use same trace
FeedIteratorInternal feedIteratorNew =
(FeedIteratorInternal)container.GetItemQueryStreamIterator(
query,
continuationToken,
newQueryRequestOptions);

while (feedIteratorNew.HasMoreResults)
{
responseMessage = await feedIteratorNew.ReadNextAsync(rootTrace, CancellationToken.None);
}

Assert.IsTrue(responseMessage.CosmosException.ToString().Contains(expectedErrorMessage));
}
}

private class CustomHandler : RequestHandler
{
string correlatedActivityId;
Expand Down