From 294a8a3bd7132b66cfbe001e6acc81ce37496950 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 20 May 2020 10:32:07 -0700 Subject: [PATCH] colexec: fix premature calls to IdempotentClose in hash router outputs Previously, we were passing in `toClose` slice of idempotent closers to every outbox that is the output of a hash router. However, it is possible that one stream will be closed before others, and this would prompt the corresponding outbox to close all of the closers prematurely. Other streams might still be active and ready to consume more data, but that single outbox would close everything. This is now fixed by making hash router responsible for closing the whole `toClose` slice, and it will do so right before exiting `Run` method. Release note (bug fix): Previously, CockroachDB could return an internal error or incorrect results on queries when they were run via the vectorized execution engine and had a hash router in the DistSQL plan. This could only occur with `vectorize=on`. --- pkg/sql/colexec/routers.go | 17 ++++++++++++++++- pkg/sql/colexec/routers_test.go | 13 ++++++++----- pkg/sql/colflow/vectorized_flow.go | 7 +++++-- .../colflow/vectorized_flow_shutdown_test.go | 2 +- 4 files changed, 30 insertions(+), 9 deletions(-) diff --git a/pkg/sql/colexec/routers.go b/pkg/sql/colexec/routers.go index 867ebd4bcd94..5ae3d32b9f82 100644 --- a/pkg/sql/colexec/routers.go +++ b/pkg/sql/colexec/routers.go @@ -407,6 +407,9 @@ type HashRouter struct { // One output for each stream. outputs []routerOutput + // closers is a slice of IdempotentClosers that need to be closed when the + // hash router terminates. + closers []IdempotentCloser // unblockedEventsChan is a channel shared between the HashRouter and its // outputs. outputs send events on this channel when they are unblocked by a @@ -443,6 +446,7 @@ func NewHashRouter( diskQueueCfg colcontainer.DiskQueueCfg, fdSemaphore semaphore.Semaphore, diskAccounts []*mon.BoundAccount, + toClose []IdempotentCloser, ) (*HashRouter, []colexecbase.Operator) { if diskQueueCfg.CacheMode != colcontainer.DiskQueueCacheModeDefault { colexecerror.InternalError(errors.Errorf("hash router instantiated with incompatible disk queue cache mode: %d", diskQueueCfg.CacheMode)) @@ -463,7 +467,7 @@ func NewHashRouter( outputs[i] = op outputsAsOps[i] = op } - router := newHashRouterWithOutputs(input, types, hashCols, unblockEventsChan, outputs) + router := newHashRouterWithOutputs(input, types, hashCols, unblockEventsChan, outputs, toClose) for i := range outputs { outputs[i].(*routerOutputOp).input = router } @@ -476,12 +480,14 @@ func newHashRouterWithOutputs( hashCols []uint32, unblockEventsChan <-chan struct{}, outputs []routerOutput, + toClose []IdempotentCloser, ) *HashRouter { r := &HashRouter{ OneInputNode: NewOneInputNode(input), types: types, hashCols: hashCols, outputs: outputs, + closers: toClose, unblockedEventsChan: unblockEventsChan, tupleDistributor: newTupleHashDistributor(defaultInitHashValue, len(outputs)), } @@ -497,6 +503,15 @@ func (r *HashRouter) Run(ctx context.Context) { r.mu.bufferedMeta = append(r.mu.bufferedMeta, execinfrapb.ProducerMetadata{Err: err}) r.mu.Unlock() } + defer func() { + for _, closer := range r.closers { + if err := closer.IdempotentClose(ctx); err != nil { + if log.V(1) { + log.Infof(ctx, "error closing IdempotentCloser: %v", err) + } + } + } + }() // Since HashRouter runs in a separate goroutine, we want to be safe and // make sure that we catch errors in all code paths, so we wrap the whole // method with a catcher. Note that we also have "internal" catchers as diff --git a/pkg/sql/colexec/routers_test.go b/pkg/sql/colexec/routers_test.go index 9a310801ab1a..1759a01c2151 100644 --- a/pkg/sql/colexec/routers_test.go +++ b/pkg/sql/colexec/routers_test.go @@ -592,7 +592,7 @@ func TestHashRouterComputesDestination(t *testing.T) { } } - r := newHashRouterWithOutputs(in, typs, []uint32{0}, nil /* ch */, outputs) + r := newHashRouterWithOutputs(in, typs, []uint32{0}, nil /* ch */, outputs, nil /* toClose */) for r.processNextBatch(ctx) { } @@ -631,7 +631,7 @@ func TestHashRouterCancellation(t *testing.T) { in := colexecbase.NewRepeatableBatchSource(testAllocator, batch, typs) unbufferedCh := make(chan struct{}) - r := newHashRouterWithOutputs(in, typs, []uint32{0}, unbufferedCh, outputs) + r := newHashRouterWithOutputs(in, typs, []uint32{0}, unbufferedCh, outputs, nil /* toClose */) t.Run("BeforeRun", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -737,7 +737,7 @@ func TestHashRouterOneOutput(t *testing.T) { r, routerOutputs := NewHashRouter( []*colmem.Allocator{testAllocator}, newOpFixedSelTestInput(sel, len(sel), data, typs), typs, []uint32{0}, mtc.bytes, queueCfg, colexecbase.NewTestingSemaphore(2), - []*mon.BoundAccount{&diskAcc}, + []*mon.BoundAccount{&diskAcc}, nil, /* toClose */ ) if len(routerOutputs) != 1 { @@ -853,7 +853,7 @@ func TestHashRouterRandom(t *testing.T) { } r := newHashRouterWithOutputs( - inputs[0], typs, hashCols, unblockEventsChan, outputs, + inputs[0], typs, hashCols, unblockEventsChan, outputs, nil, /* toClose */ ) var ( @@ -953,7 +953,10 @@ func BenchmarkHashRouter(b *testing.B) { diskAccounts[i] = &diskAcc defer diskAcc.Close(ctx) } - r, outputs := NewHashRouter(allocators, input, typs, []uint32{0}, 64<<20, queueCfg, &colexecbase.TestingSemaphore{}, diskAccounts) + r, outputs := NewHashRouter( + allocators, input, typs, []uint32{0}, 64<<20, + queueCfg, &colexecbase.TestingSemaphore{}, diskAccounts, nil, /* toClose */ + ) b.SetBytes(8 * int64(coldata.BatchSize()) * int64(numInputBatches)) // We expect distribution to not change. This is a sanity check that // we're resetting properly. diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index c98842e99738..8fbd326eda2d 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -646,7 +646,8 @@ func (s *vectorizedFlowCreator) setupRouter( } diskMon, diskAccounts := s.createDiskAccounts(ctx, flowCtx, mmName, len(output.Streams)) router, outputs := colexec.NewHashRouter( - allocators, input, outputTyps, output.HashColumns, limit, s.diskQueueCfg, s.fdSemaphore, diskAccounts, + allocators, input, outputTyps, output.HashColumns, limit, + s.diskQueueCfg, s.fdSemaphore, diskAccounts, toClose, ) runRouter := func(ctx context.Context, _ context.CancelFunc) { logtags.AddTag(ctx, "hashRouterID", mmName) @@ -664,8 +665,10 @@ func (s *vectorizedFlowCreator) setupRouter( case execinfrapb.StreamEndpointSpec_SYNC_RESPONSE: return errors.Errorf("unexpected sync response output when setting up router") case execinfrapb.StreamEndpointSpec_REMOTE: + // Note that here we pass in nil 'toClose' slice because hash + // router is responsible for closing all of the idempotent closers. if _, err := s.setupRemoteOutputStream( - ctx, flowCtx, op, outputTyps, stream, metadataSourcesQueue, toClose, factory, + ctx, flowCtx, op, outputTyps, stream, metadataSourcesQueue, nil /* toClose */, factory, ); err != nil { return err } diff --git a/pkg/sql/colflow/vectorized_flow_shutdown_test.go b/pkg/sql/colflow/vectorized_flow_shutdown_test.go index 00dbab6ff8e4..e7fb8989b54c 100644 --- a/pkg/sql/colflow/vectorized_flow_shutdown_test.go +++ b/pkg/sql/colflow/vectorized_flow_shutdown_test.go @@ -180,7 +180,7 @@ func TestVectorizedFlowShutdown(t *testing.T) { } hashRouter, hashRouterOutputs := colexec.NewHashRouter( allocators, hashRouterInput, typs, []uint32{0}, 64<<20, /* 64 MiB */ - queueCfg, &colexecbase.TestingSemaphore{}, diskAccounts, + queueCfg, &colexecbase.TestingSemaphore{}, diskAccounts, nil, /* toClose */ ) for i := 0; i < numInboxes; i++ { inboxMemAccount := testMemMonitor.MakeBoundAccount()