diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java index 3120e651feb2..372b0de3a6f2 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java @@ -135,7 +135,7 @@ public interface DatabaseClient { ReadOnlyTransaction singleUseReadOnlyTransaction(); /** -   * Returns a read-only transaction context in which a single read or query can be performed at the + * Returns a read-only transaction context in which a single read or query can be performed at * given timestamp bound. This method differs from {@link #singleUse(TimestampBound)} in that the * read timestamp used may be inspected after the read has returned data or finished successfully. * @@ -269,4 +269,53 @@ public interface DatabaseClient { * */ TransactionManager transactionManager(); + + /** + * Returns the lower bound of rows modified by this DML statement. + * + *

The method will block until the update is complete. Running a DML statement with this method + * does not offer exactly once semantics, and therfore the DML statement should be idempotent. The + * DML statement must be fully-partitionable. Specifically, the statement must be expressible as + * the union of many statements which each access only a single row of the table. This is a + * Partitioned DML transaction in which a single Partitioned DML statement is executed. + * Partitioned DML partitions the key space and runs the DML statement over each partition in + * parallel using separate, internal transactions that commit independently. Partitioned DML + * transactions do not need to be committed. + * + *

Partitioned DML updates are used to execute a single DML statement with a different + * execution strategy that provides different, and often better, scalability properties for large, + * table-wide operations than DML in a {@link #readWriteTransaction()} transaction. Smaller scoped + * statements, such as an OLTP workload, should prefer using {@link + * TransactionContext#executeUpdate(Statement)} with {@link #readWriteTransaction()}. + * + *

That said, Partitioned DML is not a drop-in replacement for standard DML used in {@link + * #readWriteTransaction()}.

+ * + * + * + *

Given the above, Partitioned DML is good fit for large, database-wide, operations that are + * idempotent, such as deleting old rows from a very large table. + */ + long executePartitionedUpdate(Statement stmt); } diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index b8c51849d5a6..46a90afac120 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -28,10 +28,11 @@ class DatabaseClientImpl implements DatabaseClient { private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction"; private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction"; + private static final String PARTITION_DML_TRANSACTION = "CloudSpanner.PartitionDMLTransaction"; private static final Tracer tracer = Tracing.getTracer(); static { - TraceUtil.exportSpans(READ_WRITE_TRANSACTION, READ_ONLY_TRANSACTION); + TraceUtil.exportSpans(READ_WRITE_TRANSACTION, READ_ONLY_TRANSACTION, PARTITION_DML_TRANSACTION); } private final SessionPool pool; @@ -155,6 +156,17 @@ public TransactionManager transactionManager() { } } + @Override + public long executePartitionedUpdate(Statement stmt) { + Span span = tracer.spanBuilder(PARTITION_DML_TRANSACTION).startSpan(); + try (Scope s = tracer.withSpan(span)) { + return pool.getReadWriteSession().executePartitionedUpdate(stmt); + } catch (RuntimeException e) { + TraceUtil.endSpanWithFailure(span, e); + throw e; + } + } + ListenableFuture closeAsync() { return pool.closeAsync(); } diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSet.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSet.java index d3359df6d24b..5d396d8ec730 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSet.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSet.java @@ -16,7 +16,9 @@ package com.google.cloud.spanner; +import com.google.cloud.spanner.Options.QueryOption; import com.google.spanner.v1.ResultSetStats; +import javax.annotation.Nullable; /** * Provides access to the data returned by a Cloud Spanner read or query. {@code ResultSet} allows a @@ -59,13 +61,17 @@ public interface ResultSet extends AutoCloseable, StructReader { @Override void close(); + /** * Returns the {@link ResultSetStats} for the query only if the query was executed in either the * {@code PLAN} or the {@code PROFILE} mode via the {@link ReadContext#analyzeQuery(Statement, - * com.google.cloud.spanner.ReadContext.QueryAnalyzeMode)} method. Attempts to call this method on - * a {@code ResultSet} not obtained from {@code analyzeQuery} result in an {@code - * UnsupportedOperationException}. This method must be called after {@link #next()} has - * returned @{code false}. Calling it before that will result in an {@code IllegalStateException}. + * com.google.cloud.spanner.ReadContext.QueryAnalyzeMode)} method or for DML statements in + * {@link ReadContext#executeQuery(Statement, QueryOption...)}. Attempts to call this method on + * a {@code ResultSet} not obtained from {@code analyzeQuery} or {@code executeQuery} will return + * a {@code null} {@code ResultSetStats}. This method must be called after {@link #next()} has + * returned @{code false}. Calling it before that will result in {@code null} + * {@code ResultSetStats} too. */ + @Nullable ResultSetStats getStats(); } diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index ced479151253..3aec9e8ce47b 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -312,6 +312,18 @@ public Timestamp write(Iterable mutations) throws SpannerException { } } + @Override + public long executePartitionedUpdate(Statement stmt) throws SpannerException { + try { + markUsed(); + return delegate.executePartitionedUpdate(stmt); + } catch (SpannerException e) { + throw lastException = e; + } finally { + close(); + } + } + @Override public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException { try { diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index b997103dbaf2..d89471a2b8de 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -102,6 +102,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -768,6 +769,13 @@ public String getName() { return options; } + @Override + public long executePartitionedUpdate(Statement stmt) { + setActive(null); + PartitionedDMLTransaction txn = new PartitionedDMLTransaction(this, rpc); + return txn.executePartitionedUpdate(stmt); + } + @Override public Timestamp write(Iterable mutations) throws SpannerException { TransactionRunner runner = readWriteTransaction(); @@ -957,6 +965,11 @@ abstract static class AbstractReadContext @GuardedBy("lock") private boolean isClosed = false; + + // A per-transaction sequence number used to identify this ExecuteSqlRequests. Required for DML, + // ignored for query by the server. + private AtomicLong seqNo = new AtomicLong(); + // Allow up to 512MB to be buffered (assuming 1MB chunks). In practice, restart tokens are sent // much more frequently. private static final int MAX_BUFFERED_CHUNKS = 512; @@ -973,6 +986,10 @@ private AbstractReadContext(SessionImpl session, SpannerRpc rpc, int defaultPref this.span = span; } + long getSeqNo() { + return seqNo.incrementAndGet(); + } + @Override public final ResultSet read( String table, KeySet keys, Iterable columns, ReadOption... options) { @@ -1033,17 +1050,14 @@ private ResultSet executeQueryInternal( statement, queryMode, readOptions, null /*partitionToken*/); } - ResultSet executeQueryInternalWithOptions( + ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder( Statement statement, - com.google.spanner.v1.ExecuteSqlRequest.QueryMode queryMode, - Options readOptions, - ByteString partitionToken) { - beforeReadOrQuery(); + QueryMode queryMode) { ExecuteSqlRequest.Builder builder = ExecuteSqlRequest.newBuilder() .setSql(statement.getSql()) .setQueryMode(queryMode) - .setSession(session.getName()); + .setSession(session.name); Map stmtParameters = statement.getParameters(); if (!stmtParameters.isEmpty()) { com.google.protobuf.Struct.Builder paramsBuilder = builder.getParamsBuilder(); @@ -1056,10 +1070,21 @@ ResultSet executeQueryInternalWithOptions( if (selector != null) { builder.setTransaction(selector); } + builder.setSeqno(getSeqNo()); + return builder; + } + + ResultSet executeQueryInternalWithOptions( + Statement statement, + com.google.spanner.v1.ExecuteSqlRequest.QueryMode queryMode, + Options readOptions, + ByteString partitionToken) { + beforeReadOrQuery(); + final ExecuteSqlRequest.Builder request = + getExecuteSqlRequestBuilder(statement, queryMode); if (partitionToken != null) { - builder.setPartitionToken(partitionToken); + request.setPartitionToken(partitionToken); } - final ExecuteSqlRequest request = builder.build(); final int prefetchChunks = readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks; ResumableStreamIterator stream = @@ -1067,13 +1092,11 @@ ResultSet executeQueryInternalWithOptions( @Override CloseableIterator startStream(@Nullable ByteString resumeToken) { GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks); + if (resumeToken != null) { + request.setResumeToken(resumeToken); + } SpannerRpc.StreamingCall call = - rpc.executeQuery( - resumeToken == null - ? request - : request.toBuilder().setResumeToken(resumeToken).build(), - stream.consumer(), - session.options); + rpc.executeQuery(request.build(), stream.consumer(), session.options); // We get one message for free. if (prefetchChunks > 1) { call.request(prefetchChunks - 1); @@ -1083,7 +1106,7 @@ CloseableIterator startStream(@Nullable ByteString resumeToken } }; return new GrpcResultSet(stream, this, queryMode); - } + } /** * Called before any read or query is started to perform state checks and initializations. @@ -1152,7 +1175,7 @@ ResultSet readInternalWithOptions( Options readOptions, ByteString partitionToken) { beforeReadOrQuery(); - ReadRequest.Builder builder = + final ReadRequest.Builder builder = ReadRequest.newBuilder() .setSession(session.name) .setTable(checkNotNull(table)) @@ -1172,7 +1195,6 @@ ResultSet readInternalWithOptions( if (partitionToken != null) { builder.setPartitionToken(partitionToken); } - final ReadRequest request = builder.build(); final int prefetchChunks = readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks; ResumableStreamIterator stream = @@ -1180,13 +1202,11 @@ ResultSet readInternalWithOptions( @Override CloseableIterator startStream(@Nullable ByteString resumeToken) { GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks); + if (resumeToken != null) { + builder.setResumeToken(resumeToken); + } SpannerRpc.StreamingCall call = - rpc.read( - resumeToken == null - ? request - : request.toBuilder().setResumeToken(resumeToken).build(), - stream.consumer(), - session.options); + rpc.read(builder.build(), stream.consumer(), session.options); // We get one message for free. if (prefetchChunks > 1) { call.request(prefetchChunks - 1); @@ -1362,6 +1382,83 @@ private void backoff(Context context, BackOff backoff) { } } + static class PartitionedDMLTransaction implements SessionTransaction { + private final ByteString transactionId; + private final SessionImpl session; + private final SpannerRpc rpc; + private volatile boolean isValid = true; + + PartitionedDMLTransaction( + SessionImpl session, + SpannerRpc rpc) { + this.session = session; + this.rpc = rpc; + this.transactionId = initTransaction(); + } + + ByteString initTransaction() { + final BeginTransactionRequest request = + BeginTransactionRequest.newBuilder() + .setSession(session.getName()) + .setOptions( + TransactionOptions.newBuilder() + .setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance())) + .build(); + Transaction txn = + runWithRetries( + new Callable() { + @Override + public Transaction call() throws Exception { + return rpc.beginTransaction(request, session.options); + } + }); + if (txn.getId().isEmpty()) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.INTERNAL, + "Failed to init transaction, missing transaction id\n" + session.getName()); + } + return txn.getId(); + } + + public long executePartitionedUpdate(Statement statement) { + checkState( + isValid, "Partitioned DML has been invalidated by a new operation on the session"); + final ExecuteSqlRequest.Builder builder = + ExecuteSqlRequest.newBuilder() + .setSql(statement.getSql()) + .setQueryMode(QueryMode.NORMAL) + .setSession(session.name) + .setTransaction(TransactionSelector.newBuilder().setId(transactionId).build()); + Map stmtParameters = statement.getParameters(); + if (!stmtParameters.isEmpty()) { + com.google.protobuf.Struct.Builder paramsBuilder = builder.getParamsBuilder(); + for (Map.Entry param : stmtParameters.entrySet()) { + paramsBuilder.putFields(param.getKey(), param.getValue().toProto()); + builder.putParamTypes(param.getKey(), param.getValue().getType().toProto()); + } + } + com.google.spanner.v1.ResultSet resultSet = + runWithRetries( + new Callable() { + @Override + public com.google.spanner.v1.ResultSet call() throws Exception { + return rpc.executeQuery(builder.build(), session.options); + } + }); + if (!resultSet.hasStats()) { + throw new IllegalArgumentException( + "Partitioned DML response missing stats possibly due to non-DML statement as input"); + } + // For partitioned DML, using the row count lower bound. + return resultSet.getStats().getRowCountLowerBound(); + } + + @Override + public void invalidate() { + isValid = false; + } + } + @VisibleForTesting static class TransactionContextImpl extends AbstractReadContext implements TransactionContext { @GuardedBy("lock") @@ -1536,6 +1633,29 @@ public void buffer(Iterable mutations) { } } } + + @Override + public long executeUpdate(Statement statement) { + beforeReadOrQuery(); + final ExecuteSqlRequest.Builder builder = + getExecuteSqlRequestBuilder( + statement, + QueryMode.NORMAL); + com.google.spanner.v1.ResultSet resultSet = + runWithRetries( + new Callable() { + @Override + public com.google.spanner.v1.ResultSet call() throws Exception { + return rpc.executeQuery(builder.build(), session.options); + } + }); + if (!resultSet.hasStats()) { + throw new IllegalArgumentException( + "DML response missing stats possibly due to non-DML statement as input"); + } + // For standard DML, using the exact row count. + return resultSet.getStats().getRowCountExact(); + } } /** @@ -1901,7 +2021,7 @@ public boolean next() throws SpannerException { currRow = new GrpcStruct(iterator.type(), new ArrayList<>()); } boolean hasNext = currRow.consumeRow(iterator); - if (queryMode != QueryMode.NORMAL && !hasNext) { + if (!hasNext) { statistics = iterator.getStats(); } return hasNext; @@ -1911,13 +2031,8 @@ public boolean next() throws SpannerException { } @Override + @Nullable public ResultSetStats getStats() { - if (queryMode == QueryMode.NORMAL) { - throw new UnsupportedOperationException( - "ResultSetStats are available only in PLAN and PROFILE execution modes"); - } - checkState( - statistics != null, "ResultSetStats requested before consuming the entire ResultSet"); return statistics; } @@ -2637,13 +2752,10 @@ ResultSetMetadata getMetadata() throws SpannerException { /* * Get the query statistics. Query statistics are delivered with the last PartialResultSet * in the stream. Any attempt to call this method before the caller has finished consuming the - * results will throw an exception. + * results will return null. */ - ResultSetStats getStats() { - if (statistics == null) { - throw newSpannerException( - ErrorCode.INTERNAL, "Stream closed without sending query statistics"); - } + @Nullable + ResultSetStats getStats() { return statistics; } diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionContext.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionContext.java index 540d9a276772..5ddefcf73728 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionContext.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionContext.java @@ -94,4 +94,11 @@ public interface TransactionContext extends ReadContext { * mutations will be applied atomically. */ void buffer(Iterable mutations); + + /** + * Executes the DML statement(s) and returns the number of rows modified. For non-DML statements, + * it will result in an {@code IllegalArgumentException}. The effects of the DML statement + * will be visible to subsequent operations in the transaction. + */ + long executeUpdate(Statement statement); } diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java index e983ef6303f9..0cbdf614d45e 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java @@ -66,6 +66,7 @@ import com.google.spanner.v1.PartitionReadRequest; import com.google.spanner.v1.PartitionResponse; import com.google.spanner.v1.ReadRequest; +import com.google.spanner.v1.ResultSet; import com.google.spanner.v1.RollbackRequest; import com.google.spanner.v1.Session; import com.google.spanner.v1.SpannerGrpc; @@ -377,6 +378,17 @@ public StreamingCall read( Option.CHANNEL_HINT.getLong(options)); } + @Override + public ResultSet executeQuery( + ExecuteSqlRequest request, @Nullable Map options) { + return get( + doUnaryCall( + SpannerGrpc.METHOD_EXECUTE_SQL, + request, + request.getSession(), + Option.CHANNEL_HINT.getLong(options))); + } + @Override public StreamingCall executeQuery( ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map options) { diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java index 9d7066c5e55c..c07a80af8ae8 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java @@ -34,6 +34,7 @@ import com.google.spanner.v1.PartitionReadRequest; import com.google.spanner.v1.PartitionResponse; import com.google.spanner.v1.ReadRequest; +import com.google.spanner.v1.ResultSet; import com.google.spanner.v1.RollbackRequest; import com.google.spanner.v1.Session; import com.google.spanner.v1.Transaction; @@ -200,6 +201,8 @@ Session createSession(String databaseName, @Nullable Map labels, StreamingCall read( ReadRequest request, ResultStreamConsumer consumer, @Nullable Map options); + ResultSet executeQuery(ExecuteSqlRequest request, @Nullable Map options); + StreamingCall executeQuery( ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map options); diff --git a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java index 1d0a6683401d..048b5a9550b9 100644 --- a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java +++ b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java @@ -500,7 +500,7 @@ public void planResult() { } @Test - public void statsUnavailableError() { + public void statsUnavailable() { ResultSetStats stats = ResultSetStats.newBuilder().build(); consumer.onPartialResultSet( PartialResultSet.newBuilder() @@ -510,27 +510,7 @@ public void statsUnavailableError() { .build()); resultSet = resultSetWithMode(QueryMode.PROFILE); consumer.onCompleted(); - expectedException.expect(IllegalStateException.class); - expectedException.expectMessage( - "ResultSetStats requested before consuming the entire ResultSet"); - resultSet.getStats(); - } - - @Test - public void statsNotSupportedError() { - ResultSetStats stats = ResultSetStats.newBuilder().build(); - consumer.onPartialResultSet( - PartialResultSet.newBuilder() - .setMetadata(makeMetadata(Type.struct(new ArrayList()))) - .setChunkedValue(false) - .setStats(stats) - .build()); - resultSet = resultSetWithMode(QueryMode.NORMAL); - consumer.onCompleted(); - expectedException.expect(UnsupportedOperationException.class); - expectedException.expectMessage( - "ResultSetStats are available only in PLAN and PROFILE execution modes"); - resultSet.getStats(); + assertThat(resultSet.getStats()).isNull(); } private void doArrayTest( diff --git a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDMLTest.java b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDMLTest.java new file mode 100644 index 000000000000..35ba83e93203 --- /dev/null +++ b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDMLTest.java @@ -0,0 +1,298 @@ +/* + * Copyright 2017 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.it; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; + +import com.google.cloud.spanner.Database; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.ErrorCode; +import com.google.cloud.spanner.IntegrationTest; +import com.google.cloud.spanner.IntegrationTestEnv; +import com.google.cloud.spanner.Key; +import com.google.cloud.spanner.KeyRange; +import com.google.cloud.spanner.KeySet; +import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.SpannerException; +import com.google.cloud.spanner.Statement; +import com.google.cloud.spanner.TimestampBound; +import com.google.cloud.spanner.TransactionContext; +import com.google.cloud.spanner.TransactionRunner; +import com.google.cloud.spanner.TransactionRunner.TransactionCallable; +import java.util.Arrays; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Integration tests for DML. */ +@Category(IntegrationTest.class) +@RunWith(JUnit4.class) +public final class ITDMLTest { + @ClassRule public static IntegrationTestEnv env = new IntegrationTestEnv(); + private static Database db; + private static DatabaseClient client; + /** Sequence for assigning unique keys to test cases. */ + private static int seq; + + private static final String INSERT_DML = + "INSERT INTO T (k, v) VALUES ('boo1', 1), ('boo2', 2), ('boo3', 3), ('boo4', 4);"; + private static final String UPDATE_DML = "UPDATE T SET T.V = 100 WHERE T.K LIKE 'boo%';"; + private static final String DELETE_DML = "DELETE FROM T WHERE T.K like 'boo%';"; + private static final long DML_COUNT = 4; + + @BeforeClass + public static void setUpDatabase() { + db = + env.getTestHelper() + .createTestDatabase( + "CREATE TABLE T (" + + " K STRING(MAX) NOT NULL," + + " V INT64," + + ") PRIMARY KEY (K)"); + client = env.getTestHelper().getDatabaseClient(db); + } + + private static String uniqueKey() { + return "k" + seq++; + } + + private void executeUpdate(long expectedCount, final String... stmts) { + final TransactionCallable callable = + new TransactionCallable() { + @Override + public Long run(TransactionContext transaction) { + long rowCount = 0; + for (String stmt : stmts) { + rowCount += transaction.executeUpdate(Statement.of(stmt)); + } + return rowCount; + } + }; + TransactionRunner runner = client.readWriteTransaction(); + Long rowCount = runner.run(callable); + assertThat(rowCount).isEqualTo(expectedCount); + } + + @Test + public void partitionedDML() { + executeUpdate(DML_COUNT, INSERT_DML); + assertThat( + client + .singleUse(TimestampBound.strong()) + .readRow("T", Key.of("boo1"), Arrays.asList("V")) + .getLong(0)) + .isEqualTo(1); + + long rowCount = client.executePartitionedUpdate(Statement.of(UPDATE_DML)); + // Note: With PDML there is a possibility of network replay or partial update to occur, causing + // this assert to fail. We should remove this assert if it is a recurring failure in IT tests. + assertThat(rowCount).isEqualTo(DML_COUNT); + assertThat( + client + .singleUse(TimestampBound.strong()) + .readRow("T", Key.of("boo1"), Arrays.asList("V")) + .getLong(0)) + .isEqualTo(100); + + rowCount = client.executePartitionedUpdate(Statement.of(DELETE_DML)); + assertThat(rowCount).isEqualTo(DML_COUNT); + assertThat( + client + .singleUse(TimestampBound.strong()) + .readRow("T", Key.of("boo1"), Arrays.asList("V"))) + .isNull(); + } + + @Test + public void standardDML() { + executeUpdate(DML_COUNT, INSERT_DML); + assertThat( + client + .singleUse(TimestampBound.strong()) + .readRow("T", Key.of("boo1"), Arrays.asList("V")) + .getLong(0)) + .isEqualTo(1); + executeUpdate(DML_COUNT, UPDATE_DML); + assertThat( + client + .singleUse(TimestampBound.strong()) + .readRow("T", Key.of("boo1"), Arrays.asList("V")) + .getLong(0)) + .isEqualTo(100); + executeUpdate(DML_COUNT, DELETE_DML); + assertThat( + client + .singleUse(TimestampBound.strong()) + .readRow("T", Key.of("boo1"), Arrays.asList("V"))) + .isNull(); + } + + @Test + public void standardDMLWithError() { + try { + executeUpdate(0, "SELECT * FROM T;"); + fail("Expected illegal argument exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.UNKNOWN); + assertThat(e.getMessage()) + .contains("DML response missing stats possibly due to non-DML statement as input"); + assertThat(e.getCause()).isInstanceOf(IllegalArgumentException.class); + } + } + + @Test + public void standardDMLWithDuplicates() { + executeUpdate(DML_COUNT, INSERT_DML); + + executeUpdate( + 4, + "UPDATE T SET v = 200 WHERE k = 'boo1';", + "UPDATE T SET v = 300 WHERE k = 'boo1';", + "UPDATE T SET v = 400 WHERE k = 'boo1';", + "UPDATE T SET v = 500 WHERE k = 'boo1';"); + assertThat( + client + .singleUse(TimestampBound.strong()) + .readRow("T", Key.of("boo1"), Arrays.asList("V")) + .getLong(0)) + .isEqualTo(500); + + executeUpdate(DML_COUNT, DELETE_DML, DELETE_DML); + } + + @Test + public void standardDMLReadYourWrites() { + executeUpdate(DML_COUNT, INSERT_DML); + + final TransactionCallable callable = + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) { + long rowCount = + transaction.executeUpdate(Statement.of("UPDATE T SET v = v * 2 WHERE k = 'boo2';")); + assertThat(rowCount).isEqualTo(1); + assertThat(transaction.readRow("T", Key.of("boo2"), Arrays.asList("v")).getLong(0)) + .isEqualTo(2 * 2); + return null; + } + }; + TransactionRunner runner = client.readWriteTransaction(); + runner.run(callable); + + executeUpdate(DML_COUNT, DELETE_DML); + } + + @Test + public void standardDMLRollback() { + class UserException extends Exception { + UserException(String message) { + super(message); + } + } + final TransactionCallable callable = + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws UserException { + long rowCount = transaction.executeUpdate(Statement.of(INSERT_DML)); + assertThat(rowCount).isEqualTo(DML_COUNT); + throw new UserException("failing to commit"); + } + }; + + try { + TransactionRunner runner = client.readWriteTransaction(); + runner.run(callable); + fail("Expected user exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.UNKNOWN); + assertThat(e.getMessage()).contains("failing to commit"); + assertThat(e.getCause()).isInstanceOf(UserException.class); + } + + ResultSet resultSet = + client + .singleUse(TimestampBound.strong()) + .read("T", KeySet.range(KeyRange.prefix(Key.of("boo"))), Arrays.asList("K")); + assertThat(resultSet.next()).isFalse(); + } + + @Test + public void standardDMLAndMutations() { + final String key1 = uniqueKey(); + final String key2 = uniqueKey(); + final TransactionCallable callable = + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) { + // DML + long rowCount = + transaction.executeUpdate( + Statement.of("INSERT INTO T (k, v) VALUES ('" + key1 + "', 1)")); + assertThat(rowCount).isEqualTo(1); + + // Mutations + transaction.buffer( + Mutation.newInsertOrUpdateBuilder("T").set("K").to(key2).set("V").to(2).build()); + return null; + } + }; + TransactionRunner runner = client.readWriteTransaction(); + runner.run(callable); + + KeySet.Builder keys = KeySet.newBuilder(); + keys.addKey(Key.of(key1)).addKey(Key.of(key2)); + ResultSet resultSet = + client.singleUse(TimestampBound.strong()).read("T", keys.build(), Arrays.asList("K")); + int rowCount = 0; + while (resultSet.next()) { + rowCount++; + } + assertThat(rowCount).isEqualTo(2); + } + + private void executeQuery(long expectedCount, final String... stmts) { + final TransactionCallable callable = + new TransactionCallable() { + @Override + public Long run(TransactionContext transaction) { + long rowCount = 0; + for (final String stmt : stmts) { + ResultSet resultSet = transaction.executeQuery(Statement.of(stmt)); + assertThat(resultSet.next()).isFalse(); + assertThat(resultSet.getStats()).isNotNull(); + rowCount += resultSet.getStats().getRowCountExact(); + } + return rowCount; + } + }; + TransactionRunner runner = client.readWriteTransaction(); + Long rowCount = runner.run(callable); + assertThat(rowCount).isEqualTo(expectedCount); + } + + @Test + public void standardDMLWithExecuteSQL() { + executeQuery(DML_COUNT, INSERT_DML); + // checks for multi-stmts within a txn, therefore also verifying seqNo. + executeQuery(DML_COUNT * 2, UPDATE_DML, DELETE_DML); + } +}