diff --git a/sdk/cosmos/azure-cosmos-test/src/test/java/com/azure/cosmos/test/faultinjection/FaultInjectionServerErrorRuleTests.java b/sdk/cosmos/azure-cosmos-test/src/test/java/com/azure/cosmos/test/faultinjection/FaultInjectionServerErrorRuleTests.java index 7c608648c64d6..008bff668bbce 100644 --- a/sdk/cosmos/azure-cosmos-test/src/test/java/com/azure/cosmos/test/faultinjection/FaultInjectionServerErrorRuleTests.java +++ b/sdk/cosmos/azure-cosmos-test/src/test/java/com/azure/cosmos/test/faultinjection/FaultInjectionServerErrorRuleTests.java @@ -176,7 +176,7 @@ public void faultInjectionServerErrorRuleTests_OperationType(OperationType opera cosmosDiagnostics, operationType, HttpConstants.StatusCodes.GONE, - HttpConstants.SubStatusCodes.UNKNOWN, + HttpConstants.SubStatusCodes.SERVER_GENERATED_410, serverGoneRuleId, true); @@ -294,7 +294,7 @@ public void faultInjectionServerErrorRuleTests_OperationTypeImpactAddresses(Oper cosmosDiagnostics, operationType, HttpConstants.StatusCodes.GONE, - HttpConstants.SubStatusCodes.UNKNOWN, + HttpConstants.SubStatusCodes.SERVER_GENERATED_410, writeRegionServerGoneRuleId, true); } else { @@ -394,7 +394,7 @@ public void faultInjectionServerErrorRuleTests_Region() throws JsonProcessingExc cosmosDiagnostics, OperationType.Read, HttpConstants.StatusCodes.GONE, - HttpConstants.SubStatusCodes.UNKNOWN, + HttpConstants.SubStatusCodes.SERVER_GENERATED_410, localRegionRuleId, true ); @@ -456,7 +456,7 @@ public void faultInjectionServerErrorRuleTests_Partition() throws JsonProcessing cosmosDiagnostics, OperationType.Query, HttpConstants.StatusCodes.GONE, - HttpConstants.SubStatusCodes.UNKNOWN, + HttpConstants.SubStatusCodes.SERVER_GENERATED_410, feedRangeRuleId, true ); @@ -798,7 +798,7 @@ public void faultInjectionServerErrorRuleTests_HitLimit() throws JsonProcessingE cosmosDiagnostics, OperationType.Read, HttpConstants.StatusCodes.GONE, - HttpConstants.SubStatusCodes.UNKNOWN, + HttpConstants.SubStatusCodes.SERVER_GENERATED_410, hitLimitRuleId, true ); @@ -922,7 +922,7 @@ public void faultInjectionServerErrorRuleTests_includePrimary() throws JsonProce cosmosDiagnostics, OperationType.Create, HttpConstants.StatusCodes.GONE, - HttpConstants.SubStatusCodes.UNKNOWN, + HttpConstants.SubStatusCodes.SERVER_GENERATED_410, serverGoneIncludePrimaryRuleId, true); @@ -932,7 +932,7 @@ public void faultInjectionServerErrorRuleTests_includePrimary() throws JsonProce cosmosDiagnostics, OperationType.Upsert, HttpConstants.StatusCodes.GONE, - HttpConstants.SubStatusCodes.UNKNOWN, + HttpConstants.SubStatusCodes.SERVER_GENERATED_410, serverGoneIncludePrimaryRuleId, true); } finally { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeOneMergeHandlerTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeGoneMergeHandlerTests.java similarity index 59% rename from sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeOneMergeHandlerTests.java rename to sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeGoneMergeHandlerTests.java index f878450b1e537..b8758fcbd8fb6 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeOneMergeHandlerTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeGoneMergeHandlerTests.java @@ -7,13 +7,41 @@ import com.azure.cosmos.implementation.changefeed.epkversion.feedRangeGoneHandler.FeedRangeGoneMergeHandler; import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl; import com.azure.cosmos.implementation.routing.Range; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import reactor.test.StepVerifier; import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; -public class FeedRangeOneMergeHandlerTests { +public class FeedRangeGoneMergeHandlerTests { + @DataProvider(name = "maxScaleCountArgProvider") + public static Object[][] maxScaleCountArgProvider() { + return new Object[][]{ + // maxScaleCount + { 0 }, + { 1 } + }; + } + + @Test(groups = "unit") + public void feedRangeGoneMergeHandler_constructor() { + FeedRangeEpkImpl feedRangeForLeaseWithGoneException = new FeedRangeEpkImpl( + new Range<>("AA", "BB", true, false)); + + ServiceItemLeaseV1 leaseWithGoneException = + new ServiceItemLeaseV1() + .withLeaseToken("AA-BB") + .withFeedRange(feedRangeForLeaseWithGoneException); + leaseWithGoneException.setId("TestLease-" + UUID.randomUUID()); + + FeedRangeGoneMergeHandler mergeHandler = new FeedRangeGoneMergeHandler( + leaseWithGoneException, + new PartitionKeyRange("1", "AA", "CC")); + + assertThat(mergeHandler.shouldSkipDirectLeaseAssignment()).isFalse(); + assertThat(mergeHandler.shouldDeleteCurrentLease()).isFalse(); + } @Test(groups = "unit") public void mergeHandlerForEpkBasedLease() { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeGoneSplitHandlerTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeGoneSplitHandlerTests.java index 7d019ec5b9655..3ddef46d9c2f7 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeGoneSplitHandlerTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeGoneSplitHandlerTests.java @@ -34,6 +34,44 @@ public Object[][] secondChildLeaseSuccessArgProvider() { }; } + @DataProvider(name = "maxScaleCountArgProvider") + public static Object[][] maxScaleCountArgProvider() { + return new Object[][]{ + // maxScaleCount + { 0 }, + { 1 } + }; + } + + @Test(groups = "unit", dataProvider = "maxScaleCountArgProvider") + public void feedRangeGoneSplitHandler_constructor(int maxScaleCount) { + ServiceItemLeaseV1 leaseWithGoneException = + new ServiceItemLeaseV1() + .withLeaseToken("AA-CC") + .withFeedRange(new FeedRangeEpkImpl(new Range<>("AA", "CC", true, false))); + leaseWithGoneException.setId("TestLease-" + UUID.randomUUID()); + + List childRanges = new ArrayList<>(); + // using a min less than AA to check we are using the min of the lease with gone exception + childRanges.add(new PartitionKeyRange("1", "", "BB")); + // using a max larger than CC to check we are using the max of the lease with gone exception + childRanges.add(new PartitionKeyRange("2", "BB", "FF")); + + FeedRangeGoneSplitHandler splitHandler = new FeedRangeGoneSplitHandler( + leaseWithGoneException, + childRanges, + Mockito.mock(LeaseManager.class), + maxScaleCount); + + if (maxScaleCount > 0) { + assertThat(splitHandler.shouldSkipDirectLeaseAssignment()).isTrue(); + } else { + assertThat(splitHandler.shouldSkipDirectLeaseAssignment()).isFalse(); + } + + assertThat(splitHandler.shouldDeleteCurrentLease()).isTrue(); + } + @Test(groups = "unit", dataProvider = "secondChildLeaseSuccessArgProvider") public void splitHandlerForEpkBasedLease(boolean secondChildLeaseSuccess) { // Testing an imaginary scenario FeedRange "AA-CC" has been split into "''-BB", "BB-FF" @@ -74,8 +112,8 @@ public void splitHandlerForEpkBasedLease(boolean secondChildLeaseSuccess) { FeedRangeGoneSplitHandler splitHandler = new FeedRangeGoneSplitHandler( leaseWithGoneException, childRanges, - leaseManagerMock - ); + leaseManagerMock, + 0); if (secondChildLeaseSuccess) { StepVerifier diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionControllerImplTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionControllerImplTests.java index d8f46f2a65373..cb50f008065a4 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionControllerImplTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionControllerImplTests.java @@ -13,6 +13,7 @@ import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl; import com.azure.cosmos.implementation.routing.Range; import org.mockito.Mockito; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -25,13 +26,24 @@ import java.util.UUID; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class PartitionControllerImplTests { - @Test(groups = "unit") - public void handleSplit() throws InterruptedException { + + @DataProvider(name = "shouldSkipDirectLeaseAssignmentArgProvider") + public static Object[][] shouldSkipDirectLeaseAssignmentArgProvider() { + return new Object[][]{ + // shouldSkipDirectLeaseAssignment + { true }, + { false } + }; + } + + @Test(groups = "unit", dataProvider = "shouldSkipDirectLeaseAssignmentArgProvider") + public void handleSplit(boolean shouldSkipDirectLeaseAssignment) throws InterruptedException { LeaseContainer leaseContainer = Mockito.mock(LeaseContainer.class); when(leaseContainer.getOwnedLeases()).thenReturn(Flux.empty()); @@ -76,7 +88,8 @@ public void handleSplit() throws InterruptedException { synchronizer, lease, Arrays.asList(childLease1, childLease2), - true); + true, + shouldSkipDirectLeaseAssignment); StepVerifier.create(partitionController.initialize()).verifyComplete(); StepVerifier.create(partitionController.addOrUpdateLease(lease)) @@ -87,28 +100,53 @@ public void handleSplit() throws InterruptedException { // add some waiting time here so that we can capture all the calls Thread.sleep(500); - // Verify total three leases are acquired - verify(leaseManager, times(1)).acquire(lease); - verify(leaseManager, times(1)).acquire(childLease1); - verify(leaseManager, times(1)).acquire(childLease2); - - // Verify partitionSupervisor is created for each lease - verify(partitionSupervisorFactory, times(1)).create(lease); - verify(partitionSupervisorFactory, times(1)).create(childLease1); - verify(partitionSupervisorFactory, times(1)).create(childLease2); - - // Verify only the lease with feedRangeGone exception will be deleted from lease container - verify(leaseManager, times(1)).delete(lease); - verify(leaseManager, Mockito.never()).delete(childLease1); - verify(leaseManager, Mockito.never()).delete(childLease2); - - // Verify at the end, all the leases will be released - verify(leaseManager, times(1)).release(lease); - verify(leaseManager, times(1)).release(childLease1); - verify(leaseManager, times(1)).release(childLease2); - - verify(leaseManager, Mockito.never()).updateProperties(Mockito.any()); - verify(feedRangeGoneHandler, times(1)).handlePartitionGone(); + if (shouldSkipDirectLeaseAssignment) { + // Verify only parent lease is acquired + verify(leaseManager, times(1)).acquire(lease); + verify(leaseManager, never()).acquire(childLease1); + verify(leaseManager, never()).acquire(childLease2); + + // Verify partitionSupervisor is created for parent lease + verify(partitionSupervisorFactory, times(1)).create(lease); + verify(partitionSupervisorFactory, never()).create(childLease1); + verify(partitionSupervisorFactory, never()).create(childLease2); + + // Verify only the lease with feedRangeGone exception will be deleted from lease container + verify(leaseManager, times(1)).delete(lease); + verify(leaseManager, Mockito.never()).delete(childLease1); + verify(leaseManager, Mockito.never()).delete(childLease2); + + // Verify at the end, only parent lease will be released + verify(leaseManager, times(1)).release(lease); + verify(leaseManager, never()).release(childLease1); + verify(leaseManager, never()).release(childLease2); + + verify(leaseManager, Mockito.never()).updateProperties(Mockito.any()); + verify(feedRangeGoneHandler, times(1)).handlePartitionGone(); + } else { + // Verify total three leases are acquired + verify(leaseManager, times(1)).acquire(lease); + verify(leaseManager, times(1)).acquire(childLease1); + verify(leaseManager, times(1)).acquire(childLease2); + + // Verify partitionSupervisor is created for each lease + verify(partitionSupervisorFactory, times(1)).create(lease); + verify(partitionSupervisorFactory, times(1)).create(childLease1); + verify(partitionSupervisorFactory, times(1)).create(childLease2); + + // Verify only the lease with feedRangeGone exception will be deleted from lease container + verify(leaseManager, times(1)).delete(lease); + verify(leaseManager, Mockito.never()).delete(childLease1); + verify(leaseManager, Mockito.never()).delete(childLease2); + + // Verify at the end, all the leases will be released + verify(leaseManager, times(1)).release(lease); + verify(leaseManager, times(1)).release(childLease1); + verify(leaseManager, times(1)).release(childLease2); + + verify(leaseManager, Mockito.never()).updateProperties(Mockito.any()); + verify(feedRangeGoneHandler, times(1)).handlePartitionGone(); + } } @@ -151,6 +189,7 @@ public void handleMerge() throws InterruptedException { synchronizer, lease, Arrays.asList(lease), // For merge with epkBased lease, we are going to reuse the lease + false, false); StepVerifier.create(partitionController.initialize()).verifyComplete(); @@ -256,10 +295,12 @@ private FeedRangeGoneHandler setDefaultFeedRangeGoneHandlerBehavior( PartitionSynchronizer partitionSynchronizer, ServiceItemLeaseV1 leaseWithException, List newLeases, - boolean shouldDeleteLeaseWithException){ + boolean shouldDeleteLeaseWithException, + boolean shouldSkipDirectLeaseAssignment){ FeedRangeGoneHandler feedRangeGoneHandler = Mockito.mock(FeedRangeGoneHandler.class); when(feedRangeGoneHandler.handlePartitionGone()).thenReturn(Flux.fromIterable(newLeases)); when(feedRangeGoneHandler.shouldDeleteCurrentLease()).thenReturn(shouldDeleteLeaseWithException); + when(feedRangeGoneHandler.shouldSkipDirectLeaseAssignment()).thenReturn(shouldSkipDirectLeaseAssignment); when(partitionSynchronizer.getFeedRangeGoneHandler(leaseWithException)).thenReturn(Mono.just(feedRangeGoneHandler)); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionLoadBalancerImplTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionLoadBalancerImplTests.java new file mode 100644 index 0000000000000..4fa0b1a417b2c --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionLoadBalancerImplTests.java @@ -0,0 +1,81 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.changefeed.epkversion; + +import com.azure.cosmos.implementation.changefeed.Lease; +import com.azure.cosmos.implementation.changefeed.LeaseContainer; +import com.azure.cosmos.implementation.changefeed.PartitionController; +import com.azure.cosmos.implementation.changefeed.PartitionLoadBalancingStrategy; +import org.mockito.Mockito; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +public class PartitionLoadBalancerImplTests { + private final int PARTITION_LOAD_BALANCER_TIMEOUT = 5000; + + @DataProvider(name = "loadBalancingSucceededArgProvider") + public static Object[][] loadBalancingSucceededArgProvider() { + return new Object[][]{ + // load balancing call succeeded + { false }, + { true } + }; + } + + @Test(groups = "unit", dataProvider = "loadBalancingSucceededArgProvider") + public void run(boolean loadBalancingSucceeded) throws InterruptedException { + PartitionController partitionControllerMock = Mockito.mock(PartitionController.class); + LeaseContainer leaseContainerMock = Mockito.mock(LeaseContainer.class); + PartitionLoadBalancingStrategy partitionLoadBalancingStrategyMock = Mockito.mock(PartitionLoadBalancingStrategy.class); + + ServiceItemLeaseV1 lease = new ServiceItemLeaseV1().withLeaseToken("1"); + lease.setId("TestLease-" + UUID.randomUUID()); + + List allLeases = Arrays.asList(lease); + Mockito.when(leaseContainerMock.getAllLeases()).thenReturn(Flux.fromIterable(allLeases)); + + if (loadBalancingSucceeded) { + Mockito + .when(partitionLoadBalancingStrategyMock.selectLeasesToTake(allLeases)) + .thenReturn(allLeases) + .thenReturn(Arrays.asList()); + } else { + Mockito + .when(partitionLoadBalancingStrategyMock.selectLeasesToTake(allLeases)) + .thenThrow(new IllegalArgumentException("Something is wrong")); + } + + Mockito.when(partitionControllerMock.shutdown()).thenReturn(Mono.empty()); + + PartitionLoadBalancerImpl partitionLoadBalancerImpl = + new PartitionLoadBalancerImpl( + partitionControllerMock, + leaseContainerMock, + partitionLoadBalancingStrategyMock, + Duration.ofSeconds(2), + Schedulers.boundedElastic() + ); + + partitionLoadBalancerImpl + .start() + .timeout(Duration.ofMillis(PARTITION_LOAD_BALANCER_TIMEOUT)) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe(); + Thread.sleep(Duration.ofSeconds(5).toMillis()); + Mockito.verify(leaseContainerMock, Mockito.atMost(3)).getAllLeases(); + partitionLoadBalancerImpl + .stop() + .timeout(Duration.ofMillis(PARTITION_LOAD_BALANCER_TIMEOUT)) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe(); + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionControllerImplTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionControllerImplTests.java index 2848f8c8132a0..111fab8293835 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionControllerImplTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionControllerImplTests.java @@ -3,12 +3,14 @@ package com.azure.cosmos.implementation.changefeed.pkversion; +import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.changefeed.LeaseContainer; import com.azure.cosmos.implementation.changefeed.LeaseManager; import com.azure.cosmos.implementation.changefeed.PartitionSupervisor; import com.azure.cosmos.implementation.changefeed.PartitionSupervisorFactory; import com.azure.cosmos.implementation.changefeed.exceptions.FeedRangeGoneException; import org.mockito.Mockito; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -21,13 +23,24 @@ import java.util.UUID; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class PartitionControllerImplTests { - @Test(groups = "unit") - public void handleSplit() throws InterruptedException { + + @DataProvider(name = "maxScaleCountArgProvider") + public static Object[][] maxScaleCountArgProvider() { + return new Object[][]{ + // maxScaleCount + { 0 }, + { 1 } + }; + } + + @Test(groups = "unit", dataProvider = "maxScaleCountArgProvider") + public void handleSplit(int maxScaleCount) throws InterruptedException { LeaseContainer leaseContainer = Mockito.mock(LeaseContainer.class); when(leaseContainer.getOwnedLeases()).thenReturn(Flux.empty()); @@ -42,7 +55,8 @@ public void handleSplit() throws InterruptedException { leaseManager, partitionSupervisorFactory, synchronizer, - scheduler); + scheduler, + maxScaleCount); ServiceItemLease lease = new ServiceItemLease().withLeaseToken("1"); lease.setId("TestLease-" + UUID.randomUUID()); @@ -69,27 +83,53 @@ public void handleSplit() throws InterruptedException { // add some waiting time here so that we can capture all the calls Thread.sleep(500); - // Verify total three leases are acquired - verify(leaseManager, times(1)).acquire(lease); - verify(leaseManager, times(1)).acquire(childLease1); - verify(leaseManager, times(1)).acquire(childLease2); - - // Verify partitionSupervisor is created for each lease - verify(partitionSupervisorFactory, times(1)).create(lease); - verify(partitionSupervisorFactory, times(1)).create(childLease1); - verify(partitionSupervisorFactory, times(1)).create(childLease2); - - // Verify only the lease with feedRangeGone exception will be deleted from lease container - verify(leaseManager, times(1)).delete(lease); - verify(leaseManager, Mockito.never()).delete(childLease1); - verify(leaseManager, Mockito.never()).delete(childLease2); - - // Verify at the end, all the leases will be released - verify(leaseManager, times(1)).release(lease); - verify(leaseManager, times(1)).release(childLease1); - verify(leaseManager, times(1)).release(childLease2); - - verify(leaseManager, Mockito.never()).updateProperties(Mockito.any()); + if (maxScaleCount > 0) { + // when maxScaleCount > 0, there will be no direct lease assignment, all lease assignments will go through load balancing stage + + // Verify only parent lease is acquired + verify(leaseManager, times(1)).acquire(lease); + verify(leaseManager, never()).acquire(childLease1); + verify(leaseManager, never()).acquire(childLease2); + + // Verify partitionSupervisor is created only for parent lease + verify(partitionSupervisorFactory, times(1)).create(lease); + verify(partitionSupervisorFactory, never()).create(childLease1); + verify(partitionSupervisorFactory, never()).create(childLease2); + + // Verify only the lease with feedRangeGone exception will be deleted from lease container + verify(leaseManager, times(1)).delete(lease); + verify(leaseManager, Mockito.never()).delete(childLease1); + verify(leaseManager, Mockito.never()).delete(childLease2); + + // Verify at the end, all the leases will be released + verify(leaseManager, times(1)).release(lease); + verify(leaseManager, never()).release(childLease1); + verify(leaseManager, never()).release(childLease2); + + verify(leaseManager, Mockito.never()).updateProperties(Mockito.any()); + } else { + // Verify total three leases are acquired + verify(leaseManager, times(1)).acquire(lease); + verify(leaseManager, times(1)).acquire(childLease1); + verify(leaseManager, times(1)).acquire(childLease2); + + // Verify partitionSupervisor is created for each lease + verify(partitionSupervisorFactory, times(1)).create(lease); + verify(partitionSupervisorFactory, times(1)).create(childLease1); + verify(partitionSupervisorFactory, times(1)).create(childLease2); + + // Verify only the lease with feedRangeGone exception will be deleted from lease container + verify(leaseManager, times(1)).delete(lease); + verify(leaseManager, Mockito.never()).delete(childLease1); + verify(leaseManager, Mockito.never()).delete(childLease2); + + // Verify at the end, all the leases will be released + verify(leaseManager, times(1)).release(lease); + verify(leaseManager, times(1)).release(childLease1); + verify(leaseManager, times(1)).release(childLease2); + + verify(leaseManager, Mockito.never()).updateProperties(Mockito.any()); + } } private void setDefaultLeaseManagerBehavior(LeaseManager leaseManager, List leases) { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionLoadBalancerImplTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionLoadBalancerImplTests.java new file mode 100644 index 0000000000000..f9fa4f069a511 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionLoadBalancerImplTests.java @@ -0,0 +1,81 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.changefeed.pkversion; + +import com.azure.cosmos.implementation.changefeed.Lease; +import com.azure.cosmos.implementation.changefeed.LeaseContainer; +import com.azure.cosmos.implementation.changefeed.PartitionController; +import com.azure.cosmos.implementation.changefeed.PartitionLoadBalancingStrategy; +import org.mockito.Mockito; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +public class PartitionLoadBalancerImplTests { + private final int PARTITION_LOAD_BALANCER_TIMEOUT = 5000; + + @DataProvider(name = "loadBalancingSucceededArgProvider") + public static Object[][] loadBalancingSucceededArgProvider() { + return new Object[][]{ + // load balancing call succeeded + { false }, + { true } + }; + } + + @Test(groups = "unit", dataProvider = "loadBalancingSucceededArgProvider") + public void run(boolean loadBalancingSucceeded) throws InterruptedException { + PartitionController partitionControllerMock = Mockito.mock(PartitionController.class); + LeaseContainer leaseContainerMock = Mockito.mock(LeaseContainer.class); + PartitionLoadBalancingStrategy partitionLoadBalancingStrategyMock = Mockito.mock(PartitionLoadBalancingStrategy.class); + + ServiceItemLease lease = new ServiceItemLease().withLeaseToken("1"); + lease.setId("TestLease-" + UUID.randomUUID()); + + List allLeases = Arrays.asList(lease); + Mockito.when(leaseContainerMock.getAllLeases()).thenReturn(Flux.fromIterable(allLeases)); + + if (loadBalancingSucceeded) { + Mockito + .when(partitionLoadBalancingStrategyMock.selectLeasesToTake(allLeases)) + .thenReturn(allLeases) + .thenReturn(Arrays.asList()); + } else { + Mockito + .when(partitionLoadBalancingStrategyMock.selectLeasesToTake(allLeases)) + .thenThrow(new IllegalArgumentException("Something is wrong")); + } + + Mockito.when(partitionControllerMock.shutdown()).thenReturn(Mono.empty()); + + PartitionLoadBalancerImpl partitionLoadBalancerImpl = + new PartitionLoadBalancerImpl( + partitionControllerMock, + leaseContainerMock, + partitionLoadBalancingStrategyMock, + Duration.ofSeconds(2), + Schedulers.boundedElastic() + ); + + partitionLoadBalancerImpl + .start() + .timeout(Duration.ofMillis(PARTITION_LOAD_BALANCER_TIMEOUT)) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe(); + Thread.sleep(Duration.ofSeconds(5).toMillis()); + Mockito.verify(leaseContainerMock, Mockito.atMost(3)).getAllLeases(); + partitionLoadBalancerImpl + .stop() + .timeout(Duration.ofMillis(PARTITION_LOAD_BALANCER_TIMEOUT)) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe(); + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java index af354d9dea5a2..acef05ff572cf 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java @@ -27,6 +27,7 @@ import com.azure.cosmos.models.SqlParameter; import com.azure.cosmos.models.SqlQuerySpec; import com.azure.cosmos.models.ThroughputProperties; +import com.azure.cosmos.models.ThroughputResponse; import com.azure.cosmos.rx.TestSuiteBase; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; @@ -808,6 +809,148 @@ public void readFeedDocumentsAfterSplit() throws InterruptedException { } } + @Test(groups = { "simple" }, timeOut = 160 * CHANGE_FEED_PROCESSOR_TIMEOUT) + public void readFeedDocumentsAfterSplit_maxScaleCount() throws InterruptedException { + CosmosAsyncContainer createdFeedCollectionForSplit = createFeedCollection(FEED_COLLECTION_THROUGHPUT); + CosmosAsyncContainer createdLeaseCollection = createLeaseCollection(2 * LEASE_COLLECTION_THROUGHPUT); + + ChangeFeedProcessor changeFeedProcessor1; + ChangeFeedProcessor changeFeedProcessor2; + String changeFeedProcessor1HostName = RandomStringUtils.randomAlphabetic(6); + String changeFeedProcessor2HostName = RandomStringUtils.randomAlphabetic(6); + + try { + // Set up the maxScaleCount to be equal to the current partition count + int partitionCountBeforeSplit = createdFeedCollectionForSplit.getFeedRanges().block().size(); + List createdDocuments = new ArrayList<>(); + Map receivedDocuments = new ConcurrentHashMap<>(); + + // generate a first batch of documents + setupReadFeedDocuments(createdDocuments, receivedDocuments, createdFeedCollectionForSplit, FEED_COUNT); + + changeFeedProcessor1 = new ChangeFeedProcessorBuilder() + .hostName(changeFeedProcessor1HostName) + .handleLatestVersionChanges(changeFeedProcessorHandler(receivedDocuments)) + .feedContainer(createdFeedCollectionForSplit) + .leaseContainer(createdLeaseCollection) + .options(new ChangeFeedProcessorOptions() + .setLeasePrefix("TEST") + .setStartFromBeginning(true) + .setMaxItemCount(10) + .setLeaseAcquireInterval(Duration.ofSeconds(1)) + .setMaxScaleCount(partitionCountBeforeSplit) // set to match the partition count + .setLeaseRenewInterval(Duration.ofSeconds(2)) + ) + .buildChangeFeedProcessor(); + + changeFeedProcessor1 + .start() + .subscribeOn(Schedulers.boundedElastic()) + .timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .onErrorResume(throwable -> { + logger.error("Change feed processor did not start in the expected time", throwable); + return Mono.error(throwable); + }) + .block(); + + // Wait for the feed processor to receive and process the second batch of documents. + waitToReceiveDocuments(receivedDocuments, 2 * CHANGE_FEED_PROCESSOR_TIMEOUT, FEED_COUNT); + + // increase throughput to force a single partition collection to go through a split + createdFeedCollectionForSplit + .readThroughput() + .subscribeOn(Schedulers.boundedElastic()) + .flatMap(currentThroughput -> + createdFeedCollectionForSplit + .replaceThroughput(ThroughputProperties.createManualThroughput(FEED_COLLECTION_THROUGHPUT_FOR_SPLIT)) + .subscribeOn(Schedulers.boundedElastic()) + ) + .block(); + + // wait for the split to finish + ThroughputResponse throughputResponse = createdFeedCollectionForSplit.readThroughput().block(); + while (true) { + assert throughputResponse != null; + if (!throughputResponse.isReplacePending()) { + break; + } + logger.info("Waiting for split to complete"); + Thread.sleep(10 * 1000); + throughputResponse = createdFeedCollectionForSplit.readThroughput().block(); + } + + // generate the second batch of documents + setupReadFeedDocuments(createdDocuments, receivedDocuments, createdFeedCollectionForSplit, FEED_COUNT); + + // wait for the change feed processor to receive some documents + Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT); + + String leaseQuery = "select * from c where not contains(c.id, \"info\")"; + List leaseDocuments = + createdLeaseCollection + .queryItems(leaseQuery, JsonNode.class) + .byPage() + .blockFirst() + .getResults(); + + long host1Leases = leaseDocuments.stream().filter(lease -> lease.get("Owner").asText().equals(changeFeedProcessor1HostName)).count(); + assertThat(host1Leases).isEqualTo(partitionCountBeforeSplit); + + // now starts a new change feed processor + changeFeedProcessor2 = new ChangeFeedProcessorBuilder() + .hostName(changeFeedProcessor2HostName) + .handleLatestVersionChanges(changeFeedProcessorHandler(receivedDocuments)) + .feedContainer(createdFeedCollectionForSplit) + .leaseContainer(createdLeaseCollection) + .options(new ChangeFeedProcessorOptions() + .setLeasePrefix("TEST") + .setStartFromBeginning(true) + .setMaxItemCount(10) + .setLeaseAcquireInterval(Duration.ofSeconds(1)) + .setMaxScaleCount(partitionCountBeforeSplit) // set to match the partition count + .setLeaseRenewInterval(Duration.ofSeconds(2)) + ) + .buildChangeFeedProcessor(); + + changeFeedProcessor2 + .start() + .subscribeOn(Schedulers.boundedElastic()) + .timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .onErrorResume(throwable -> { + logger.error("Change feed processor did not start in the expected time", throwable); + return Mono.error(throwable); + }) + .subscribe(); + + // Wait for the feed processor to receive and process the second batch of documents. + waitToReceiveDocuments(receivedDocuments, 2 * CHANGE_FEED_PROCESSOR_TIMEOUT, FEED_COUNT*2); + + changeFeedProcessor1 + .stop() + .timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .onErrorResume(throwable -> { + logger.error("Change feed processor1 did not stop in the expected time", throwable); + return Mono.empty(); + }) + .block(); + changeFeedProcessor2 + .stop() + .timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .onErrorResume(throwable -> { + logger.error("Change feed processor2 did not stop in the expected time", throwable); + return Mono.empty(); + }) + .block(); + + } finally { + safeDeleteCollection(createdFeedCollectionForSplit); + safeDeleteCollection(createdLeaseCollection); + + // Allow some time for the collections to be deleted before exiting. + Thread.sleep(500); + } + } + @Test(groups = { "simple" }, timeOut = 20 * TIMEOUT) public void inactiveOwnersRecovery() throws InterruptedException { CosmosAsyncContainer createdFeedCollection = createFeedCollection(FEED_COLLECTION_THROUGHPUT); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java index cdf9bfbefdf7c..442a2d7b44474 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java @@ -23,6 +23,7 @@ import com.azure.cosmos.models.SqlParameter; import com.azure.cosmos.models.SqlQuerySpec; import com.azure.cosmos.models.ThroughputProperties; +import com.azure.cosmos.models.ThroughputResponse; import com.azure.cosmos.rx.TestSuiteBase; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; @@ -756,13 +757,15 @@ public void readFeedDocumentsAfterSplit() throws InterruptedException { CosmosAsyncContainer createdLeaseCollection = createLeaseCollection(2 * LEASE_COLLECTION_THROUGHPUT); CosmosAsyncContainer createdLeaseMonitorCollection = createLeaseMonitorCollection(LEASE_COLLECTION_THROUGHPUT); + ChangeFeedProcessor leaseMonitoringChangeFeedProcessor = null; + try { List createdDocuments = new ArrayList<>(); Map receivedDocuments = new ConcurrentHashMap<>(); LeaseStateMonitor leaseStateMonitor = new LeaseStateMonitor(); // create a monitoring CFP for ensuring the leases are updating as expected - ChangeFeedProcessor leaseMonitoringChangeFeedProcessor = new ChangeFeedProcessorBuilder() + leaseMonitoringChangeFeedProcessor = new ChangeFeedProcessorBuilder() .hostName(hostName) .handleChanges(leasesChangeFeedProcessorHandler(leaseStateMonitor)) .feedContainer(createdLeaseCollection) @@ -927,6 +930,180 @@ public void readFeedDocumentsAfterSplit() throws InterruptedException { Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT); } finally { + if (leaseMonitoringChangeFeedProcessor != null && leaseMonitoringChangeFeedProcessor.isStarted()) { + leaseMonitoringChangeFeedProcessor + .stop() + .timeout(Duration.ofMinutes(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .onErrorResume(throwable -> { + logger.warn("Stop leaseMonitoringChangeFeedProcessor failed", throwable); + return Mono.empty(); + }) + .block(); + } + + if (changeFeedProcessor != null && changeFeedProcessor.isStarted()) { + changeFeedProcessor + .stop() + .timeout(Duration.ofMinutes(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .onErrorResume(throwable -> { + logger.warn("Stop changeFeedProcessor failed", throwable); + return Mono.empty(); + }) + .block(); + } + + safeDeleteCollection(createdFeedCollectionForSplit); + safeDeleteCollection(createdLeaseCollection); + + // Allow some time for the collections to be deleted before exiting. + Thread.sleep(500); + } + } + + @Test(groups = { "simple" }, timeOut = 160 * CHANGE_FEED_PROCESSOR_TIMEOUT) + public void readFeedDocumentsAfterSplit_maxScaleCount() throws InterruptedException { + CosmosAsyncContainer createdFeedCollectionForSplit = createFeedCollection(FEED_COLLECTION_THROUGHPUT); + CosmosAsyncContainer createdLeaseCollection = createLeaseCollection(2 * LEASE_COLLECTION_THROUGHPUT); + + ChangeFeedProcessor changeFeedProcessor1 = null; + ChangeFeedProcessor changeFeedProcessor2 = null; + String changeFeedProcessor1HostName = RandomStringUtils.randomAlphabetic(6); + String changeFeedProcessor2HostName = RandomStringUtils.randomAlphabetic(6); + logger.info("readFeedDocumentsAfterSplit_maxScaleCount changeFeedProcessor1 name {}", changeFeedProcessor1HostName); + logger.info("readFeedDocumentsAfterSplit_maxScaleCount changeFeedProcessor2 name {}", changeFeedProcessor2HostName); + + try { + // Set up the maxScaleCount to be equal to the current partition count + int partitionCountBeforeSplit = createdFeedCollectionForSplit.getFeedRanges().block().size(); + List createdDocuments = new ArrayList<>(); + Map receivedDocuments = new ConcurrentHashMap<>(); + + // generate a first batch of documents + setupReadFeedDocuments(createdDocuments, receivedDocuments, createdFeedCollectionForSplit, FEED_COUNT); + + changeFeedProcessor1 = new ChangeFeedProcessorBuilder() + .hostName(changeFeedProcessor1HostName) + .handleChanges(changeFeedProcessorHandler(receivedDocuments)) + .feedContainer(createdFeedCollectionForSplit) + .leaseContainer(createdLeaseCollection) + .options(new ChangeFeedProcessorOptions() + .setLeasePrefix("TEST") + .setStartFromBeginning(true) + .setMaxItemCount(10) + .setLeaseAcquireInterval(Duration.ofSeconds(1)) + .setMaxScaleCount(partitionCountBeforeSplit) // set to match the partition count + .setLeaseRenewInterval(Duration.ofSeconds(2)) + ) + .buildChangeFeedProcessor(); + + changeFeedProcessor1 + .start() + .subscribeOn(Schedulers.boundedElastic()) + .timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .onErrorResume(throwable -> { + log.error("Change feed processor did not start in the expected time", throwable); + return Mono.error(throwable); + }) + .block(); + + // Wait for the feed processor to receive and process the second batch of documents. + waitToReceiveDocuments(receivedDocuments, 2 * CHANGE_FEED_PROCESSOR_TIMEOUT, FEED_COUNT); + + // increase throughput to force a single partition collection to go through a split + createdFeedCollectionForSplit + .readThroughput() + .subscribeOn(Schedulers.boundedElastic()) + .flatMap(currentThroughput -> + createdFeedCollectionForSplit + .replaceThroughput(ThroughputProperties.createManualThroughput(FEED_COLLECTION_THROUGHPUT_FOR_SPLIT)) + .subscribeOn(Schedulers.boundedElastic()) + ) + .block(); + + // wait for the split to finish + ThroughputResponse throughputResponse = createdFeedCollectionForSplit.readThroughput().block(); + while (true) { + assert throughputResponse != null; + if (!throughputResponse.isReplacePending()) { + break; + } + logger.info("Waiting for split to complete"); + Thread.sleep(10 * 1000); + throughputResponse = createdFeedCollectionForSplit.readThroughput().block(); + } + + // generate the second batch of documents + setupReadFeedDocuments(createdDocuments, receivedDocuments, createdFeedCollectionForSplit, FEED_COUNT); + + // wait for the change feed processor to receive some documents + Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT); + + String leaseQuery = "select * from c where not contains(c.id, \"info\")"; + List leaseDocuments = + createdLeaseCollection + .queryItems(leaseQuery, JsonNode.class) + .byPage() + .blockFirst() + .getResults(); + + long host1Leases = leaseDocuments.stream().filter(lease -> lease.get("Owner").asText().equals(changeFeedProcessor1HostName)).count(); + for (JsonNode lease : leaseDocuments) { + logger.info("readFeedDocumentsAfterSplit_maxScaleCount lease {} {}", lease.get("Owner").asText(), lease); + } + assertThat(host1Leases).isEqualTo(partitionCountBeforeSplit); + + // now starts a new change feed processor + changeFeedProcessor2 = new ChangeFeedProcessorBuilder() + .hostName(changeFeedProcessor2HostName) + .handleChanges(changeFeedProcessorHandler(receivedDocuments)) + .feedContainer(createdFeedCollectionForSplit) + .leaseContainer(createdLeaseCollection) + .options(new ChangeFeedProcessorOptions() + .setLeasePrefix("TEST") + .setStartFromBeginning(true) + .setMaxItemCount(10) + .setLeaseAcquireInterval(Duration.ofSeconds(1)) + .setMaxScaleCount(partitionCountBeforeSplit) // set to match the partition count + .setLeaseRenewInterval(Duration.ofSeconds(2)) + ) + .buildChangeFeedProcessor(); + + changeFeedProcessor2 + .start() + .subscribeOn(Schedulers.boundedElastic()) + .timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .onErrorResume(throwable -> { + log.error("Change feed processor did not start in the expected time", throwable); + return Mono.error(throwable); + }) + .subscribe(); + + // Wait for the feed processor to receive and process the second batch of documents. + waitToReceiveDocuments(receivedDocuments, 2 * CHANGE_FEED_PROCESSOR_TIMEOUT, FEED_COUNT*2); + + } finally { + if (changeFeedProcessor1 != null && changeFeedProcessor1.isStarted()) { + changeFeedProcessor1 + .stop() + .timeout(Duration.ofMinutes(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .onErrorResume(throwable -> { + logger.warn("Stop changeFeedProcessor1 failed", throwable); + return Mono.empty(); + }) + .block(); + } + + if (changeFeedProcessor2 != null && changeFeedProcessor2.isStarted()) { + changeFeedProcessor2 + .stop() + .timeout(Duration.ofMinutes(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .onErrorResume(throwable -> { + logger.warn("Stop changeFeedProcessor2 failed", throwable); + return Mono.empty(); + }) + .block(); + } + safeDeleteCollection(createdFeedCollectionForSplit); safeDeleteCollection(createdLeaseCollection); diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index e9d450f597307..e08a6de9912ca 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -7,6 +7,7 @@ #### Breaking Changes #### Bugs Fixed +* Fixed `IllegalArgumentException` in changeFeedProcessor when `maxScaleCount` is configured - See [PR 34618](https://github.com/Azure/azure-sdk-for-java/pull/34618) #### Other Changes diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseManager.java index 5267efda59f9b..7dbe0bb029435 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseManager.java @@ -6,6 +6,8 @@ import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl; import reactor.core.publisher.Mono; +import java.util.Map; + /** * It defines a way to perform operations with {@link Lease}. */ @@ -19,6 +21,16 @@ public interface LeaseManager { */ Mono createLeaseIfNotExist(String leaseToken, String continuationToken); + /** + * Checks whether the lease exists and creates it if it does not exist. + * + * @param leaseToken the lease token to work with. + * @param continuationToken the continuation token if it exists. + * @param properties the properties. + * @return the lease. + */ + Mono createLeaseIfNotExist(String leaseToken, String continuationToken, Map properties); + /** * Checks whether the lease exists and creates it if it does not exist. * @@ -28,6 +40,16 @@ public interface LeaseManager { */ Mono createLeaseIfNotExist(FeedRangeEpkImpl feedRange, String continuationToken); + /** + * Checks whether the lease exists and creates it if it does not exist. + * + * @param feedRange the epk range for the lease. + * @param continuationToken the continuation token if it exists. + * @param properties the properties. + * @return the lease. + */ + Mono createLeaseIfNotExist(FeedRangeEpkImpl feedRange, String continuationToken, Map properties); + /** * Deletes the lease. * diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseStoreManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseStoreManager.java index fa10bf1a771bd..a345699202afa 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseStoreManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseStoreManager.java @@ -9,6 +9,7 @@ import java.time.Duration; import java.util.List; +import java.util.Map; /** * Defines an interface for operations with {@link Lease}. @@ -52,6 +53,16 @@ interface LeaseStoreManagerBuilderDefinition { */ Mono createLeaseIfNotExist(String leaseToken, String continuationToken); + /** + * Checks whether the lease exists and creates it if it does not exist. + * + * @param leaseToken the partition to work on. + * @param continuationToken the continuation token if it exists. + * @param properties the properties. + * @return the lease. + */ + Mono createLeaseIfNotExist(String leaseToken, String continuationToken, Map properties); + /** * Checks whether the lease exists and creates it if it does not exist. * @@ -61,6 +72,16 @@ interface LeaseStoreManagerBuilderDefinition { */ Mono createLeaseIfNotExist(FeedRangeEpkImpl feedRange, String continuationToken); + /** + * Checks whether the lease exists and creates it if it does not exist. + * + * @param feedRange the epk range for the lease. + * @param continuationToken the continuation token if it exists. + * @param properties the properties. + * @return the lease. + */ + Mono createLeaseIfNotExist(FeedRangeEpkImpl feedRange, String continuationToken, Map properties); + /** * DELETE the lease. * diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/EqualPartitionsBalancingStrategy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/EqualPartitionsBalancingStrategy.java index 7fd5b76cb6f43..53827479d4171 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/EqualPartitionsBalancingStrategy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/EqualPartitionsBalancingStrategy.java @@ -78,6 +78,13 @@ public List selectLeasesToTake(List allLeases) { } } + // If we reach here with partitionsNeededForMe < 0, then it means the change feed processor instances has owned leases >= the maxScaleCount. + // Then in this case, the change feed processor instance will not pick up any new leases. + if (partitionsNeededForMe <= 0) + { + return new ArrayList<>(); + } + return expiredLeases.subList(0, Math.min(partitionsNeededForMe, expiredLeases.size())); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/LeaseStoreManagerImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/LeaseStoreManagerImpl.java index 64e65fcb035d3..6472a93ebd53f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/LeaseStoreManagerImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/LeaseStoreManagerImpl.java @@ -142,11 +142,21 @@ public Flux getOwnedLeases() { @Override public Mono createLeaseIfNotExist(String leaseToken, String continuationToken) { + return this.createLeaseIfNotExist(leaseToken, continuationToken, null); + } + + @Override + public Mono createLeaseIfNotExist(String leaseToken, String continuationToken, Map properties) { throw new UnsupportedOperationException("partition key based leases are not supported for Change Feed V1 wire format"); } @Override public Mono createLeaseIfNotExist(FeedRangeEpkImpl feedRange, String continuationToken) { + return this.createLeaseIfNotExist(feedRange, continuationToken, null); + } + + @Override + public Mono createLeaseIfNotExist(FeedRangeEpkImpl feedRange, String continuationToken, Map properties) { checkNotNull(feedRange, "Argument 'feedRanges' should not be null"); String leaseToken = feedRange.getRange().getMin() + "-" + feedRange.getRange().getMax(); @@ -157,33 +167,34 @@ public Mono createLeaseIfNotExist(FeedRangeEpkImpl feedRange, String cont .withId(leaseDocId) .withLeaseToken(leaseToken) .withFeedRange(feedRange) - .withContinuationToken(continuationToken); + .withContinuationToken(continuationToken) + .withProperties(properties); return this.leaseDocumentClient.createItem(this.settings.getLeaseCollectionLink(), documentServiceLease, null, false) - .onErrorResume( ex -> { - if (ex instanceof CosmosException) { - CosmosException e = (CosmosException) ex; - if (Exceptions.isConflict(e)) { - logger.info("Some other host created lease for {}.", leaseToken); - return Mono.empty(); - } - } - - return Mono.error(ex); - }) - .map(documentResourceResponse -> { - if (documentResourceResponse == null) { - return null; - } - - InternalObjectNode document = BridgeInternal.getProperties(documentResourceResponse); - - return documentServiceLease - .withId(document.getId()) - .withETag(document.getETag()) - .withTs(ModelBridgeInternal.getStringFromJsonSerializable(document, - Constants.Properties.LAST_MODIFIED)); - }); + .onErrorResume( ex -> { + if (ex instanceof CosmosException) { + CosmosException e = (CosmosException) ex; + if (Exceptions.isConflict(e)) { + logger.info("Some other host created lease for {}.", leaseToken); + return Mono.empty(); + } + } + + return Mono.error(ex); + }) + .map(documentResourceResponse -> { + if (documentResourceResponse == null) { + return null; + } + + InternalObjectNode document = BridgeInternal.getProperties(documentResourceResponse); + + return documentServiceLease + .withId(document.getId()) + .withETag(document.getETag()) + .withTs(ModelBridgeInternal.getStringFromJsonSerializable(document, + Constants.Properties.LAST_MODIFIED)); + }); } @Override diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionControllerImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionControllerImpl.java index 80b058d67084d..d4e63734fa317 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionControllerImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionControllerImpl.java @@ -168,8 +168,12 @@ private Mono handleFeedRangeGone(Lease lease, String lastContinuationToken .flatMap(partitionGoneHandler -> { return partitionGoneHandler.handlePartitionGone() .flatMap(l -> { - l.setProperties(lease.getProperties()); - return this.addOrUpdateLease(l); + if (partitionGoneHandler.shouldSkipDirectLeaseAssignment()) { + return Mono.empty(); + } else { + l.setProperties(lease.getProperties()); + return this.addOrUpdateLease(l); + } }) .then(this.tryDeleteGoneLease(lease, partitionGoneHandler.shouldDeleteCurrentLease())); }) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionLoadBalancerImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionLoadBalancerImpl.java index 47d6debb76204..2034d0a15d71b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionLoadBalancerImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionLoadBalancerImpl.java @@ -111,30 +111,30 @@ private Mono run(CancellationToken cancellationToken) { if (cancellationToken.isCancellationRequested()) return Mono.empty(); return this.partitionController.addOrUpdateLease(lease); }) - .then(Mono.just(this) - .flatMap(value -> { - if (cancellationToken.isCancellationRequested()) { - return Mono.empty(); - } - - Instant stopTimer = Instant.now().plus(this.leaseAcquireInterval); - return Mono.just(value) - .delayElement(Duration.ofMillis(100), CosmosSchedulers.COSMOS_PARALLEL) - .repeat( () -> { - Instant currentTime = Instant.now(); - return !cancellationToken.isCancellationRequested() && currentTime.isBefore(stopTimer); - }).last(); - }) - ); + .then(); }) .onErrorResume(throwable -> { // "catch all" exception handler to keep the loop going until the user stops the change feed processor logger.warn("Unexpected exception thrown while trying to acquire available leases", throwable); return Mono.empty(); }) - .repeat(() -> { - return !cancellationToken.isCancellationRequested(); - }) + .then( + Mono.just(this) + .flatMap(value -> { + if (cancellationToken.isCancellationRequested()) { + return Mono.empty(); + } + Instant stopTimer = Instant.now().plus(this.leaseAcquireInterval); + return Mono.just(value) + .delayElement(Duration.ofMillis(100), CosmosSchedulers.COSMOS_PARALLEL) + .repeat(() -> { + Instant currentTime = Instant.now(); + return !cancellationToken.isCancellationRequested() && currentTime.isBefore(stopTimer); + }) + .then(); + }) + ) + .repeat(() -> !cancellationToken.isCancellationRequested()) .then() .onErrorResume(throwable -> { // We should not get here. diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionSynchronizerImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionSynchronizerImpl.java index 9d125d56b8286..ffde9263cf88f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionSynchronizerImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionSynchronizerImpl.java @@ -111,7 +111,7 @@ public Mono getFeedRangeGoneHandler(Lease lease) { if (pkRangeList.size() > 1) { // Split: More than two children spanning the pkRange - return Mono.just(new FeedRangeGoneSplitHandler(lease, pkRangeList, this.leaseManager)); + return Mono.just(new FeedRangeGoneSplitHandler(lease, pkRangeList, this.leaseManager, this.changeFeedProcessorOptions.getMaxScaleCount())); } // Merge diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/feedRangeGoneHandler/FeedRangeGoneHandler.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/feedRangeGoneHandler/FeedRangeGoneHandler.java index 96fba31ba31ab..68187ab4dbe53 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/feedRangeGoneHandler/FeedRangeGoneHandler.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/feedRangeGoneHandler/FeedRangeGoneHandler.java @@ -12,4 +12,5 @@ public interface FeedRangeGoneHandler { Flux handlePartitionGone(); boolean shouldDeleteCurrentLease(); + boolean shouldSkipDirectLeaseAssignment(); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/feedRangeGoneHandler/FeedRangeGoneMergeHandler.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/feedRangeGoneHandler/FeedRangeGoneMergeHandler.java index 00f7a47f8c08c..921e42a029508 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/feedRangeGoneHandler/FeedRangeGoneMergeHandler.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/feedRangeGoneHandler/FeedRangeGoneMergeHandler.java @@ -43,4 +43,11 @@ public Flux handlePartitionGone() { public boolean shouldDeleteCurrentLease() { return this.removeCurrentLease.get(); } + + @Override + public boolean shouldSkipDirectLeaseAssignment() { + // For merge, the same lease will be reused, + // so the current instance can acquire ownership of the lease right away + return false; + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/feedRangeGoneHandler/FeedRangeGoneSplitHandler.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/feedRangeGoneHandler/FeedRangeGoneSplitHandler.java index 9980767c4d151..573e54333a7a6 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/feedRangeGoneHandler/FeedRangeGoneSplitHandler.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/feedRangeGoneHandler/FeedRangeGoneSplitHandler.java @@ -28,8 +28,13 @@ public class FeedRangeGoneSplitHandler implements FeedRangeGoneHandler { private final List overlappingRanges; private final LeaseManager leaseManager; private final boolean removeCurrentLease; + private final boolean shouldSkipDirectLeaseAssignment; - public FeedRangeGoneSplitHandler(Lease lease, List overlappingRanges, LeaseManager leaseManager) { + public FeedRangeGoneSplitHandler( + Lease lease, + List overlappingRanges, + LeaseManager leaseManager, + int maxScaleCount) { checkNotNull(lease, "Argument 'lease' can not be null"); checkNotNull(overlappingRanges, "Argument 'overlappingRanges' can not be null"); checkNotNull(leaseManager, "Argument 'leaseManager' can not be null"); @@ -41,6 +46,10 @@ public FeedRangeGoneSplitHandler(Lease lease, List overlappin // A flag to indicate to upstream whether the current lease which we get FeedRangeGoneException should be removed. // For split, the parent lease will be replaced by the child leases. so we need to remove the current lease in the end. this.removeCurrentLease = true; + + // If maxScaleCount is configured, then all lease assignments should go through load balancer + // It will make sure the change feed processor instance always honor the maxScaleCount config + this.shouldSkipDirectLeaseAssignment = maxScaleCount > 0; } @Override @@ -118,4 +127,9 @@ private String getEffectiveChildLeaseContinuationToken(FeedRangeEpkImpl childLea public boolean shouldDeleteCurrentLease() { return this.removeCurrentLease; } + + @Override + public boolean shouldSkipDirectLeaseAssignment() { + return this.shouldSkipDirectLeaseAssignment; + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/IncrementalChangeFeedProcessorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/IncrementalChangeFeedProcessorImpl.java index 1fb17ef5d6e42..3bb9bdc411592 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/IncrementalChangeFeedProcessorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/IncrementalChangeFeedProcessorImpl.java @@ -449,7 +449,14 @@ private Mono buildPartitionManager(LeaseStoreManager leaseStor this.changeFeedProcessorOptions.getLeaseExpirationInterval()); } - PartitionController partitionController = new PartitionControllerImpl(leaseStoreManager, leaseStoreManager, partitionSupervisorFactory, synchronizer, scheduler); + PartitionController partitionController = + new PartitionControllerImpl( + leaseStoreManager, + leaseStoreManager, + partitionSupervisorFactory, + synchronizer, + scheduler, + this.changeFeedProcessorOptions.getMaxScaleCount()); if (this.healthMonitor == null) { this.healthMonitor = new TraceHealthMonitor(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/LeaseStoreManagerImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/LeaseStoreManagerImpl.java index de44e9415bc86..edc8d5ab43540 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/LeaseStoreManagerImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/LeaseStoreManagerImpl.java @@ -139,11 +139,21 @@ public Flux getOwnedLeases() { @Override public Mono createLeaseIfNotExist(FeedRangeEpkImpl feedRange, String continuationToken) { + return this.createLeaseIfNotExist(feedRange, continuationToken, null); + } + + @Override + public Mono createLeaseIfNotExist(FeedRangeEpkImpl feedRange, String continuationToken, Map properties) { throw new UnsupportedOperationException("FeedRangeEpkImpl based leases are not supported for Change Feed V0 wire format"); } @Override public Mono createLeaseIfNotExist(String leaseToken, String continuationToken) { + return this.createLeaseIfNotExist(leaseToken, continuationToken, null); + } + + @Override + public Mono createLeaseIfNotExist(String leaseToken, String continuationToken, Map properties) { if (leaseToken == null) { throw new IllegalArgumentException("leaseToken"); } @@ -152,7 +162,8 @@ public Mono createLeaseIfNotExist(String leaseToken, String continuationT ServiceItemLease documentServiceLease = new ServiceItemLease() .withId(leaseDocId) .withLeaseToken(leaseToken) - .withContinuationToken(continuationToken); + .withContinuationToken(continuationToken) + .withProperties(properties); return this.leaseDocumentClient.createItem(this.settings.getLeaseCollectionLink(), documentServiceLease, null, false) .onErrorResume( ex -> { @@ -173,8 +184,6 @@ public Mono createLeaseIfNotExist(String leaseToken, String continuationT InternalObjectNode document = BridgeInternal.getProperties(documentResourceResponse); -// logger.info("Created lease for partition {}.", leaseToken); - return documentServiceLease .withId(document.getId()) .withETag(document.getETag()) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionControllerImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionControllerImpl.java index ca7f575399f73..1f2379392e9e5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionControllerImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionControllerImpl.java @@ -32,6 +32,7 @@ class PartitionControllerImpl implements PartitionController { private final LeaseManager leaseManager; private final PartitionSupervisorFactory partitionSupervisorFactory; private final PartitionSynchronizer synchronizer; + private final boolean shouldSkipDirectLeaseAssignment; private CancellationTokenSource shutdownCts; private final Scheduler scheduler; @@ -41,13 +42,18 @@ public PartitionControllerImpl( LeaseManager leaseManager, PartitionSupervisorFactory partitionSupervisorFactory, PartitionSynchronizer synchronizer, - Scheduler scheduler) { + Scheduler scheduler, + int maxScaleCount) { this.leaseContainer = leaseContainer; this.leaseManager = leaseManager; this.partitionSupervisorFactory = partitionSupervisorFactory; this.synchronizer = synchronizer; this.scheduler = scheduler; + + // If maxScaleCount is configured, then all lease assignments should go through load balancer + // It will make sure the change feed processor instance always honor the maxScaleCount config + this.shouldSkipDirectLeaseAssignment = maxScaleCount > 0; } @Override @@ -165,8 +171,12 @@ private Mono handleSplit(Lease lease, String lastContinuationToken) { lease.setContinuationToken(lastContinuationToken); return this.synchronizer.splitPartition(lease) .flatMap(l -> { - l.setProperties(lease.getProperties()); - return this.addOrUpdateLease(l); + if (this.shouldSkipDirectLeaseAssignment) { + return Mono.empty(); + } else { + l.setProperties(lease.getProperties()); + return this.addOrUpdateLease(l); + } }).then(this.leaseManager.delete(lease)) .onErrorResume(throwable -> { logger.warn("Partition {}: failed to split", lease.getLeaseToken(), throwable); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionLoadBalancerImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionLoadBalancerImpl.java index 24007e4760948..d05fe13f01b6d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionLoadBalancerImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionLoadBalancerImpl.java @@ -119,30 +119,30 @@ private Mono run(CancellationToken cancellationToken) { if (cancellationToken.isCancellationRequested()) return Mono.empty(); return this.partitionController.addOrUpdateLease(lease); }) - .then(Mono.just(this) + .then(); + }) + .onErrorResume(throwable -> { + // "catch all" exception handler to keep the loop going until the user stops the change feed processor + logger.warn("Unexpected exception thrown while trying to acquire available leases", throwable); + return Mono.empty(); + }) + .then( + Mono.just(this) .flatMap(value -> { if (cancellationToken.isCancellationRequested()) { return Mono.empty(); } - Instant stopTimer = Instant.now().plus(this.leaseAcquireInterval); return Mono.just(value) .delayElement(Duration.ofMillis(100), CosmosSchedulers.COSMOS_PARALLEL) - .repeat( () -> { + .repeat(() -> { Instant currentTime = Instant.now(); return !cancellationToken.isCancellationRequested() && currentTime.isBefore(stopTimer); - }).last(); + }) + .then(); }) - ); - }) - .onErrorResume(throwable -> { - // "catch all" exception handler to keep the loop going until the user stops the change feed processor - logger.warn("Unexpected exception thrown while trying to acquire available leases", throwable); - return Mono.empty(); - }) - .repeat(() -> { - return !cancellationToken.isCancellationRequested(); - }) + ) + .repeat(() -> !cancellationToken.isCancellationRequested()) .then() .onErrorResume(throwable -> { // We should not get here. diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionSynchronizerImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionSynchronizerImpl.java index cde7ded5ecf4d..0c535a636e48d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionSynchronizerImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionSynchronizerImpl.java @@ -118,7 +118,7 @@ public Flux splitPartition(Lease lease) { }) .flatMap(addedRangeId -> { // Creating new lease. - return this.leaseManager.createLeaseIfNotExist(addedRangeId, lastContinuationToken); + return this.leaseManager.createLeaseIfNotExist(addedRangeId, lastContinuationToken, lease.getProperties()); }, this.degreeOfParallelism) .map(newLease -> { logger.info("Partition {} split into new partition and continuation token {}.", leaseToken, newLease.getLeaseToken(), lastContinuationToken); diff --git a/sdk/cosmos/platform-matrix.json b/sdk/cosmos/platform-matrix.json index a99d5336674a7..91a32b296581e 100644 --- a/sdk/cosmos/platform-matrix.json +++ b/sdk/cosmos/platform-matrix.json @@ -9,7 +9,7 @@ "-DargLine=\"-Dazure.cosmos.directModeProtocol=Tcp\"": "TCP", "Session": "", "ubuntu": "", - "@{ enableMultipleWriteLocations = $false; defaultConsistencyLevel = 'Session'; enablePartitionMerge = $true }": "", + "@{ enableMultipleWriteLocations = $false; defaultConsistencyLevel = 'Session' }": "", "@{ enableMultipleWriteLocations = $true; defaultConsistencyLevel = 'Session' }": "", "@{ enableMultipleWriteLocations = $false; defaultConsistencyLevel = 'Strong' }": "", "@{ enableMultipleWriteLocations = $false; defaultConsistencyLevel = 'Session'; enablePartitionMerge = $true }": "", @@ -42,7 +42,7 @@ "ACCOUNT_CONSISTENCY": "Session", "PROTOCOLS": "[\"Tcp\"]", "ProfileFlag": [ "-Pfast", "-Pdirect" ], - "ArmTemplateParameters": "@{ enableMultipleWriteLocations = $false; defaultConsistencyLevel = 'Session'; enablePartitionMerge = $true }", + "ArmTemplateParameters": "@{ enableMultipleWriteLocations = $false; defaultConsistencyLevel = 'Session' }", "Agent": { "ubuntu": { "OSVmImage": "MMSUbuntu20.04", "Pool": "azsdk-pool-mms-ubuntu-2004-general" } }