Skip to content

Commit

Permalink
feat(spanner): BatchWrite (#8652)
Browse files Browse the repository at this point in the history
* feat(spanner): BatchWrite

* vet: apply recommendations

* Update comment

Co-authored-by: Sri Harsha CH <[email protected]>

* Add BatchWriteOptions

* remove redundant int conversion

---------

Co-authored-by: Sri Harsha CH <[email protected]>
  • Loading branch information
Sunny Singh and harshachinta authored Oct 27, 2023
1 parent aa41de6 commit 507d232
Show file tree
Hide file tree
Showing 7 changed files with 815 additions and 0 deletions.
219 changes: 219 additions & 0 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package spanner
import (
"context"
"fmt"
"io"
"log"
"os"
"regexp"
"time"

"cloud.google.com/go/internal/trace"
sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
gtransport "google.golang.org/api/transport/grpc"
Expand Down Expand Up @@ -97,6 +100,7 @@ type Client struct {
ro ReadOptions
ao []ApplyOption
txo TransactionOptions
bwo BatchWriteOptions
ct *commonTags
disableRouteToLeader bool
}
Expand Down Expand Up @@ -138,6 +142,9 @@ type ClientConfig struct {
// TransactionOptions is the configuration for a transaction.
TransactionOptions TransactionOptions

// BatchWriteOptions is the configuration for a BatchWrite request.
BatchWriteOptions BatchWriteOptions

// CallOptions is the configuration for providing custom retry settings that
// override the default values.
CallOptions *vkit.CallOptions
Expand Down Expand Up @@ -281,6 +288,7 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
ro: config.ReadOptions,
ao: config.ApplyOptions,
txo: config.TransactionOptions,
bwo: config.BatchWriteOptions,
ct: getCommonTags(sc),
disableRouteToLeader: config.DisableRouteToLeader,
}
Expand Down Expand Up @@ -669,6 +677,217 @@ func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption)
return t.applyAtLeastOnce(ctx, ms...)
}

// BatchWriteOptions provides options for a BatchWriteRequest.
type BatchWriteOptions struct {
// Priority is the RPC priority to use for this request.
Priority sppb.RequestOptions_Priority

// The transaction tag to use for this request.
TransactionTag string
}

// merge combines two BatchWriteOptions such that the input parameter will have higher
// order of precedence.
func (bwo BatchWriteOptions) merge(opts BatchWriteOptions) BatchWriteOptions {
merged := BatchWriteOptions{
TransactionTag: bwo.TransactionTag,
Priority: bwo.Priority,
}
if opts.TransactionTag != "" {
merged.TransactionTag = opts.TransactionTag
}
if opts.Priority != sppb.RequestOptions_PRIORITY_UNSPECIFIED {
merged.Priority = opts.Priority
}
return merged
}

// BatchWriteResponseIterator is an iterator over BatchWriteResponse structures returned from BatchWrite RPC.
type BatchWriteResponseIterator struct {
ctx context.Context
stream sppb.Spanner_BatchWriteClient
err error
dataReceived bool
replaceSession func(ctx context.Context) error
rpc func(ctx context.Context) (sppb.Spanner_BatchWriteClient, error)
release func(error)
cancel func()
}

// Next returns the next result. Its second return value is iterator.Done if
// there are no more results. Once Next returns Done, all subsequent calls
// will return Done.
func (r *BatchWriteResponseIterator) Next() (*sppb.BatchWriteResponse, error) {
for {
// Stream finished or in error state.
if r.err != nil {
return nil, r.err
}

// RPC not made yet.
if r.stream == nil {
r.stream, r.err = r.rpc(r.ctx)
continue
}

// Read from the stream.
var response *sppb.BatchWriteResponse
response, r.err = r.stream.Recv()

// Return an item.
if r.err == nil {
r.dataReceived = true
return response, nil
}

// Stream finished.
if r.err == io.EOF {
r.err = iterator.Done
return nil, r.err
}

// Retry request on session not found error only if no data has been received before.
if !r.dataReceived && r.replaceSession != nil && isSessionNotFoundError(r.err) {
r.err = r.replaceSession(r.ctx)
r.stream = nil
}
}
}

// Stop terminates the iteration. It should be called after you finish using the
// iterator.
func (r *BatchWriteResponseIterator) Stop() {
if r.stream != nil {
err := r.err
if err == iterator.Done {
err = nil
}
defer trace.EndSpan(r.ctx, err)
}
if r.cancel != nil {
r.cancel()
r.cancel = nil
}
if r.release != nil {
r.release(r.err)
r.release = nil
}
if r.err == nil {
r.err = spannerErrorf(codes.FailedPrecondition, "Next called after Stop")
}
}

// Do calls the provided function once in sequence for each item in the
// iteration. If the function returns a non-nil error, Do immediately returns
// that error.
//
// If there are no items in the iterator, Do will return nil without calling the
// provided function.
//
// Do always calls Stop on the iterator.
func (r *BatchWriteResponseIterator) Do(f func(r *sppb.BatchWriteResponse) error) error {
defer r.Stop()
for {
row, err := r.Next()
switch err {
case iterator.Done:
return nil
case nil:
if err = f(row); err != nil {
return err
}
default:
return err
}
}
}

// BatchWrite applies a list of mutation groups in a collection of efficient
// transactions. The mutation groups are applied non-atomically in an
// unspecified order and thus, they must be independent of each other. Partial
// failure is possible, i.e., some mutation groups may have been applied
// successfully, while some may have failed. The results of individual batches
// are streamed into the response as the batches are applied.
//
// BatchWrite requests are not replay protected, meaning that each mutation
// group may be applied more than once. Replays of non-idempotent mutations
// may have undesirable effects. For example, replays of an insert mutation
// may produce an already exists error or if you use generated or commit
// timestamp-based keys, it may result in additional rows being added to the
// mutation's table. We recommend structuring your mutation groups to be
// idempotent to avoid this issue.
func (c *Client) BatchWrite(ctx context.Context, mgs []*MutationGroup) *BatchWriteResponseIterator {
return c.BatchWriteWithOptions(ctx, mgs, BatchWriteOptions{})
}

// BatchWriteWithOptions is same as BatchWrite. It accepts additional options to customize the request.
func (c *Client) BatchWriteWithOptions(ctx context.Context, mgs []*MutationGroup, opts BatchWriteOptions) *BatchWriteResponseIterator {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchWrite")

var err error
defer func() {
trace.EndSpan(ctx, err)
}()

opts = c.bwo.merge(opts)

mgsPb, err := mutationGroupsProto(mgs)
if err != nil {
return &BatchWriteResponseIterator{err: err}
}

var sh *sessionHandle
sh, err = c.idleSessions.take(ctx)
if err != nil {
return &BatchWriteResponseIterator{err: err}
}

rpc := func(ct context.Context) (sppb.Spanner_BatchWriteClient, error) {
var md metadata.MD
stream, rpcErr := sh.getClient().BatchWrite(contextWithOutgoingMetadata(ct, sh.getMetadata(), c.disableRouteToLeader), &sppb.BatchWriteRequest{
Session: sh.getID(),
MutationGroups: mgsPb,
RequestOptions: createRequestOptions(opts.Priority, "", opts.TransactionTag),
}, gax.WithGRPCOptions(grpc.Header(&md)))

if getGFELatencyMetricsFlag() && md != nil && c.ct != nil {
if metricErr := createContextAndCaptureGFELatencyMetrics(ct, c.ct, md, "BatchWrite"); metricErr != nil {
trace.TracePrintf(ct, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
}
return stream, rpcErr
}

replaceSession := func(ct context.Context) error {
if sh != nil {
sh.destroy()
}
var sessionErr error
sh, sessionErr = c.idleSessions.take(ct)
return sessionErr
}

release := func(err error) {
if sh == nil {
return
}
if isSessionNotFoundError(err) {
sh.destroy()
}
sh.recycle()
}

ctx, cancel := context.WithCancel(ctx)
ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchWriteResponseIterator")
return &BatchWriteResponseIterator{
ctx: ctx,
rpc: rpc,
replaceSession: replaceSession,
release: release,
cancel: cancel,
}
}

// logf logs the given message to the given logger, or the standard logger if
// the given logger is nil.
func logf(logger *log.Logger, format string, v ...interface{}) {
Expand Down
Loading

0 comments on commit 507d232

Please sign in to comment.