Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixChangeFeedHangWhenUsingStaleContainerRid #43729

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package com.azure.cosmos;

import com.azure.cosmos.implementation.DocumentCollection;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.RetryAnalyzer;
import com.azure.cosmos.implementation.Utils;
Expand All @@ -18,6 +19,7 @@
import com.azure.cosmos.implementation.guava25.collect.Multimap;
import com.azure.cosmos.implementation.query.CompositeContinuationToken;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.implementation.throughputControl.TestItem;
import com.azure.cosmos.models.ChangeFeedPolicy;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.CosmosContainerProperties;
Expand Down Expand Up @@ -147,6 +149,15 @@ public static Object[][] changeFeedSplitHandlingDataProvider() {
};
}

@DataProvider(name = "changeFeedWithStaleContainerRidDataProvider")
public static Object[][] changeFeedWithStaleContainerRidDataProvider() {
return new Object[][]{
// re-created container RU, multi-partition container
{ 400, false },
{ 10100, true }
};
}

@Factory(dataProvider = "simpleClientBuildersWithDirect")
public CosmosContainerChangeFeedTest(CosmosClientBuilder clientBuilder) {
super(clientBuilder);
Expand Down Expand Up @@ -1029,6 +1040,161 @@ public void changeFeedQueryCompleteAfterAvailableNow(
}
}

@Test(groups = { "emulator" }, dataProvider = "changeFeedWithStaleContainerRidDataProvider", timeOut = 4 * TIMEOUT)
public void changeFeedQueryWithStaleCollectionRidInContinuationToken(
int throughput,
boolean isMultiPartitionContainer
) throws InterruptedException {
// this test is to validate when using stale container rid in the continuationToken, query change feed will return BadRequestException and does not hang
String testContainerId = UUID.randomUUID().toString();

try {
CosmosContainerProperties containerProperties = new CosmosContainerProperties(testContainerId, "/mypk");
CosmosAsyncContainer testContainer =
createCollection(
this.createdAsyncDatabase,
containerProperties,
new CosmosContainerRequestOptions(),
400);

String testContainerRid = testContainer.read().block().getProperties().getResourceId();

// create items
for (int i = 0; i < 10; i++) {
testContainer.createItem(TestItem.createNewItem()).block();
}

// using query changeFeed
logger.info("Doing initial changeFeed query on the container " + testContainerId);
AtomicReference<String> continuationToken = new AtomicReference<>();
CosmosChangeFeedRequestOptions changeFeedRequestOptions =
CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(FeedRange.forFullRange());
testContainer.queryChangeFeed(changeFeedRequestOptions, TestItem.class)
.byPage()
.doOnNext(response -> {
continuationToken.set(response.getContinuationToken());
})
.blockLast();

logger.info("Delete the container");
testContainer.delete().block();

Thread.sleep(Duration.ofSeconds(5).toMillis());
logger.info("Re-create the container");
testContainer =
createCollection(
this.createdAsyncDatabase,
containerProperties,
new CosmosContainerRequestOptions(),
throughput);
Thread.sleep(Duration.ofSeconds(5).toMillis());
List<FeedRange> feedRanges = testContainer.getFeedRanges().block();
if (isMultiPartitionContainer) {
assertThat(feedRanges.size()).isGreaterThan(1);
} else {
assertThat(feedRanges.size()).isEqualTo(1);
}

String reCreatedContainerRid = testContainer.read().block().getProperties().getResourceId();
assertThat(testContainerRid).isNotEqualTo(reCreatedContainerRid);

logger.info("Using continuation token with incorrect containerRid");
changeFeedRequestOptions = CosmosChangeFeedRequestOptions.createForProcessingFromContinuation(continuationToken.get());
try {
testContainer.queryChangeFeed(changeFeedRequestOptions, TestItem.class)
.byPage()
.blockLast();
fail("ChangeFeed query request should fail when using incorrect collectionRid in the continuation token");
} catch (CosmosException e) {
assertThat(e.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.BADREQUEST);
assertThat(e.getSubStatusCode()).isEqualTo(HttpConstants.SubStatusCodes.INCORRECT_CONTAINER_RID_SUB_STATUS);
}
} finally {
safeDeleteCollection(this.createdAsyncDatabase.getContainer(testContainerId));
}
}

@Test(groups = { "emulator" }, dataProvider = "changeFeedWithStaleContainerRidDataProvider", timeOut = 4 * TIMEOUT)
public void changeFeedQueryWithCorrectContainerRidWithStaledClient(
int throughput,
boolean isMultiPartitionContainer
) throws InterruptedException {
// this test is to validate when container re-created, using correct continuation token on client with stale cache, the request will succeed
String testContainerId = UUID.randomUUID().toString();
CosmosAsyncClient newClient = null;

try {
CosmosContainerProperties containerProperties = new CosmosContainerProperties(testContainerId, "/mypk");
CosmosAsyncContainer testContainer =
createCollection(
this.createdAsyncDatabase,
containerProperties,
new CosmosContainerRequestOptions(),
400);

String testContainerRid = testContainer.read().block().getProperties().getResourceId();

// create items
for (int i = 0; i < 10; i++) {
testContainer.createItem(TestItem.createNewItem()).block();
}

// using query changeFeed
logger.info("Doing initial changeFeed query on the container " + testContainerId);
CosmosChangeFeedRequestOptions changeFeedRequestOptions =
CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(FeedRange.forFullRange());
testContainer.queryChangeFeed(changeFeedRequestOptions, TestItem.class)
.byPage()
.blockLast();

logger.info("Delete the container");
testContainer.delete().block();

Thread.sleep(Duration.ofSeconds(5).toMillis());
logger.info("Re-create the container through a different client");
newClient = this.getClientBuilder().buildAsyncClient();
CosmosAsyncDatabase databaseWithNewClient = newClient.getDatabase(this.createdAsyncDatabase.getId());
CosmosAsyncContainer testContainerWithNewClient =
createCollection(
databaseWithNewClient,
containerProperties,
new CosmosContainerRequestOptions(),
throughput);
Thread.sleep(Duration.ofSeconds(5).toMillis());
List<FeedRange> feedRanges = testContainerWithNewClient.getFeedRanges().block();
if (isMultiPartitionContainer) {
assertThat(feedRanges.size()).isGreaterThan(1);
} else {
assertThat(feedRanges.size()).isEqualTo(1);
}

String reCreatedContainerRid = testContainerWithNewClient.read().block().getProperties().getResourceId();
assertThat(testContainerRid).isNotEqualTo(reCreatedContainerRid);

logger.info("Creating items in the re-created container");
for (int i = 0; i < 10; i++) {
testContainerWithNewClient.createItem(TestItem.createNewItem()).block();
}
logger.info("query changeFeed from re-created container");
AtomicReference<String> continuationToken = new AtomicReference<>();
testContainerWithNewClient
.queryChangeFeed(changeFeedRequestOptions, TestItem.class)
.byPage()
.doOnNext(response -> continuationToken.set(response.getContinuationToken()))
.blockLast();

logger.info("Using the continuation token from the re-created container on previous stale client");
changeFeedRequestOptions = CosmosChangeFeedRequestOptions.createForProcessingFromContinuation(continuationToken.get());
testContainer
.queryChangeFeed(changeFeedRequestOptions, TestItem.class)
.byPage()
.blockLast();
} finally {
safeDeleteCollection(this.createdAsyncDatabase.getContainer(testContainerId));
safeClose(newClient);
}
}

void insertDocuments(
int partitionCount,
int documentCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
package com.azure.cosmos.implementation.feedranges;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.GoneException;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentClientImpl;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache;
import com.azure.cosmos.implementation.query.CompositeContinuationToken;
Expand All @@ -23,6 +27,7 @@
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
Expand Down Expand Up @@ -192,6 +197,11 @@ public void feedRangeCompositeContinuation_split() {
RxDocumentClientImpl clientMock = Mockito.mock(RxDocumentClientImpl.class);
RxPartitionKeyRangeCache cacheMock = Mockito.mock(RxPartitionKeyRangeCache.class);
Mockito.when(clientMock.getPartitionKeyRangeCache()).thenReturn(cacheMock);
RxDocumentServiceRequest rxDocumentServiceRequest =
RxDocumentServiceRequest.create(
null,
OperationType.ReadFeed,
ResourceType.Document);

List<PartitionKeyRange> childRanges = new ArrayList<>();
childRanges.add(new PartitionKeyRange("1", "AA", "BB"));
Expand All @@ -205,7 +215,7 @@ public void feedRangeCompositeContinuation_split() {
eq(true),
isNull())).thenReturn(Mono.just(new Utils.ValueHolder<>(childRanges)));

continuation.handleFeedRangeGone(clientMock, goneException).block();
continuation.handleFeedRangeGone(clientMock, goneException, rxDocumentServiceRequest).block();
assertThat(continuation.getCompositeContinuationTokens().size()).isEqualTo(2);
CompositeContinuationToken token1 = continuation.getCompositeContinuationTokens().poll();
CompositeContinuationToken token2 = continuation.getCompositeContinuationTokens().poll();
Expand Down Expand Up @@ -258,6 +268,11 @@ public void feedRangeCompositeContinuation_merge() {

List<PartitionKeyRange> parentRanges = new ArrayList<>();
parentRanges.add(new PartitionKeyRange("3", "AA", "DD"));
RxDocumentServiceRequest rxDocumentServiceRequest =
RxDocumentServiceRequest.create(
null,
OperationType.ReadFeed,
ResourceType.Document);

Mockito.when(
cacheMock.tryGetOverlappingRangesAsync(
Expand All @@ -267,7 +282,7 @@ public void feedRangeCompositeContinuation_merge() {
eq(true),
isNull())).thenReturn(Mono.just(new Utils.ValueHolder<>(parentRanges)));

continuation.handleFeedRangeGone(clientMock, goneException).block();
continuation.handleFeedRangeGone(clientMock, goneException, rxDocumentServiceRequest).block();
assertThat(continuation.getCompositeContinuationTokens().size()).isEqualTo(2);
CompositeContinuationToken token1 = continuation.getCompositeContinuationTokens().poll();
CompositeContinuationToken token2 = continuation.getCompositeContinuationTokens().poll();
Expand All @@ -289,6 +304,56 @@ public void feedRangeCompositeContinuation_merge() {
continuationDummy);
}

@Test(groups = "unit")
public void feedRangeCompositeContinuation_staleContainerRid() {
String continuationDummy = UUID.randomUUID().toString();
PartitionKeyInternal partitionKey = PartitionKeyInternalUtils.createPartitionKeyInternal(
"Test");
FeedRangePartitionKeyImpl feedRange = new FeedRangePartitionKeyImpl(partitionKey);

List<Range<String>> ranges = new ArrayList<>();
ranges.add(new Range<>("AA", "DD", true, false));

String containerRid = "/cols/" + UUID.randomUUID();

FeedRangeCompositeContinuationImpl continuation = new FeedRangeCompositeContinuationImpl(
containerRid,
feedRange,
ranges,
continuationDummy
);

GoneException goneException = new GoneException("Test");
BridgeInternal.setSubStatusCode(
goneException,
HttpConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE);

RxDocumentClientImpl clientMock = Mockito.mock(RxDocumentClientImpl.class);
RxPartitionKeyRangeCache cacheMock = Mockito.mock(RxPartitionKeyRangeCache.class);
Mockito.when(clientMock.getPartitionKeyRangeCache()).thenReturn(cacheMock);
RxDocumentServiceRequest rxDocumentServiceRequest =
RxDocumentServiceRequest.create(
null,
OperationType.ReadFeed,
ResourceType.Document);

Mockito.when(
cacheMock.tryGetOverlappingRangesAsync(
isNull(),
eq(containerRid),
any(),
eq(true),
isNull())).thenReturn(Mono.just(new Utils.ValueHolder<>(null)));

try {
continuation.handleFeedRangeGone(clientMock, goneException, rxDocumentServiceRequest).block();
fail("handleFeedRangeGone should fail with BadRequestException when no child ranges can be found");
} catch (CosmosException e) {
assertThat(e.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.BADREQUEST);
assertThat(e.getSubStatusCode()).isEqualTo(HttpConstants.SubStatusCodes.INCORRECT_CONTAINER_RID_SUB_STATUS);
}
}

private void validateCompositeContinuationToken(CompositeContinuationToken token, Range<String> matchedRange, String continuationToken) {
assertThat(token.getRange().getMin()).isEqualTo(matchedRange.getMin());
assertThat(token.getRange().getMax()).isEqualTo(matchedRange.getMax());
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed an issue where queryChangeFeed will return incorrect results or hang when using continuationToken with stale container resourceId. - [PR 43114](https://github.com/Azure/azure-sdk-for-java/pull/43114)

#### Other Changes
* Added support to enable http2 for gateway mode with system property `COSMOS.HTTP2_ENABLED` and system variable `COSMOS_HTTP2_ENABLED`. - [PR 42947](https://github.com/Azure/azure-sdk-for-java/pull/42947)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,9 @@ public BadRequestException(String message, HttpHeaders headers, URI requestUri)
HttpConstants.StatusCodes.BADREQUEST,
requestUrlString);
}

public BadRequestException(String message, int subStatusCode) {
this(message);
BridgeInternal.setSubStatusCode(this, subStatusCode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,32 @@ private RxDocumentServiceRequest createDocumentServiceRequest() {
headers.put(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL, this.client.getConsistencyLevel().toString());
}

// always populate the collectionRid header
// in case of container has been recreated, this will allow correct error being returned to SDK
// Mono<String> collectionRidMono = this.client
// .getCollectionCache()
// .resolveByNameAsync(
// null,
// this.getLinkWithoutTrailingSlash(),
// null)
// .flatMap(
// collection -> {
// if (collection == null) {
// return Mono.error(new IllegalStateException("Collection cannot be null"));
// }
// return Mono.just(collection.getResourceId());
// });
//
// collectionRidMono.subscribe(collectionRid -> {
//
// if (!collectionRid.equals(this.changeFeedState.getContainerRid())) {
// throw new BadRequestException("Incorrect continuation token for this collection.",
// HttpConstants.SubStatusCodes.INCORRECT_CONTAINER_RID_SUB_STATUS);
// }
// headers.put(HttpConstants.HttpHeaders.INTENDED_COLLECTION_RID_HEADER, collectionRid);
// });
headers.put(HttpConstants.HttpHeaders.INTENDED_COLLECTION_RID_HEADER, this.changeFeedState.getContainerRid());

RxDocumentServiceRequest request = RxDocumentServiceRequest.create(clientContext,
OperationType.ReadFeed,
resourceType,
Expand All @@ -159,6 +185,14 @@ private RxDocumentServiceRequest createDocumentServiceRequest() {
return request;
}

String getLinkWithoutTrailingSlash() {
if (this.collectionLink.startsWith("/")) {
return this.collectionLink.substring(1);
}

return this.collectionLink;
}

private Mono<FeedResponse<T>> executeRequestAsync(RxDocumentServiceRequest request) {
if (this.operationContextAndListener == null) {
return handlePartitionLevelCircuitBreakingPrerequisites(request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public abstract String applyServerResponseContinuation(

public abstract String getContainerRid();

public abstract void setContainerRid(String containerRid);

public static ChangeFeedState fromString(String base64EncodedJson) {
checkNotNull(base64EncodedJson, "Argument 'base64EncodedJson' must not be null");

Expand Down
Loading
Loading