Skip to content

Commit

Permalink
opt: fetch minimal set of columns on returning mutations
Browse files Browse the repository at this point in the history
Previously, we used to fetch all columns when a mutation contained
a `RETURNING` clause. This is an issue because it forces us to
retrieve unnecessary data and creates extra contention.
This change adds logic to compute the minimal set of required columns
and fetches only those.

This change passes down the minimal required return cols
to SQL so it knows to only return columns that's requested.
This fixes the output column calculation done by opt and
makes sure the execPlan's output columns are the same as
output cols of the opt plan.

Fixes cockroachdb#30618.
Unblocks cockroachdb#30624.

Release note: None
  • Loading branch information
Ridwan Sharif committed Jul 14, 2019
1 parent 02d52a6 commit c4a93b6
Show file tree
Hide file tree
Showing 22 changed files with 848 additions and 183 deletions.
22 changes: 20 additions & 2 deletions pkg/sql/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ func (p *planner) Delete(
requestedCols = desc.Columns
}

// Since all columns are being returned, use the 1:1 mapping. See todo above.
rowIdxToRetIdx := mutationRowIdxToReturnIdx(requestedCols, requestedCols)

// Create the table deleter, which does the bulk of the work.
rd, err := row.MakeDeleter(
p.txn, desc, fkTables, requestedCols, row.CheckFKs, p.EvalContext(), &p.alloc,
Expand Down Expand Up @@ -175,6 +178,7 @@ func (p *planner) Delete(
td: tableDeleter{rd: rd, alloc: &p.alloc},
rowsNeeded: rowsNeeded,
fastPathInterleaved: canDeleteFastInterleaved(desc, fkTables),
rowIdxToRetIdx: rowIdxToRetIdx,
},
}

Expand Down Expand Up @@ -208,6 +212,13 @@ type deleteRun struct {

// traceKV caches the current KV tracing flag.
traceKV bool

// rowIdxToRetIdx is the mapping from the columns returned by the deleter
// to the columns in the resultRowBuffer. A value of -1 is used to indicate
// that the column at that index is not part of the resultRowBuffer
// of the mutation. Otherwise, the value an the i-th index refers to the
// index of the resultRowBuffer where the i-th column is to be returned.
rowIdxToRetIdx []int
}

// maxDeleteBatchSize is the max number of entries in the KV batch for
Expand Down Expand Up @@ -331,9 +342,16 @@ func (d *deleteNode) processSourceRow(params runParams, sourceVals tree.Datums)
// contain additional columns for every newly dropped column not
// visible. We do not want them to be available for RETURNING.
//
// d.columns is guaranteed to only contain the requested
// d.run.rows.NumCols() is guaranteed to only contain the requested
// public columns.
resultValues := sourceVals[:len(d.columns)]
resultValues := make(tree.Datums, d.run.rows.NumCols())
for i := range d.run.rowIdxToRetIdx {
retIdx := d.run.rowIdxToRetIdx[i]
if retIdx >= 0 {
resultValues[retIdx] = sourceVals[i]
}
}

if _, err := d.run.rows.AddRow(params.ctx, resultValues); err != nil {
return err
}
Expand Down
43 changes: 31 additions & 12 deletions pkg/sql/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ func (p *planner) Insert(
columns = sqlbase.ResultColumnsFromColDescs(desc.Columns)
}

// Since all columns are being returned, use the 1:1 mapping.
tabIdxToRetIdx := make([]int, len(desc.Columns))
for i := range tabIdxToRetIdx {
tabIdxToRetIdx[i] = i
}

// At this point, everything is ready for either an insertNode or an upserNode.

var node batchedPlanNode
Expand Down Expand Up @@ -315,8 +321,9 @@ func (p *planner) Insert(
Cols: desc.Columns,
Mapping: ri.InsertColIDtoRowIndex,
},
defaultExprs: defaultExprs,
insertCols: ri.InsertCols,
defaultExprs: defaultExprs,
insertCols: ri.InsertCols,
tabIdxToRetIdx: tabIdxToRetIdx,
},
}
node = in
Expand Down Expand Up @@ -368,12 +375,21 @@ type insertRun struct {
// into the row container above, when rowsNeeded is set.
resultRowBuffer tree.Datums

// rowIdxToRetIdx is the mapping from the ordering of rows in
// insertCols to the ordering in the result rows, used when
// rowIdxToTabColIdx is the mapping from the ordering of rows in
// insertCols to the ordering in the rows in the table, used when
// rowsNeeded is set to populate resultRowBuffer and the row
// container. The return index is -1 if the column for the row
// index is not public.
rowIdxToRetIdx []int
// index is not public. This is used in conjunction with tabIdxToRetIdx
// to populate the resultRowBuffer.
rowIdxToTabColIdx []int

// tabIdxToRetIdx is the mapping from the columns in the table to the
// columns in the resultRowBuffer. A value of -1 is used to indicate
// that the table column at that index is not part of the resultRowBuffer
// of the mutation. Otherwise, the value an the i-th index refers to the
// index of the resultRowBuffer where the i-th column of the table is
// to be returned.
tabIdxToRetIdx []int

// traceKV caches the current KV tracing flag.
traceKV bool
Expand Down Expand Up @@ -406,7 +422,7 @@ func (n *insertNode) startExec(params runParams) error {
//
// Also we need to re-order the values in the source, ordered by
// insertCols, when writing them to resultRowBuffer, ordered by
// n.columns. This uses the rowIdxToRetIdx mapping.
// n.columns. This uses the rowIdxToTabColIdx mapping.

n.run.resultRowBuffer = make(tree.Datums, len(n.columns))
for i := range n.run.resultRowBuffer {
Expand All @@ -419,13 +435,13 @@ func (n *insertNode) startExec(params runParams) error {
colIDToRetIndex[cols[i].ID] = i
}

n.run.rowIdxToRetIdx = make([]int, len(n.run.insertCols))
n.run.rowIdxToTabColIdx = make([]int, len(n.run.insertCols))
for i, col := range n.run.insertCols {
if idx, ok := colIDToRetIndex[col.ID]; !ok {
// Column must be write only and not public.
n.run.rowIdxToRetIdx[i] = -1
n.run.rowIdxToTabColIdx[i] = -1
} else {
n.run.rowIdxToRetIdx[i] = idx
n.run.rowIdxToTabColIdx[i] = idx
}
}
}
Expand Down Expand Up @@ -567,10 +583,13 @@ func (n *insertNode) processSourceRow(params runParams, sourceVals tree.Datums)
// The downstream consumer will want the rows in the order of
// the table descriptor, not that of insertCols. Reorder them
// and ignore non-public columns.
if idx := n.run.rowIdxToRetIdx[i]; idx >= 0 {
n.run.resultRowBuffer[idx] = val
if tabIdx := n.run.rowIdxToTabColIdx[i]; tabIdx >= 0 {
if retIdx := n.run.tabIdxToRetIdx[tabIdx]; retIdx >= 0 {
n.run.resultRowBuffer[retIdx] = val
}
}
}

if _, err := n.run.rows.AddRow(params.ctx, n.run.resultRowBuffer); err != nil {
return err
}
Expand Down
11 changes: 7 additions & 4 deletions pkg/sql/opt/bench/stub_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ func (f *stubFactory) ConstructInsert(
input exec.Node,
table cat.Table,
insertCols exec.ColumnOrdinalSet,
returnCols exec.ColumnOrdinalSet,
checks exec.CheckOrdinalSet,
rowsNeeded bool,
skipFKChecks bool,
) (exec.Node, error) {
return struct{}{}, nil
Expand All @@ -234,8 +234,8 @@ func (f *stubFactory) ConstructUpdate(
table cat.Table,
fetchCols exec.ColumnOrdinalSet,
updateCols exec.ColumnOrdinalSet,
returnCols exec.ColumnOrdinalSet,
checks exec.CheckOrdinalSet,
rowsNeeded bool,
) (exec.Node, error) {
return struct{}{}, nil
}
Expand All @@ -247,14 +247,17 @@ func (f *stubFactory) ConstructUpsert(
insertCols exec.ColumnOrdinalSet,
fetchCols exec.ColumnOrdinalSet,
updateCols exec.ColumnOrdinalSet,
returnCols exec.ColumnOrdinalSet,
checks exec.CheckOrdinalSet,
rowsNeeded bool,
) (exec.Node, error) {
return struct{}{}, nil
}

func (f *stubFactory) ConstructDelete(
input exec.Node, table cat.Table, fetchCols exec.ColumnOrdinalSet, rowsNeeded bool,
input exec.Node,
table cat.Table,
fetchCols exec.ColumnOrdinalSet,
returnCols exec.ColumnOrdinalSet,
) (exec.Node, error) {
return struct{}{}, nil
}
Expand Down
15 changes: 11 additions & 4 deletions pkg/sql/opt/exec/execbuilder/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@ func (b *Builder) buildInsert(ins *memo.InsertExpr) (execPlan, error) {
tab := b.mem.Metadata().Table(ins.Table)
insertOrds := ordinalSetFromColList(ins.InsertCols)
checkOrds := ordinalSetFromColList(ins.CheckCols)
returnOrds := ordinalSetFromColList(ins.ReturnCols)
// If we planned FK checks, disable the execution code for FK checks.
disableExecFKs := len(ins.Checks) > 0
node, err := b.factory.ConstructInsert(
input.root,
tab,
insertOrds,
returnOrds,
checkOrds,
ins.NeedResults(),
disableExecFKs,
)
if err != nil {
Expand Down Expand Up @@ -106,14 +107,15 @@ func (b *Builder) buildUpdate(upd *memo.UpdateExpr) (execPlan, error) {
tab := md.Table(upd.Table)
fetchColOrds := ordinalSetFromColList(upd.FetchCols)
updateColOrds := ordinalSetFromColList(upd.UpdateCols)
returnColOrds := ordinalSetFromColList(upd.ReturnCols)
checkOrds := ordinalSetFromColList(upd.CheckCols)
node, err := b.factory.ConstructUpdate(
input.root,
tab,
fetchColOrds,
updateColOrds,
returnColOrds,
checkOrds,
upd.NeedResults(),
)
if err != nil {
return execPlan{}, err
Expand Down Expand Up @@ -177,6 +179,7 @@ func (b *Builder) buildUpsert(ups *memo.UpsertExpr) (execPlan, error) {
insertColOrds := ordinalSetFromColList(ups.InsertCols)
fetchColOrds := ordinalSetFromColList(ups.FetchCols)
updateColOrds := ordinalSetFromColList(ups.UpdateCols)
returnColOrds := ordinalSetFromColList(ups.ReturnCols)
checkOrds := ordinalSetFromColList(ups.CheckCols)
node, err := b.factory.ConstructUpsert(
input.root,
Expand All @@ -185,8 +188,8 @@ func (b *Builder) buildUpsert(ups *memo.UpsertExpr) (execPlan, error) {
insertColOrds,
fetchColOrds,
updateColOrds,
returnColOrds,
checkOrds,
ups.NeedResults(),
)
if err != nil {
return execPlan{}, err
Expand Down Expand Up @@ -230,7 +233,8 @@ func (b *Builder) buildDelete(del *memo.DeleteExpr) (execPlan, error) {
md := b.mem.Metadata()
tab := md.Table(del.Table)
fetchColOrds := ordinalSetFromColList(del.FetchCols)
node, err := b.factory.ConstructDelete(input.root, tab, fetchColOrds, del.NeedResults())
returnColOrds := ordinalSetFromColList(del.ReturnCols)
node, err := b.factory.ConstructDelete(input.root, tab, fetchColOrds, returnColOrds)
if err != nil {
return execPlan{}, err
}
Expand Down Expand Up @@ -310,6 +314,9 @@ func appendColsWhenPresent(dst, src opt.ColList) opt.ColList {
// indicating columns that are not involved in the mutation.
func ordinalSetFromColList(colList opt.ColList) exec.ColumnOrdinalSet {
var res util.FastIntSet
if colList == nil {
return res
}
for i, col := range colList {
if col != 0 {
res.Add(i)
Expand Down
30 changes: 14 additions & 16 deletions pkg/sql/opt/exec/execbuilder/testdata/ddl
Original file line number Diff line number Diff line change
Expand Up @@ -233,19 +233,17 @@ COMMIT
query TTTTT colnames
EXPLAIN (VERBOSE) SELECT * FROM v
----
tree field description columns ordering
render · · (k) ·
│ render 0 k · ·
└── run · · (k, v, z) ·
└── update · · (k, v, z) ·
│ table kv · ·
│ set v · ·
│ strategy updater · ·
└── render · · (k, v, z, column7) ·
│ render 0 k · ·
│ render 1 v · ·
│ render 2 z · ·
│ render 3 444 · ·
└── scan · · (k, v, z) ·
· table kv@primary · ·
· spans /1- · ·
tree field description columns ordering
run · · (k) ·
└── update · · (k) ·
│ table kv · ·
│ set v · ·
│ strategy updater · ·
└── render · · (k, v, z, column7) ·
│ render 0 k · ·
│ render 1 v · ·
│ render 2 z · ·
│ render 3 444 · ·
└── scan · · (k, v, z) ·
· table kv@primary · ·
· spans /1- · ·
19 changes: 8 additions & 11 deletions pkg/sql/opt/exec/execbuilder/testdata/delete
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,11 @@ count · ·
query TTT
EXPLAIN DELETE FROM indexed WHERE value = 5 LIMIT 10 RETURNING id
----
render · ·
└── run · ·
└── delete · ·
│ from indexed
│ strategy deleter
└── index-join · ·
│ table indexed@primary
└── scan · ·
· table indexed@indexed_value_idx
· spans /5-/6
· limit 10
run · ·
└── delete · ·
│ from indexed
│ strategy deleter
└── scan · ·
· table indexed@indexed_value_idx
· spans /5-/6
· limit 10
28 changes: 14 additions & 14 deletions pkg/sql/opt/exec/execbuilder/testdata/insert
Original file line number Diff line number Diff line change
Expand Up @@ -480,20 +480,20 @@ CREATE TABLE xyz (x INT, y INT, z INT)
query TTTTT
EXPLAIN (VERBOSE) SELECT * FROM [INSERT INTO xyz SELECT a, b, c FROM abc RETURNING z] ORDER BY z
----
render · · (z) +z
│ render 0 z · ·
└── run · · (x, y, z, rowid[hidden]) ·
└── insert · · (x, y, z, rowid[hidden]) ·
│ into xyz(x, y, z, rowid) · ·
│ strategy inserter · ·
└── render · · (a, b, c, column9) +c
│ render 0 a · ·
│ render 1 b · ·
│ render 2 c · ·
│ render 3 unique_rowid() · ·
└── scan · · (a, b, c) +c
· table abc@abc_c_idx · ·
· spans ALL · ·
render · · (z) +z
│ render 0 z · ·
└── run · · (z, rowid[hidden]) ·
└── insert · · (z, rowid[hidden]) ·
│ into xyz(x, y, z, rowid) · ·
│ strategy inserter · ·
└── render · · (a, b, c, column9) +c
│ render 0 a · ·
│ render 1 b · ·
│ render 2 c · ·
│ render 3 unique_rowid() · ·
└── scan · · (a, b, c) +c
· table abc@abc_c_idx · ·
· spans ALL · ·

# ------------------------------------------------------------------------------
# Regression for #35364. This tests behavior that is different between the CBO
Expand Down
26 changes: 13 additions & 13 deletions pkg/sql/opt/exec/execbuilder/testdata/orderby
Original file line number Diff line number Diff line change
Expand Up @@ -483,8 +483,8 @@ EXPLAIN (VERBOSE) INSERT INTO t(a, b) SELECT * FROM (SELECT 1 AS x, 2 AS y) ORDE
----
render · · (b) ·
│ render 0 b · ·
└── run · · (a, b, c) ·
└── insert · · (a, b, c) ·
└── run · · (a, b) ·
└── insert · · (a, b) ·
│ into t(a, b, c) · ·
│ strategy inserter · ·
└── values · · (x, y, column6) ·
Expand All @@ -496,23 +496,23 @@ render · · (b) ·
query TTTTT
EXPLAIN (VERBOSE) DELETE FROM t WHERE a = 3 RETURNING b
----
render · · (b) ·
│ render 0 b · ·
└── run · · (a, b, c) ·
└── delete · · (a, b, c) ·
│ from t · ·
│ strategy deleter · ·
└── scan · · (a, b, c) ·
· table t@primary · ·
· spans /3-/3/# · ·
render · · (b) ·
│ render 0 b · ·
└── run · · (a, b) ·
└── delete · · (a, b) ·
│ from t · ·
│ strategy deleter · ·
└── scan · · (a, b) ·
· table t@primary · ·
· spans /3-/3/# · ·

query TTTTT
EXPLAIN (VERBOSE) UPDATE t SET c = TRUE RETURNING b
----
render · · (b) ·
│ render 0 b · ·
└── run · · (a, b, c) ·
└── update · · (a, b, c) ·
└── run · · (a, b) ·
└── update · · (a, b) ·
│ table t · ·
│ set c · ·
│ strategy updater · ·
Expand Down
Loading

0 comments on commit c4a93b6

Please sign in to comment.