From 5cf2947847b462f4c2ce8bed7393c32e96961fbc Mon Sep 17 00:00:00 2001 From: Ahmed Alouane Date: Sun, 20 Oct 2024 22:57:10 +0100 Subject: [PATCH] Acquire single timestamp lease per transaction-shaped batch --- .../src/main/conjure/timelock-api.yml | 29 +-- .../timelock/AsyncTimelockService.java | 7 +- .../timelock/AsyncTimelockServiceImpl.java | 34 ++-- ...ltiClientTimestampLeaseServiceAdapter.java | 4 +- .../RemotingTimestampLeaseServiceAdapter.java | 29 +-- .../timelock/lock/AsyncLockService.java | 14 +- .../AsyncTimelockServiceImplTest.java | 177 +++++++++++++----- ...ultiClientConjureTimelockResourceTest.java | 78 +++++--- 8 files changed, 250 insertions(+), 122 deletions(-) diff --git a/timelock-api/src/main/conjure/timelock-api.yml b/timelock-api/src/main/conjure/timelock-api.yml index 1c3b80e364..59e5038b56 100644 --- a/timelock-api/src/main/conjure/timelock-api.yml +++ b/timelock-api/src/main/conjure/timelock-api.yml @@ -228,27 +228,30 @@ types: LeaderTimes: fields: leaderTimes: map - TimestampLeaseRequest: - fields: - requestId: - type: uuid - safety: safe - numFreshTimestamps: - type: integer - safety: safe + RequestId: + alias: uuid + safety: safe TimestampLeaseRequests: - alias: map + fields: + requestsId: RequestId + numFreshTimestamps: map + TimestampLeasesRequest: + alias: list MultiClientTimestampLeaseRequest: - alias: map + alias: map TimestampLeaseResponse: fields: minLeased: Long - leaseGuarantee: LeaseGuarantee freshTimestamps: ConjureTimestampRange TimestampLeaseResponses: - alias: map + fields: + leaseGuarantee: LeaseGuarantee + timestampLeaseResponses: map + TimestampLeasesResponse: + fields: + alias: list MultiClientTimestampLeaseResponse: - alias: map + alias: map GetMinLeasedTimestampRequests: alias: list MultiClientGetMinLeasedTimestampRequest: diff --git a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/AsyncTimelockService.java b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/AsyncTimelockService.java index e75af6cb91..9330a5f2b3 100644 --- a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/AsyncTimelockService.java +++ b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/AsyncTimelockService.java @@ -22,7 +22,7 @@ import com.palantir.atlasdb.timelock.api.ConjureStartTransactionsResponse; import com.palantir.atlasdb.timelock.api.GetCommitTimestampsResponse; import com.palantir.atlasdb.timelock.api.TimestampLeaseName; -import com.palantir.atlasdb.timelock.api.TimestampLeaseResponse; +import com.palantir.atlasdb.timelock.api.TimestampLeaseResponses; import com.palantir.atlasdb.timelock.lock.watch.LockWatchingService; import com.palantir.lock.client.IdentifiedLockRequest; import com.palantir.lock.v2.IdentifiedTimeLockRequest; @@ -44,6 +44,7 @@ import com.palantir.timestamp.TimestampRange; import com.palantir.tritium.annotations.Instrument; import java.io.Closeable; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -76,8 +77,8 @@ ListenableFuture getCommitTimestamps( ListenableFuture getFreshTimestampsAsync(int timestampsToRequest); - ListenableFuture acquireTimestampLease( - TimestampLeaseName timestampName, UUID requestId, int numFreshTimestamps); + ListenableFuture acquireTimestampLease( + UUID requestId, Map numFreshTimestamps); ListenableFuture getMinLeasedTimestamp(TimestampLeaseName timestampName); diff --git a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/AsyncTimelockServiceImpl.java b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/AsyncTimelockServiceImpl.java index feb98eb057..2bd10ef038 100644 --- a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/AsyncTimelockServiceImpl.java +++ b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/AsyncTimelockServiceImpl.java @@ -29,6 +29,7 @@ import com.palantir.atlasdb.timelock.api.LockWatchRequest; import com.palantir.atlasdb.timelock.api.TimestampLeaseName; import com.palantir.atlasdb.timelock.api.TimestampLeaseResponse; +import com.palantir.atlasdb.timelock.api.TimestampLeaseResponses; import com.palantir.atlasdb.timelock.lock.AsyncLockService; import com.palantir.atlasdb.timelock.lock.AsyncResult; import com.palantir.atlasdb.timelock.lock.Leased; @@ -38,6 +39,7 @@ import com.palantir.atlasdb.timelock.lockwatches.RequestMetrics; import com.palantir.atlasdb.timelock.transaction.timestamp.DelegatingClientAwareManagedTimestampService; import com.palantir.atlasdb.timelock.transaction.timestamp.LeadershipGuardedClientAwareManagedTimestampService; +import com.palantir.common.streams.KeyedStream; import com.palantir.lock.LockDescriptor; import com.palantir.lock.client.IdentifiedLockRequest; import com.palantir.lock.v2.IdentifiedTimeLockRequest; @@ -60,6 +62,7 @@ import com.palantir.lock.watch.LockWatchVersion; import com.palantir.timestamp.ManagedTimestampService; import com.palantir.timestamp.TimestampRange; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -257,24 +260,22 @@ public ListenableFuture getFreshTimestampsAsync(int timestampsTo } @Override - public ListenableFuture acquireTimestampLease( - TimestampLeaseName timestampName, UUID requestId, int numFreshTimestamps) { + public ListenableFuture acquireTimestampLease( + UUID requestId, Map numFreshTimestamps) { long timestamp = timestampService.getFreshTimestamp(); Leased leasedLock = lockService - .acquireTimestampLease(timestampName, requestId, timestamp) + .acquireTimestampLease(requestId, numFreshTimestamps.keySet(), timestamp) .get(); - long minLeased = lockService.getMinLeasedTimestamp(timestampName).orElse(timestamp); - TimestampRange timestampRange = timestampService.getFreshTimestamps(numFreshTimestamps); - - ConjureTimestampRange freshTimestamps = - ConjureTimestampRange.of(timestampRange.getLowerBound(), timestampRange.size()); + Map responses = KeyedStream.stream(numFreshTimestamps) + .map((timestampName, numFreshTimestampsForName) -> + getMinLeasedAndFreshTimestamps(timestampName, numFreshTimestampsForName, timestamp)) + .collectToMap(); - return Futures.immediateFuture(TimestampLeaseResponse.builder() - .minLeased(minLeased) + return Futures.immediateFuture(TimestampLeaseResponses.builder() + .timestampLeaseResponses(responses) .leaseGuarantee(toConjure(leasedLock)) - .freshTimestamps(freshTimestamps) .build()); } @@ -337,6 +338,17 @@ public void logState() { lockService.getLockWatchingService().logState(); } + private TimestampLeaseResponse getMinLeasedAndFreshTimestamps( + TimestampLeaseName timestampName, int numFreshTimestamps, long lockedTimestamp) { + long minLeased = lockService.getMinLeasedTimestamp(timestampName).orElse(lockedTimestamp); + + TimestampRange timestampRange = timestampService.getFreshTimestamps(numFreshTimestamps); + ConjureTimestampRange freshTimestamps = + ConjureTimestampRange.of(timestampRange.getLowerBound(), timestampRange.size()); + + return TimestampLeaseResponse.of(minLeased, freshTimestamps); + } + private static LockWatchVersion fromConjure(ConjureIdentifiedVersion conjure) { return LockWatchVersion.of(conjure.getId(), conjure.getVersion()); } diff --git a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/batch/RemotingMultiClientTimestampLeaseServiceAdapter.java b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/batch/RemotingMultiClientTimestampLeaseServiceAdapter.java index 41c4250fe9..44e15b7082 100644 --- a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/batch/RemotingMultiClientTimestampLeaseServiceAdapter.java +++ b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/batch/RemotingMultiClientTimestampLeaseServiceAdapter.java @@ -27,7 +27,7 @@ import com.palantir.atlasdb.timelock.api.MultiClientTimestampLeaseRequest; import com.palantir.atlasdb.timelock.api.MultiClientTimestampLeaseResponse; import com.palantir.atlasdb.timelock.api.Namespace; -import com.palantir.atlasdb.timelock.api.TimestampLeaseResponses; +import com.palantir.atlasdb.timelock.api.TimestampLeasesResponse; import com.palantir.common.streams.KeyedStream; import com.palantir.conjure.java.undertow.lib.RequestContext; import java.util.Map; @@ -46,7 +46,7 @@ private RemotingMultiClientTimestampLeaseServiceAdapter(RemotingTimestampLeaseSe ListenableFuture acquireTimestampLeases( MultiClientTimestampLeaseRequest requests, @Nullable RequestContext context) { - Map> futures = KeyedStream.stream(requests.get()) + Map> futures = KeyedStream.stream(requests.get()) .map((namespace, request) -> delegate.acquireTimestampLeases(namespace, request, context)) .collectToMap(); diff --git a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/batch/RemotingTimestampLeaseServiceAdapter.java b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/batch/RemotingTimestampLeaseServiceAdapter.java index ac8d46d1d6..35029459c3 100644 --- a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/batch/RemotingTimestampLeaseServiceAdapter.java +++ b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/batch/RemotingTimestampLeaseServiceAdapter.java @@ -26,14 +26,16 @@ import com.palantir.atlasdb.timelock.api.GetMinLeasedTimestampRequests; import com.palantir.atlasdb.timelock.api.GetMinLeasedTimestampResponses; import com.palantir.atlasdb.timelock.api.Namespace; +import com.palantir.atlasdb.timelock.api.RequestId; import com.palantir.atlasdb.timelock.api.TimestampLeaseName; -import com.palantir.atlasdb.timelock.api.TimestampLeaseRequest; -import com.palantir.atlasdb.timelock.api.TimestampLeaseRequests; -import com.palantir.atlasdb.timelock.api.TimestampLeaseResponse; import com.palantir.atlasdb.timelock.api.TimestampLeaseResponses; +import com.palantir.atlasdb.timelock.api.TimestampLeasesRequest; +import com.palantir.atlasdb.timelock.api.TimestampLeasesResponse; import com.palantir.common.streams.KeyedStream; import com.palantir.conjure.java.undertow.lib.RequestContext; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import javax.annotation.Nullable; final class RemotingTimestampLeaseServiceAdapter { @@ -43,19 +45,18 @@ final class RemotingTimestampLeaseServiceAdapter { this.timelockServices = timelockServices; } - ListenableFuture acquireTimestampLeases( - Namespace namespace, TimestampLeaseRequests requests, @Nullable RequestContext context) { + ListenableFuture acquireTimestampLeases( + Namespace namespace, TimestampLeasesRequest requests, @Nullable RequestContext context) { AsyncTimelockService service = getServiceForNamespace(namespace, context); - Map> futures = KeyedStream.stream(requests.get()) - .map((timestampName, request) -> acquireTimestampLease(service, timestampName, request)) - .collectToMap(); + List> futures = requests.get().stream() + .map(request -> + acquireTimestampLease(service, request.getRequestsId(), request.getNumFreshTimestamps())) + .collect(Collectors.toList()); // TODO(aalouane): clean up lease resources in cases of partial failures return Futures.transform( - AtlasFutures.allAsMap(futures, MoreExecutors.directExecutor()), - TimestampLeaseResponses::of, - MoreExecutors.directExecutor()); + Futures.allAsList(futures), TimestampLeasesResponse::of, MoreExecutors.directExecutor()); } ListenableFuture getMinLeasedTimestamps( @@ -76,8 +77,8 @@ private AsyncTimelockService getServiceForNamespace(Namespace namespace, @Nullab return timelockServices.get(namespace.get(), TimelockNamespaces.toUserAgent(context)); } - private static ListenableFuture acquireTimestampLease( - AsyncTimelockService service, TimestampLeaseName timestampName, TimestampLeaseRequest request) { - return service.acquireTimestampLease(timestampName, request.getRequestId(), request.getNumFreshTimestamps()); + private static ListenableFuture acquireTimestampLease( + AsyncTimelockService service, RequestId requestsId, Map numFreshTimestamps) { + return service.acquireTimestampLease(requestsId.get(), numFreshTimestamps); } } diff --git a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/AsyncLockService.java b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/AsyncLockService.java index d18877c44e..47e1ca3efb 100644 --- a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/AsyncLockService.java +++ b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/AsyncLockService.java @@ -29,11 +29,13 @@ import com.palantir.logsafe.logger.SafeLogger; import com.palantir.logsafe.logger.SafeLoggerFactory; import java.io.Closeable; +import java.util.List; import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; public class AsyncLockService implements Closeable { @@ -140,9 +142,9 @@ public AsyncResult> lockImmutableTimestamp(UUID requestId, lon } public AsyncResult> acquireTimestampLease( - TimestampLeaseName timestampName, UUID requestId, long timestamp) { + UUID requestId, Set timestampNames, long timestamp) { return heldLocks.getExistingOrAcquire( - requestId, () -> acquireNamedTimestampLockInternal(timestampName, requestId, timestamp)); + requestId, () -> acquireNamedTimestampLockInternal(requestId, timestampNames, timestamp)); } public AsyncResult waitForLocks(UUID requestId, Set lockDescriptors, TimeLimit timeout) { @@ -177,9 +179,11 @@ private AsyncResult acquireImmutableTimestampLock(UUID requestId, lon } private AsyncResult acquireNamedTimestampLockInternal( - TimestampLeaseName timestampName, UUID requestId, long timestamp) { - AsyncLock lock = lockManager.getNamedTimestampLock(timestampName, timestamp); - return lockAcquirer.acquireLocks(requestId, OrderedLocks.fromSingleLock(lock), TimeLimit.zero()); + UUID requestId, Set timestampNames, long timestamp) { + List locks = timestampNames.stream() + .map(name -> lockManager.getNamedTimestampLock(name, timestamp)) + .collect(Collectors.toList()); + return lockAcquirer.acquireLocks(requestId, OrderedLocks.fromOrderedList(locks), TimeLimit.zero()); } public boolean unlock(LockToken token) { diff --git a/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/AsyncTimelockServiceImplTest.java b/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/AsyncTimelockServiceImplTest.java index fbff9f6138..d42cc2163f 100644 --- a/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/AsyncTimelockServiceImplTest.java +++ b/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/AsyncTimelockServiceImplTest.java @@ -24,6 +24,7 @@ import com.palantir.atlasdb.futures.AtlasFutures; import com.palantir.atlasdb.timelock.api.TimestampLeaseName; import com.palantir.atlasdb.timelock.api.TimestampLeaseResponse; +import com.palantir.atlasdb.timelock.api.TimestampLeaseResponses; import com.palantir.atlasdb.timelock.lock.AsyncLockService; import com.palantir.atlasdb.timelock.lock.LockLog; import com.palantir.atlasdb.timelock.lockwatches.BufferMetrics; @@ -32,14 +33,18 @@ import com.palantir.timestamp.InMemoryTimestampService; import com.palantir.timestamp.ManagedTimestampService; import com.palantir.tritium.metrics.registry.DefaultTaggedMetricRegistry; +import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.jmock.lib.concurrent.DeterministicScheduler; import org.junit.jupiter.api.Test; public final class AsyncTimelockServiceImplTest { - private static final TimestampLeaseName TIMESTAMP_NAME = TimestampLeaseName.of("ToyTimestamp"); + private static final TimestampLeaseName TIMESTAMP_NAME_1 = TimestampLeaseName.of("ToyTimestamp_1"); + private static final TimestampLeaseName TIMESTAMP_NAME_2 = TimestampLeaseName.of("ToyTimestamp_2"); private final DeterministicScheduler executor = new DeterministicScheduler(); private final ManagedTimestampService timestampService = new InMemoryTimestampService(); @@ -57,101 +62,171 @@ public void delegatesInitializationCheck() { assertThat(timelockService.isInitialized()).isTrue(); } + @Test + public void unlockingWithRequestIdFromAcquireTimestampLeaseResponseUnlocksForAllRelevantTimestampNames() { + TimestampedInvocation timestamp1MinLeased1 = getMinLeasedTimestampTimestamped(TIMESTAMP_NAME_1); + assertThatTimestampIsStrictlyWithinInvocationInterval(timestamp1MinLeased1.result, timestamp1MinLeased1); + TimestampedInvocation timestamp2MinLeased1 = getMinLeasedTimestampTimestamped(TIMESTAMP_NAME_2); + assertThatTimestampIsStrictlyWithinInvocationInterval(timestamp2MinLeased1.result, timestamp2MinLeased1); + + TimestampedInvocation responses = + acquireTimestampLeaseTimestamped(TIMESTAMP_NAME_1, 10, TIMESTAMP_NAME_2, 20); + assertThatTimestampsIsStrictlyWithinInvocationInterval(getMinLeasedTimestampsFrom(responses.result), responses); + + unlockForResponse(responses.result); + assertThatTimestampIsStrictlyWithinInvocationInterval(timestamp1MinLeased1.result, timestamp1MinLeased1); + assertThatTimestampIsStrictlyWithinInvocationInterval(timestamp2MinLeased1.result, timestamp2MinLeased1); + } + @Test public void acquireTimestampLeaseReturnsLeaseGuaranteeIdentifierWithGivenRequestId() { UUID requestId = UUID.randomUUID(); - TimestampLeaseResponse acquireResponse = acquireTimestampLease(requestId, 10); + TimestampLeaseResponses acquireResponse = + acquireTimestampLease(requestId, TIMESTAMP_NAME_1, 10, TIMESTAMP_NAME_2, 20); assertThat(acquireResponse.getLeaseGuarantee().getIdentifier().get()).isEqualTo(requestId); } @Test public void acquireTimestampLeaseReturnsMinLeasedAllThroughout() { - TimestampedInvocation response1 = acquireTimestampLeaseTimestamped(10); - assertThatTimestampIsStrictlyWithinInvocationInterval(response1.result.getMinLeased(), response1); + TimestampedInvocation response1 = + acquireTimestampLeaseTimestamped(TIMESTAMP_NAME_1, 15); + assertThatTimestampsIsStrictlyWithinInvocationInterval(getMinLeasedTimestampsFrom(response1.result), response1); - TimestampedInvocation response2 = acquireTimestampLeaseTimestamped(5); - assertThatTimestampIsStrictlyWithinInvocationInterval(response2.result.getMinLeased(), response1); + TimestampedInvocation response2 = + acquireTimestampLeaseTimestamped(TIMESTAMP_NAME_1, 5); + assertThatTimestampsIsStrictlyWithinInvocationInterval(getMinLeasedTimestampsFrom(response2.result), response1); - unlockForResponse(response1.result); - TimestampedInvocation response3 = acquireTimestampLeaseTimestamped(7); - assertThatTimestampIsStrictlyWithinInvocationInterval(response3.result.getMinLeased(), response2); + TimestampedInvocation response3 = + acquireTimestampLeaseTimestamped(TIMESTAMP_NAME_1, 7); + assertThatTimestampsIsStrictlyWithinInvocationInterval(getMinLeasedTimestampsFrom(response3.result), response1); - unlockForResponse(response2.result); - TimestampedInvocation response4 = acquireTimestampLeaseTimestamped(19); - assertThatTimestampIsStrictlyWithinInvocationInterval(response4.result.getMinLeased(), response3); + unlockForResponse(response1.result); + unlockForResponse(response3.result); + TimestampedInvocation response4 = + acquireTimestampLeaseTimestamped(TIMESTAMP_NAME_1, 19); + assertThatTimestampsIsStrictlyWithinInvocationInterval(getMinLeasedTimestampsFrom(response4.result), response2); } @Test public void acquireTimestampLeaseReturnsFreshTimestampsGreaterThanReturnedMinLeased() { - TimestampLeaseResponse response = acquireTimestampLease(10); - assertThat(response.getFreshTimestamps().getStart()).isGreaterThan(response.getMinLeased()); + TimestampLeaseResponses response = acquireTimestampLease(TIMESTAMP_NAME_1, 199, TIMESTAMP_NAME_2, 87); + + TimestampLeaseResponse timestampName1Response = + response.getTimestampLeaseResponses().get(TIMESTAMP_NAME_1); + TimestampLeaseResponse timestampName2Response = + response.getTimestampLeaseResponses().get(TIMESTAMP_NAME_2); + + assertThat(timestampName1Response.getFreshTimestamps().getStart()) + .isGreaterThan(timestampName1Response.getMinLeased()); + assertThat(timestampName2Response.getFreshTimestamps().getStart()) + .isGreaterThan(timestampName2Response.getMinLeased()); } @Test public void acquireTimestampLeaseReturnedFreshTimestampsAreFreshTimestamps() { - TimestampedInvocation response = acquireTimestampLeaseTimestamped(10); - long firstTimestamp = response.result.getFreshTimestamps().getStart(); - long lastTimestamp = - firstTimestamp + response.result.getFreshTimestamps().getCount() - 1; - assertThatTimestampIsStrictlyWithinInvocationInterval(firstTimestamp, response); - assertThatTimestampIsStrictlyWithinInvocationInterval(lastTimestamp, response); + TimestampedInvocation responses = + acquireTimestampLeaseTimestamped(TIMESTAMP_NAME_1, 10); + TimestampLeaseResponse response = + responses.result.getTimestampLeaseResponses().get(TIMESTAMP_NAME_1); + + long firstTimestamp = response.getFreshTimestamps().getStart(); + long lastTimestamp = firstTimestamp + response.getFreshTimestamps().getCount() - 1; + + assertThatTimestampIsStrictlyWithinInvocationInterval(firstTimestamp, responses); + assertThatTimestampIsStrictlyWithinInvocationInterval(lastTimestamp, responses); } @Test public void acquireTimestampLeaseLocksWithAFreshTimestamp() { - TimestampedInvocation response = acquireTimestampLeaseTimestamped(10); - assertThatTimestampIsStrictlyWithinInvocationInterval(response.result.getMinLeased(), response); + TimestampedInvocation responses = + acquireTimestampLeaseTimestamped(TIMESTAMP_NAME_1, 10); + TimestampLeaseResponse response = + responses.result.getTimestampLeaseResponses().get(TIMESTAMP_NAME_1); + + assertThatTimestampIsStrictlyWithinInvocationInterval(response.getMinLeased(), responses); } @Test public void getMinLeasedTimestampReturnsFreshTimestampWhenNoLeaseIsHeld() { - TimestampedInvocation response = getMinLeasedTimestampTimestamped(); + TimestampedInvocation response = getMinLeasedTimestampTimestamped(TIMESTAMP_NAME_1); assertThatTimestampIsStrictlyWithinInvocationInterval(response.result, response); } @Test public void getMinLeasedTimestampReturnsMinLeasedWhenLeaseIsHeld() { - TimestampedInvocation response1 = acquireTimestampLeaseTimestamped(10); - assertThatTimestampIsStrictlyWithinInvocationInterval(getMinLeasedTimestamp(), response1); + TimestampedInvocation response1 = + acquireTimestampLeaseTimestamped(TIMESTAMP_NAME_1, 10); + assertThatTimestampIsStrictlyWithinInvocationInterval(getMinLeasedTimestamp(TIMESTAMP_NAME_1), response1); - TimestampedInvocation response2 = acquireTimestampLeaseTimestamped(5); - assertThatTimestampIsStrictlyWithinInvocationInterval(getMinLeasedTimestamp(), response1); + TimestampedInvocation response2 = + acquireTimestampLeaseTimestamped(TIMESTAMP_NAME_1, 5); + assertThatTimestampIsStrictlyWithinInvocationInterval(getMinLeasedTimestamp(TIMESTAMP_NAME_1), response1); assertThat(unlockForResponse(response1.result)).isTrue(); - TimestampedInvocation response3 = acquireTimestampLeaseTimestamped(7); - assertThatTimestampIsStrictlyWithinInvocationInterval(getMinLeasedTimestamp(), response2); + TimestampedInvocation response3 = + acquireTimestampLeaseTimestamped(TIMESTAMP_NAME_1, 7); + assertThatTimestampIsStrictlyWithinInvocationInterval(getMinLeasedTimestamp(TIMESTAMP_NAME_1), response2); - TimestampedInvocation response4 = acquireTimestampLeaseTimestamped(12); - assertThatTimestampIsStrictlyWithinInvocationInterval(getMinLeasedTimestamp(), response2); + TimestampedInvocation response4 = + acquireTimestampLeaseTimestamped(TIMESTAMP_NAME_1, 12); + assertThatTimestampIsStrictlyWithinInvocationInterval(getMinLeasedTimestamp(TIMESTAMP_NAME_1), response2); assertThat(unlockForResponse(response2.result)).isTrue(); - assertThatTimestampIsStrictlyWithinInvocationInterval(getMinLeasedTimestamp(), response3); + assertThatTimestampIsStrictlyWithinInvocationInterval(getMinLeasedTimestamp(TIMESTAMP_NAME_1), response3); assertThat(unlockForResponse(response3.result)).isTrue(); - assertThatTimestampIsStrictlyWithinInvocationInterval(getMinLeasedTimestamp(), response4); + assertThatTimestampIsStrictlyWithinInvocationInterval(getMinLeasedTimestamp(TIMESTAMP_NAME_1), response4); } - private TimestampedInvocation acquireTimestampLeaseTimestamped(int requested) { - return new TimestampedInvocation<>(() -> acquireTimestampLease(requested)); + private TimestampedInvocation acquireTimestampLeaseTimestamped( + TimestampLeaseName timestampName1, int requested1, TimestampLeaseName timestampName2, int requested2) { + return new TimestampedInvocation<>( + () -> acquireTimestampLease(timestampName1, requested1, timestampName2, requested2)); } - private TimestampLeaseResponse acquireTimestampLease(int numFreshTimestamps) { - return acquireTimestampLease(UUID.randomUUID(), numFreshTimestamps); + private TimestampedInvocation acquireTimestampLeaseTimestamped( + TimestampLeaseName timestampName, int requested) { + return new TimestampedInvocation<>(() -> acquireTimestampLease(timestampName, requested)); } - private TimestampLeaseResponse acquireTimestampLease(UUID requestId, int numFreshTimestamps) { - return unwrap(service.acquireTimestampLease(TIMESTAMP_NAME, requestId, numFreshTimestamps)); + private TimestampLeaseResponses acquireTimestampLease( + TimestampLeaseName timestampLeaseName, int numFreshTimestamps) { + return acquireTimestampLease(UUID.randomUUID(), timestampLeaseName, numFreshTimestamps); } - private TimestampedInvocation getMinLeasedTimestampTimestamped() { - return new TimestampedInvocation<>(this::getMinLeasedTimestamp); + private TimestampLeaseResponses acquireTimestampLease( + UUID requestId, TimestampLeaseName timestampName, int numFreshTimestamps) { + return unwrap(service.acquireTimestampLease(requestId, Map.of(timestampName, numFreshTimestamps))); } - private long getMinLeasedTimestamp() { - return unwrap(service.getMinLeasedTimestamp(TIMESTAMP_NAME)); + private TimestampLeaseResponses acquireTimestampLease( + TimestampLeaseName timestampName1, + int numFreshTimestamps1, + TimestampLeaseName timestampLeaseName2, + int numFreshTimestamps2) { + return acquireTimestampLease( + UUID.randomUUID(), timestampName1, numFreshTimestamps1, timestampLeaseName2, numFreshTimestamps2); } - private boolean unlockForResponse(TimestampLeaseResponse response) { + private TimestampLeaseResponses acquireTimestampLease( + UUID requestId, + TimestampLeaseName timestampName1, + int numFreshTimestamps1, + TimestampLeaseName timestampLeaseName2, + int numFreshTimestamps2) { + return unwrap(service.acquireTimestampLease( + requestId, Map.of(timestampName1, numFreshTimestamps1, timestampLeaseName2, numFreshTimestamps2))); + } + + private TimestampedInvocation getMinLeasedTimestampTimestamped(TimestampLeaseName name) { + return new TimestampedInvocation<>(() -> getMinLeasedTimestamp(name)); + } + + private long getMinLeasedTimestamp(TimestampLeaseName timestampName) { + return unwrap(service.getMinLeasedTimestamp(timestampName)); + } + + private boolean unlockForResponse(TimestampLeaseResponses response) { return lockService.unlock(createLockTokenFromResponse(response)); } @@ -160,12 +235,24 @@ private T unwrap(ListenableFuture future) { return AtlasFutures.getUnchecked(future); } + private static List getMinLeasedTimestampsFrom(TimestampLeaseResponses response) { + return response.getTimestampLeaseResponses().values().stream() + .map(TimestampLeaseResponse::getMinLeased) + .collect(Collectors.toList()); + } + + private static void assertThatTimestampsIsStrictlyWithinInvocationInterval( + List timestamps, TimestampedInvocation invocation) { + assertThat(timestamps) + .allSatisfy(timestamp -> assertThatTimestampIsStrictlyWithinInvocationInterval(timestamp, invocation)); + } + private static void assertThatTimestampIsStrictlyWithinInvocationInterval( long timestamp, TimestampedInvocation invocation) { assertThat(timestamp).isStrictlyBetween(invocation.timestampBefore, invocation.timestampAfter); } - private static LockToken createLockTokenFromResponse(TimestampLeaseResponse response) { + private static LockToken createLockTokenFromResponse(TimestampLeaseResponses response) { return LockToken.of(response.getLeaseGuarantee().getIdentifier().get()); } diff --git a/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/batch/MultiClientConjureTimelockResourceTest.java b/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/batch/MultiClientConjureTimelockResourceTest.java index a6d24380d8..76c3045715 100644 --- a/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/batch/MultiClientConjureTimelockResourceTest.java +++ b/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/batch/MultiClientConjureTimelockResourceTest.java @@ -46,11 +46,13 @@ import com.palantir.atlasdb.timelock.api.MultiClientTimestampLeaseRequest; import com.palantir.atlasdb.timelock.api.MultiClientTimestampLeaseResponse; import com.palantir.atlasdb.timelock.api.Namespace; +import com.palantir.atlasdb.timelock.api.RequestId; import com.palantir.atlasdb.timelock.api.TimestampLeaseName; -import com.palantir.atlasdb.timelock.api.TimestampLeaseRequest; import com.palantir.atlasdb.timelock.api.TimestampLeaseRequests; import com.palantir.atlasdb.timelock.api.TimestampLeaseResponse; import com.palantir.atlasdb.timelock.api.TimestampLeaseResponses; +import com.palantir.atlasdb.timelock.api.TimestampLeasesRequest; +import com.palantir.atlasdb.timelock.api.TimestampLeasesResponse; import com.palantir.atlasdb.util.TimelockTestUtils; import com.palantir.common.streams.KeyedStream; import com.palantir.common.time.NanoTime; @@ -69,6 +71,7 @@ import com.palantir.tokens.auth.AuthHeader; import java.net.URL; import java.time.Duration; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -199,27 +202,28 @@ public void canAcquireNamedMinTimestampLease() { TimestampLeaseName timestampName2 = TimestampLeaseName.of("t2"); TimestampLeaseName timestampName3 = TimestampLeaseName.of("t"); - TimestampLeaseRequest request1 = createNamedMinTimestampLeaseRequest(); - TimestampLeaseRequest request2 = createNamedMinTimestampLeaseRequest(); - TimestampLeaseRequest request3 = createNamedMinTimestampLeaseRequest(); + TimestampLeaseRequests request1ForClient1 = createTimestampLeasesRequest(timestampName1, timestampName2); + TimestampLeaseRequests request2ForClient1 = createTimestampLeasesRequest(timestampName1, timestampName3); + TimestampLeaseRequests requestForClient2 = + createTimestampLeasesRequest(timestampName1, timestampName2, timestampName3); - TimestampLeaseResponse response1 = createNamedMinTimestampLeaseResponse(request1); - TimestampLeaseResponse response2 = createNamedMinTimestampLeaseResponse(request2); - TimestampLeaseResponse response3 = createNamedMinTimestampLeaseResponse(request3); + TimestampLeaseResponses response1ForClient1 = createTimestampLeasesResponseFor(request1ForClient1); + TimestampLeaseResponses response2ForClient1 = createTimestampLeasesResponseFor(request2ForClient1); + TimestampLeaseResponses responseForClient2 = createTimestampLeasesResponseFor(requestForClient2); AsyncTimelockService serviceForClient1 = getServiceForClient(client1.get()); - stubAcquireNamedMinTimestampLeaseInResource(serviceForClient1, timestampName1, request1, response1); - stubAcquireNamedMinTimestampLeaseInResource(serviceForClient1, timestampName2, request2, response2); + stubAcquireNamedMinTimestampLeaseInResource(serviceForClient1, request1ForClient1, response1ForClient1); + stubAcquireNamedMinTimestampLeaseInResource(serviceForClient1, request2ForClient1, response2ForClient1); AsyncTimelockService serviceForClient2 = getServiceForClient(client2.get()); - stubAcquireNamedMinTimestampLeaseInResource(serviceForClient2, timestampName3, request3, response3); + stubAcquireNamedMinTimestampLeaseInResource(serviceForClient2, requestForClient2, responseForClient2); MultiClientTimestampLeaseRequest request = MultiClientTimestampLeaseRequest.of(Map.of( - client1, TimestampLeaseRequests.of(Map.of(timestampName1, request1, timestampName2, request2)), - client2, TimestampLeaseRequests.of(Map.of(timestampName3, request3)))); + client1, TimestampLeasesRequest.of(List.of(request1ForClient1, request2ForClient1)), + client2, TimestampLeasesRequest.of(List.of(requestForClient2)))); MultiClientTimestampLeaseResponse response = MultiClientTimestampLeaseResponse.of(Map.of( - client1, TimestampLeaseResponses.of(Map.of(timestampName1, response1, timestampName2, response2)), - client2, TimestampLeaseResponses.of(Map.of(timestampName3, response3)))); + client1, TimestampLeasesResponse.of(List.of(response1ForClient1, response2ForClient1)), + client2, TimestampLeasesResponse.of(List.of(responseForClient2)))); assertThat(Futures.getUnchecked(resource.acquireTimestampLease(AUTH_HEADER, request, REQUEST_CONTEXT))) .isEqualTo(response); } @@ -325,11 +329,8 @@ private Integer getInclusiveLowerCommitTs(String namespace) { } private static void stubAcquireNamedMinTimestampLeaseInResource( - AsyncTimelockService service, - TimestampLeaseName timestampName, - TimestampLeaseRequest request, - TimestampLeaseResponse response) { - when(service.acquireTimestampLease(timestampName, request.getRequestId(), request.getNumFreshTimestamps())) + AsyncTimelockService service, TimestampLeaseRequests request, TimestampLeaseResponses response) { + when(service.acquireTimestampLease(request.getRequestsId().get(), request.getNumFreshTimestamps())) .thenReturn(Futures.immediateFuture(response)); } @@ -338,25 +339,44 @@ private static void stubGetMinLeasedNamedTimestampInResource( when(service.getMinLeasedTimestamp(timestampName)).thenReturn(Futures.immediateFuture(timestamp)); } - private static TimestampLeaseRequest createNamedMinTimestampLeaseRequest() { - return TimestampLeaseRequest.builder() - .requestId(UUID.randomUUID()) - .numFreshTimestamps(createRandomPositiveInteger()) + private static TimestampLeaseRequests createTimestampLeasesRequest(TimestampLeaseName... names) { + TimestampLeaseRequests.Builder builder = + TimestampLeaseRequests.builder().requestsId(RequestId.of(UUID.randomUUID())); + + Map numFreshTimestamps = KeyedStream.of(Arrays.stream(names)) + .map(_name -> createRandomPositiveInteger()) + .collectToMap(); + + return builder.numFreshTimestamps(numFreshTimestamps).build(); + } + + private static TimestampLeaseResponses createTimestampLeasesResponseFor(TimestampLeaseRequests request1ForClient1) { + return TimestampLeaseResponses.builder() + .leaseGuarantee(createRandomLeaseGuarantee(request1ForClient1.getRequestsId())) + .timestampLeaseResponses(KeyedStream.stream(request1ForClient1.getNumFreshTimestamps()) + .map(_name -> createTimestampLeaseResponse()) + .collectToMap()) .build(); } - private static TimestampLeaseResponse createNamedMinTimestampLeaseResponse(TimestampLeaseRequest request) { + private static TimestampLeaseResponse createTimestampLeaseResponse() { return TimestampLeaseResponse.builder() .minLeased(createRandomPositiveInteger()) - .leaseGuarantee(LeaseGuarantee.of( - LeaseIdentifier.of(request.getRequestId()), - Lease.of( - LeaderTime.of(LeadershipId.random(), NanoTime.createForTests(1234L)), - Duration.ofDays(2000)))) .freshTimestamps(ConjureTimestampRange.of(createRandomPositiveInteger(), createRandomPositiveInteger())) .build(); } + private static LeaseGuarantee createRandomLeaseGuarantee(RequestId requestId) { + return LeaseGuarantee.builder() + .lease(createRandomLease()) + .identifier(LeaseIdentifier.of(requestId.get())) + .build(); + } + + private static Lease createRandomLease() { + return Lease.of(LeaderTime.of(LeadershipId.random(), NanoTime.createForTests(1234L)), Duration.ofDays(2000)); + } + private static int createRandomPositiveInteger() { return ThreadLocalRandom.current().nextInt(0, 2500); }