Skip to content

Commit

Permalink
workload: use SQLRunner for queries without tuples
Browse files Browse the repository at this point in the history
Switch TPCC to use pgx; use SQLRunner for queries that have a fixed
number of arguments.

Release note: None
  • Loading branch information
RaduBerinde committed Oct 16, 2018
1 parent e4510ee commit d15637c
Show file tree
Hide file tree
Showing 11 changed files with 537 additions and 265 deletions.
30 changes: 30 additions & 0 deletions pkg/workload/pgx_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ package workload

import (
"context"
gosql "database/sql"
"sync/atomic"

"github.com/cockroachdb/cockroach-go/crdb"
"github.com/jackc/pgx"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -81,6 +83,9 @@ func NewMultiConnPool(maxTotalConnections int, urls ...string) (*MultiConnPool,

// Get returns one of the pools, in round-robin manner.
func (m *MultiConnPool) Get() *pgx.ConnPool {
if len(m.Pools) == 1 {
return m.Pools[0]
}
i := atomic.AddUint32(&m.counter, 1) - 1
return m.Pools[i%uint32(len(m.Pools))]
}
Expand Down Expand Up @@ -108,3 +113,28 @@ func (m *MultiConnPool) Close() {
p.Close()
}
}

// PgxTx is a thin wrapper that implements the crdb.Tx interface, allowing pgx
// transactions to be used with ExecuteInTx.
type PgxTx pgx.Tx

var _ crdb.Tx = &PgxTx{}

// ExecContext is part of the crdb.Tx interface.
func (tx *PgxTx) ExecContext(
ctx context.Context, sql string, args ...interface{},
) (gosql.Result, error) {
_, err := (*pgx.Tx)(tx).ExecEx(ctx, sql, nil /* QueryExOptions */, args...)
// crdb.ExecuteInTx doesn't actually care about the Result, just the error.
return nil, err
}

// Commit is part of the crdb.Tx interface.
func (tx *PgxTx) Commit() error {
return (*pgx.Tx)(tx).Commit()
}

// Rollback is part of the crdb.Tx interface.
func (tx *PgxTx) Rollback() error {
return (*pgx.Tx)(tx).Rollback()
}
72 changes: 67 additions & 5 deletions pkg/workload/sql_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ type SQLRunner struct {

// The fields below are set by Init.
initialized bool
name string
method method
mcp *MultiConnPool
}
Expand Down Expand Up @@ -85,9 +84,8 @@ func (sr *SQLRunner) Define(sql string) StmtHandle {
// Init initializes the runner; must be called after calls to Define and before
// the StmtHandles are used.
//
// The name of the runner is used for naming prepared statements. Multiple
// workers that use the same set of defined queries can and should use the same
// name.
// The name is used for naming prepared statements. Multiple workers that use
// the same set of defined queries can and should use the same name.
//
// The way we issue queries is set by flags.Method:
//
Expand All @@ -112,7 +110,6 @@ func (sr *SQLRunner) Init(
if sr.initialized {
panic("already initialized")
}
sr.name = name

var ok bool
sr.method, ok = stringToMethod[strings.ToLower(flags.Method)]
Expand Down Expand Up @@ -179,6 +176,28 @@ func (h StmtHandle) Exec(ctx context.Context, args ...interface{}) (pgx.CommandT
}
}

// ExecTx executes a query that doesn't return rows, inside a transaction.
//
// See pgx.Conn.Exec.
func (h StmtHandle) ExecTx(
ctx context.Context, tx *pgx.Tx, args ...interface{},
) (pgx.CommandTag, error) {
h.check()
switch h.s.sr.method {
case prepare:
return tx.ExecEx(ctx, h.s.prepared.Name, nil /* options */, args...)

case noprepare:
return tx.ExecEx(ctx, h.s.sql, nil /* options */, args...)

case simple:
return tx.ExecEx(ctx, h.s.sql, simpleProtocolOpt, args...)

default:
panic("invalid method")
}
}

// Query executes a query that returns rows.
//
// See pgx.Conn.Query.
Expand All @@ -200,6 +219,28 @@ func (h StmtHandle) Query(ctx context.Context, args ...interface{}) (*pgx.Rows,
}
}

// QueryTx executes a query that returns rows, inside a transaction.
//
// See pgx.Tx.Query.
func (h StmtHandle) QueryTx(
ctx context.Context, tx *pgx.Tx, args ...interface{},
) (*pgx.Rows, error) {
h.check()
switch h.s.sr.method {
case prepare:
return tx.QueryEx(ctx, h.s.prepared.Name, nil /* options */, args...)

case noprepare:
return tx.QueryEx(ctx, h.s.sql, nil /* options */, args...)

case simple:
return tx.QueryEx(ctx, h.s.sql, simpleProtocolOpt, args...)

default:
panic("invalid method")
}
}

// QueryRow executes a query that is expected to return at most one row.
//
// See pgx.Conn.QueryRow.
Expand All @@ -221,5 +262,26 @@ func (h StmtHandle) QueryRow(ctx context.Context, args ...interface{}) *pgx.Row
}
}

// QueryRowTx executes a query that is expected to return at most one row,
// inside a transaction.
//
// See pgx.Conn.QueryRow.
func (h StmtHandle) QueryRowTx(ctx context.Context, tx *pgx.Tx, args ...interface{}) *pgx.Row {
h.check()
switch h.s.sr.method {
case prepare:
return tx.QueryRowEx(ctx, h.s.prepared.Name, nil /* options */, args...)

case noprepare:
return tx.QueryRowEx(ctx, h.s.sql, nil /* options */, args...)

case simple:
return tx.QueryRowEx(ctx, h.s.sql, simpleProtocolOpt, args...)

default:
panic("invalid method")
}
}

// Appease the linter.
var _ = StmtHandle.QueryRow
8 changes: 4 additions & 4 deletions pkg/workload/tpcc/ddls.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
package tpcc

import (
gosql "database/sql"
"fmt"
"math"

"github.com/cockroachdb/cockroach/pkg/util/uint128"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/jackc/pgx"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -160,7 +160,7 @@ const (
tpccOrderLineSchemaInterleave = ` interleave in parent "order" (ol_w_id, ol_d_id, ol_o_id)`
)

func maybeDisableMergeQueue(db *gosql.DB) error {
func maybeDisableMergeQueue(db *pgx.ConnPool) error {
var ok bool
if err := db.QueryRow(
`SELECT count(*) > 0 FROM [ SHOW ALL CLUSTER SETTINGS ] AS _ (v) WHERE v = 'kv.range_merge.queue_enabled'`,
Expand All @@ -173,7 +173,7 @@ func maybeDisableMergeQueue(db *gosql.DB) error {

// NB: Since we always split at the same points (specific warehouse IDs and
// item IDs), splitting is idempotent.
func splitTables(db *gosql.DB, warehouses int) {
func splitTables(db *pgx.ConnPool, warehouses int) {
// Prevent the merge queue from immediately discarding our splits.
if err := maybeDisableMergeQueue(db); err != nil {
panic(err)
Expand Down Expand Up @@ -241,7 +241,7 @@ func splitTables(db *gosql.DB, warehouses int) {
}
}

func scatterRanges(db *gosql.DB) {
func scatterRanges(db *pgx.ConnPool) {
tables := []string{
`customer`,
`district`,
Expand Down
113 changes: 77 additions & 36 deletions pkg/workload/tpcc/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/cockroachdb/cockroach-go/crdb"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/pkg/errors"
)

Expand All @@ -47,16 +48,40 @@ import (

type delivery struct {
config *tpcc
db *gosql.DB
mcp *workload.MultiConnPool
sr workload.SQLRunner

selectNewOrder workload.StmtHandle
sumAmount workload.StmtHandle
}

var _ tpccTx = &delivery{}

func createDelivery(ctx context.Context, config *tpcc, db *gosql.DB) (tpccTx, error) {
func createDelivery(
ctx context.Context, config *tpcc, mcp *workload.MultiConnPool,
) (tpccTx, error) {
del := &delivery{
config: config,
db: db,
mcp: mcp,
}

del.selectNewOrder = del.sr.Define(`
SELECT no_o_id
FROM new_order
WHERE no_w_id = $1 AND no_d_id = $2
ORDER BY no_o_id ASC
LIMIT 1`,
)

del.sumAmount = del.sr.Define(`
SELECT sum(ol_amount) FROM order_line
WHERE ol_w_id = $1 AND ol_d_id = $2 AND ol_o_id = $3`,
)

if err := del.sr.Init(ctx, "delivery", mcp, config.connFlags); err != nil {
return nil, err
}

return del, nil
}

Expand All @@ -68,23 +93,19 @@ func (del *delivery) run(ctx context.Context, wID int) (interface{}, error) {
oCarrierID := rng.Intn(10) + 1
olDeliveryD := timeutil.Now()

err := crdb.ExecuteTx(
ctx,
del.db,
del.config.txOpts,
func(tx *gosql.Tx) error {
tx, err := del.mcp.Get().BeginEx(ctx, del.config.txOpts)
if err != nil {
return nil, err
}
err = crdb.ExecuteInTx(
ctx, (*workload.PgxTx)(tx),
func() error {
// 2.7.4.2. For each district:
dIDoIDPairs := make(map[int]int)
dIDolTotalPairs := make(map[int]float64)
for dID := 1; dID <= 10; dID++ {
var oID int
if err := tx.QueryRowContext(ctx, fmt.Sprintf(`
SELECT no_o_id
FROM new_order
WHERE no_w_id = %d AND no_d_id = %d
ORDER BY no_o_id ASC
LIMIT 1`,
wID, dID)).Scan(&oID); err != nil {
if err := del.selectNewOrder.QueryRowTx(ctx, tx, wID, dID).Scan(&oID); err != nil {
// If no matching order is found, the delivery of this order is skipped.
if err != gosql.ErrNoRows {
atomic.AddUint64(&del.config.auditor.skippedDelivieries, 1)
Expand All @@ -95,22 +116,26 @@ func (del *delivery) run(ctx context.Context, wID int) (interface{}, error) {
dIDoIDPairs[dID] = oID

var olTotal float64
if err := tx.QueryRowContext(ctx, fmt.Sprintf(`
SELECT sum(ol_amount) FROM order_line
WHERE ol_w_id = %d AND ol_d_id = %d AND ol_o_id = %d`,
wID, dID, oID)).Scan(&olTotal); err != nil {
if err := del.sumAmount.QueryRowTx(
ctx, tx, wID, dID, oID,
).Scan(&olTotal); err != nil {
return err
}
dIDolTotalPairs[dID] = olTotal
}
dIDoIDPairsStr := makeInTuples(dIDoIDPairs)

rows, err := tx.QueryContext(ctx, fmt.Sprintf(`
rows, err := tx.QueryEx(
ctx,
fmt.Sprintf(`
UPDATE "order"
SET o_carrier_id = %d
WHERE o_w_id = %d AND (o_d_id, o_id) IN (%s)
RETURNING o_d_id, o_c_id`,
oCarrierID, wID, dIDoIDPairsStr))
oCarrierID, wID, dIDoIDPairsStr,
),
nil, /* options */
)
if err != nil {
return err
}
Expand All @@ -134,25 +159,41 @@ func (del *delivery) run(ctx context.Context, wID int) (interface{}, error) {
dIDcIDPairsStr := makeInTuples(dIDcIDPairs)
dIDToOlTotalStr := makeWhereCases(dIDolTotalPairs)

if _, err := tx.ExecContext(ctx, fmt.Sprintf(`
UPDATE customer
SET c_delivery_cnt = c_delivery_cnt + 1,
c_balance = c_balance + CASE c_d_id %s END
WHERE c_w_id = %d AND (c_d_id, c_id) IN (%s)`,
dIDToOlTotalStr, wID, dIDcIDPairsStr)); err != nil {
if _, err := tx.ExecEx(
ctx,
fmt.Sprintf(`
UPDATE customer
SET c_delivery_cnt = c_delivery_cnt + 1,
c_balance = c_balance + CASE c_d_id %s END
WHERE c_w_id = %d AND (c_d_id, c_id) IN (%s)`,
dIDToOlTotalStr, wID, dIDcIDPairsStr,
),
nil, /* options */
); err != nil {
return err
}
if _, err := tx.ExecContext(ctx, fmt.Sprintf(`
DELETE FROM new_order
WHERE no_w_id = %d AND (no_d_id, no_o_id) IN (%s)`,
wID, dIDoIDPairsStr)); err != nil {
if _, err := tx.ExecEx(
ctx,
fmt.Sprintf(`
DELETE FROM new_order
WHERE no_w_id = %d AND (no_d_id, no_o_id) IN (%s)`,
wID, dIDoIDPairsStr,
),
nil, /* options */
); err != nil {
return err
}
_, err = tx.ExecContext(ctx, fmt.Sprintf(`
UPDATE order_line
SET ol_delivery_d = '%s'
WHERE ol_w_id = %d AND (ol_d_id, ol_o_id) IN (%s)`,
olDeliveryD.Format("2006-01-02 15:04:05"), wID, dIDoIDPairsStr))

_, err = tx.ExecEx(
ctx,
fmt.Sprintf(`
UPDATE order_line
SET ol_delivery_d = '%s'
WHERE ol_w_id = %d AND (ol_d_id, ol_o_id) IN (%s)`,
olDeliveryD.Format("2006-01-02 15:04:05"), wID, dIDoIDPairsStr,
),
nil, /* options */
)
return err
})
return nil, err
Expand Down
Loading

0 comments on commit d15637c

Please sign in to comment.