diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/LeaseStoreManagerImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/LeaseStoreManagerImpl.java index f9a35821729f..0c6cd8fccf9b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/LeaseStoreManagerImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/LeaseStoreManagerImpl.java @@ -271,13 +271,10 @@ public Mono release(Lease lease) { this.requestOptionsFactory.createItemRequestOptions(lease), serverLease -> { - if (serverLease.getOwner() != null) { - if (!serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) { - logger.info("Partition {} no need to release lease. The lease was already taken by another host '{}'.", lease.getLeaseToken(), serverLease.getOwner()); - throw new LeaseLostException(lease); - } + if (serverLease.getOwner() != null && !serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) { + logger.info("Partition {} no need to release lease. The lease was already taken by another host '{}'.", lease.getLeaseToken(), serverLease.getOwner()); + throw new LeaseLostException(lease); } - serverLease.setOwner(null); return serverLease; @@ -317,7 +314,7 @@ public Mono renew(Lease lease) { this.requestOptionsFactory.createItemRequestOptions(lease), serverLease -> { - if (!serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) { + if (serverLease.getOwner() != null && !serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) { logger.info("Partition {} lease was taken over by owner '{}'", lease.getLeaseToken(), serverLease.getOwner()); throw new LeaseLostException(lease); } @@ -333,7 +330,7 @@ public Mono updateProperties(Lease lease) { throw new IllegalArgumentException("lease"); } - if (!lease.getOwner().equalsIgnoreCase(this.settings.getHostName())) { + if (lease.getOwner() != null && !lease.getOwner().equalsIgnoreCase(this.settings.getHostName())) { logger.info("Partition '{}' lease was taken over by owner '{}' before lease item update", lease.getLeaseToken(), lease.getOwner()); throw new LeaseLostException(lease); } @@ -344,7 +341,7 @@ public Mono updateProperties(Lease lease) { new PartitionKey(lease.getId()), this.requestOptionsFactory.createItemRequestOptions(lease), serverLease -> { - if (!serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) { + if (serverLease.getOwner() != null && !serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) { logger.info("Partition '{}' lease was taken over by owner '{}'", lease.getLeaseToken(), serverLease.getOwner()); throw new LeaseLostException(lease); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorImpl.java index 7132e5197577..b13dbdaff1be 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorImpl.java @@ -96,7 +96,7 @@ public Mono run(CancellationToken cancellationToken) { if (cancellationToken.isCancellationRequested()) return Flux.error(new TaskCancelledException()); final String continuationToken = documentFeedResponse.getContinuationToken(); - final ChangeFeedState continuationState = ChangeFeedState.fromString(documentFeedResponse.getContinuationToken()); + final ChangeFeedState continuationState = ChangeFeedState.fromString(continuationToken); checkNotNull(continuationState, "Argument 'continuationState' must not be null."); checkArgument( continuationState diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionSynchronizerImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionSynchronizerImpl.java index d4af0c33fd5e..6b16ce6cba95 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionSynchronizerImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionSynchronizerImpl.java @@ -77,12 +77,7 @@ public Flux splitPartition(Lease lease) { throw new IllegalArgumentException("lease"); } - String leaseToken = lease.getLeaseToken(); - - ChangeFeedState lastContinuationState = lease.getContinuationState( - this.collectionResourceId, - new FeedRangePartitionKeyRangeImpl(leaseToken) - ); + final String leaseToken = lease.getLeaseToken(); // TODO fabianm - this needs more elaborate processing in case the initial // FeedRangeContinuation has continuation state for multiple feed Ranges @@ -93,11 +88,23 @@ public Flux splitPartition(Lease lease) { // or at least Service - this will be part of the next set of changes // For now - no merge just simple V0 of lease contract // this simplification will work - final String lastContinuationToken = lastContinuationState.getContinuation() != null ? - lastContinuationState.getContinuation().getCurrentContinuationToken().getToken() : - null; - - logger.info("Partition {} is gone due to split.", leaseToken); + // + //ChangeFeedState lastContinuationState = lease.getContinuationState( + // this.collectionResourceId, + // new FeedRangePartitionKeyRangeImpl(leaseToken) + //); + // + //final String lastContinuationToken = lastContinuationState.getContinuation() != null ? + // lastContinuationState.getContinuation().getCurrentContinuationToken().getToken() : + // null; + + // "Push" ChangeFeedProcessor is not merge-proof currently. For such cases we need a specific handler that can + // take multiple leases and "converge" them in a thread safe manner while also merging the various continuation + // tokens for each merged lease. + // We will directly reuse the original/parent continuation token as the seed for the new leases until then. + final String lastContinuationToken = lease.getContinuationToken(); + + logger.info("Partition {} is gone due to split; will attempt to resume using continuation token {}.", leaseToken, lastContinuationToken); // After a split, the children are either all or none available return this.enumPartitionKeyRanges() @@ -116,7 +123,7 @@ public Flux splitPartition(Lease lease) { return this.leaseManager.createLeaseIfNotExist(addedRangeId, lastContinuationToken); }, this.degreeOfParallelism) .map(newLease -> { - logger.info("Partition {} split into new partition with lease token {}.", leaseToken, newLease.getLeaseToken()); + logger.info("Partition {} split into new partition with lease token {} and continuation token {}.", leaseToken, newLease.getLeaseToken(), lastContinuationToken); return newLease; }); } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java index 38301e66ac44..34fbbc695e6a 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java @@ -474,7 +474,7 @@ public void staledLeaseAcquiring() throws InterruptedException { .timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)); }); }) - .subscribe(); + .subscribe(); }) .subscribe(); } catch (Exception ex) { @@ -510,11 +510,28 @@ public void staledLeaseAcquiring() throws InterruptedException { @Test(groups = { "simple" }, timeOut = 160 * CHANGE_FEED_PROCESSOR_TIMEOUT) public void readFeedDocumentsAfterSplit() throws InterruptedException { CosmosAsyncContainer createdFeedCollectionForSplit = createFeedCollection(FEED_COLLECTION_THROUGHPUT_FOR_SPLIT); - CosmosAsyncContainer createdLeaseCollection = createLeaseCollection(LEASE_COLLECTION_THROUGHPUT); + CosmosAsyncContainer createdLeaseCollection = createLeaseCollection(2 * LEASE_COLLECTION_THROUGHPUT); + CosmosAsyncContainer createdLeaseMonitorCollection = createLeaseMonitorCollection(LEASE_COLLECTION_THROUGHPUT); try { List createdDocuments = new ArrayList<>(); Map receivedDocuments = new ConcurrentHashMap<>(); + LeaseStateMonitor leaseStateMonitor = new LeaseStateMonitor(); + + // create a monitoring CFP for ensuring the leases are updating as expected + ChangeFeedProcessor leaseMonitoringChangeFeedProcessor = new ChangeFeedProcessorBuilder() + .hostName(hostName) + .handleChanges(leasesChangeFeedProcessorHandler(leaseStateMonitor)) + .feedContainer(createdLeaseCollection) + .leaseContainer(createdLeaseMonitorCollection) + .options(new ChangeFeedProcessorOptions() + .setLeasePrefix("MONITOR") + .setStartFromBeginning(true) + .setMaxItemCount(100) + .setLeaseExpirationInterval(Duration.ofMillis(10 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .setFeedPollDelay(Duration.ofMillis(200)) + ) + .buildChangeFeedProcessor(); // generate a first batch of documents setupReadFeedDocuments(createdDocuments, receivedDocuments, createdFeedCollectionForSplit, FEED_COUNT); @@ -528,39 +545,82 @@ public void readFeedDocumentsAfterSplit() throws InterruptedException { .setLeasePrefix("TEST") .setStartFromBeginning(true) .setMaxItemCount(10) + .setLeaseRenewInterval(Duration.ofSeconds(2)) ) .buildChangeFeedProcessor(); - changeFeedProcessor.start().subscribeOn(Schedulers.elastic()) + leaseMonitoringChangeFeedProcessor.start().subscribeOn(Schedulers.elastic()) .timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)) .onErrorResume(throwable -> { - log.error("Change feed processor did not start in the expected time", throwable); + log.error("Change feed processor for lease monitoring did not start in the expected time", throwable); return Mono.error(throwable); }) - .doOnSuccess(aVoid -> { - // Wait for the feed processor to receive and process the first batch of documents. - try { - waitToReceiveDocuments(receivedDocuments, 2 * CHANGE_FEED_PROCESSOR_TIMEOUT, FEED_COUNT); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted exception", e); - } - }) .then( - // increase throughput to force a single partition collection to go through a split - createdFeedCollectionForSplit - .readThroughput().subscribeOn(Schedulers.elastic()) - .flatMap(currentThroughput -> + changeFeedProcessor.start().subscribeOn(Schedulers.elastic()) + .timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .onErrorResume(throwable -> { + log.error("Change feed processor did not start in the expected time", throwable); + return Mono.error(throwable); + }) + .doOnSuccess(aVoid -> { + // Wait for the feed processor to receive and process the first batch of documents. + try { + waitToReceiveDocuments(receivedDocuments, 2 * CHANGE_FEED_PROCESSOR_TIMEOUT, FEED_COUNT); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted exception", e); + } + }) + .then( + // increase throughput to force a single partition collection to go through a split createdFeedCollectionForSplit - .replaceThroughput(ThroughputProperties.createManualThroughput(FEED_COLLECTION_THROUGHPUT)) - .subscribeOn(Schedulers.elastic()) + .readThroughput().subscribeOn(Schedulers.elastic()) + .flatMap(currentThroughput -> + createdFeedCollectionForSplit + .replaceThroughput(ThroughputProperties.createManualThroughput(FEED_COLLECTION_THROUGHPUT)) + .subscribeOn(Schedulers.elastic()) + ) + .then() ) - .then() ) .subscribe(); // Wait for the feed processor to receive and process the first batch of documents and apply throughput change. Thread.sleep(4 * CHANGE_FEED_PROCESSOR_TIMEOUT); + // Retrieve the latest continuation token value. + long continuationToken = Long.MAX_VALUE; + for (JsonNode item : leaseStateMonitor.receivedLeases.values()) { + JsonNode tempToken = item.get("ContinuationToken"); + long continuationTokenValue = 0; + if (tempToken != null && !tempToken.asText().replaceAll("[^0-9]", "").isEmpty()) { + continuationTokenValue = Long.parseLong(tempToken.asText().replaceAll("[^0-9]", "")); + } + if (tempToken == null || continuationTokenValue == 0) { + log.error("Found unexpected lease with continuation token value of null or 0"); + try { + log.info("ERROR LEASE FOUND {}", OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(item)); + } catch (JsonProcessingException e) { + log.error("Failure in processing json [{}]", e.getMessage(), e); + } + leaseStateMonitor.isContinuationTokenAdvancing = false; + } + else { + // keep the lowest continuation token value + if (continuationToken > continuationTokenValue) { + continuationToken = continuationTokenValue; + } + } + } + if (continuationToken == Long.MAX_VALUE) { + // something went wrong; we could not find any valid leases. + log.error("Could not find any valid lease documents"); + leaseStateMonitor.isContinuationTokenAdvancing = false; + } + else { + leaseStateMonitor.parentContinuationToken = continuationToken; + } + leaseStateMonitor.isAfterLeaseInitialization = true; + // Loop through reading the current partition count until we get a split // This can take up to two minute or more. String partitionKeyRangesPath = extractContainerSelfLink(createdFeedCollectionForSplit); @@ -580,18 +640,22 @@ public void readFeedDocumentsAfterSplit() throws InterruptedException { } return count; }) - // this will timeout approximately after 3 minutes + // this will timeout approximately after 30 minutes .retryWhen(Retry.max(40).filter(throwable -> { try { log.warn("Retrying..."); // Splits are taking longer, so increasing sleep time between retries - Thread.sleep(60 * CHANGE_FEED_PROCESSOR_TIMEOUT); + Thread.sleep(10 * CHANGE_FEED_PROCESSOR_TIMEOUT); } catch (InterruptedException e) { throw new RuntimeException("Interrupted exception", e); } return true; })) - .last().block(); + .last() + .doOnSuccess(partitionCount -> { + leaseStateMonitor.isAfterSplits = true; + }) + .block(); assertThat(changeFeedProcessor.isStarted()).as("Change Feed Processor instance is running").isTrue(); @@ -602,6 +666,7 @@ public void readFeedDocumentsAfterSplit() throws InterruptedException { waitToReceiveDocuments(receivedDocuments, 2 * CHANGE_FEED_PROCESSOR_TIMEOUT, FEED_COUNT * 2); changeFeedProcessor.stop().subscribeOn(Schedulers.elastic()).timeout(Duration.ofMillis(CHANGE_FEED_PROCESSOR_TIMEOUT)).subscribe(); + leaseMonitoringChangeFeedProcessor.stop().subscribeOn(Schedulers.elastic()).timeout(Duration.ofMillis(CHANGE_FEED_PROCESSOR_TIMEOUT)).subscribe(); int leaseCount = changeFeedProcessor.getCurrentState() .map(List::size).block(); assertThat(leaseCount > 1).as("Found %d leases", leaseCount).isTrue(); @@ -611,8 +676,12 @@ public void readFeedDocumentsAfterSplit() throws InterruptedException { assertThat(receivedDocuments.containsKey(item.getId())).as("Document with getId: " + item.getId()).isTrue(); } + // check the continuation tokens have advanced after splits + assertThat(leaseStateMonitor.isContinuationTokenAdvancing && leaseStateMonitor.parentContinuationToken > 0) + .as("Continuation tokens for the leases after split should advance from parent value; parent: %d", leaseStateMonitor.parentContinuationToken).isTrue(); + // Wait for the feed processor to shutdown. - Thread.sleep(CHANGE_FEED_PROCESSOR_TIMEOUT); + Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT); } finally { safeDeleteCollection(createdFeedCollectionForSplit); @@ -643,6 +712,48 @@ private void waitToReceiveDocuments(Map receivedDocuments, lon assertThat(remainingWork >= 0).as("Failed to receive all the feed documents").isTrue(); } + private Consumer> leasesChangeFeedProcessorHandler(LeaseStateMonitor leaseStateMonitor) { + return docs -> { + log.info("LEASES processing from thread in test {}", Thread.currentThread().getId()); + for (JsonNode item : docs) { + try { + log + .debug("LEASE RECEIVED {}", OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(item)); + } catch (JsonProcessingException e) { + log.error("Failure in processing json [{}]", e.getMessage(), e); + } + + JsonNode leaseToken = item.get("LeaseToken"); + + if (leaseToken != null) { + JsonNode continuationTokenNode = item.get("ContinuationToken"); + if (continuationTokenNode == null) { + // Something catastrophic went wrong and the lease is malformed. + log.error("Found invalid lease document"); + leaseStateMonitor.isContinuationTokenAdvancing = false; + } + else { + log.info("LEASE {} with continuation {}", leaseToken.asText(), continuationTokenNode.asText()); + if (leaseStateMonitor.isAfterLeaseInitialization) { + String value = continuationTokenNode.asText().replaceAll("[^0-9]", ""); + if (value.isEmpty()) { + log.error("Found unexpected continuation token that does not conform to the expected format"); + leaseStateMonitor.isContinuationTokenAdvancing = false; + } + long continuationToken = Long.parseLong(value); + if (leaseStateMonitor.parentContinuationToken > continuationToken) { + log.error("Found unexpected continuation token that did not advance after the split; parent: {}, current: {}"); + leaseStateMonitor.isContinuationTokenAdvancing = false; + } + } + } + leaseStateMonitor.receivedLeases.put(item.get("id").asText(), item); + } + } + log.info("LEASES processing from thread {}", Thread.currentThread().getId()); + }; + } + @BeforeMethod(groups = { "emulator", "simple" }, timeOut = 2 * SETUP_TIMEOUT, alwaysRun = true) public void beforeMethod() { } @@ -738,6 +849,14 @@ private CosmosAsyncContainer createLeaseCollection(int provisionedThroughput) { return createCollection(createdDatabase, collectionDefinition, options, provisionedThroughput); } + private CosmosAsyncContainer createLeaseMonitorCollection(int provisionedThroughput) { + CosmosContainerRequestOptions options = new CosmosContainerRequestOptions(); + CosmosContainerProperties collectionDefinition = new CosmosContainerProperties( + "monitor_" + UUID.randomUUID(), + "/id"); + return createCollection(createdDatabase, collectionDefinition, options, provisionedThroughput); + } + private static synchronized void processItem(JsonNode item, Map receivedDocuments) { try { ChangeFeedProcessorTest.log @@ -747,4 +866,12 @@ private static synchronized void processItem(JsonNode item, Map receivedLeases = new ConcurrentHashMap<>(); + public volatile boolean isAfterLeaseInitialization = false; + public volatile boolean isAfterSplits = false; + public volatile long parentContinuationToken = 0; + public volatile boolean isContinuationTokenAdvancing = true; + } }