Skip to content

Commit

Permalink
Merge #31358
Browse files Browse the repository at this point in the history
31358: workload: add pgx-based SQLRunner, use it in KV r=RaduBerinde a=RaduBerinde

SQLRunner is a common facility for issuing SQL queries through `pgx`.
It supports multiple issuing methods, specified by the `--method`
flag:
 - prepare: each statement is prepared once per session and then
   reused.
 - noprepare: each statement is executed directly (which internally
   is equivalent to preparing and executing each statement
   separately).
 - simple: parameters are formatted as text and replaced in the query
   before it is issued.

KV is switched to use the new runner. The default `prepare` method
should be similar to the old code (except that we go through `pgx`).

Benchmark numbers (ops/s for kv80, single node GCE worker):
  - before:            30990 (541 stddev)
  - prepare (default): 31478 (331 stddev)
  - noprepare:         25506 (530 stddev)
  - simple:            29187 (599 stddev)

Release note: None

Co-authored-by: Radu Berinde <[email protected]>
  • Loading branch information
craig[bot] and RaduBerinde committed Oct 15, 2018
2 parents ae59e8e + 1abda3b commit 342312a
Show file tree
Hide file tree
Showing 4 changed files with 352 additions and 30 deletions.
4 changes: 4 additions & 0 deletions pkg/workload/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type ConnFlags struct {
*pflag.FlagSet
DBOverride string
Concurrency int
// Method for issuing queries; see SQLRunner.
Method string
}

// NewConnFlags returns an initialized ConnFlags.
Expand All @@ -38,12 +40,14 @@ func NewConnFlags(genFlags *Flags) *ConnFlags {
`Override for the SQL database to use. If empty, defaults to the generator name`)
c.IntVar(&c.Concurrency, `concurrency`, 2*runtime.NumCPU(),
`Number of concurrent workers`)
c.StringVar(&c.Method, `method`, `prepare`, `SQL issue method (prepare, noprepare)`)
genFlags.AddFlagSet(c.FlagSet)
if genFlags.Meta == nil {
genFlags.Meta = make(map[string]FlagMeta)
}
genFlags.Meta[`db`] = FlagMeta{RuntimeOnly: true}
genFlags.Meta[`concurrency`] = FlagMeta{RuntimeOnly: true}
genFlags.Meta[`method`] = FlagMeta{RuntimeOnly: true}
return c
}

Expand Down
43 changes: 13 additions & 30 deletions pkg/workload/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package kv
import (
"context"
"crypto/sha1"
gosql "database/sql"
"encoding/binary"
"fmt"
"hash"
Expand Down Expand Up @@ -171,16 +170,13 @@ func (w *kv) Ops(urls []string, reg *workload.HistogramRegistry) (workload.Query
if err != nil {
return workload.QueryLoad{}, err
}
db, err := gosql.Open(`cockroach`, strings.Join(urls, ` `))
mcp, err := workload.NewMultiConnPool(w.connFlags.Concurrency+1, urls...)
if err != nil {
return workload.QueryLoad{}, err
}
// Allow a maximum of concurrency+1 connections to the database.
db.SetMaxOpenConns(w.connFlags.Concurrency + 1)
db.SetMaxIdleConns(w.connFlags.Concurrency + 1)

if !w.useOpt {
_, err := db.Exec("SET optimizer=off")
_, err := mcp.Get().Exec("SET optimizer=off")
if err != nil {
return workload.QueryLoad{}, err
}
Expand Down Expand Up @@ -212,29 +208,16 @@ func (w *kv) Ops(urls []string, reg *workload.HistogramRegistry) (workload.Query
seq := &sequence{config: w, val: int64(writeSeq)}
numEmptyResults := new(int64)
for i := 0; i < w.connFlags.Concurrency; i++ {
// Give each kvOp worker its own SQL connection and prepare statements
// using this connection. This avoids lock contention in the sql.Rows
// objects they produce.
conn, err := db.Conn(ctx)
if err != nil {
return workload.QueryLoad{}, err
}
readStmt, err := conn.PrepareContext(ctx, readStmtStr)
if err != nil {
return workload.QueryLoad{}, err
}
writeStmt, err := conn.PrepareContext(ctx, writeStmtStr)
if err != nil {
return workload.QueryLoad{}, err
}
op := kvOp{
op := &kvOp{
config: w,
hists: reg.GetHandle(),
conn: conn,
readStmt: readStmt,
writeStmt: writeStmt,
numEmptyResults: numEmptyResults,
}
op.readStmt = op.sr.Define(readStmtStr)
op.writeStmt = op.sr.Define(writeStmtStr)
if err := op.sr.Init(ctx, "kv", mcp, w.connFlags); err != nil {
return workload.QueryLoad{}, err
}
if w.sequential {
op.g = newSequentialGenerator(seq)
} else {
Expand All @@ -249,9 +232,9 @@ func (w *kv) Ops(urls []string, reg *workload.HistogramRegistry) (workload.Query
type kvOp struct {
config *kv
hists *workload.Histograms
conn *gosql.Conn
readStmt *gosql.Stmt
writeStmt *gosql.Stmt
sr workload.SQLRunner
readStmt workload.StmtHandle
writeStmt workload.StmtHandle
g keyGenerator
numEmptyResults *int64 // accessed atomically
}
Expand All @@ -263,7 +246,7 @@ func (o *kvOp) run(ctx context.Context) error {
args[i] = o.g.readKey()
}
start := timeutil.Now()
rows, err := o.readStmt.QueryContext(ctx, args...)
rows, err := o.readStmt.Query(ctx, args...)
if err != nil {
return err
}
Expand All @@ -286,7 +269,7 @@ func (o *kvOp) run(ctx context.Context) error {
args[j+1] = randomBlock(o.config, o.g.rand())
}
start := timeutil.Now()
_, err := o.writeStmt.ExecContext(ctx, args...)
_, err := o.writeStmt.Exec(ctx, args...)
elapsed := timeutil.Since(start)
o.hists.Get(`write`).Record(elapsed)
return err
Expand Down
110 changes: 110 additions & 0 deletions pkg/workload/pgx_helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.

package workload

import (
"context"
"sync/atomic"

"github.com/jackc/pgx"
"golang.org/x/sync/errgroup"
)

// MultiConnPool maintains a set of pgx ConnPools (to different servers).
type MultiConnPool struct {
Pools []*pgx.ConnPool
// Atomic counter used by Get().
counter uint32
}

// NewMultiConnPool creates a new MultiConnPool (with one pool per url).
// The pools have approximately the same number of max connections, adding up to
// maxTotalConnections.
func NewMultiConnPool(maxTotalConnections int, urls ...string) (*MultiConnPool, error) {
m := &MultiConnPool{
Pools: make([]*pgx.ConnPool, len(urls)),
}
for i := range urls {
cfg, err := pgx.ParseConnectionString(urls[i])
if err != nil {
return nil, err
}
// Use the average number of remaining connections (this handles
// rounding).
numConn := maxTotalConnections / (len(urls) - i)
maxTotalConnections -= numConn
p, err := pgx.NewConnPool(pgx.ConnPoolConfig{
ConnConfig: cfg,
MaxConnections: numConn,
})
if err != nil {
return nil, err
}

// "Warm up" the pool so we don't have to establish connections later (which
// would affect the observed latencies of the first requests). We do this by
// acquiring all connections (in parallel), then releasing them back to the
// pool.
conns := make([]*pgx.Conn, numConn)
var g errgroup.Group
for i := range conns {
i := i
g.Go(func() error {
conns[i], err = p.Acquire()
return err
})
}
if err := g.Wait(); err != nil {
return nil, err
}
for _, c := range conns {
p.Release(c)
}

m.Pools[i] = p
}
return m, nil
}

// Get returns one of the pools, in round-robin manner.
func (m *MultiConnPool) Get() *pgx.ConnPool {
i := atomic.AddUint32(&m.counter, 1) - 1
return m.Pools[i%uint32(len(m.Pools))]
}

// PrepareEx prepares the given statement on all the pools.
func (m *MultiConnPool) PrepareEx(
ctx context.Context, name, sql string, opts *pgx.PrepareExOptions,
) (*pgx.PreparedStatement, error) {
var ps *pgx.PreparedStatement
for _, p := range m.Pools {
var err error
ps, err = p.PrepareEx(ctx, name, sql, opts)
if err != nil {
return nil, err
}
}
// It doesn't matter which PreparedStatement we return, they should be the
// same.
return ps, nil
}

// Close closes all the pools.
func (m *MultiConnPool) Close() {
for _, p := range m.Pools {
p.Close()
}
}
Loading

0 comments on commit 342312a

Please sign in to comment.