Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[Named min timestamp Leases] Acquire single lease per transaction-sha…
Browse files Browse the repository at this point in the history
…ped batch (#7385)
  • Loading branch information
ergo14 authored Oct 21, 2024
1 parent 395fbf0 commit aa8665f
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 122 deletions.
29 changes: 16 additions & 13 deletions timelock-api/src/main/conjure/timelock-api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -228,27 +228,30 @@ types:
LeaderTimes:
fields:
leaderTimes: map<Namespace, LeaderTime>
TimestampLeaseRequest:
fields:
requestId:
type: uuid
safety: safe
numFreshTimestamps:
type: integer
safety: safe
RequestId:
alias: uuid
safety: safe
TimestampLeaseRequests:
alias: map<TimestampLeaseName, TimestampLeaseRequest>
fields:
requestsId: RequestId
numFreshTimestamps: map<TimestampLeaseName, integer>
TimestampLeasesRequest:
alias: list<TimestampLeaseRequests>
MultiClientTimestampLeaseRequest:
alias: map<Namespace, TimestampLeaseRequests>
alias: map<Namespace, TimestampLeasesRequest>
TimestampLeaseResponse:
fields:
minLeased: Long
leaseGuarantee: LeaseGuarantee
freshTimestamps: ConjureTimestampRange
TimestampLeaseResponses:
alias: map<TimestampLeaseName, TimestampLeaseResponse>
fields:
leaseGuarantee: LeaseGuarantee
timestampLeaseResponses: map<TimestampLeaseName, TimestampLeaseResponse>
TimestampLeasesResponse:
fields:
alias: list<TimestampLeaseResponses>
MultiClientTimestampLeaseResponse:
alias: map<Namespace, TimestampLeaseResponses>
alias: map<Namespace, TimestampLeasesResponse>
GetMinLeasedTimestampRequests:
alias: list<TimestampLeaseName>
MultiClientGetMinLeasedTimestampRequest:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.palantir.atlasdb.timelock.api.ConjureStartTransactionsResponse;
import com.palantir.atlasdb.timelock.api.GetCommitTimestampsResponse;
import com.palantir.atlasdb.timelock.api.TimestampLeaseName;
import com.palantir.atlasdb.timelock.api.TimestampLeaseResponse;
import com.palantir.atlasdb.timelock.api.TimestampLeaseResponses;
import com.palantir.atlasdb.timelock.lock.watch.LockWatchingService;
import com.palantir.lock.client.IdentifiedLockRequest;
import com.palantir.lock.v2.IdentifiedTimeLockRequest;
Expand All @@ -44,6 +44,7 @@
import com.palantir.timestamp.TimestampRange;
import com.palantir.tritium.annotations.Instrument;
import java.io.Closeable;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -76,8 +77,8 @@ ListenableFuture<GetCommitTimestampsResponse> getCommitTimestamps(

ListenableFuture<TimestampRange> getFreshTimestampsAsync(int timestampsToRequest);

ListenableFuture<TimestampLeaseResponse> acquireTimestampLease(
TimestampLeaseName timestampName, UUID requestId, int numFreshTimestamps);
ListenableFuture<TimestampLeaseResponses> acquireTimestampLease(
UUID requestId, Map<TimestampLeaseName, Integer> numFreshTimestamps);

ListenableFuture<Long> getMinLeasedTimestamp(TimestampLeaseName timestampName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.palantir.atlasdb.timelock.api.LockWatchRequest;
import com.palantir.atlasdb.timelock.api.TimestampLeaseName;
import com.palantir.atlasdb.timelock.api.TimestampLeaseResponse;
import com.palantir.atlasdb.timelock.api.TimestampLeaseResponses;
import com.palantir.atlasdb.timelock.lock.AsyncLockService;
import com.palantir.atlasdb.timelock.lock.AsyncResult;
import com.palantir.atlasdb.timelock.lock.Leased;
Expand All @@ -38,6 +39,7 @@
import com.palantir.atlasdb.timelock.lockwatches.RequestMetrics;
import com.palantir.atlasdb.timelock.transaction.timestamp.DelegatingClientAwareManagedTimestampService;
import com.palantir.atlasdb.timelock.transaction.timestamp.LeadershipGuardedClientAwareManagedTimestampService;
import com.palantir.common.streams.KeyedStream;
import com.palantir.lock.LockDescriptor;
import com.palantir.lock.client.IdentifiedLockRequest;
import com.palantir.lock.v2.IdentifiedTimeLockRequest;
Expand All @@ -60,6 +62,7 @@
import com.palantir.lock.watch.LockWatchVersion;
import com.palantir.timestamp.ManagedTimestampService;
import com.palantir.timestamp.TimestampRange;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -257,24 +260,22 @@ public ListenableFuture<TimestampRange> getFreshTimestampsAsync(int timestampsTo
}

@Override
public ListenableFuture<TimestampLeaseResponse> acquireTimestampLease(
TimestampLeaseName timestampName, UUID requestId, int numFreshTimestamps) {
public ListenableFuture<TimestampLeaseResponses> acquireTimestampLease(
UUID requestId, Map<TimestampLeaseName, Integer> numFreshTimestamps) {
long timestamp = timestampService.getFreshTimestamp();

Leased<LockToken> leasedLock = lockService
.acquireTimestampLease(timestampName, requestId, timestamp)
.acquireTimestampLease(requestId, numFreshTimestamps.keySet(), timestamp)
.get();
long minLeased = lockService.getMinLeasedTimestamp(timestampName).orElse(timestamp);

TimestampRange timestampRange = timestampService.getFreshTimestamps(numFreshTimestamps);

ConjureTimestampRange freshTimestamps =
ConjureTimestampRange.of(timestampRange.getLowerBound(), timestampRange.size());
Map<TimestampLeaseName, TimestampLeaseResponse> responses = KeyedStream.stream(numFreshTimestamps)
.map((timestampName, numFreshTimestampsForName) ->
getMinLeasedAndFreshTimestamps(timestampName, numFreshTimestampsForName, timestamp))
.collectToMap();

return Futures.immediateFuture(TimestampLeaseResponse.builder()
.minLeased(minLeased)
return Futures.immediateFuture(TimestampLeaseResponses.builder()
.timestampLeaseResponses(responses)
.leaseGuarantee(toConjure(leasedLock))
.freshTimestamps(freshTimestamps)
.build());
}

Expand Down Expand Up @@ -337,6 +338,17 @@ public void logState() {
lockService.getLockWatchingService().logState();
}

private TimestampLeaseResponse getMinLeasedAndFreshTimestamps(
TimestampLeaseName timestampName, int numFreshTimestamps, long lockedTimestamp) {
long minLeased = lockService.getMinLeasedTimestamp(timestampName).orElse(lockedTimestamp);

TimestampRange timestampRange = timestampService.getFreshTimestamps(numFreshTimestamps);
ConjureTimestampRange freshTimestamps =
ConjureTimestampRange.of(timestampRange.getLowerBound(), timestampRange.size());

return TimestampLeaseResponse.of(minLeased, freshTimestamps);
}

private static LockWatchVersion fromConjure(ConjureIdentifiedVersion conjure) {
return LockWatchVersion.of(conjure.getId(), conjure.getVersion());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.palantir.atlasdb.timelock.api.MultiClientTimestampLeaseRequest;
import com.palantir.atlasdb.timelock.api.MultiClientTimestampLeaseResponse;
import com.palantir.atlasdb.timelock.api.Namespace;
import com.palantir.atlasdb.timelock.api.TimestampLeaseResponses;
import com.palantir.atlasdb.timelock.api.TimestampLeasesResponse;
import com.palantir.common.streams.KeyedStream;
import com.palantir.conjure.java.undertow.lib.RequestContext;
import java.util.Map;
Expand All @@ -46,7 +46,7 @@ private RemotingMultiClientTimestampLeaseServiceAdapter(RemotingTimestampLeaseSe

ListenableFuture<MultiClientTimestampLeaseResponse> acquireTimestampLeases(
MultiClientTimestampLeaseRequest requests, @Nullable RequestContext context) {
Map<Namespace, ListenableFuture<TimestampLeaseResponses>> futures = KeyedStream.stream(requests.get())
Map<Namespace, ListenableFuture<TimestampLeasesResponse>> futures = KeyedStream.stream(requests.get())
.map((namespace, request) -> delegate.acquireTimestampLeases(namespace, request, context))
.collectToMap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@
import com.palantir.atlasdb.timelock.api.GetMinLeasedTimestampRequests;
import com.palantir.atlasdb.timelock.api.GetMinLeasedTimestampResponses;
import com.palantir.atlasdb.timelock.api.Namespace;
import com.palantir.atlasdb.timelock.api.RequestId;
import com.palantir.atlasdb.timelock.api.TimestampLeaseName;
import com.palantir.atlasdb.timelock.api.TimestampLeaseRequest;
import com.palantir.atlasdb.timelock.api.TimestampLeaseRequests;
import com.palantir.atlasdb.timelock.api.TimestampLeaseResponse;
import com.palantir.atlasdb.timelock.api.TimestampLeaseResponses;
import com.palantir.atlasdb.timelock.api.TimestampLeasesRequest;
import com.palantir.atlasdb.timelock.api.TimestampLeasesResponse;
import com.palantir.common.streams.KeyedStream;
import com.palantir.conjure.java.undertow.lib.RequestContext;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

final class RemotingTimestampLeaseServiceAdapter {
Expand All @@ -43,19 +45,18 @@ final class RemotingTimestampLeaseServiceAdapter {
this.timelockServices = timelockServices;
}

ListenableFuture<TimestampLeaseResponses> acquireTimestampLeases(
Namespace namespace, TimestampLeaseRequests requests, @Nullable RequestContext context) {
ListenableFuture<TimestampLeasesResponse> acquireTimestampLeases(
Namespace namespace, TimestampLeasesRequest requests, @Nullable RequestContext context) {
AsyncTimelockService service = getServiceForNamespace(namespace, context);

Map<TimestampLeaseName, ListenableFuture<TimestampLeaseResponse>> futures = KeyedStream.stream(requests.get())
.map((timestampName, request) -> acquireTimestampLease(service, timestampName, request))
.collectToMap();
List<ListenableFuture<TimestampLeaseResponses>> futures = requests.get().stream()
.map(request ->
acquireTimestampLease(service, request.getRequestsId(), request.getNumFreshTimestamps()))
.collect(Collectors.toList());

// TODO(aalouane): clean up lease resources in cases of partial failures
return Futures.transform(
AtlasFutures.allAsMap(futures, MoreExecutors.directExecutor()),
TimestampLeaseResponses::of,
MoreExecutors.directExecutor());
Futures.allAsList(futures), TimestampLeasesResponse::of, MoreExecutors.directExecutor());
}

ListenableFuture<GetMinLeasedTimestampResponses> getMinLeasedTimestamps(
Expand All @@ -76,8 +77,8 @@ private AsyncTimelockService getServiceForNamespace(Namespace namespace, @Nullab
return timelockServices.get(namespace.get(), TimelockNamespaces.toUserAgent(context));
}

private static ListenableFuture<TimestampLeaseResponse> acquireTimestampLease(
AsyncTimelockService service, TimestampLeaseName timestampName, TimestampLeaseRequest request) {
return service.acquireTimestampLease(timestampName, request.getRequestId(), request.getNumFreshTimestamps());
private static ListenableFuture<TimestampLeaseResponses> acquireTimestampLease(
AsyncTimelockService service, RequestId requestsId, Map<TimestampLeaseName, Integer> numFreshTimestamps) {
return service.acquireTimestampLease(requestsId.get(), numFreshTimestamps);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import java.io.Closeable;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class AsyncLockService implements Closeable {

Expand Down Expand Up @@ -140,9 +142,9 @@ public AsyncResult<Leased<LockToken>> lockImmutableTimestamp(UUID requestId, lon
}

public AsyncResult<Leased<LockToken>> acquireTimestampLease(
TimestampLeaseName timestampName, UUID requestId, long timestamp) {
UUID requestId, Set<TimestampLeaseName> timestampNames, long timestamp) {
return heldLocks.getExistingOrAcquire(
requestId, () -> acquireNamedTimestampLockInternal(timestampName, requestId, timestamp));
requestId, () -> acquireNamedTimestampLockInternal(requestId, timestampNames, timestamp));
}

public AsyncResult<Void> waitForLocks(UUID requestId, Set<LockDescriptor> lockDescriptors, TimeLimit timeout) {
Expand Down Expand Up @@ -177,9 +179,11 @@ private AsyncResult<HeldLocks> acquireImmutableTimestampLock(UUID requestId, lon
}

private AsyncResult<HeldLocks> acquireNamedTimestampLockInternal(
TimestampLeaseName timestampName, UUID requestId, long timestamp) {
AsyncLock lock = lockManager.getNamedTimestampLock(timestampName, timestamp);
return lockAcquirer.acquireLocks(requestId, OrderedLocks.fromSingleLock(lock), TimeLimit.zero());
UUID requestId, Set<TimestampLeaseName> timestampNames, long timestamp) {
List<AsyncLock> locks = timestampNames.stream()
.map(name -> lockManager.getNamedTimestampLock(name, timestamp))
.collect(Collectors.toList());
return lockAcquirer.acquireLocks(requestId, OrderedLocks.fromOrderedList(locks), TimeLimit.zero());
}

public boolean unlock(LockToken token) {
Expand Down
Loading

0 comments on commit aa8665f

Please sign in to comment.