Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): Add DML, DQL, Mutation, Txn Actions and Utility methods for executor framework #8976

Merged
64 changes: 64 additions & 0 deletions spanner/test/cloudexecutor/executor/actions/dml.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package actions

import (
"context"
"log"

"cloud.google.com/go/spanner"
"cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/outputstream"
"cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility"
executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// DmlActionHandler holds the necessary components required for DML action.
type DmlActionHandler struct {
Action *executorpb.DmlAction
FlowContext *ExecutionFlowContext
OutcomeSender *outputstream.OutcomeSender
}

// ExecuteAction executes a DML update action request, store the results in the outputstream.OutcomeSender.
func (h *DmlActionHandler) ExecuteAction(ctx context.Context) error {
h.FlowContext.mu.Lock()
defer h.FlowContext.mu.Unlock()
log.Printf("Executing dml update %s\n %v\n", h.FlowContext.transactionSeed, h.Action)
stmt, err := utility.BuildQuery(h.Action.GetUpdate())
if err != nil {
return h.OutcomeSender.FinishWithError(err)
}

var iter *spanner.RowIterator
txn, err := h.FlowContext.getTransactionForWrite()
if err != nil {
return h.OutcomeSender.FinishWithError(err)
}
h.OutcomeSender.InitForQuery()
iter = txn.Query(ctx, stmt)
defer iter.Stop()
log.Printf("Parsing DML result %s\n", h.FlowContext.transactionSeed)
err = processResults(iter, 0, h.OutcomeSender, h.FlowContext)
if err != nil {
if status.Code(err) == codes.Aborted {
return h.OutcomeSender.FinishWithTransactionRestarted()
}
return h.OutcomeSender.FinishWithError(err)
}
h.OutcomeSender.AppendDmlRowsModified(iter.RowCount)
return h.OutcomeSender.FinishSuccessfully()
}
166 changes: 166 additions & 0 deletions spanner/test/cloudexecutor/executor/actions/dql.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,179 @@
package actions

import (
"context"
"fmt"
"log"

"cloud.google.com/go/spanner"
"cloud.google.com/go/spanner/apiv1/spannerpb"
"cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/outputstream"
"cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility"
executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto"
"google.golang.org/api/iterator"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// ReadActionHandler holds the necessary components required for executorpb.ReadAction.
type ReadActionHandler struct {
Action *executorpb.ReadAction
FlowContext *ExecutionFlowContext
OutcomeSender *outputstream.OutcomeSender
}

// ExecuteAction executes a read action request, store the results in the OutcomeSender.
func (h *ReadActionHandler) ExecuteAction(ctx context.Context) error {
h.FlowContext.mu.Lock()
defer h.FlowContext.mu.Unlock()
log.Printf("Executing read %s:\n %v", h.FlowContext.transactionSeed, h.Action)
action := h.Action
var err error

var typeList []*spannerpb.Type
if action.Index != nil {
typeList, err = extractTypes(action.GetTable(), action.GetColumn(), h.FlowContext.tableMetadata)
if err != nil {
return h.OutcomeSender.FinishWithError(status.Error(codes.InvalidArgument, fmt.Sprintf("Can't extract types from metadata: %s", err)))
}
} else {
typeList, err = h.FlowContext.tableMetadata.GetKeyColumnTypes(action.GetTable())
if err != nil {
return h.OutcomeSender.FinishWithError(status.Error(codes.InvalidArgument, fmt.Sprintf("Can't extract types from metadata: %s", err)))
}
}

keySet, err := utility.KeySetProtoToCloudKeySet(action.GetKeys(), typeList)
if err != nil {
return h.OutcomeSender.FinishWithError(status.Error(codes.InvalidArgument, fmt.Sprintf("Can't convert rowSet: %s", err)))
}

var iter *spanner.RowIterator
if h.FlowContext.currentActiveTransaction == None {
return h.OutcomeSender.FinishWithError(status.Error(codes.InvalidArgument, "no active transaction"))
} else if h.FlowContext.currentActiveTransaction == Batch {
return h.OutcomeSender.FinishWithError(status.Error(codes.InvalidArgument, "can't execute regular read in a batch transaction"))
} else if h.FlowContext.currentActiveTransaction == Read {
txn, err := h.FlowContext.getTransactionForRead()
if err != nil {
return h.OutcomeSender.FinishWithError(err)
}
h.OutcomeSender.InitForRead(action.GetTable(), action.Index)
h.FlowContext.numPendingReads++
if action.Index != nil {
iter = txn.ReadUsingIndex(ctx, action.GetTable(), action.GetIndex(), keySet, action.GetColumn())
} else {
iter = txn.Read(ctx, action.GetTable(), keySet, action.GetColumn())
}
} else if h.FlowContext.currentActiveTransaction == ReadWrite {
txn, err := h.FlowContext.getTransactionForWrite()
if err != nil {
return h.OutcomeSender.FinishWithError(err)
}
h.OutcomeSender.InitForRead(action.GetTable(), action.Index)
h.FlowContext.numPendingReads++
if action.Index != nil {
iter = txn.ReadUsingIndex(ctx, action.GetTable(), action.GetIndex(), keySet, action.GetColumn())
} else {
iter = txn.Read(ctx, action.GetTable(), keySet, action.GetColumn())
}
}
defer iter.Stop()
log.Printf("Parsing read result %s\n", h.FlowContext.transactionSeed)
err = processResults(iter, int64(action.GetLimit()), h.OutcomeSender, h.FlowContext)
if err != nil {
h.FlowContext.finishRead(status.Code(err))
if status.Code(err) == codes.Aborted {
return h.OutcomeSender.FinishWithTransactionRestarted()
}
return h.OutcomeSender.FinishWithError(err)
}
h.FlowContext.finishRead(codes.OK)
return h.OutcomeSender.FinishSuccessfully()
}

// QueryActionHandler holds the necessary components required for executorpb.QueryAction.
type QueryActionHandler struct {
Action *executorpb.QueryAction
FlowContext *ExecutionFlowContext
OutcomeSender *outputstream.OutcomeSender
}

// ExecuteAction executes a query action request, store the results in the OutcomeSender.
func (h *QueryActionHandler) ExecuteAction(ctx context.Context) error {
h.FlowContext.mu.Lock()
defer h.FlowContext.mu.Unlock()
log.Printf("Executing query %s\n %v\n", h.FlowContext.transactionSeed, h.Action)
stmt, err := utility.BuildQuery(h.Action)
if err != nil {
return h.OutcomeSender.FinishWithError(err)
}

var iter *spanner.RowIterator
if h.FlowContext.currentActiveTransaction == None {
return h.OutcomeSender.FinishWithError(status.Error(codes.InvalidArgument, "no active transaction"))
} else if h.FlowContext.currentActiveTransaction == Batch {
return h.OutcomeSender.FinishWithError(status.Error(codes.InvalidArgument, "can't execute regular read in a batch transaction"))
} else if h.FlowContext.currentActiveTransaction == Read {
txn, err := h.FlowContext.getTransactionForRead()
if err != nil {
return h.OutcomeSender.FinishWithError(err)
}
h.OutcomeSender.InitForQuery()
h.FlowContext.numPendingReads++
iter = txn.Query(ctx, stmt)
} else if h.FlowContext.currentActiveTransaction == ReadWrite {
txn, err := h.FlowContext.getTransactionForWrite()
if err != nil {
return h.OutcomeSender.FinishWithError(err)
}
h.OutcomeSender.InitForQuery()
h.FlowContext.numPendingReads++
iter = txn.Query(ctx, stmt)
}
defer iter.Stop()
log.Printf("Parsing query result %s\n", h.FlowContext.transactionSeed)
err = processResults(iter, 0, h.OutcomeSender, h.FlowContext)
if err != nil {
h.FlowContext.finishRead(status.Code(err))
if status.Code(err) == codes.Aborted {
return h.OutcomeSender.FinishWithTransactionRestarted()
}
return h.OutcomeSender.FinishWithError(err)
}
h.FlowContext.finishRead(codes.OK)
return h.OutcomeSender.FinishSuccessfully()
}

// processResults processes a ResultSet from a read/query/dml and store the results in the OutcomeSender.
func processResults(iter *spanner.RowIterator, limit int64, outcomeSender *outputstream.OutcomeSender, flowContext *ExecutionFlowContext) error {
var rowCount int64 = 0
log.Printf("Iterating result set: %s\n", flowContext.transactionSeed)
for {
row, err := iter.Next()
if err == iterator.Done {
return nil
}
if err != nil {
return err
}
spannerRow, rowType, err := utility.ConvertSpannerRow(row)
if err != nil {
return err
}
outcomeSender.SetRowType(rowType)
// outcomeSender.rowType = rowType
err = outcomeSender.AppendRow(spannerRow)
if err != nil {
return err
}
rowCount++
if limit > 0 && rowCount >= limit {
log.Printf("Stopping at row limit: %d", limit)
break
}
}
log.Printf("Successfully processed result: %s\n", flowContext.transactionSeed)
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ package actions
import (
"context"
"log"
"strings"
"sync"
"time"

"cloud.google.com/go/spanner"
"cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility"
executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -122,3 +125,71 @@ func (c *ExecutionFlowContext) initReadState() {
c.readAborted = false
c.numPendingReads = 0
}

// clear clears the transaction related variables.
func (c *ExecutionFlowContext) clear() {
c.roTxn = nil
c.rwTxn = nil
c.tableMetadata = nil
}

// finish attempts to finish the transaction by either committing it or exiting without committing.
// In order to follow the ExecuteActions protocol, we must distinguish Spanner-generated errors
// (e.g. RowNotFound) and internal errors (e.g. a precondition is not met). Errors returned from
// Spanner populate the status of SpannerActionOutcome. Internal errors, in contrast, break the
// stubby call. For this reason, finish() has two return values dedicated to errors. If any of
// these errors is not nil, other return values are undefined.
// Return values in order:
// 1. Whether or not the transaction is restarted. It will be true if commit has been attempted,
// but Spanner returned aborted and restarted instead. When that happens, all reads and writes
// should be replayed, followed by another commit attempt.
// 2. Commit timestamp. It's returned only if commit has succeeded.
// 3. Spanner error -- an error that Spanner client returned.
// 4. Internal error.
func (c *ExecutionFlowContext) finish(ctx context.Context, txnFinishMode executorpb.FinishTransactionAction_Mode) (bool, *time.Time, error, error) {
if txnFinishMode == executorpb.FinishTransactionAction_COMMIT {
var err error
ts, err := c.rwTxn.Commit(ctx)
if err != nil {
log.Printf("Transaction finished with error %v", err)
if status.Code(err) == codes.Aborted {
log.Println("Transaction Aborted. Sending status to outcome sender to restart the transaction.")
return true, nil, nil, nil
}
// Filter expected errors
if status.Code(err) == codes.Unknown && strings.Contains(err.Error(), "Transaction outcome unknown") {
return false, nil, spanner.ToSpannerError(status.Error(codes.DeadlineExceeded, "Transaction outcome unknown")), nil
}
// TODO(harsha): check if this is an internal or spanner error
return false, nil, err, nil
}
return false, &ts, nil, nil
} else if txnFinishMode == executorpb.FinishTransactionAction_ABANDON {
log.Printf("Transaction Abandoned")
c.rwTxn.Rollback(ctx)
return false, nil, nil, nil
} else {
return false, nil, nil, spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "Unsupported finish mode %s", txnFinishMode.String()))
}
}

// CloseOpenTransactions cleans up all the active transactions if the stubby call is closing.
func (c *ExecutionFlowContext) CloseOpenTransactions() {
c.mu.Lock()
defer c.mu.Unlock()
if c.roTxn != nil {
log.Println("A read only transaction was active when stubby call closed.")
c.roTxn.Close()
}
if c.rwTxn != nil {
log.Println("Abandon a read-write transaction that was active when stubby call closed.")
_, _, _, err := c.finish(context.Background(), executorpb.FinishTransactionAction_ABANDON)
if err != nil {
log.Printf("Failed to abandon a read-write transaction: %v\n", err)
}
}
if c.batchTxn != nil {
log.Println("A batch transaction was active when stubby call closed.")
c.batchTxn.Close()
}
}
Loading
Loading