Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

[Named min timestamp leases] Acquire single lease per transaction-shaped batch #7385

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions timelock-api/src/main/conjure/timelock-api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -228,27 +228,30 @@ types:
LeaderTimes:
fields:
leaderTimes: map<Namespace, LeaderTime>
TimestampLeaseRequest:
fields:
requestId:
type: uuid
safety: safe
numFreshTimestamps:
type: integer
safety: safe
RequestId:
alias: uuid
safety: safe
TimestampLeaseRequests:
alias: map<TimestampLeaseName, TimestampLeaseRequest>
fields:
requestsId: RequestId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requestId

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requestId! 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in one of the PRs that merged - might be worth to go synchronously on the final yml api

numFreshTimestamps: map<TimestampLeaseName, integer>
TimestampLeasesRequest:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dunno if we should do something like:

  • MultiClientAcquireTimestampLeasesRequest
  • NamespaceAcquireTimestampLeasesRequest
  • AcquireTimestampLeasesRequest

I think this sets the right hierarchy or Multi-Client -> Namespace -> Single request

alias: list<TimestampLeaseRequests>
MultiClientTimestampLeaseRequest:
alias: map<Namespace, TimestampLeaseRequests>
alias: map<Namespace, TimestampLeasesRequest>
TimestampLeaseResponse:
fields:
minLeased: Long
leaseGuarantee: LeaseGuarantee
freshTimestamps: ConjureTimestampRange
TimestampLeaseResponses:
alias: map<TimestampLeaseName, TimestampLeaseResponse>
fields:
leaseGuarantee: LeaseGuarantee
timestampLeaseResponses: map<TimestampLeaseName, TimestampLeaseResponse>
TimestampLeasesResponse:
fields:
alias: list<TimestampLeaseResponses>
MultiClientTimestampLeaseResponse:
alias: map<Namespace, TimestampLeaseResponses>
alias: map<Namespace, TimestampLeasesResponse>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oof, yeah this is a bit tricky to follow. We have LeasesResponse having a bunch of LeaseResponses. Also, I do think LeasesResponse should probably be the internal and this one should be LeasesResponses. Though if you changed the naming later would be curious to see what you ended up with.

GetMinLeasedTimestampRequests:
alias: list<TimestampLeaseName>
MultiClientGetMinLeasedTimestampRequest:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.palantir.atlasdb.timelock.api.ConjureStartTransactionsResponse;
import com.palantir.atlasdb.timelock.api.GetCommitTimestampsResponse;
import com.palantir.atlasdb.timelock.api.TimestampLeaseName;
import com.palantir.atlasdb.timelock.api.TimestampLeaseResponse;
import com.palantir.atlasdb.timelock.api.TimestampLeaseResponses;
import com.palantir.atlasdb.timelock.lock.watch.LockWatchingService;
import com.palantir.lock.client.IdentifiedLockRequest;
import com.palantir.lock.v2.IdentifiedTimeLockRequest;
Expand All @@ -44,6 +44,7 @@
import com.palantir.timestamp.TimestampRange;
import com.palantir.tritium.annotations.Instrument;
import java.io.Closeable;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -76,8 +77,8 @@ ListenableFuture<GetCommitTimestampsResponse> getCommitTimestamps(

ListenableFuture<TimestampRange> getFreshTimestampsAsync(int timestampsToRequest);

ListenableFuture<TimestampLeaseResponse> acquireTimestampLease(
TimestampLeaseName timestampName, UUID requestId, int numFreshTimestamps);
ListenableFuture<TimestampLeaseResponses> acquireTimestampLease(
UUID requestId, Map<TimestampLeaseName, Integer> numFreshTimestamps);

ListenableFuture<Long> getMinLeasedTimestamp(TimestampLeaseName timestampName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.palantir.atlasdb.timelock.api.LockWatchRequest;
import com.palantir.atlasdb.timelock.api.TimestampLeaseName;
import com.palantir.atlasdb.timelock.api.TimestampLeaseResponse;
import com.palantir.atlasdb.timelock.api.TimestampLeaseResponses;
import com.palantir.atlasdb.timelock.lock.AsyncLockService;
import com.palantir.atlasdb.timelock.lock.AsyncResult;
import com.palantir.atlasdb.timelock.lock.Leased;
Expand All @@ -38,6 +39,7 @@
import com.palantir.atlasdb.timelock.lockwatches.RequestMetrics;
import com.palantir.atlasdb.timelock.transaction.timestamp.DelegatingClientAwareManagedTimestampService;
import com.palantir.atlasdb.timelock.transaction.timestamp.LeadershipGuardedClientAwareManagedTimestampService;
import com.palantir.common.streams.KeyedStream;
import com.palantir.lock.LockDescriptor;
import com.palantir.lock.client.IdentifiedLockRequest;
import com.palantir.lock.v2.IdentifiedTimeLockRequest;
Expand All @@ -60,6 +62,7 @@
import com.palantir.lock.watch.LockWatchVersion;
import com.palantir.timestamp.ManagedTimestampService;
import com.palantir.timestamp.TimestampRange;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -257,24 +260,22 @@ public ListenableFuture<TimestampRange> getFreshTimestampsAsync(int timestampsTo
}

@Override
public ListenableFuture<TimestampLeaseResponse> acquireTimestampLease(
TimestampLeaseName timestampName, UUID requestId, int numFreshTimestamps) {
public ListenableFuture<TimestampLeaseResponses> acquireTimestampLease(
UUID requestId, Map<TimestampLeaseName, Integer> numFreshTimestamps) {
long timestamp = timestampService.getFreshTimestamp();

Leased<LockToken> leasedLock = lockService
.acquireTimestampLease(timestampName, requestId, timestamp)
.acquireTimestampLease(requestId, numFreshTimestamps.keySet(), timestamp)
.get();
long minLeased = lockService.getMinLeasedTimestamp(timestampName).orElse(timestamp);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a comment here that it's crucial that the timestamps here are AFTER the fresh timestamp we're locking on.

TimestampRange timestampRange = timestampService.getFreshTimestamps(numFreshTimestamps);

ConjureTimestampRange freshTimestamps =
ConjureTimestampRange.of(timestampRange.getLowerBound(), timestampRange.size());
Map<TimestampLeaseName, TimestampLeaseResponse> responses = KeyedStream.stream(numFreshTimestamps)
.map((timestampName, numFreshTimestampsForName) ->
getMinLeasedAndFreshTimestamps(timestampName, numFreshTimestampsForName, timestamp))
.collectToMap();

return Futures.immediateFuture(TimestampLeaseResponse.builder()
.minLeased(minLeased)
return Futures.immediateFuture(TimestampLeaseResponses.builder()
.timestampLeaseResponses(responses)
.leaseGuarantee(toConjure(leasedLock))
.freshTimestamps(freshTimestamps)
.build());
}

Expand Down Expand Up @@ -337,6 +338,17 @@ public void logState() {
lockService.getLockWatchingService().logState();
}

private TimestampLeaseResponse getMinLeasedAndFreshTimestamps(
TimestampLeaseName timestampName, int numFreshTimestamps, long lockedTimestamp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the right factoring? There's value in calling getFreshTimestamps once for the N lease names and then portioning the timestamps out across the things that want it, after having gotten the min leased / locked timestamps for each of the namespaces in that we save a bunch of volatile reads and writes - normally I wouldn't care as much, but this is timelock...

long minLeased = lockService.getMinLeasedTimestamp(timestampName).orElse(lockedTimestamp);

TimestampRange timestampRange = timestampService.getFreshTimestamps(numFreshTimestamps);
ConjureTimestampRange freshTimestamps =
ConjureTimestampRange.of(timestampRange.getLowerBound(), timestampRange.size());

return TimestampLeaseResponse.of(minLeased, freshTimestamps);
}

private static LockWatchVersion fromConjure(ConjureIdentifiedVersion conjure) {
return LockWatchVersion.of(conjure.getId(), conjure.getVersion());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.palantir.atlasdb.timelock.api.MultiClientTimestampLeaseRequest;
import com.palantir.atlasdb.timelock.api.MultiClientTimestampLeaseResponse;
import com.palantir.atlasdb.timelock.api.Namespace;
import com.palantir.atlasdb.timelock.api.TimestampLeaseResponses;
import com.palantir.atlasdb.timelock.api.TimestampLeasesResponse;
import com.palantir.common.streams.KeyedStream;
import com.palantir.conjure.java.undertow.lib.RequestContext;
import java.util.Map;
Expand All @@ -46,7 +46,7 @@ private RemotingMultiClientTimestampLeaseServiceAdapter(RemotingTimestampLeaseSe

ListenableFuture<MultiClientTimestampLeaseResponse> acquireTimestampLeases(
MultiClientTimestampLeaseRequest requests, @Nullable RequestContext context) {
Map<Namespace, ListenableFuture<TimestampLeaseResponses>> futures = KeyedStream.stream(requests.get())
Map<Namespace, ListenableFuture<TimestampLeasesResponse>> futures = KeyedStream.stream(requests.get())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice 👍

.map((namespace, request) -> delegate.acquireTimestampLeases(namespace, request, context))
.collectToMap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@
import com.palantir.atlasdb.timelock.api.GetMinLeasedTimestampRequests;
import com.palantir.atlasdb.timelock.api.GetMinLeasedTimestampResponses;
import com.palantir.atlasdb.timelock.api.Namespace;
import com.palantir.atlasdb.timelock.api.RequestId;
import com.palantir.atlasdb.timelock.api.TimestampLeaseName;
import com.palantir.atlasdb.timelock.api.TimestampLeaseRequest;
import com.palantir.atlasdb.timelock.api.TimestampLeaseRequests;
import com.palantir.atlasdb.timelock.api.TimestampLeaseResponse;
import com.palantir.atlasdb.timelock.api.TimestampLeaseResponses;
import com.palantir.atlasdb.timelock.api.TimestampLeasesRequest;
import com.palantir.atlasdb.timelock.api.TimestampLeasesResponse;
import com.palantir.common.streams.KeyedStream;
import com.palantir.conjure.java.undertow.lib.RequestContext;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

final class RemotingTimestampLeaseServiceAdapter {
Expand All @@ -43,19 +45,18 @@ final class RemotingTimestampLeaseServiceAdapter {
this.timelockServices = timelockServices;
}

ListenableFuture<TimestampLeaseResponses> acquireTimestampLeases(
Namespace namespace, TimestampLeaseRequests requests, @Nullable RequestContext context) {
ListenableFuture<TimestampLeasesResponse> acquireTimestampLeases(
Namespace namespace, TimestampLeasesRequest requests, @Nullable RequestContext context) {
AsyncTimelockService service = getServiceForNamespace(namespace, context);

Map<TimestampLeaseName, ListenableFuture<TimestampLeaseResponse>> futures = KeyedStream.stream(requests.get())
.map((timestampName, request) -> acquireTimestampLease(service, timestampName, request))
.collectToMap();
List<ListenableFuture<TimestampLeaseResponses>> futures = requests.get().stream()
.map(request ->
acquireTimestampLease(service, request.getRequestsId(), request.getNumFreshTimestamps()))
.collect(Collectors.toList());

// TODO(aalouane): clean up lease resources in cases of partial failures
return Futures.transform(
AtlasFutures.allAsMap(futures, MoreExecutors.directExecutor()),
TimestampLeaseResponses::of,
MoreExecutors.directExecutor());
Futures.allAsList(futures), TimestampLeasesResponse::of, MoreExecutors.directExecutor());
}

ListenableFuture<GetMinLeasedTimestampResponses> getMinLeasedTimestamps(
Expand All @@ -76,8 +77,8 @@ private AsyncTimelockService getServiceForNamespace(Namespace namespace, @Nullab
return timelockServices.get(namespace.get(), TimelockNamespaces.toUserAgent(context));
}

private static ListenableFuture<TimestampLeaseResponse> acquireTimestampLease(
AsyncTimelockService service, TimestampLeaseName timestampName, TimestampLeaseRequest request) {
return service.acquireTimestampLease(timestampName, request.getRequestId(), request.getNumFreshTimestamps());
private static ListenableFuture<TimestampLeaseResponses> acquireTimestampLease(
AsyncTimelockService service, RequestId requestsId, Map<TimestampLeaseName, Integer> numFreshTimestamps) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pedantry: requestId

return service.acquireTimestampLease(requestsId.get(), numFreshTimestamps);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import java.io.Closeable;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class AsyncLockService implements Closeable {

Expand Down Expand Up @@ -140,9 +142,9 @@ public AsyncResult<Leased<LockToken>> lockImmutableTimestamp(UUID requestId, lon
}

public AsyncResult<Leased<LockToken>> acquireTimestampLease(
TimestampLeaseName timestampName, UUID requestId, long timestamp) {
UUID requestId, Set<TimestampLeaseName> timestampNames, long timestamp) {
return heldLocks.getExistingOrAcquire(
requestId, () -> acquireNamedTimestampLockInternal(timestampName, requestId, timestamp));
requestId, () -> acquireNamedTimestampLockInternal(requestId, timestampNames, timestamp));
}

public AsyncResult<Void> waitForLocks(UUID requestId, Set<LockDescriptor> lockDescriptors, TimeLimit timeout) {
Expand Down Expand Up @@ -177,9 +179,11 @@ private AsyncResult<HeldLocks> acquireImmutableTimestampLock(UUID requestId, lon
}

private AsyncResult<HeldLocks> acquireNamedTimestampLockInternal(
Copy link
Contributor

@jkozlowski jkozlowski Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

acquireNamedTimestampLocksInternal

In general, let's make sure you do a naming-alignment pass once we're done with all of the changes. I think we've iterated quite hard, so the whole chain is out of whack.

TimestampLeaseName timestampName, UUID requestId, long timestamp) {
AsyncLock lock = lockManager.getNamedTimestampLock(timestampName, timestamp);
return lockAcquirer.acquireLocks(requestId, OrderedLocks.fromSingleLock(lock), TimeLimit.zero());
UUID requestId, Set<TimestampLeaseName> timestampNames, long timestamp) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a set because the ordering doesn't matter.

In HeldLocks, the list is just used to be iterated on in lock/unlock operations. And, the lock descriptors are returned as a set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, looks like we thought about the same thing. I agree it doesn't matter, but it definitely reads weird. So maybe to allay these concerns, add an explicit comment OR handle the ordering.

List<AsyncLock> locks = timestampNames.stream()
.map(name -> lockManager.getNamedTimestampLock(name, timestamp))
.collect(Collectors.toList());
return lockAcquirer.acquireLocks(requestId, OrderedLocks.fromOrderedList(locks), TimeLimit.zero());
Copy link
Contributor

@jeremyk-91 jeremyk-91 Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree this works as written because the lock descriptor has a fresh timestamp appended, and so there should never be contention. A comment here is IMO required though: if the locks provided here actually had contention this is not OK because of the possibility of deadlock.

(Curious if there's more context elsewhere!)

}

public boolean unlock(LockToken token) {
Expand Down
Loading