Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
48842: sql: fix portals after exhausting rows r=yuzefovich a=yuzefovich

Previously, we would erroneously restart the execution from the very
beginning of empty, unclosed portals after they have been fully
exhausted when we should be returning no rows or an error in such
scenarios. This is now fixed by tracking whether a portal is exhausted
or not and intercepting the calls to `execStmt` when the conn executor
state machine is in an open state.

Note that the current solution has known deviations from Postgres:
- when attempting to execute portals of statements that don't return row
sets, on the second and consequent attempt PG returns an error while we
are silently doing nothing (meaning we do not run the statement at all
and return 0 rows)
- we incorrectly populate "command tag" field of pgwire messages of some
rows-returning statements after the portal suspension (for example,
a suspended UPDATE RETURNING in PG will return the total count of "rows
affected" while we will return the count since the last suspension).

These deviations are deemed acceptable since this commit fixes a much
worse problem - re-executing an exhausted portal (which could be
a mutation meaning, previously, we could have executed a mutation
multiple times).

The reasons for why this commit does not address these deviations are:
- Postgres has a concept of "portal strategy"
(see https://github.com/postgres/postgres/blob/2f9661311b83dc481fc19f6e3bda015392010a40/src/include/utils/portal.h#L89).
- Postgres has a concept of "command" type (these are things like
SELECTs, UPDATEs, INSERTs, etc,
see https://github.com/postgres/postgres/blob/1aac32df89eb19949050f6f27c268122833ad036/src/include/nodes/nodes.h#L672).

CRDB doesn't have these concepts, and without them any attempt to
simulate Postgres results in a very error-prone and brittle code.

Fixes: #48448.

Release note (bug fix): Previously, CockroachDB would erroneously
restart the execution of empty, unclosed portals after they have been
fully exhausted, and this has been fixed.

52098: sql: better distribute distinct processors r=yuzefovich a=yuzefovich

**sql: better distribute distinct processors**

The distinct processors are planned in two stages - first, a "local"
stage is planned on the same nodes as the previous stage, then,
a "global" stage is added. Previously, the global stage would be planned
on the gateway, and this commit changes that to make it distributed - by
planning "global" distinct processors on the same nodes as the "local"
ones and connecting them via a hash router hashing the distinct columns.

Release note: None

**sql: implement ConstructDistinct in the new factory**

Addresses: #47473.

Release note: None

52358: engine: set the MVCC timestamp on reads due to historical intents r=ajwerner a=ajwerner

Before this commit, we'd return a zero-value MVCC timestamp when reading an
intent from the intent history. This was problematic because elsewhere in the
code we assume that we will always get a non-zero MVCC timestamp when a read
returns a value. This is especially bizarre given that a read of the latest
intent will return its write timestamp.

The semantics here are such that we'll return the timestamp of the MVCC
metadata for the row. I think this is the most reasonable thing to do as
that timestamp ought to reflect the timestamp we return when we return the
current intent and furthermore is the only timestamp we really have around.
We could return the transactions current read or write timestamp but those
both seem like worse choices.

It's also worth noting that in the case where we need to read a non-zero
value, we don't really care what that value is and the fact that we are
reading this intent itself is somewhat suspect. That being said, I still
think we should make this change in addition to any change made to prevent
the code referenced in #49266 from needing it.

Fixes #49266.
Informs #50102.

Release note: None

52384: sql: properly reset extraTxnState in COPY r=ajwerner a=ajwerner

Apparently we support some sort of COPY protocol that I know nothing about.
We allow operations in that protocol in the open state and in the noTxn state
in the connExecutor. In the noTxn state we let the `copyMachine` handle its
transaction lifecycle all on its own. In that case, we also hand have the
`connExecutor` in a fresh state when calling `execCopyIn()`. During the
execution of `COPY`, it seems like sometime we can pick up table descriptor
leases. In the noTxn case we'd like to drop those leases and generally leave
the executor in the fresh state in which it was handed to us. To deal with
that, we call `resetExtraTxnState` before returning in the happy noTxn case.

Fixes #52065.

Release note (bug fix): Fixed a bug when using the COPY protocol which could
prevent schema changes for up to 5 minutes.

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
3 people committed Aug 5, 2020
5 parents 49c37d7 + cf0d5e4 + 229df27 + 55467a8 + f5b7b67 commit e86282e
Show file tree
Hide file tree
Showing 22 changed files with 748 additions and 173 deletions.
18 changes: 16 additions & 2 deletions c-deps/libroach/mvcc.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,21 @@ template <bool reverse> class mvccScanner {

rocksdb::Slice value = intent.value();
if (value.size() > 0 || tombstones_) {
kvs_->Put(cur_raw_key_, value);
// If we're adding a value due to a previous intent, as indicated by the
// zero-valued timestamp, we want to populate the timestamp as of current
// metaTimestamp. Note that this may be controversial as this maybe be
// neither the write timestamp when this intent was written. However, this
// was the only case in which a value could have been returned from a read
// without an MVCC timestamp.
if (cur_timestamp_ == kZeroTimestamp) {
auto meta_timestamp = meta_.timestamp();
auto key = EncodeKey(cur_key_,
meta_timestamp.wall_time(),
meta_timestamp.logical());
kvs_->Put(key, value);
} else {
kvs_->Put(cur_raw_key_, value);
}
}
return true;
}
Expand Down Expand Up @@ -296,7 +310,7 @@ template <bool reverse> class mvccScanner {
// version's timestamp and the scanner has been configured
// to throw a write too old error on more recent versions.
// Merge the current timestamp with the maximum timestamp
// we've seen so we know to return an error, but then keep
// we've seen so we know to return an error, but then keep
// scanning so that we can return the largest possible time.
if (cur_timestamp_ > most_recent_timestamp_) {
most_recent_timestamp_ = cur_timestamp_;
Expand Down
65 changes: 42 additions & 23 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,12 +634,13 @@ func (s *Server) newConnExecutor(
ex.phaseTimes[sessionInit] = timeutil.Now()
ex.extraTxnState.prepStmtsNamespace = prepStmtNamespace{
prepStmts: make(map[string]*PreparedStatement),
portals: make(map[string]*PreparedPortal),
portals: make(map[string]PreparedPortal),
}
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos = prepStmtNamespace{
prepStmts: make(map[string]*PreparedStatement),
portals: make(map[string]*PreparedPortal),
portals: make(map[string]PreparedPortal),
}
ex.extraTxnState.prepStmtsNamespaceMemAcc = ex.sessionMon.MakeBoundAccount()
ex.extraTxnState.descCollection = descs.MakeCollection(s.cfg.LeaseManager,
s.cfg.Settings, s.dbCache.getDatabaseCache(), s.dbCache)
ex.extraTxnState.txnRewindPos = -1
Expand Down Expand Up @@ -856,8 +857,13 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {

if closeType != panicClose {
// Close all statements and prepared portals.
ex.extraTxnState.prepStmtsNamespace.resetTo(ctx, prepStmtNamespace{})
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.resetTo(ctx, prepStmtNamespace{})
ex.extraTxnState.prepStmtsNamespace.resetTo(
ctx, prepStmtNamespace{}, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.resetTo(
ctx, prepStmtNamespace{}, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
ex.extraTxnState.prepStmtsNamespaceMemAcc.Close(ctx)
}

if ex.sessionTracing.Enabled() {
Expand Down Expand Up @@ -992,6 +998,12 @@ type connExecutor struct {
// collections, but these collections are periodically reconciled.
prepStmtsNamespaceAtTxnRewindPos prepStmtNamespace

// prepStmtsNamespaceMemAcc is the memory account that is shared
// between prepStmtsNamespace and prepStmtsNamespaceAtTxnRewindPos. It
// tracks the memory usage of portals and should be closed upon
// connExecutor's closure.
prepStmtsNamespaceMemAcc mon.BoundAccount

// onTxnFinish (if non-nil) will be called when txn is finished (either
// committed or aborted). It is set when txn is started but can remain
// unset when txn is executed within another higher-level txn.
Expand Down Expand Up @@ -1138,7 +1150,7 @@ type prepStmtNamespace struct {
// session.
prepStmts map[string]*PreparedStatement
// portals contains the portals currently available on the session.
portals map[string]*PreparedPortal
portals map[string]PreparedPortal
}

func (ns prepStmtNamespace) String() string {
Expand All @@ -1158,13 +1170,15 @@ func (ns prepStmtNamespace) String() string {
// references are release and all the to's references are duplicated.
//
// An empty `to` can be passed in to deallocate everything.
func (ns *prepStmtNamespace) resetTo(ctx context.Context, to prepStmtNamespace) {
func (ns *prepStmtNamespace) resetTo(
ctx context.Context, to prepStmtNamespace, prepStmtsNamespaceMemAcc *mon.BoundAccount,
) {
for name, p := range ns.prepStmts {
p.decRef(ctx)
delete(ns.prepStmts, name)
}
for name, p := range ns.portals {
p.decRef(ctx)
p.decRef(ctx, prepStmtsNamespaceMemAcc, name)
delete(ns.portals, name)
}

Expand Down Expand Up @@ -1195,7 +1209,7 @@ func (ex *connExecutor) resetExtraTxnState(

// Close all portals.
for name, p := range ex.extraTxnState.prepStmtsNamespace.portals {
p.decRef(ctx)
p.decRef(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name)
delete(ex.extraTxnState.prepStmtsNamespace.portals, name)
}

Expand Down Expand Up @@ -1400,10 +1414,11 @@ func (ex *connExecutor) execCmd(ctx context.Context) error {
// ExecPortal is handled like ExecStmt, except that the placeholder info
// is taken from the portal.

portal, ok := ex.extraTxnState.prepStmtsNamespace.portals[tcmd.Name]
portalName := tcmd.Name
portal, ok := ex.extraTxnState.prepStmtsNamespace.portals[portalName]
if !ok {
err := pgerror.Newf(
pgcode.InvalidCursorName, "unknown portal %q", tcmd.Name)
pgcode.InvalidCursorName, "unknown portal %q", portalName)
ev = eventNonRetriableErr{IsCommit: fsm.False}
payload = eventNonRetriableErrPayload{err: err}
res = ex.clientComm.CreateErrorResult(pos)
Expand Down Expand Up @@ -1443,18 +1458,11 @@ func (ex *connExecutor) execCmd(ctx context.Context) error {
pos, portal.OutFormats,
ex.sessionData.DataConversion,
tcmd.Limit,
tcmd.Name,
portalName,
ex.implicitTxn(),
)
res = stmtRes
curStmt := Statement{
Statement: portal.Stmt.Statement,
Prepared: portal.Stmt,
ExpectedTypes: portal.Stmt.Columns,
AnonymizedStr: portal.Stmt.AnonymizedStr,
}
stmtCtx := withStatement(ctx, ex.curStmt)
ev, payload, err = ex.execStmt(stmtCtx, curStmt, stmtRes, pinfo)
ev, payload, err = ex.execPortal(ctx, portal, portalName, stmtRes, pinfo)
if err != nil {
return err
}
Expand Down Expand Up @@ -1775,14 +1783,21 @@ func (ex *connExecutor) execCopyIn(
}

// If we're in an explicit txn, then the copying will be done within that
// txn. Otherwise, we tell the copyMachine to manage its own transactions.
// txn. Otherwise, we tell the copyMachine to manage its own transactions
// and give it a closure to reset the accumulated extraTxnState.
var txnOpt copyTxnOpt
if isOpen {
txnOpt = copyTxnOpt{
txn: ex.state.mu.txn,
txnTimestamp: ex.state.sqlTimestamp,
stmtTimestamp: ex.server.cfg.Clock.PhysicalTime(),
}
} else {
txnOpt = copyTxnOpt{
resetExtraTxnState: func(ctx context.Context) error {
return ex.resetExtraTxnState(ctx, ex.server.dbCache, noEvent)
},
}
}

var monToStop *mon.BytesMonitor
Expand Down Expand Up @@ -1860,14 +1875,16 @@ func (ex *connExecutor) generateID() ClusterWideID {
// prepStmtsNamespaceAtTxnRewindPos that's not part of prepStmtsNamespace.
func (ex *connExecutor) commitPrepStmtNamespace(ctx context.Context) {
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.resetTo(
ctx, ex.extraTxnState.prepStmtsNamespace)
ctx, ex.extraTxnState.prepStmtsNamespace, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
}

// commitPrepStmtNamespace deallocates everything in prepStmtsNamespace that's
// not part of prepStmtsNamespaceAtTxnRewindPos.
func (ex *connExecutor) rewindPrepStmtNamespace(ctx context.Context) {
ex.extraTxnState.prepStmtsNamespace.resetTo(
ctx, ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos)
ctx, ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
}

// getRewindTxnCapability checks whether rewinding to the position previously
Expand Down Expand Up @@ -2581,7 +2598,9 @@ func (ps connExPrepStmtsAccessor) Delete(ctx context.Context, name string) bool

// DeleteAll is part of the preparedStatementsAccessor interface.
func (ps connExPrepStmtsAccessor) DeleteAll(ctx context.Context) {
ps.ex.extraTxnState.prepStmtsNamespace.resetTo(ctx, prepStmtNamespace{})
ps.ex.extraTxnState.prepStmtsNamespace.resetTo(
ctx, prepStmtNamespace{}, &ps.ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
}

// contextStatementKey is an empty type for the handle associated with the
Expand Down
55 changes: 53 additions & 2 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,57 @@ func (ex *connExecutor) recordFailure() {
ex.metrics.EngineMetrics.FailureCount.Inc(1)
}

// execPortal executes a prepared statement. It is a "wrapper" around execStmt
// method that is performing additional work to track portal's state.
func (ex *connExecutor) execPortal(
ctx context.Context,
portal PreparedPortal,
portalName string,
stmtRes CommandResult,
pinfo *tree.PlaceholderInfo,
) (ev fsm.Event, payload fsm.EventPayload, err error) {
curStmt := Statement{
Statement: portal.Stmt.Statement,
Prepared: portal.Stmt,
ExpectedTypes: portal.Stmt.Columns,
AnonymizedStr: portal.Stmt.AnonymizedStr,
}
stmtCtx := withStatement(ctx, ex.curStmt)
switch ex.machine.CurState().(type) {
case stateOpen:
// We're about to execute the statement in an open state which
// could trigger the dispatch to the execution engine. However, it
// is possible that we're trying to execute an already exhausted
// portal - in such a scenario we should return no rows, but the
// execution engine is not aware of that and would run the
// statement as if it was running it for the first time. In order
// to prevent such behavior, we check whether the portal has been
// exhausted and execute the statement only if it hasn't. If it has
// been exhausted, then we do not dispatch the query for execution,
// but connExecutor will still perform necessary state transitions
// which will emit CommandComplete messages and alike (in a sense,
// by not calling execStmt we "execute" the portal in such a way
// that it returns 0 rows).
// Note that here we deviate from Postgres which returns an error
// when attempting to execute an exhausted portal which has a
// StatementType() different from "Rows".
if !portal.exhausted {
ev, payload, err = ex.execStmt(stmtCtx, curStmt, stmtRes, pinfo)
// Portal suspension is supported via a "side" state machine
// (see pgwire.limitedCommandResult for details), so when
// execStmt returns, we know for sure that the portal has been
// executed to completion, thus, it is exhausted.
// Note that the portal is considered exhausted regardless of
// the fact whether an error occurred or not - if it did, we
// still don't want to re-execute the portal from scratch.
ex.exhaustPortal(portalName)
}
default:
ev, payload, err = ex.execStmt(stmtCtx, curStmt, stmtRes, pinfo)
}
return
}

// execStmtInOpenState executes one statement in the context of the session's
// current transaction.
// It handles statements that affect the transaction state (BEGIN, COMMIT)
Expand Down Expand Up @@ -476,10 +527,10 @@ func (ex *connExecutor) execStmtInOpenState(
// is re-configured, re-set etc without using NewTxnWithSteppingEnabled().
//
// Manually hunting them down and calling ConfigureStepping() each
// time would be error prone (and increase the change that a future
// time would be error prone (and increase the chance that a future
// change would forget to add the call).
//
// TODO(andrei): really the code should be re-architectued to ensure
// TODO(andrei): really the code should be rearchitected to ensure
// that all uses of SQL execution initialize the client.Txn using a
// single/common function. That would be where the stepping mode
// gets enabled once for all SQL statements executed "underneath".
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/conn_executor_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ import (
)

// Test portal implicit destruction. Unless destroying a portal is explicitly
// requested, portals live until the end of the transaction in which
// they'recreated. If they're created outside of a transaction, they live until
// requested, portals live until the end of the transaction in which they're
// created. If they're created outside of a transaction, they live until
// the next transaction completes (so until the next statement is executed,
// which statement is expected to be the execution of the portal that was just
// created).
Expand Down
20 changes: 14 additions & 6 deletions pkg/sql/conn_executor_prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,7 @@ func (ex *connExecutor) execBind(
}

// Create the new PreparedPortal.
if err := ex.addPortal(
ctx, portalName, bindCmd.PreparedStatementName, ps, qargs, columnFormatCodes,
); err != nil {
if err := ex.addPortal(ctx, portalName, ps, qargs, columnFormatCodes); err != nil {
return retErr(err)
}

Expand All @@ -379,7 +377,6 @@ func (ex *connExecutor) execBind(
func (ex *connExecutor) addPortal(
ctx context.Context,
portalName string,
psName string,
stmt *PreparedStatement,
qargs tree.QueryArguments,
outFormats []pgwirebase.FormatCode,
Expand All @@ -388,7 +385,7 @@ func (ex *connExecutor) addPortal(
panic(errors.AssertionFailedf("portal already exists: %q", portalName))
}

portal, err := ex.newPreparedPortal(ctx, portalName, stmt, qargs, outFormats)
portal, err := ex.makePreparedPortal(ctx, portalName, stmt, qargs, outFormats)
if err != nil {
return err
}
Expand All @@ -397,6 +394,17 @@ func (ex *connExecutor) addPortal(
return nil
}

// exhaustPortal marks a portal with the provided name as "exhausted" and
// panics if there is no portal with such name.
func (ex *connExecutor) exhaustPortal(portalName string) {
portal, ok := ex.extraTxnState.prepStmtsNamespace.portals[portalName]
if !ok {
panic(errors.AssertionFailedf("portal %s doesn't exist", portalName))
}
portal.exhausted = true
ex.extraTxnState.prepStmtsNamespace.portals[portalName] = portal
}

func (ex *connExecutor) deletePreparedStmt(ctx context.Context, name string) {
ps, ok := ex.extraTxnState.prepStmtsNamespace.prepStmts[name]
if !ok {
Expand All @@ -411,7 +419,7 @@ func (ex *connExecutor) deletePortal(ctx context.Context, name string) {
if !ok {
return
}
portal.decRef(ctx)
portal.decRef(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name)
delete(ex.extraTxnState.prepStmtsNamespace.portals, name)
}

Expand Down
21 changes: 17 additions & 4 deletions pkg/sql/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ type copyTxnOpt struct {
txnTimestamp time.Time
stmtTimestamp time.Time
resetPlanner func(ctx context.Context, p *planner, txn *kv.Txn, txnTS time.Time, stmtTS time.Time)

// resetExecutor should be called upon completing a batch from the copy
// machine when the copy machine handles its own transaction.
resetExtraTxnState func(ctx context.Context) error
}

// run consumes all the copy-in data from the network connection and inserts it
Expand Down Expand Up @@ -309,8 +313,17 @@ func (p *planner) preparePlannerForCopy(
txnOpt.resetPlanner(ctx, p, txn, txnTs, stmtTs)
p.autoCommit = autoCommit

return func(ctx context.Context, err error) error {
if err == nil {
return func(ctx context.Context, prevErr error) (err error) {
// Ensure that we clean up any accumulated extraTxnState state if we've
// been handed a mechanism to do so.
if txnOpt.resetExtraTxnState != nil {
defer func() {
// Note: combine errors will return nil if both are nil and the
// non-nil error in the case that there's just one.
err = errors.CombineErrors(err, txnOpt.resetExtraTxnState(ctx))
}()
}
if prevErr == nil {
// Ensure that the txn is committed if the copyMachine is in charge of
// committing its transactions and the execution didn't already commit it
// (through the planner.autoCommit optimization).
Expand All @@ -319,8 +332,8 @@ func (p *planner) preparePlannerForCopy(
}
return nil
}
txn.CleanupOnError(ctx, err)
return err
txn.CleanupOnError(ctx, prevErr)
return prevErr
}
}

Expand Down
Loading

0 comments on commit e86282e

Please sign in to comment.