From adfd07276a3010ea8eb4007104ac7e1d76c59ad5 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Thu, 26 Dec 2024 12:35:17 +0000 Subject: [PATCH 1/6] chore(Spanner): fix mutation only case ibn rw using mux with aborted errors --- .../spanner/AsyncTransactionManagerImpl.java | 15 +- .../MultiplexedSessionDatabaseClient.java | 13 +- .../cloud/spanner/SessionPoolOptions.java | 26 +- .../cloud/spanner/TransactionManagerImpl.java | 11 +- .../cloud/spanner/TransactionRunnerImpl.java | 19 +- .../cloud/spanner/MockSpannerServiceImpl.java | 9 +- ...edSessionDatabaseClientMockServerTest.java | 275 ++++++++++++++++++ 7 files changed, 357 insertions(+), 11 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java index 0057bb15bea..7192a3664dc 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java @@ -82,6 +82,8 @@ public TransactionContextFutureImpl beginAsync() { private ApiFuture internalBeginAsync(boolean firstAttempt) { txnState = TransactionState.STARTED; + boolean isMutationsOnlyTransaction = false; + // Determine the latest transactionId when using a multiplexed session. ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY; if (txn != null && session.getIsMultiplexed() && !firstAttempt) { @@ -89,6 +91,7 @@ private ApiFuture internalBeginAsync(boolean firstAttempt) { // transactionId. multiplexedSessionPreviousTransactionId = txn.transactionId != null ? txn.transactionId : txn.getPreviousTransactionId(); + isMutationsOnlyTransaction = txn.mutationsOnlyTransaction; } txn = @@ -99,7 +102,17 @@ private ApiFuture internalBeginAsync(boolean firstAttempt) { } final SettableApiFuture res = SettableApiFuture.create(); final ApiFuture fut; - if (firstAttempt) { + + /* + If the transaction contains only mutations and is using a multiplexed session, perform a + `BeginTransaction` after the user operation completes during a retry. + + This ensures that a random mutation from the mutations list is chosen when invoking + `BeginTransaction`. If `BeginTransaction` is performed before the user operation, + the mutations are not sent, and the precommit token is not received, resulting in + an INVALID_ARGUMENT error (missing precommit token) during commit. + */ + if (firstAttempt || isMutationsOnlyTransaction) { fut = ApiFutures.immediateFuture(null); } else { fut = txn.ensureTxnAsync(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index 01f41a2dfdc..89371a21c51 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -253,10 +253,15 @@ public void onSessionReady(SessionImpl session) { // initiate a begin transaction request to verify if read-write transactions are // supported using multiplexed sessions. if (sessionClient - .getSpanner() - .getOptions() - .getSessionPoolOptions() - .getUseMultiplexedSessionForRW()) { + .getSpanner() + .getOptions() + .getSessionPoolOptions() + .getUseMultiplexedSessionForRW() + && !sessionClient + .getSpanner() + .getOptions() + .getSessionPoolOptions() + .getSkipVerifyBeginTransactionForMuxRW()) { verifyBeginTransactionWithRWOnMultiplexedSessionAsync(session.getName()); } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index 36a4e5fe208..9bdccc7f25e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -83,6 +83,7 @@ public class SessionPoolOptions { // TODO: Change to use java.time.Duration. private final Duration multiplexedSessionMaintenanceDuration; + private final boolean skipVerifyingBeginTransactionForMuxRW; private SessionPoolOptions(Builder builder) { // minSessions > maxSessions is only possible if the user has only set a value for maxSessions. @@ -132,6 +133,7 @@ private SessionPoolOptions(Builder builder) { ? useMultiplexedSessionFromEnvVariablePartitionedOps : builder.useMultiplexedSessionPartitionedOps; this.multiplexedSessionMaintenanceDuration = builder.multiplexedSessionMaintenanceDuration; + this.skipVerifyingBeginTransactionForMuxRW = builder.skipVerifyingBeginTransactionForMuxRW; } @Override @@ -169,8 +171,10 @@ public boolean equals(Object o) { && Objects.equals(this.useMultiplexedSession, other.useMultiplexedSession) && Objects.equals(this.useMultiplexedSessionForRW, other.useMultiplexedSessionForRW) && Objects.equals( - this.multiplexedSessionMaintenanceDuration, - other.multiplexedSessionMaintenanceDuration); + this.multiplexedSessionMaintenanceDuration, other.multiplexedSessionMaintenanceDuration) + && Objects.equals( + this.skipVerifyingBeginTransactionForMuxRW, + other.skipVerifyingBeginTransactionForMuxRW); } @Override @@ -199,7 +203,8 @@ public int hashCode() { this.poolMaintainerClock, this.useMultiplexedSession, this.useMultiplexedSessionForRW, - this.multiplexedSessionMaintenanceDuration); + this.multiplexedSessionMaintenanceDuration, + this.skipVerifyingBeginTransactionForMuxRW); } public Builder toBuilder() { @@ -392,6 +397,12 @@ Duration getMultiplexedSessionMaintenanceDuration() { return multiplexedSessionMaintenanceDuration; } + @VisibleForTesting + @InternalApi + boolean getSkipVerifyBeginTransactionForMuxRW() { + return skipVerifyingBeginTransactionForMuxRW; + } + public static Builder newBuilder() { return new Builder(); } @@ -607,6 +618,7 @@ public static class Builder { private Duration multiplexedSessionMaintenanceDuration = Duration.ofDays(7); private Clock poolMaintainerClock = Clock.INSTANCE; + private boolean skipVerifyingBeginTransactionForMuxRW = false; private static Position getReleaseToPositionFromSystemProperty() { // NOTE: This System property is a beta feature. Support for it can be removed in the future. @@ -650,6 +662,7 @@ private Builder(SessionPoolOptions options) { this.useMultiplexedSessionPartitionedOps = options.useMultiplexedSessionForPartitionedOps; this.multiplexedSessionMaintenanceDuration = options.multiplexedSessionMaintenanceDuration; this.poolMaintainerClock = options.poolMaintainerClock; + this.skipVerifyingBeginTransactionForMuxRW = options.skipVerifyingBeginTransactionForMuxRW; } /** @@ -872,6 +885,13 @@ Builder setMultiplexedSessionMaintenanceDuration( return this; } + @VisibleForTesting + Builder setSkipVerifyingBeginTransactionForMuxRW( + boolean skipVerifyingBeginTransactionForMuxRW) { + this.skipVerifyingBeginTransactionForMuxRW = skipVerifyingBeginTransactionForMuxRW; + return this; + } + /** * Sets whether the client should automatically execute a background query to detect the dialect * that is used by the database or not. Set this option to true if you do not know what the diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java index cafb27ba6b7..e1c68c19d7a 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java @@ -102,7 +102,16 @@ public TransactionContext resetForRetry() { "resetForRetry can only be called if the previous attempt" + " aborted"); } try (IScope s = tracer.withSpan(span)) { - boolean useInlinedBegin = txn.transactionId != null; + /* + If the transaction contains only mutations and is using a multiplexed session, perform a + `BeginTransaction` after the user operation completes during a retry. + + This ensures that a random mutation from the mutations list is chosen when invoking + `BeginTransaction`. If `BeginTransaction` is performed before the user operation, + the mutations are not sent, and the precommit token is not received, resulting in + an INVALID_ARGUMENT error (missing precommit token) during commit. + */ + boolean useInlinedBegin = txn.mutationsOnlyTransaction || txn.transactionId != null; // Determine the latest transactionId when using a multiplexed session. ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 9e9fe62304a..6157720af22 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -222,6 +222,9 @@ public void removeListener(Runnable listener) { private final Map channelHint; + // This field indicates whether the read-write transaction contains only mutation operations. + boolean mutationsOnlyTransaction = false; + private TransactionContextImpl(Builder builder) { super(builder); this.transactionId = builder.transactionId; @@ -402,6 +405,11 @@ ApiFuture commitAsync() { synchronized (lock) { if (transactionIdFuture == null && transactionId == null && runningAsyncOperations == 0) { finishOps = SettableApiFuture.create(); + // At this point, it is ensured that the transaction contains only mutations. Adding a + // safeguard to apply this only for multiplexed sessions. + if (session.getIsMultiplexed()) { + mutationsOnlyTransaction = true; + } createTxnAsync(finishOps, randomMutation); } else { finishOps = finishedAsyncOperations; @@ -1229,7 +1237,16 @@ private T runInternal(final TransactionCallable txCallable) { if (attempt.get() > 0) { // Do not inline the BeginTransaction during a retry if the initial attempt did not // actually start a transaction. - useInlinedBegin = txn.transactionId != null; + /* + If the transaction contains only mutations and is using a multiplexed session, perform a + `BeginTransaction` after the user operation completes during a retry. + + This ensures that a random mutation from the mutations list is chosen when invoking + `BeginTransaction`. If `BeginTransaction` is performed before the user operation, + the mutations are not sent, and the precommit token is not received, resulting in + an INVALID_ARGUMENT error (missing precommit token) during commit. + */ + useInlinedBegin = txn.mutationsOnlyTransaction || txn.transactionId != null; // Determine the latest transactionId when using a multiplexed session. ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 676cb05eb07..7febf734119 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -1914,7 +1914,8 @@ private Transaction beginTransaction( } if (session.getMultiplexed() && options.getModeCase() == ModeCase.READ_WRITE - && mutationKey != null) { + && mutationKey != null + && mutationKey != com.google.spanner.v1.Mutation.getDefaultInstance()) { // Mutation only case in a read-write transaction. builder.setPrecommitToken(getTransactionPrecommitToken(transactionId)); } @@ -2013,6 +2014,12 @@ public void commit(CommitRequest request, StreamObserver respons return; } sessionLastUsed.put(session.getName(), Instant.now()); + if (session.getMultiplexed() && !request.hasPrecommitToken()) { + throw Status.INVALID_ARGUMENT + .withDescription( + "A Commit request for a read-write transaction on a multiplexed session must specify a precommit token.") + .asRuntimeException(); + } try { commitExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); // Find or start a transaction diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index c808bbe1110..d4dbc2d525c 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -58,6 +58,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -1291,6 +1292,280 @@ public void testMutationOnlyUsingAsyncTransactionManager() { request.getPrecommitToken().getPrecommitToken()); } + // The following 4 tests validate mutation-only cases where the BeginTransaction RPC fails with an + // ABORTED or retryable error + @Test + public void testMutationOnlyCaseAbortedDuringBeginTransaction() { + // This test verifies that in the case of mutations-only, when a transaction is retried after an + // ABORT in BeginTransaction RPC, the mutation key is correctly set in the BeginTransaction + // request + // and precommit token is set in Commit request. + + Spanner spanner1 = + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setUseMultiplexedSession(true) + .setUseMultiplexedSessionForRW(true) + .setSkipVerifyingBeginTransactionForMuxRW(true) + .build()) + .build() + .getService(); + DatabaseClientImpl client = + (DatabaseClientImpl) spanner1.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + // Force the BeginTransaction RPC to return Aborted the first time it is called. The exception + // is cleared + // after the first call, so the retry should succeed. + mockSpanner.setBeginTransactionExecutionTime( + SimulatedExecutionTime.ofException( + mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); + client + .readWriteTransaction() + .run( + transaction -> { + Mutation mutation = + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build(); + transaction.buffer(mutation); + return null; + }); + + // Verify that for mutation only case, a mutation key is set in BeginTransactionRequest. + List beginTransactionRequests = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertEquals(2, beginTransactionRequests.size()); + // Verify the requests are executed using multiplexed sessions + for (BeginTransactionRequest request : beginTransactionRequests) { + assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed()); + assertTrue(request.hasMutationKey()); + assertTrue(request.getMutationKey().hasInsert()); + } + + // Verify that the latest precommit token is set in the CommitRequest + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertEquals(1L, commitRequests.size()); + for (CommitRequest request : commitRequests) { + assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed()); + assertNotNull(request.getPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("TransactionPrecommitToken"), + request.getPrecommitToken().getPrecommitToken()); + } + + spanner1.close(); + } + + @Test + public void testMutationOnlyUsingTransactionManagerAbortedDuringBeginTransaction() { + // This test verifies that in the case of mutations-only, when a transaction is retried after an + // ABORT in BeginTransaction RPC, the mutation key is correctly set in the BeginTransaction + // request + // and precommit token is set in Commit request. + + Spanner spanner1 = + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setUseMultiplexedSession(true) + .setUseMultiplexedSessionForRW(true) + .setSkipVerifyingBeginTransactionForMuxRW(true) + .build()) + .build() + .getService(); + DatabaseClientImpl client = + (DatabaseClientImpl) spanner1.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + // Force the BeginTransaction RPC to return Aborted the first time it is called. The exception + // is cleared + // after the first call, so the retry should succeed. + mockSpanner.setBeginTransactionExecutionTime( + SimulatedExecutionTime.ofException( + mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); + try (TransactionManager manager = client.transactionManager()) { + TransactionContext transaction = manager.begin(); + while (true) { + try { + Mutation mutation = + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build(); + transaction.buffer(mutation); + manager.commit(); + assertNotNull(manager.getCommitTimestamp()); + break; + } catch (AbortedException e) { + transaction = manager.resetForRetry(); + } + } + } + + // Verify that for mutation only case, a mutation key is set in BeginTransactionRequest. + List beginTransactionRequests = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertEquals(2, beginTransactionRequests.size()); + // Verify the requests are executed using multiplexed sessions + for (BeginTransactionRequest request : beginTransactionRequests) { + assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed()); + assertTrue(request.hasMutationKey()); + assertTrue(request.getMutationKey().hasInsert()); + } + + // Verify that the latest precommit token is set in the CommitRequest + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertEquals(1L, commitRequests.size()); + for (CommitRequest request : commitRequests) { + assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed()); + assertNotNull(request.getPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("TransactionPrecommitToken"), + request.getPrecommitToken().getPrecommitToken()); + } + + spanner1.close(); + } + + @Test + public void testMutationOnlyUsingAsyncRunnerAbortedDuringBeginTransaction() { + // This test verifies that in the case of mutations-only, when a transaction is retried after an + // ABORT in BeginTransaction RPC, the mutation key is correctly set in the BeginTransaction + // request + // and precommit token is set in Commit request. + + Spanner spanner1 = + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setUseMultiplexedSession(true) + .setUseMultiplexedSessionForRW(true) + .setSkipVerifyingBeginTransactionForMuxRW(true) + .build()) + .build() + .getService(); + DatabaseClientImpl client = + (DatabaseClientImpl) spanner1.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + // Force the BeginTransaction RPC to return Aborted the first time it is called. The exception + // is cleared + // after the first call, so the retry should succeed. + mockSpanner.setBeginTransactionExecutionTime( + SimulatedExecutionTime.ofException( + mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); + AsyncRunner runner = client.runAsync(); + get( + runner.runAsync( + txn -> { + txn.buffer(Mutation.delete("TEST", KeySet.all())); + return ApiFutures.immediateFuture(null); + }, + MoreExecutors.directExecutor())); + + // Verify that for mutation only case, a mutation key is set in BeginTransactionRequest. + List beginTransactionRequests = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertEquals(2, beginTransactionRequests.size()); + // Verify the requests are executed using multiplexed sessions + for (BeginTransactionRequest request : beginTransactionRequests) { + assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed()); + assertTrue(request.hasMutationKey()); + assertTrue(request.getMutationKey().hasDelete()); + } + + // Verify that the latest precommit token is set in the CommitRequest + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertEquals(1L, commitRequests.size()); + for (CommitRequest request : commitRequests) { + assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed()); + assertNotNull(request.getPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("TransactionPrecommitToken"), + request.getPrecommitToken().getPrecommitToken()); + } + + spanner1.close(); + } + + @Test + public void testMutationOnlyUsingTransactionManagerAsyncAbortedDuringBeginTransaction() + throws InterruptedException, ExecutionException { + // This test verifies that in the case of mutations-only, when a transaction is retried after an + // ABORT in BeginTransaction RPC, the mutation key is correctly set in the BeginTransaction + // request + // and precommit token is set in Commit request. + Spanner spanner1 = + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setUseMultiplexedSession(true) + .setUseMultiplexedSessionForRW(true) + .setSkipVerifyingBeginTransactionForMuxRW(true) + .build()) + .build() + .getService(); + DatabaseClientImpl client = + (DatabaseClientImpl) spanner1.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + // Force the BeginTransaction RPC to return Aborted the first time it is called. The exception + // is cleared + // after the first call, so the retry should succeed. + mockSpanner.setBeginTransactionExecutionTime( + SimulatedExecutionTime.ofException( + mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); + try (AsyncTransactionManager manager = client.transactionManagerAsync()) { + TransactionContextFuture transaction = manager.beginAsync(); + while (true) { + CommitTimestampFuture commitTimestamp = + transaction + .then( + (txn, input) -> { + txn.buffer(Mutation.delete("TEST", KeySet.all())); + return ApiFutures.immediateFuture(null); + }, + MoreExecutors.directExecutor()) + .commitAsync(); + try { + assertThat(commitTimestamp.get()).isNotNull(); + break; + } catch (AbortedException e) { + transaction = manager.resetForRetryAsync(); + } + } + } + + // Verify that for mutation only case, a mutation key is set in BeginTransactionRequest. + List beginTransactionRequests = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertEquals(2, beginTransactionRequests.size()); + // Verify the requests are executed using multiplexed sessions + for (BeginTransactionRequest request : beginTransactionRequests) { + assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed()); + assertTrue(request.hasMutationKey()); + assertTrue(request.getMutationKey().hasDelete()); + } + + // Verify that the latest precommit token is set in the CommitRequest + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertEquals(1L, commitRequests.size()); + for (CommitRequest request : commitRequests) { + assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed()); + assertNotNull(request.getPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("TransactionPrecommitToken"), + request.getPrecommitToken().getPrecommitToken()); + } + + spanner1.close(); + } + // Tests the behavior of the server-side kill switch for read-write multiplexed sessions.. @Test public void testInitialBeginTransactionWithRW_receivesUnimplemented_fallsBackToRegularSession() { From 875b49f53a6c3aeec4615c7992486bd6ad5adb42 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Thu, 26 Dec 2024 12:48:02 +0000 Subject: [PATCH 2/6] chore(Spanner): fix mockspanner --- .../java/com/google/cloud/spanner/MockSpannerServiceImpl.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 7febf734119..c59ad34dc69 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -2014,7 +2014,9 @@ public void commit(CommitRequest request, StreamObserver respons return; } sessionLastUsed.put(session.getName(), Instant.now()); - if (session.getMultiplexed() && !request.hasPrecommitToken()) { + if (session.getMultiplexed() + && !request.hasPrecommitToken() + && !request.hasSingleUseTransaction()) { throw Status.INVALID_ARGUMENT .withDescription( "A Commit request for a read-write transaction on a multiplexed session must specify a precommit token.") From b6cee665febbb83fca1d3964ff250229c1590532 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Wed, 29 Jan 2025 12:07:07 +0000 Subject: [PATCH 3/6] chore(spanner): update logic for mutations only --- .../cloud/spanner/SessionPoolOptions.java | 2 +- .../cloud/spanner/TransactionManagerImpl.java | 10 +- .../cloud/spanner/TransactionRunnerImpl.java | 13 +- ...edSessionDatabaseClientMockServerTest.java | 246 ++++++------------ 4 files changed, 101 insertions(+), 170 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index a32f25b6e5f..03551640b43 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -884,7 +884,7 @@ Builder setMultiplexedSessionMaintenanceDuration( this.multiplexedSessionMaintenanceDuration = multiplexedSessionMaintenanceDuration; return this; } - + // The additional BeginTransaction RPC for multiplexed session read-write is causing // unexpected behavior in mock Spanner tests that rely on mocking the BeginTransaction RPC. // Invoking this method with `true` skips sending the BeginTransaction RPC when the multiplexed diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java index 6f59918e375..2dd7fbb8530 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java @@ -103,15 +103,17 @@ public TransactionContext resetForRetry() { } try (IScope s = tracer.withSpan(span)) { /* + In case of regular session, explicitBeginBeforeUserOperation field is always true and hence there is no change in behaviour. + If the transaction contains only mutations and is using a multiplexed session, perform a `BeginTransaction` after the user operation completes during a retry. - - This ensures that a random mutation from the mutations list is chosen when invoking + This ensures that the mutations from the user callable is available before invoking `BeginTransaction`. If `BeginTransaction` is performed before the user operation, the mutations are not sent, and the precommit token is not received, resulting in an INVALID_ARGUMENT error (missing precommit token) during commit. */ - boolean useInlinedBegin = txn.mutationsOnlyTransaction || txn.transactionId != null; + boolean useInlinedBegin = txn.transactionId != null; + boolean explicitBeginBeforeUserOperation = !txn.mutationsOnlyTransaction; // Determine the latest transactionId when using a multiplexed session. ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY; @@ -124,7 +126,7 @@ an INVALID_ARGUMENT error (missing precommit token) during commit. txn = session.newTransaction( options, /* previousTransactionId = */ multiplexedSessionPreviousTransactionId); - if (!useInlinedBegin) { + if (!useInlinedBegin && explicitBeginBeforeUserOperation) { txn.ensureTxn(); } txnState = TransactionState.STARTED; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 6157720af22..4f7a755c8e2 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -223,6 +223,7 @@ public void removeListener(Runnable listener) { private final Map channelHint; // This field indicates whether the read-write transaction contains only mutation operations. + // This field is set only in case of multiplexed sessions. boolean mutationsOnlyTransaction = false; private TransactionContextImpl(Builder builder) { @@ -1234,19 +1235,23 @@ private T runInternal(final TransactionCallable txCallable) { Callable retryCallable = () -> { boolean useInlinedBegin = true; + boolean explicitBeginBeforeUserOperation = true; if (attempt.get() > 0) { // Do not inline the BeginTransaction during a retry if the initial attempt did not // actually start a transaction. + useInlinedBegin = txn.transactionId != null; + /* + In case of regular session, explicitBeginBeforeUserOperation field is always true and hence there is no change in behaviour. + If the transaction contains only mutations and is using a multiplexed session, perform a `BeginTransaction` after the user operation completes during a retry. - - This ensures that a random mutation from the mutations list is chosen when invoking + This ensures that the mutations from the user callable is available before invoking `BeginTransaction`. If `BeginTransaction` is performed before the user operation, the mutations are not sent, and the precommit token is not received, resulting in an INVALID_ARGUMENT error (missing precommit token) during commit. */ - useInlinedBegin = txn.mutationsOnlyTransaction || txn.transactionId != null; + explicitBeginBeforeUserOperation = !txn.mutationsOnlyTransaction; // Determine the latest transactionId when using a multiplexed session. ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY; @@ -1267,7 +1272,7 @@ an INVALID_ARGUMENT error (missing precommit token) during commit. span.addAnnotation("Starting Transaction Attempt", "Attempt", attempt.longValue()); // Only ensure that there is a transaction if we should not inline the beginTransaction // with the first statement. - if (!useInlinedBegin) { + if (!useInlinedBegin && explicitBeginBeforeUserOperation) { txn.ensureTxn(); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index d4dbc2d525c..810753ca11a 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -58,7 +58,6 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -1292,50 +1291,30 @@ public void testMutationOnlyUsingAsyncTransactionManager() { request.getPrecommitToken().getPrecommitToken()); } - // The following 4 tests validate mutation-only cases where the BeginTransaction RPC fails with an - // ABORTED or retryable error - @Test - public void testMutationOnlyCaseAbortedDuringBeginTransaction() { - // This test verifies that in the case of mutations-only, when a transaction is retried after an - // ABORT in BeginTransaction RPC, the mutation key is correctly set in the BeginTransaction - // request - // and precommit token is set in Commit request. - - Spanner spanner1 = - SpannerOptions.newBuilder() - .setProjectId("test-project") - .setChannelProvider(channelProvider) - .setCredentials(NoCredentials.getInstance()) - .setSessionPoolOption( - SessionPoolOptions.newBuilder() - .setUseMultiplexedSession(true) - .setUseMultiplexedSessionForRW(true) - .setSkipVerifyingBeginTransactionForMuxRW(true) - .build()) - .build() - .getService(); - DatabaseClientImpl client = - (DatabaseClientImpl) spanner1.getDatabaseClient(DatabaseId.of("p", "i", "d")); - + private Spanner setupSpannerForAbortedBeginTransactionTests() { // Force the BeginTransaction RPC to return Aborted the first time it is called. The exception // is cleared // after the first call, so the retry should succeed. mockSpanner.setBeginTransactionExecutionTime( SimulatedExecutionTime.ofException( mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); - client - .readWriteTransaction() - .run( - transaction -> { - Mutation mutation = - Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build(); - transaction.buffer(mutation); - return null; - }); - // Verify that for mutation only case, a mutation key is set in BeginTransactionRequest. - List beginTransactionRequests = - mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + return SpannerOptions.newBuilder() + .setProjectId("test-project") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setUseMultiplexedSession(true) + .setUseMultiplexedSessionForRW(true) + .setSkipVerifyingBeginTransactionForMuxRW(true) + .build()) + .build() + .getService(); + } + + private void verifyMutationKeySetInBeginTransactionRequests( + List beginTransactionRequests) { assertEquals(2, beginTransactionRequests.size()); // Verify the requests are executed using multiplexed sessions for (BeginTransactionRequest request : beginTransactionRequests) { @@ -1343,9 +1322,9 @@ public void testMutationOnlyCaseAbortedDuringBeginTransaction() { assertTrue(request.hasMutationKey()); assertTrue(request.getMutationKey().hasInsert()); } + } - // Verify that the latest precommit token is set in the CommitRequest - List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + private void verifyPreCommitTokenSetInCommitRequest(List commitRequests) { assertEquals(1L, commitRequests.size()); for (CommitRequest request : commitRequests) { assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed()); @@ -1354,39 +1333,52 @@ public void testMutationOnlyCaseAbortedDuringBeginTransaction() { ByteString.copyFromUtf8("TransactionPrecommitToken"), request.getPrecommitToken().getPrecommitToken()); } + } + + // The following 4 tests validate mutation-only cases where the BeginTransaction RPC fails with an + // ABORTED or retryable error + @Test + public void testMutationOnlyCaseAbortedDuringBeginTransaction() { + // This test ensures that when a transaction containing only mutations is retried after an + // ABORT error in the BeginTransaction RPC: + // 1. The mutation key is correctly included in the BeginTransaction request. + // 2. The precommit token is properly set in the Commit request. + Spanner spanner = setupSpannerForAbortedBeginTransactionTests(); + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + client + .readWriteTransaction() + .run( + transaction -> { + Mutation mutation = + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build(); + transaction.buffer(mutation); + return null; + }); - spanner1.close(); + // Verify that for mutation only case, a mutation key is set in BeginTransactionRequest. + List beginTransactionRequests = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + verifyMutationKeySetInBeginTransactionRequests(beginTransactionRequests); + + // Verify that the latest precommit token is set in the CommitRequest + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + verifyPreCommitTokenSetInCommitRequest(commitRequests); + + spanner.close(); } @Test public void testMutationOnlyUsingTransactionManagerAbortedDuringBeginTransaction() { - // This test verifies that in the case of mutations-only, when a transaction is retried after an - // ABORT in BeginTransaction RPC, the mutation key is correctly set in the BeginTransaction - // request - // and precommit token is set in Commit request. - - Spanner spanner1 = - SpannerOptions.newBuilder() - .setProjectId("test-project") - .setChannelProvider(channelProvider) - .setCredentials(NoCredentials.getInstance()) - .setSessionPoolOption( - SessionPoolOptions.newBuilder() - .setUseMultiplexedSession(true) - .setUseMultiplexedSessionForRW(true) - .setSkipVerifyingBeginTransactionForMuxRW(true) - .build()) - .build() - .getService(); + // This test ensures that when a transaction containing only mutations is retried after an + // ABORT error in the BeginTransaction RPC: + // 1. The mutation key is correctly included in the BeginTransaction request. + // 2. The precommit token is properly set in the Commit request. + Spanner spanner = setupSpannerForAbortedBeginTransactionTests(); DatabaseClientImpl client = - (DatabaseClientImpl) spanner1.getDatabaseClient(DatabaseId.of("p", "i", "d")); + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); - // Force the BeginTransaction RPC to return Aborted the first time it is called. The exception - // is cleared - // after the first call, so the retry should succeed. - mockSpanner.setBeginTransactionExecutionTime( - SimulatedExecutionTime.ofException( - mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); try (TransactionManager manager = client.transactionManager()) { TransactionContext transaction = manager.begin(); while (true) { @@ -1406,62 +1398,32 @@ public void testMutationOnlyUsingTransactionManagerAbortedDuringBeginTransaction // Verify that for mutation only case, a mutation key is set in BeginTransactionRequest. List beginTransactionRequests = mockSpanner.getRequestsOfType(BeginTransactionRequest.class); - assertEquals(2, beginTransactionRequests.size()); - // Verify the requests are executed using multiplexed sessions - for (BeginTransactionRequest request : beginTransactionRequests) { - assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed()); - assertTrue(request.hasMutationKey()); - assertTrue(request.getMutationKey().hasInsert()); - } + verifyMutationKeySetInBeginTransactionRequests(beginTransactionRequests); // Verify that the latest precommit token is set in the CommitRequest List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); - assertEquals(1L, commitRequests.size()); - for (CommitRequest request : commitRequests) { - assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed()); - assertNotNull(request.getPrecommitToken()); - assertEquals( - ByteString.copyFromUtf8("TransactionPrecommitToken"), - request.getPrecommitToken().getPrecommitToken()); - } + verifyPreCommitTokenSetInCommitRequest(commitRequests); - spanner1.close(); + spanner.close(); } @Test public void testMutationOnlyUsingAsyncRunnerAbortedDuringBeginTransaction() { - // This test verifies that in the case of mutations-only, when a transaction is retried after an - // ABORT in BeginTransaction RPC, the mutation key is correctly set in the BeginTransaction - // request - // and precommit token is set in Commit request. + // This test ensures that when a transaction containing only mutations is retried after an + // ABORT error in the BeginTransaction RPC: + // 1. The mutation key is correctly included in the BeginTransaction request. + // 2. The precommit token is properly set in the Commit request. - Spanner spanner1 = - SpannerOptions.newBuilder() - .setProjectId("test-project") - .setChannelProvider(channelProvider) - .setCredentials(NoCredentials.getInstance()) - .setSessionPoolOption( - SessionPoolOptions.newBuilder() - .setUseMultiplexedSession(true) - .setUseMultiplexedSessionForRW(true) - .setSkipVerifyingBeginTransactionForMuxRW(true) - .build()) - .build() - .getService(); + Spanner spanner = setupSpannerForAbortedBeginTransactionTests(); DatabaseClientImpl client = - (DatabaseClientImpl) spanner1.getDatabaseClient(DatabaseId.of("p", "i", "d")); + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); - // Force the BeginTransaction RPC to return Aborted the first time it is called. The exception - // is cleared - // after the first call, so the retry should succeed. - mockSpanner.setBeginTransactionExecutionTime( - SimulatedExecutionTime.ofException( - mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); AsyncRunner runner = client.runAsync(); get( runner.runAsync( txn -> { - txn.buffer(Mutation.delete("TEST", KeySet.all())); + txn.buffer( + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()); return ApiFutures.immediateFuture(null); }, MoreExecutors.directExecutor())); @@ -1469,57 +1431,26 @@ public void testMutationOnlyUsingAsyncRunnerAbortedDuringBeginTransaction() { // Verify that for mutation only case, a mutation key is set in BeginTransactionRequest. List beginTransactionRequests = mockSpanner.getRequestsOfType(BeginTransactionRequest.class); - assertEquals(2, beginTransactionRequests.size()); - // Verify the requests are executed using multiplexed sessions - for (BeginTransactionRequest request : beginTransactionRequests) { - assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed()); - assertTrue(request.hasMutationKey()); - assertTrue(request.getMutationKey().hasDelete()); - } + verifyMutationKeySetInBeginTransactionRequests(beginTransactionRequests); // Verify that the latest precommit token is set in the CommitRequest List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); - assertEquals(1L, commitRequests.size()); - for (CommitRequest request : commitRequests) { - assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed()); - assertNotNull(request.getPrecommitToken()); - assertEquals( - ByteString.copyFromUtf8("TransactionPrecommitToken"), - request.getPrecommitToken().getPrecommitToken()); - } + verifyPreCommitTokenSetInCommitRequest(commitRequests); - spanner1.close(); + spanner.close(); } @Test public void testMutationOnlyUsingTransactionManagerAsyncAbortedDuringBeginTransaction() - throws InterruptedException, ExecutionException { + throws Exception { // This test verifies that in the case of mutations-only, when a transaction is retried after an // ABORT in BeginTransaction RPC, the mutation key is correctly set in the BeginTransaction // request // and precommit token is set in Commit request. - Spanner spanner1 = - SpannerOptions.newBuilder() - .setProjectId("test-project") - .setChannelProvider(channelProvider) - .setCredentials(NoCredentials.getInstance()) - .setSessionPoolOption( - SessionPoolOptions.newBuilder() - .setUseMultiplexedSession(true) - .setUseMultiplexedSessionForRW(true) - .setSkipVerifyingBeginTransactionForMuxRW(true) - .build()) - .build() - .getService(); + Spanner spanner = setupSpannerForAbortedBeginTransactionTests(); DatabaseClientImpl client = - (DatabaseClientImpl) spanner1.getDatabaseClient(DatabaseId.of("p", "i", "d")); + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); - // Force the BeginTransaction RPC to return Aborted the first time it is called. The exception - // is cleared - // after the first call, so the retry should succeed. - mockSpanner.setBeginTransactionExecutionTime( - SimulatedExecutionTime.ofException( - mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); try (AsyncTransactionManager manager = client.transactionManagerAsync()) { TransactionContextFuture transaction = manager.beginAsync(); while (true) { @@ -1527,7 +1458,13 @@ public void testMutationOnlyUsingTransactionManagerAsyncAbortedDuringBeginTransa transaction .then( (txn, input) -> { - txn.buffer(Mutation.delete("TEST", KeySet.all())); + txn.buffer( + Mutation.newInsertBuilder("FOO") + .set("ID") + .to(1L) + .set("NAME") + .to("Bar") + .build()); return ApiFutures.immediateFuture(null); }, MoreExecutors.directExecutor()) @@ -1544,26 +1481,13 @@ public void testMutationOnlyUsingTransactionManagerAsyncAbortedDuringBeginTransa // Verify that for mutation only case, a mutation key is set in BeginTransactionRequest. List beginTransactionRequests = mockSpanner.getRequestsOfType(BeginTransactionRequest.class); - assertEquals(2, beginTransactionRequests.size()); - // Verify the requests are executed using multiplexed sessions - for (BeginTransactionRequest request : beginTransactionRequests) { - assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed()); - assertTrue(request.hasMutationKey()); - assertTrue(request.getMutationKey().hasDelete()); - } + verifyMutationKeySetInBeginTransactionRequests(beginTransactionRequests); // Verify that the latest precommit token is set in the CommitRequest List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); - assertEquals(1L, commitRequests.size()); - for (CommitRequest request : commitRequests) { - assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed()); - assertNotNull(request.getPrecommitToken()); - assertEquals( - ByteString.copyFromUtf8("TransactionPrecommitToken"), - request.getPrecommitToken().getPrecommitToken()); - } + verifyPreCommitTokenSetInCommitRequest(commitRequests); - spanner1.close(); + spanner.close(); } // Tests the behavior of the server-side kill switch for read-write multiplexed sessions.. From 7d19004ddf671446630324fe3d9cb37549433773 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Wed, 29 Jan 2025 12:12:04 +0000 Subject: [PATCH 4/6] chore(spanner): lint fix --- .../java/com/google/cloud/spanner/TransactionManagerImpl.java | 3 ++- .../java/com/google/cloud/spanner/TransactionRunnerImpl.java | 3 ++- .../MultiplexedSessionDatabaseClientMockServerTest.java | 3 +-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java index 2dd7fbb8530..19311e875b5 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java @@ -103,7 +103,8 @@ public TransactionContext resetForRetry() { } try (IScope s = tracer.withSpan(span)) { /* - In case of regular session, explicitBeginBeforeUserOperation field is always true and hence there is no change in behaviour. + In case of regular session, explicitBeginBeforeUserOperation field is always true and + hence there is no change in behaviour. If the transaction contains only mutations and is using a multiplexed session, perform a `BeginTransaction` after the user operation completes during a retry. diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 4f7a755c8e2..b3ff124291c 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -1242,7 +1242,8 @@ private T runInternal(final TransactionCallable txCallable) { useInlinedBegin = txn.transactionId != null; /* - In case of regular session, explicitBeginBeforeUserOperation field is always true and hence there is no change in behaviour. + In case of regular session, explicitBeginBeforeUserOperation field is always true and hence + there is no change in behaviour. If the transaction contains only mutations and is using a multiplexed session, perform a `BeginTransaction` after the user operation completes during a retry. diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 810753ca11a..ee75da460e1 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -1293,8 +1293,7 @@ public void testMutationOnlyUsingAsyncTransactionManager() { private Spanner setupSpannerForAbortedBeginTransactionTests() { // Force the BeginTransaction RPC to return Aborted the first time it is called. The exception - // is cleared - // after the first call, so the retry should succeed. + // is cleared after the first call, so the retry should succeed. mockSpanner.setBeginTransactionExecutionTime( SimulatedExecutionTime.ofException( mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); From 638f94f519fcef8dda2bc264eca18b012be4d42c Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Thu, 30 Jan 2025 10:57:04 +0000 Subject: [PATCH 5/6] chore(spanner): update logic to handle begin txn retry only for aborted case --- .../spanner/AsyncTransactionManagerImpl.java | 15 +------- .../cloud/spanner/TransactionManagerImpl.java | 14 +------- .../cloud/spanner/TransactionRunnerImpl.java | 35 ++++++------------- 3 files changed, 13 insertions(+), 51 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java index 7192a3664dc..0057bb15bea 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java @@ -82,8 +82,6 @@ public TransactionContextFutureImpl beginAsync() { private ApiFuture internalBeginAsync(boolean firstAttempt) { txnState = TransactionState.STARTED; - boolean isMutationsOnlyTransaction = false; - // Determine the latest transactionId when using a multiplexed session. ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY; if (txn != null && session.getIsMultiplexed() && !firstAttempt) { @@ -91,7 +89,6 @@ private ApiFuture internalBeginAsync(boolean firstAttempt) { // transactionId. multiplexedSessionPreviousTransactionId = txn.transactionId != null ? txn.transactionId : txn.getPreviousTransactionId(); - isMutationsOnlyTransaction = txn.mutationsOnlyTransaction; } txn = @@ -102,17 +99,7 @@ private ApiFuture internalBeginAsync(boolean firstAttempt) { } final SettableApiFuture res = SettableApiFuture.create(); final ApiFuture fut; - - /* - If the transaction contains only mutations and is using a multiplexed session, perform a - `BeginTransaction` after the user operation completes during a retry. - - This ensures that a random mutation from the mutations list is chosen when invoking - `BeginTransaction`. If `BeginTransaction` is performed before the user operation, - the mutations are not sent, and the precommit token is not received, resulting in - an INVALID_ARGUMENT error (missing precommit token) during commit. - */ - if (firstAttempt || isMutationsOnlyTransaction) { + if (firstAttempt) { fut = ApiFutures.immediateFuture(null); } else { fut = txn.ensureTxnAsync(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java index 19311e875b5..b1d37f3e4cd 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java @@ -102,19 +102,7 @@ public TransactionContext resetForRetry() { "resetForRetry can only be called if the previous attempt aborted"); } try (IScope s = tracer.withSpan(span)) { - /* - In case of regular session, explicitBeginBeforeUserOperation field is always true and - hence there is no change in behaviour. - - If the transaction contains only mutations and is using a multiplexed session, perform a - `BeginTransaction` after the user operation completes during a retry. - This ensures that the mutations from the user callable is available before invoking - `BeginTransaction`. If `BeginTransaction` is performed before the user operation, - the mutations are not sent, and the precommit token is not received, resulting in - an INVALID_ARGUMENT error (missing precommit token) during commit. - */ boolean useInlinedBegin = txn.transactionId != null; - boolean explicitBeginBeforeUserOperation = !txn.mutationsOnlyTransaction; // Determine the latest transactionId when using a multiplexed session. ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY; @@ -127,7 +115,7 @@ an INVALID_ARGUMENT error (missing precommit token) during commit. txn = session.newTransaction( options, /* previousTransactionId = */ multiplexedSessionPreviousTransactionId); - if (!useInlinedBegin && explicitBeginBeforeUserOperation) { + if (!useInlinedBegin) { txn.ensureTxn(); } txnState = TransactionState.STARTED; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index b3ff124291c..8eca6975240 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -222,10 +222,6 @@ public void removeListener(Runnable listener) { private final Map channelHint; - // This field indicates whether the read-write transaction contains only mutation operations. - // This field is set only in case of multiplexed sessions. - boolean mutationsOnlyTransaction = false; - private TransactionContextImpl(Builder builder) { super(builder); this.transactionId = builder.transactionId; @@ -330,6 +326,16 @@ private void createTxnAsync( } res.set(null); } catch (ExecutionException e) { + SpannerException spannerException = SpannerExceptionFactory.asSpannerException(e); + if (spannerException.getErrorCode() == ErrorCode.ABORTED + && session.getIsMultiplexed() + && mutation != null) { + // Begin transaction can return ABORTED errors. This can only happen if it included + // a mutation key, which again means that this is a mutation-only transaction on a multiplexed + // session. + createTxnAsync(res, mutation); + return; + } span.addAnnotation( "Transaction Creation Failed", e.getCause() == null ? e : e.getCause()); res.setException(e.getCause() == null ? e : e.getCause()); @@ -406,11 +412,6 @@ ApiFuture commitAsync() { synchronized (lock) { if (transactionIdFuture == null && transactionId == null && runningAsyncOperations == 0) { finishOps = SettableApiFuture.create(); - // At this point, it is ensured that the transaction contains only mutations. Adding a - // safeguard to apply this only for multiplexed sessions. - if (session.getIsMultiplexed()) { - mutationsOnlyTransaction = true; - } createTxnAsync(finishOps, randomMutation); } else { finishOps = finishedAsyncOperations; @@ -1235,25 +1236,11 @@ private T runInternal(final TransactionCallable txCallable) { Callable retryCallable = () -> { boolean useInlinedBegin = true; - boolean explicitBeginBeforeUserOperation = true; if (attempt.get() > 0) { // Do not inline the BeginTransaction during a retry if the initial attempt did not // actually start a transaction. useInlinedBegin = txn.transactionId != null; - /* - In case of regular session, explicitBeginBeforeUserOperation field is always true and hence - there is no change in behaviour. - - If the transaction contains only mutations and is using a multiplexed session, perform a - `BeginTransaction` after the user operation completes during a retry. - This ensures that the mutations from the user callable is available before invoking - `BeginTransaction`. If `BeginTransaction` is performed before the user operation, - the mutations are not sent, and the precommit token is not received, resulting in - an INVALID_ARGUMENT error (missing precommit token) during commit. - */ - explicitBeginBeforeUserOperation = !txn.mutationsOnlyTransaction; - // Determine the latest transactionId when using a multiplexed session. ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY; if (session.getIsMultiplexed()) { @@ -1273,7 +1260,7 @@ an INVALID_ARGUMENT error (missing precommit token) during commit. span.addAnnotation("Starting Transaction Attempt", "Attempt", attempt.longValue()); // Only ensure that there is a transaction if we should not inline the beginTransaction // with the first statement. - if (!useInlinedBegin && explicitBeginBeforeUserOperation) { + if (!useInlinedBegin) { txn.ensureTxn(); } From cce319e7150060c86202ceada751161fe1f2d047 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Thu, 30 Jan 2025 11:00:39 +0000 Subject: [PATCH 6/6] chore(spanner): add span for abort --- .../com/google/cloud/spanner/TransactionRunnerImpl.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 8eca6975240..fad4ce564ab 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -331,8 +331,11 @@ private void createTxnAsync( && session.getIsMultiplexed() && mutation != null) { // Begin transaction can return ABORTED errors. This can only happen if it included - // a mutation key, which again means that this is a mutation-only transaction on a multiplexed - // session. + // a mutation key, which again means that this is a mutation-only transaction on a + // multiplexed session. + span.addAnnotation( + "Transaction Creation Failed with ABORT. Retrying", + e.getCause() == null ? e : e.getCause()); createTxnAsync(res, mutation); return; }