Skip to content

Commit

Permalink
feat(spanner): add databoost property for batch transactions (#8152)
Browse files Browse the repository at this point in the history
* feat(spanner): add databoost property for batch transactions

* feat(spanner): add databoost property

* feat(spanner): disable databoost property in tests

* feat(spanner): comment refactor
  • Loading branch information
harshachinta authored Jun 20, 2023
1 parent 005d2df commit fc49c78
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 60 deletions.
68 changes: 36 additions & 32 deletions spanner/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,14 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
}
// Prepare ReadRequest.
req := &sppb.ReadRequest{
Session: sid,
Transaction: ts,
Table: table,
Index: index,
Columns: columns,
KeySet: kset,
RequestOptions: createRequestOptions(readOptions.Priority, readOptions.RequestTag, ""),
Session: sid,
Transaction: ts,
Table: table,
Index: index,
Columns: columns,
KeySet: kset,
RequestOptions: createRequestOptions(readOptions.Priority, readOptions.RequestTag, ""),
DataBoostEnabled: readOptions.DataBoostEnabled,
}
// Generate partitions.
for _, p := range resp.GetPartitions() {
Expand Down Expand Up @@ -212,13 +213,14 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement

// prepare ExecuteSqlRequest
r := &sppb.ExecuteSqlRequest{
Session: sid,
Transaction: ts,
Sql: statement.SQL,
Params: params,
ParamTypes: paramTypes,
QueryOptions: qOpts.Options,
RequestOptions: createRequestOptions(qOpts.Priority, qOpts.RequestTag, ""),
Session: sid,
Transaction: ts,
Sql: statement.SQL,
Params: params,
ParamTypes: paramTypes,
QueryOptions: qOpts.Options,
RequestOptions: createRequestOptions(qOpts.Priority, qOpts.RequestTag, ""),
DataBoostEnabled: qOpts.DataBoostEnabled,
}

// generate Partitions
Expand Down Expand Up @@ -308,15 +310,16 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
if p.rreq != nil {
rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
client, err := client.StreamingRead(ctx, &sppb.ReadRequest{
Session: p.rreq.Session,
Transaction: p.rreq.Transaction,
Table: p.rreq.Table,
Index: p.rreq.Index,
Columns: p.rreq.Columns,
KeySet: p.rreq.KeySet,
PartitionToken: p.pt,
RequestOptions: p.rreq.RequestOptions,
ResumeToken: resumeToken,
Session: p.rreq.Session,
Transaction: p.rreq.Transaction,
Table: p.rreq.Table,
Index: p.rreq.Index,
Columns: p.rreq.Columns,
KeySet: p.rreq.KeySet,
PartitionToken: p.pt,
RequestOptions: p.rreq.RequestOptions,
ResumeToken: resumeToken,
DataBoostEnabled: p.rreq.DataBoostEnabled,
})
if err != nil {
return client, err
Expand All @@ -332,15 +335,16 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
} else {
rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
client, err := client.ExecuteStreamingSql(ctx, &sppb.ExecuteSqlRequest{
Session: p.qreq.Session,
Transaction: p.qreq.Transaction,
Sql: p.qreq.Sql,
Params: p.qreq.Params,
ParamTypes: p.qreq.ParamTypes,
QueryOptions: p.qreq.QueryOptions,
PartitionToken: p.pt,
RequestOptions: p.qreq.RequestOptions,
ResumeToken: resumeToken,
Session: p.qreq.Session,
Transaction: p.qreq.Transaction,
Sql: p.qreq.Sql,
Params: p.qreq.Params,
ParamTypes: p.qreq.ParamTypes,
QueryOptions: p.qreq.QueryOptions,
PartitionToken: p.pt,
RequestOptions: p.qreq.RequestOptions,
ResumeToken: resumeToken,
DataBoostEnabled: p.qreq.DataBoostEnabled,
})
if err != nil {
return client, err
Expand Down
4 changes: 2 additions & 2 deletions spanner/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3141,7 +3141,7 @@ func TestIntegration_BatchQuery(t *testing.T) {
t.Fatal(err)
}
defer txn.Cleanup(ctx)
if partitions, err = txn.PartitionQuery(ctx, stmt, PartitionOptions{0, 3}); err != nil {
if partitions, err = txn.PartitionQueryWithOptions(ctx, stmt, PartitionOptions{0, 3}, QueryOptions{DataBoostEnabled: false}); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -3224,7 +3224,7 @@ func TestIntegration_BatchRead(t *testing.T) {
t.Fatal(err)
}
defer txn.Cleanup(ctx)
if partitions, err = txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil {
if partitions, err = txn.PartitionReadWithOptions(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}, ReadOptions{DataBoostEnabled: false}); err != nil {
t.Fatal(err)
}

Expand Down
74 changes: 48 additions & 26 deletions spanner/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,16 +165,21 @@ type ReadOptions struct {

// The request tag to use for this request.
RequestTag string

// If this is for a partitioned read and DataBoostEnabled field is set to true, the request will be executed
// via Spanner independent compute resources. Setting this option for regular read operations has no effect.
DataBoostEnabled bool
}

// merge combines two ReadOptions that the input parameter will have higher
// order of precedence.
func (ro ReadOptions) merge(opts ReadOptions) ReadOptions {
merged := ReadOptions{
Index: ro.Index,
Limit: ro.Limit,
Priority: ro.Priority,
RequestTag: ro.RequestTag,
Index: ro.Index,
Limit: ro.Limit,
Priority: ro.Priority,
RequestTag: ro.RequestTag,
DataBoostEnabled: ro.DataBoostEnabled,
}
if opts.Index != "" {
merged.Index = opts.Index
Expand All @@ -188,6 +193,9 @@ func (ro ReadOptions) merge(opts ReadOptions) ReadOptions {
if opts.RequestTag != "" {
merged.RequestTag = opts.RequestTag
}
if opts.DataBoostEnabled {
merged.DataBoostEnabled = opts.DataBoostEnabled
}
return merged
}

Expand Down Expand Up @@ -218,13 +226,17 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key
limit := t.ro.Limit
prio := t.ro.Priority
requestTag := t.ro.RequestTag
dataBoostEnabled := t.ro.DataBoostEnabled
if opts != nil {
index = opts.Index
if opts.Limit > 0 {
limit = opts.Limit
}
prio = opts.Priority
requestTag = opts.RequestTag
if opts.DataBoostEnabled {
dataBoostEnabled = opts.DataBoostEnabled
}
}
var setTransactionID func(transactionID)
if _, ok := ts.Selector.(*sppb.TransactionSelector_Begin); ok {
Expand All @@ -238,15 +250,16 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key
func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
client, err := client.StreamingRead(ctx,
&sppb.ReadRequest{
Session: t.sh.getID(),
Transaction: t.getTransactionSelector(),
Table: table,
Index: index,
Columns: columns,
KeySet: kset,
ResumeToken: resumeToken,
Limit: int64(limit),
RequestOptions: createRequestOptions(prio, requestTag, t.txOpts.TransactionTag),
Session: t.sh.getID(),
Transaction: t.getTransactionSelector(),
Table: table,
Index: index,
Columns: columns,
KeySet: kset,
ResumeToken: resumeToken,
Limit: int64(limit),
RequestOptions: createRequestOptions(prio, requestTag, t.txOpts.TransactionTag),
DataBoostEnabled: dataBoostEnabled,
})
if err != nil {
if _, ok := t.getTransactionSelector().GetSelector().(*sppb.TransactionSelector_Begin); ok {
Expand Down Expand Up @@ -357,16 +370,21 @@ type QueryOptions struct {

// The request tag to use for this request.
RequestTag string

// If this is for a partitioned query and DataBoostEnabled field is set to true, the request will be executed
// via Spanner independent compute resources. Setting this option for regular query operations has no effect.
DataBoostEnabled bool
}

// merge combines two QueryOptions that the input parameter will have higher
// order of precedence.
func (qo QueryOptions) merge(opts QueryOptions) QueryOptions {
merged := QueryOptions{
Mode: qo.Mode,
Options: &sppb.ExecuteSqlRequest_QueryOptions{},
RequestTag: qo.RequestTag,
Priority: qo.Priority,
Mode: qo.Mode,
Options: &sppb.ExecuteSqlRequest_QueryOptions{},
RequestTag: qo.RequestTag,
Priority: qo.Priority,
DataBoostEnabled: qo.DataBoostEnabled,
}
if opts.Mode != nil {
merged.Mode = opts.Mode
Expand All @@ -377,6 +395,9 @@ func (qo QueryOptions) merge(opts QueryOptions) QueryOptions {
if opts.Priority != sppb.RequestOptions_PRIORITY_UNSPECIFIED {
merged.Priority = opts.Priority
}
if opts.DataBoostEnabled {
merged.DataBoostEnabled = opts.DataBoostEnabled
}
proto.Merge(merged.Options, qo.Options)
proto.Merge(merged.Options, opts.Options)
return merged
Expand Down Expand Up @@ -517,15 +538,16 @@ func (t *txReadOnly) prepareExecuteSQL(ctx context.Context, stmt Statement, opti
mode = *options.Mode
}
req := &sppb.ExecuteSqlRequest{
Session: sid,
Transaction: ts,
Sql: stmt.SQL,
QueryMode: mode,
Seqno: atomic.AddInt64(&t.sequenceNumber, 1),
Params: params,
ParamTypes: paramTypes,
QueryOptions: options.Options,
RequestOptions: createRequestOptions(options.Priority, options.RequestTag, t.txOpts.TransactionTag),
Session: sid,
Transaction: ts,
Sql: stmt.SQL,
QueryMode: mode,
Seqno: atomic.AddInt64(&t.sequenceNumber, 1),
Params: params,
ParamTypes: paramTypes,
QueryOptions: options.Options,
RequestOptions: createRequestOptions(options.Priority, options.RequestTag, t.txOpts.TransactionTag),
DataBoostEnabled: options.DataBoostEnabled,
}
return req, sh, nil
}
Expand Down

0 comments on commit fc49c78

Please sign in to comment.