Skip to content

Commit

Permalink
feat(spanner): Add DML, DQL, Mutation, Txn Actions and Utility method…
Browse files Browse the repository at this point in the history
…s for executor framework (#8976)

* feat(spanner): add code for executor framework

* feat(spanner): lint fix

* feat(spanner): check test flakiness

* feat(spanner): revert - check test flakiness

* feat(spanner): update context

* feat(spanner): add start and finish transaction actions

* feat(spanner): remove the fixed TODO

* feat(spanner): add input stream handling

* feat(spanner): code fixes
  • Loading branch information
harshachinta authored Nov 7, 2023
1 parent 1a16cbf commit ca76671
Show file tree
Hide file tree
Showing 12 changed files with 1,690 additions and 46 deletions.
64 changes: 64 additions & 0 deletions spanner/test/cloudexecutor/executor/actions/dml.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package actions

import (
"context"
"log"

"cloud.google.com/go/spanner"
"cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/outputstream"
"cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility"
executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// DmlActionHandler holds the necessary components required for DML action.
type DmlActionHandler struct {
Action *executorpb.DmlAction
FlowContext *ExecutionFlowContext
OutcomeSender *outputstream.OutcomeSender
}

// ExecuteAction executes a DML update action request, store the results in the outputstream.OutcomeSender.
func (h *DmlActionHandler) ExecuteAction(ctx context.Context) error {
h.FlowContext.mu.Lock()
defer h.FlowContext.mu.Unlock()
log.Printf("Executing dml update %s\n %v\n", h.FlowContext.transactionSeed, h.Action)
stmt, err := utility.BuildQuery(h.Action.GetUpdate())
if err != nil {
return h.OutcomeSender.FinishWithError(err)
}

var iter *spanner.RowIterator
txn, err := h.FlowContext.getTransactionForWrite()
if err != nil {
return h.OutcomeSender.FinishWithError(err)
}
h.OutcomeSender.InitForQuery()
iter = txn.Query(ctx, stmt)
defer iter.Stop()
log.Printf("Parsing DML result %s\n", h.FlowContext.transactionSeed)
err = processResults(iter, 0, h.OutcomeSender, h.FlowContext)
if err != nil {
if status.Code(err) == codes.Aborted {
return h.OutcomeSender.FinishWithTransactionRestarted()
}
return h.OutcomeSender.FinishWithError(err)
}
h.OutcomeSender.AppendDmlRowsModified(iter.RowCount)
return h.OutcomeSender.FinishSuccessfully()
}
166 changes: 166 additions & 0 deletions spanner/test/cloudexecutor/executor/actions/dql.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,179 @@
package actions

import (
"context"
"fmt"
"log"

"cloud.google.com/go/spanner"
"cloud.google.com/go/spanner/apiv1/spannerpb"
"cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/outputstream"
"cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility"
executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto"
"google.golang.org/api/iterator"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// ReadActionHandler holds the necessary components required for executorpb.ReadAction.
type ReadActionHandler struct {
Action *executorpb.ReadAction
FlowContext *ExecutionFlowContext
OutcomeSender *outputstream.OutcomeSender
}

// ExecuteAction executes a read action request, store the results in the OutcomeSender.
func (h *ReadActionHandler) ExecuteAction(ctx context.Context) error {
h.FlowContext.mu.Lock()
defer h.FlowContext.mu.Unlock()
log.Printf("Executing read %s:\n %v", h.FlowContext.transactionSeed, h.Action)
action := h.Action
var err error

var typeList []*spannerpb.Type
if action.Index != nil {
typeList, err = extractTypes(action.GetTable(), action.GetColumn(), h.FlowContext.tableMetadata)
if err != nil {
return h.OutcomeSender.FinishWithError(status.Error(codes.InvalidArgument, fmt.Sprintf("Can't extract types from metadata: %s", err)))
}
} else {
typeList, err = h.FlowContext.tableMetadata.GetKeyColumnTypes(action.GetTable())
if err != nil {
return h.OutcomeSender.FinishWithError(status.Error(codes.InvalidArgument, fmt.Sprintf("Can't extract types from metadata: %s", err)))
}
}

keySet, err := utility.KeySetProtoToCloudKeySet(action.GetKeys(), typeList)
if err != nil {
return h.OutcomeSender.FinishWithError(status.Error(codes.InvalidArgument, fmt.Sprintf("Can't convert rowSet: %s", err)))
}

var iter *spanner.RowIterator
if h.FlowContext.currentActiveTransaction == None {
return h.OutcomeSender.FinishWithError(status.Error(codes.InvalidArgument, "no active transaction"))
} else if h.FlowContext.currentActiveTransaction == Batch {
return h.OutcomeSender.FinishWithError(status.Error(codes.InvalidArgument, "can't execute regular read in a batch transaction"))
} else if h.FlowContext.currentActiveTransaction == Read {
txn, err := h.FlowContext.getTransactionForRead()
if err != nil {
return h.OutcomeSender.FinishWithError(err)
}
h.OutcomeSender.InitForRead(action.GetTable(), action.Index)
h.FlowContext.numPendingReads++
if action.Index != nil {
iter = txn.ReadUsingIndex(ctx, action.GetTable(), action.GetIndex(), keySet, action.GetColumn())
} else {
iter = txn.Read(ctx, action.GetTable(), keySet, action.GetColumn())
}
} else if h.FlowContext.currentActiveTransaction == ReadWrite {
txn, err := h.FlowContext.getTransactionForWrite()
if err != nil {
return h.OutcomeSender.FinishWithError(err)
}
h.OutcomeSender.InitForRead(action.GetTable(), action.Index)
h.FlowContext.numPendingReads++
if action.Index != nil {
iter = txn.ReadUsingIndex(ctx, action.GetTable(), action.GetIndex(), keySet, action.GetColumn())
} else {
iter = txn.Read(ctx, action.GetTable(), keySet, action.GetColumn())
}
}
defer iter.Stop()
log.Printf("Parsing read result %s\n", h.FlowContext.transactionSeed)
err = processResults(iter, int64(action.GetLimit()), h.OutcomeSender, h.FlowContext)
if err != nil {
h.FlowContext.finishRead(status.Code(err))
if status.Code(err) == codes.Aborted {
return h.OutcomeSender.FinishWithTransactionRestarted()
}
return h.OutcomeSender.FinishWithError(err)
}
h.FlowContext.finishRead(codes.OK)
return h.OutcomeSender.FinishSuccessfully()
}

// QueryActionHandler holds the necessary components required for executorpb.QueryAction.
type QueryActionHandler struct {
Action *executorpb.QueryAction
FlowContext *ExecutionFlowContext
OutcomeSender *outputstream.OutcomeSender
}

// ExecuteAction executes a query action request, store the results in the OutcomeSender.
func (h *QueryActionHandler) ExecuteAction(ctx context.Context) error {
h.FlowContext.mu.Lock()
defer h.FlowContext.mu.Unlock()
log.Printf("Executing query %s\n %v\n", h.FlowContext.transactionSeed, h.Action)
stmt, err := utility.BuildQuery(h.Action)
if err != nil {
return h.OutcomeSender.FinishWithError(err)
}

var iter *spanner.RowIterator
if h.FlowContext.currentActiveTransaction == None {
return h.OutcomeSender.FinishWithError(status.Error(codes.InvalidArgument, "no active transaction"))
} else if h.FlowContext.currentActiveTransaction == Batch {
return h.OutcomeSender.FinishWithError(status.Error(codes.InvalidArgument, "can't execute regular read in a batch transaction"))
} else if h.FlowContext.currentActiveTransaction == Read {
txn, err := h.FlowContext.getTransactionForRead()
if err != nil {
return h.OutcomeSender.FinishWithError(err)
}
h.OutcomeSender.InitForQuery()
h.FlowContext.numPendingReads++
iter = txn.Query(ctx, stmt)
} else if h.FlowContext.currentActiveTransaction == ReadWrite {
txn, err := h.FlowContext.getTransactionForWrite()
if err != nil {
return h.OutcomeSender.FinishWithError(err)
}
h.OutcomeSender.InitForQuery()
h.FlowContext.numPendingReads++
iter = txn.Query(ctx, stmt)
}
defer iter.Stop()
log.Printf("Parsing query result %s\n", h.FlowContext.transactionSeed)
err = processResults(iter, 0, h.OutcomeSender, h.FlowContext)
if err != nil {
h.FlowContext.finishRead(status.Code(err))
if status.Code(err) == codes.Aborted {
return h.OutcomeSender.FinishWithTransactionRestarted()
}
return h.OutcomeSender.FinishWithError(err)
}
h.FlowContext.finishRead(codes.OK)
return h.OutcomeSender.FinishSuccessfully()
}

// processResults processes a ResultSet from a read/query/dml and store the results in the OutcomeSender.
func processResults(iter *spanner.RowIterator, limit int64, outcomeSender *outputstream.OutcomeSender, flowContext *ExecutionFlowContext) error {
var rowCount int64 = 0
log.Printf("Iterating result set: %s\n", flowContext.transactionSeed)
for {
row, err := iter.Next()
if err == iterator.Done {
return nil
}
if err != nil {
return err
}
spannerRow, rowType, err := utility.ConvertSpannerRow(row)
if err != nil {
return err
}
outcomeSender.SetRowType(rowType)
// outcomeSender.rowType = rowType
err = outcomeSender.AppendRow(spannerRow)
if err != nil {
return err
}
rowCount++
if limit > 0 && rowCount >= limit {
log.Printf("Stopping at row limit: %d", limit)
break
}
}
log.Printf("Successfully processed result: %s\n", flowContext.transactionSeed)
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ package actions
import (
"context"
"log"
"strings"
"sync"
"time"

"cloud.google.com/go/spanner"
"cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility"
executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -122,3 +125,71 @@ func (c *ExecutionFlowContext) initReadState() {
c.readAborted = false
c.numPendingReads = 0
}

// clear clears the transaction related variables.
func (c *ExecutionFlowContext) clear() {
c.roTxn = nil
c.rwTxn = nil
c.tableMetadata = nil
}

// finish attempts to finish the transaction by either committing it or exiting without committing.
// In order to follow the ExecuteActions protocol, we must distinguish Spanner-generated errors
// (e.g. RowNotFound) and internal errors (e.g. a precondition is not met). Errors returned from
// Spanner populate the status of SpannerActionOutcome. Internal errors, in contrast, break the
// stubby call. For this reason, finish() has two return values dedicated to errors. If any of
// these errors is not nil, other return values are undefined.
// Return values in order:
// 1. Whether or not the transaction is restarted. It will be true if commit has been attempted,
// but Spanner returned aborted and restarted instead. When that happens, all reads and writes
// should be replayed, followed by another commit attempt.
// 2. Commit timestamp. It's returned only if commit has succeeded.
// 3. Spanner error -- an error that Spanner client returned.
// 4. Internal error.
func (c *ExecutionFlowContext) finish(ctx context.Context, txnFinishMode executorpb.FinishTransactionAction_Mode) (bool, *time.Time, error, error) {
if txnFinishMode == executorpb.FinishTransactionAction_COMMIT {
var err error
ts, err := c.rwTxn.Commit(ctx)
if err != nil {
log.Printf("Transaction finished with error %v", err)
if status.Code(err) == codes.Aborted {
log.Println("Transaction Aborted. Sending status to outcome sender to restart the transaction.")
return true, nil, nil, nil
}
// Filter expected errors
if status.Code(err) == codes.Unknown && strings.Contains(err.Error(), "Transaction outcome unknown") {
return false, nil, spanner.ToSpannerError(status.Error(codes.DeadlineExceeded, "Transaction outcome unknown")), nil
}
// TODO(harsha): check if this is an internal or spanner error
return false, nil, err, nil
}
return false, &ts, nil, nil
} else if txnFinishMode == executorpb.FinishTransactionAction_ABANDON {
log.Printf("Transaction Abandoned")
c.rwTxn.Rollback(ctx)
return false, nil, nil, nil
} else {
return false, nil, nil, spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "Unsupported finish mode %s", txnFinishMode.String()))
}
}

// CloseOpenTransactions cleans up all the active transactions if the stubby call is closing.
func (c *ExecutionFlowContext) CloseOpenTransactions() {
c.mu.Lock()
defer c.mu.Unlock()
if c.roTxn != nil {
log.Println("A read only transaction was active when stubby call closed.")
c.roTxn.Close()
}
if c.rwTxn != nil {
log.Println("Abandon a read-write transaction that was active when stubby call closed.")
_, _, _, err := c.finish(context.Background(), executorpb.FinishTransactionAction_ABANDON)
if err != nil {
log.Printf("Failed to abandon a read-write transaction: %v\n", err)
}
}
if c.batchTxn != nil {
log.Println("A batch transaction was active when stubby call closed.")
c.batchTxn.Close()
}
}
Loading

0 comments on commit ca76671

Please sign in to comment.