Skip to content

Commit

Permalink
Merge pull request #49624 from yuzefovich/backport20.1-49333
Browse files Browse the repository at this point in the history
release-20.1: colexec: fix premature calls to IdempotentClose in hash router outputs
  • Loading branch information
yuzefovich authored May 28, 2020
2 parents 83098c0 + 96124b8 commit c3c5087
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 9 deletions.
17 changes: 16 additions & 1 deletion pkg/sql/colexec/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,9 @@ type HashRouter struct {

// One output for each stream.
outputs []routerOutput
// closers is a slice of IdempotentClosers that need to be closed when the
// hash router terminates.
closers []IdempotentCloser

// unblockedEventsChan is a channel shared between the HashRouter and its
// outputs. outputs send events on this channel when they are unblocked by a
Expand Down Expand Up @@ -442,6 +445,7 @@ func NewHashRouter(
diskQueueCfg colcontainer.DiskQueueCfg,
fdSemaphore semaphore.Semaphore,
diskAccounts []*mon.BoundAccount,
toClose []IdempotentCloser,
) (*HashRouter, []Operator) {
if diskQueueCfg.CacheMode != colcontainer.DiskQueueCacheModeDefault {
execerror.VectorizedInternalPanic(errors.Errorf("hash router instantiated with incompatible disk queue cache mode: %d", diskQueueCfg.CacheMode))
Expand All @@ -462,7 +466,7 @@ func NewHashRouter(
outputs[i] = op
outputsAsOps[i] = op
}
router := newHashRouterWithOutputs(input, types, hashCols, unblockEventsChan, outputs)
router := newHashRouterWithOutputs(input, types, hashCols, unblockEventsChan, outputs, toClose)
for i := range outputs {
outputs[i].(*routerOutputOp).input = router
}
Expand All @@ -475,12 +479,14 @@ func newHashRouterWithOutputs(
hashCols []uint32,
unblockEventsChan <-chan struct{},
outputs []routerOutput,
toClose []IdempotentCloser,
) *HashRouter {
r := &HashRouter{
OneInputNode: NewOneInputNode(input),
types: types,
hashCols: hashCols,
outputs: outputs,
closers: toClose,
unblockedEventsChan: unblockEventsChan,
tupleDistributor: newTupleHashDistributor(defaultInitHashValue, len(outputs)),
}
Expand All @@ -496,6 +502,15 @@ func (r *HashRouter) Run(ctx context.Context) {
r.mu.bufferedMeta = append(r.mu.bufferedMeta, execinfrapb.ProducerMetadata{Err: err})
r.mu.Unlock()
}
defer func() {
for _, closer := range r.closers {
if err := closer.IdempotentClose(ctx); err != nil {
if log.V(1) {
log.Infof(ctx, "error closing IdempotentCloser: %v", err)
}
}
}
}()
// Since HashRouter runs in a separate goroutine, we want to be safe and
// make sure that we catch errors in all code paths, so we wrap the whole
// method with a catcher. Note that we also have "internal" catchers as
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/colexec/routers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ func TestHashRouterComputesDestination(t *testing.T) {
}
}

r := newHashRouterWithOutputs(in, []coltypes.T{coltypes.Int64}, []uint32{0}, nil /* ch */, outputs)
r := newHashRouterWithOutputs(in, []coltypes.T{coltypes.Int64}, []uint32{0}, nil /* ch */, outputs, nil /* toClose */)
for r.processNextBatch(ctx) {
}

Expand Down Expand Up @@ -628,7 +628,7 @@ func TestHashRouterCancellation(t *testing.T) {
in := NewRepeatableBatchSource(testAllocator, batch)

unbufferedCh := make(chan struct{})
r := newHashRouterWithOutputs(in, []coltypes.T{coltypes.Int64}, []uint32{0}, unbufferedCh, outputs)
r := newHashRouterWithOutputs(in, []coltypes.T{coltypes.Int64}, []uint32{0}, unbufferedCh, outputs, nil /* toClose */)

t.Run("BeforeRun", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -735,7 +735,7 @@ func TestHashRouterOneOutput(t *testing.T) {
r, routerOutputs := NewHashRouter(
[]*Allocator{testAllocator}, newOpFixedSelTestInput(sel, len(sel), data),
typs, []uint32{0}, mtc.bytes, queueCfg, NewTestingSemaphore(2),
[]*mon.BoundAccount{&diskAcc},
[]*mon.BoundAccount{&diskAcc}, nil, /* toClose */
)

if len(routerOutputs) != 1 {
Expand Down Expand Up @@ -851,7 +851,7 @@ func TestHashRouterRandom(t *testing.T) {
}

r := newHashRouterWithOutputs(
inputs[0], typs, hashCols, unblockEventsChan, outputs,
inputs[0], typs, hashCols, unblockEventsChan, outputs, nil, /* toClose */
)

var (
Expand Down Expand Up @@ -952,7 +952,7 @@ func BenchmarkHashRouter(b *testing.B) {
diskAccounts[i] = &diskAcc
defer diskAcc.Close(ctx)
}
r, outputs := NewHashRouter(allocators, input, types, []uint32{0}, 64<<20, queueCfg, &TestingSemaphore{}, diskAccounts)
r, outputs := NewHashRouter(allocators, input, types, []uint32{0}, 64<<20, queueCfg, &TestingSemaphore{}, diskAccounts, nil /* toClose */)
b.SetBytes(8 * int64(coldata.BatchSize()) * int64(numInputBatches))
// We expect distribution to not change. This is a sanity check that
// we're resetting properly.
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,8 @@ func (s *vectorizedFlowCreator) setupRouter(
}
diskMon, diskAccounts := s.createDiskAccounts(ctx, flowCtx, mmName, len(output.Streams))
router, outputs := colexec.NewHashRouter(
allocators, input, outputTyps, output.HashColumns, limit, s.diskQueueCfg, s.fdSemaphore, diskAccounts,
allocators, input, outputTyps, output.HashColumns, limit,
s.diskQueueCfg, s.fdSemaphore, diskAccounts, toClose,
)
runRouter := func(ctx context.Context, _ context.CancelFunc) {
logtags.AddTag(ctx, "hashRouterID", mmName)
Expand All @@ -686,8 +687,10 @@ func (s *vectorizedFlowCreator) setupRouter(
case execinfrapb.StreamEndpointSpec_SYNC_RESPONSE:
return errors.Errorf("unexpected sync response output when setting up router")
case execinfrapb.StreamEndpointSpec_REMOTE:
// Note that here we pass in nil 'toClose' slice because hash
// router is responsible for closing all of the idempotent closers.
if _, err := s.setupRemoteOutputStream(
ctx, flowCtx, op, outputTyps, stream, metadataSourcesQueue, toClose,
ctx, flowCtx, op, outputTyps, stream, metadataSourcesQueue, nil, /* toClose */
); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/vectorized_flow_shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func TestVectorizedFlowShutdown(t *testing.T) {
}
hashRouter, hashRouterOutputs := colexec.NewHashRouter(
allocators, hashRouterInput, typs, []uint32{0}, 64<<20, /* 64 MiB */
queueCfg, &colexec.TestingSemaphore{}, diskAccounts,
queueCfg, &colexec.TestingSemaphore{}, diskAccounts, nil, /* toClose */
)
for i := 0; i < numInboxes; i++ {
inboxMemAccount := testMemMonitor.MakeBoundAccount()
Expand Down

0 comments on commit c3c5087

Please sign in to comment.