From 1abda3bca2e72b2d2a50a2d2f746e582fe2a0328 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Thu, 4 Oct 2018 14:39:30 -0400 Subject: [PATCH] workload: add pgx-based SQLRunner, use it in KV SQLRunner is a common facility for issuing SQL queries through `pgx`. It supports multiple issuing methods, specified by the `--method` flag: - prepare: each statement is prepared once per session and then reused. - noprepare: each statement is executed directly (which internally is equivalent to preparing and executing each statement separately). - simple: parameters are formatted as text and replaced in the query before it is issued. KV is switched to use the new runner. The default `prepare` method should be similar to the old code (except that we go through `pgx`). Benchmark numbers (ops/s for kv80, single node GCE worker): - before: 30990 (541 stddev) - prepare (default): 31478 (331 stddev) - noprepare: 25506 (530 stddev) - simple: 29187 (599 stddev) Release note: None --- pkg/workload/connection.go | 4 + pkg/workload/kv/kv.go | 43 +++---- pkg/workload/pgx_helpers.go | 110 ++++++++++++++++++ pkg/workload/sql_runner.go | 225 ++++++++++++++++++++++++++++++++++++ 4 files changed, 352 insertions(+), 30 deletions(-) create mode 100644 pkg/workload/pgx_helpers.go create mode 100644 pkg/workload/sql_runner.go diff --git a/pkg/workload/connection.go b/pkg/workload/connection.go index 5f12b562f535..e452ac2d8651 100644 --- a/pkg/workload/connection.go +++ b/pkg/workload/connection.go @@ -28,6 +28,8 @@ type ConnFlags struct { *pflag.FlagSet DBOverride string Concurrency int + // Method for issuing queries; see SQLRunner. + Method string } // NewConnFlags returns an initialized ConnFlags. @@ -38,12 +40,14 @@ func NewConnFlags(genFlags *Flags) *ConnFlags { `Override for the SQL database to use. If empty, defaults to the generator name`) c.IntVar(&c.Concurrency, `concurrency`, 2*runtime.NumCPU(), `Number of concurrent workers`) + c.StringVar(&c.Method, `method`, `prepare`, `SQL issue method (prepare, noprepare)`) genFlags.AddFlagSet(c.FlagSet) if genFlags.Meta == nil { genFlags.Meta = make(map[string]FlagMeta) } genFlags.Meta[`db`] = FlagMeta{RuntimeOnly: true} genFlags.Meta[`concurrency`] = FlagMeta{RuntimeOnly: true} + genFlags.Meta[`method`] = FlagMeta{RuntimeOnly: true} return c } diff --git a/pkg/workload/kv/kv.go b/pkg/workload/kv/kv.go index 0b5fd144b70f..3120c8f5516d 100644 --- a/pkg/workload/kv/kv.go +++ b/pkg/workload/kv/kv.go @@ -18,7 +18,6 @@ package kv import ( "context" "crypto/sha1" - gosql "database/sql" "encoding/binary" "fmt" "hash" @@ -171,16 +170,13 @@ func (w *kv) Ops(urls []string, reg *workload.HistogramRegistry) (workload.Query if err != nil { return workload.QueryLoad{}, err } - db, err := gosql.Open(`cockroach`, strings.Join(urls, ` `)) + mcp, err := workload.NewMultiConnPool(w.connFlags.Concurrency+1, urls...) if err != nil { return workload.QueryLoad{}, err } - // Allow a maximum of concurrency+1 connections to the database. - db.SetMaxOpenConns(w.connFlags.Concurrency + 1) - db.SetMaxIdleConns(w.connFlags.Concurrency + 1) if !w.useOpt { - _, err := db.Exec("SET optimizer=off") + _, err := mcp.Get().Exec("SET optimizer=off") if err != nil { return workload.QueryLoad{}, err } @@ -212,29 +208,16 @@ func (w *kv) Ops(urls []string, reg *workload.HistogramRegistry) (workload.Query seq := &sequence{config: w, val: int64(writeSeq)} numEmptyResults := new(int64) for i := 0; i < w.connFlags.Concurrency; i++ { - // Give each kvOp worker its own SQL connection and prepare statements - // using this connection. This avoids lock contention in the sql.Rows - // objects they produce. - conn, err := db.Conn(ctx) - if err != nil { - return workload.QueryLoad{}, err - } - readStmt, err := conn.PrepareContext(ctx, readStmtStr) - if err != nil { - return workload.QueryLoad{}, err - } - writeStmt, err := conn.PrepareContext(ctx, writeStmtStr) - if err != nil { - return workload.QueryLoad{}, err - } - op := kvOp{ + op := &kvOp{ config: w, hists: reg.GetHandle(), - conn: conn, - readStmt: readStmt, - writeStmt: writeStmt, numEmptyResults: numEmptyResults, } + op.readStmt = op.sr.Define(readStmtStr) + op.writeStmt = op.sr.Define(writeStmtStr) + if err := op.sr.Init(ctx, "kv", mcp, w.connFlags); err != nil { + return workload.QueryLoad{}, err + } if w.sequential { op.g = newSequentialGenerator(seq) } else { @@ -249,9 +232,9 @@ func (w *kv) Ops(urls []string, reg *workload.HistogramRegistry) (workload.Query type kvOp struct { config *kv hists *workload.Histograms - conn *gosql.Conn - readStmt *gosql.Stmt - writeStmt *gosql.Stmt + sr workload.SQLRunner + readStmt workload.StmtHandle + writeStmt workload.StmtHandle g keyGenerator numEmptyResults *int64 // accessed atomically } @@ -263,7 +246,7 @@ func (o *kvOp) run(ctx context.Context) error { args[i] = o.g.readKey() } start := timeutil.Now() - rows, err := o.readStmt.QueryContext(ctx, args...) + rows, err := o.readStmt.Query(ctx, args...) if err != nil { return err } @@ -286,7 +269,7 @@ func (o *kvOp) run(ctx context.Context) error { args[j+1] = randomBlock(o.config, o.g.rand()) } start := timeutil.Now() - _, err := o.writeStmt.ExecContext(ctx, args...) + _, err := o.writeStmt.Exec(ctx, args...) elapsed := timeutil.Since(start) o.hists.Get(`write`).Record(elapsed) return err diff --git a/pkg/workload/pgx_helpers.go b/pkg/workload/pgx_helpers.go new file mode 100644 index 000000000000..fa999da9e35a --- /dev/null +++ b/pkg/workload/pgx_helpers.go @@ -0,0 +1,110 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. See the AUTHORS file +// for names of contributors. + +package workload + +import ( + "context" + "sync/atomic" + + "github.com/jackc/pgx" + "golang.org/x/sync/errgroup" +) + +// MultiConnPool maintains a set of pgx ConnPools (to different servers). +type MultiConnPool struct { + Pools []*pgx.ConnPool + // Atomic counter used by Get(). + counter uint32 +} + +// NewMultiConnPool creates a new MultiConnPool (with one pool per url). +// The pools have approximately the same number of max connections, adding up to +// maxTotalConnections. +func NewMultiConnPool(maxTotalConnections int, urls ...string) (*MultiConnPool, error) { + m := &MultiConnPool{ + Pools: make([]*pgx.ConnPool, len(urls)), + } + for i := range urls { + cfg, err := pgx.ParseConnectionString(urls[i]) + if err != nil { + return nil, err + } + // Use the average number of remaining connections (this handles + // rounding). + numConn := maxTotalConnections / (len(urls) - i) + maxTotalConnections -= numConn + p, err := pgx.NewConnPool(pgx.ConnPoolConfig{ + ConnConfig: cfg, + MaxConnections: numConn, + }) + if err != nil { + return nil, err + } + + // "Warm up" the pool so we don't have to establish connections later (which + // would affect the observed latencies of the first requests). We do this by + // acquiring all connections (in parallel), then releasing them back to the + // pool. + conns := make([]*pgx.Conn, numConn) + var g errgroup.Group + for i := range conns { + i := i + g.Go(func() error { + conns[i], err = p.Acquire() + return err + }) + } + if err := g.Wait(); err != nil { + return nil, err + } + for _, c := range conns { + p.Release(c) + } + + m.Pools[i] = p + } + return m, nil +} + +// Get returns one of the pools, in round-robin manner. +func (m *MultiConnPool) Get() *pgx.ConnPool { + i := atomic.AddUint32(&m.counter, 1) - 1 + return m.Pools[i%uint32(len(m.Pools))] +} + +// PrepareEx prepares the given statement on all the pools. +func (m *MultiConnPool) PrepareEx( + ctx context.Context, name, sql string, opts *pgx.PrepareExOptions, +) (*pgx.PreparedStatement, error) { + var ps *pgx.PreparedStatement + for _, p := range m.Pools { + var err error + ps, err = p.PrepareEx(ctx, name, sql, opts) + if err != nil { + return nil, err + } + } + // It doesn't matter which PreparedStatement we return, they should be the + // same. + return ps, nil +} + +// Close closes all the pools. +func (m *MultiConnPool) Close() { + for _, p := range m.Pools { + p.Close() + } +} diff --git a/pkg/workload/sql_runner.go b/pkg/workload/sql_runner.go new file mode 100644 index 000000000000..922c08975037 --- /dev/null +++ b/pkg/workload/sql_runner.go @@ -0,0 +1,225 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. See the AUTHORS file +// for names of contributors. + +package workload + +import ( + "context" + "fmt" + "strings" + + "github.com/jackc/pgx" + "github.com/pkg/errors" +) + +// SQLRunner is a helper for issuing SQL statements; it supports multiple +// methods for issuing queries. +// +// Queries need to first be defined using calls to Define. Then the runner +// must be initialized, after which we can use the handles returned by Define. +// +// Sample usage: +// sr := &workload.SQLRunner{} +// +// sel:= sr.Define("SELECT x FROM t WHERE y = $1") +// ins:= sr.Define("INSERT INTO t(x, y) VALUES ($1, $2)") +// +// err := sr.Init(ctx, conn, flags) +// // [handle err] +// +// row := sel.QueryRow(1) +// // [use row] +// +// _, err := ins.Exec(5, 6) +// // [handle err] +// +// A runner should typically be associated with a single worker. +type SQLRunner struct { + // The fields below are used by Define. + stmts []*stmt + + // The fields below are set by Init. + initialized bool + name string + method method + mcp *MultiConnPool +} + +type method int + +const ( + prepare method = iota + noprepare + simple +) + +var stringToMethod = map[string]method{ + "prepare": prepare, + "noprepare": noprepare, + "simple": simple, +} + +// Define creates a handle for the given statement. The handle can be used after +// Init is called. +func (sr *SQLRunner) Define(sql string) StmtHandle { + if sr.initialized { + panic("Define can't be called after Init") + } + s := &stmt{sr: sr, sql: sql} + sr.stmts = append(sr.stmts, s) + return StmtHandle{s: s} +} + +// Init initializes the runner; must be called after calls to Define and before +// the StmtHandles are used. +// +// The name of the runner is used for naming prepared statements. Multiple +// workers that use the same set of defined queries can and should use the same +// name. +// +// The way we issue queries is set by flags.Method: +// +// - "prepare": we prepare the query once during Init, then we reuse it for +// each execution. This results in a Bind and Execute on the server each time +// we run a query (on the given connection). Note that it's important to +// prepare on separate connections if there are many parallel workers; this +// avoids lock contention in the sql.Rows objects they produce. See #30811. +// +// - "noprepare": each query is issued separately (on the given connection). +// This results in Parse, Bind, Execute on the server each time we run a +// query. +// +// - "simple": each query is issued in a single string; parameters are +// rendered inside the string. This results in a single SimpleExecute +// request to the server for each query. Note that only a few parameter types +// are supported. +// +func (sr *SQLRunner) Init( + ctx context.Context, name string, mcp *MultiConnPool, flags *ConnFlags, +) error { + if sr.initialized { + panic("already initialized") + } + sr.name = name + + var ok bool + sr.method, ok = stringToMethod[strings.ToLower(flags.Method)] + if !ok { + return errors.Errorf("unknown method %s", flags.Method) + } + + if sr.method == prepare { + for i, s := range sr.stmts { + stmtName := fmt.Sprintf("%s-%d", name, i+1) + var err error + s.prepared, err = mcp.PrepareEx(ctx, stmtName, s.sql, nil /* opts */) + if err != nil { + return errors.Wrapf(err, "preparing %s", s.sql) + } + } + } + + sr.mcp = mcp + sr.initialized = true + return nil +} + +func (h StmtHandle) check() { + if !h.s.sr.initialized { + panic("SQLRunner.Init not called") + } +} + +var simpleProtocolOpt = &pgx.QueryExOptions{SimpleProtocol: true} + +type stmt struct { + sr *SQLRunner + sql string + // prepared is only used for the prepare method. + prepared *pgx.PreparedStatement +} + +// StmtHandle is associated with a (possibly prepared) statement; created by +// SQLRunner.Define. +type StmtHandle struct { + s *stmt +} + +// Exec executes a query that doesn't return rows. The query is executed on the +// connection that was passed to SQLRunner.Init. +// +// See pgx.Conn.Exec. +func (h StmtHandle) Exec(ctx context.Context, args ...interface{}) (pgx.CommandTag, error) { + h.check() + p := h.s.sr.mcp.Get() + switch h.s.sr.method { + case prepare: + return p.ExecEx(ctx, h.s.prepared.Name, nil /* options */, args...) + + case noprepare: + return p.ExecEx(ctx, h.s.sql, nil /* options */, args...) + + case simple: + return p.ExecEx(ctx, h.s.sql, simpleProtocolOpt, args...) + + default: + panic("invalid method") + } +} + +// Query executes a query that returns rows. +// +// See pgx.Conn.Query. +func (h StmtHandle) Query(ctx context.Context, args ...interface{}) (*pgx.Rows, error) { + h.check() + p := h.s.sr.mcp.Get() + switch h.s.sr.method { + case prepare: + return p.QueryEx(ctx, h.s.prepared.Name, nil /* options */, args...) + + case noprepare: + return p.QueryEx(ctx, h.s.sql, nil /* options */, args...) + + case simple: + return p.QueryEx(ctx, h.s.sql, simpleProtocolOpt, args...) + + default: + panic("invalid method") + } +} + +// QueryRow executes a query that is expected to return at most one row. +// +// See pgx.Conn.QueryRow. +func (h StmtHandle) QueryRow(ctx context.Context, args ...interface{}) *pgx.Row { + h.check() + p := h.s.sr.mcp.Get() + switch h.s.sr.method { + case prepare: + return p.QueryRowEx(ctx, h.s.prepared.Name, nil /* options */, args...) + + case noprepare: + return p.QueryRowEx(ctx, h.s.sql, nil /* options */, args...) + + case simple: + return p.QueryRowEx(ctx, h.s.sql, simpleProtocolOpt, args...) + + default: + panic("invalid method") + } +} + +// Appease the linter. +var _ = StmtHandle.QueryRow