diff --git a/pkg/sql/colexec/routers.go b/pkg/sql/colexec/routers.go index 498b4c1a0208..04258f7a8774 100644 --- a/pkg/sql/colexec/routers.go +++ b/pkg/sql/colexec/routers.go @@ -406,6 +406,9 @@ type HashRouter struct { // One output for each stream. outputs []routerOutput + // closers is a slice of IdempotentClosers that need to be closed when the + // hash router terminates. + closers []IdempotentCloser // unblockedEventsChan is a channel shared between the HashRouter and its // outputs. outputs send events on this channel when they are unblocked by a @@ -442,6 +445,7 @@ func NewHashRouter( diskQueueCfg colcontainer.DiskQueueCfg, fdSemaphore semaphore.Semaphore, diskAccounts []*mon.BoundAccount, + toClose []IdempotentCloser, ) (*HashRouter, []Operator) { if diskQueueCfg.CacheMode != colcontainer.DiskQueueCacheModeDefault { execerror.VectorizedInternalPanic(errors.Errorf("hash router instantiated with incompatible disk queue cache mode: %d", diskQueueCfg.CacheMode)) @@ -462,7 +466,7 @@ func NewHashRouter( outputs[i] = op outputsAsOps[i] = op } - router := newHashRouterWithOutputs(input, types, hashCols, unblockEventsChan, outputs) + router := newHashRouterWithOutputs(input, types, hashCols, unblockEventsChan, outputs, toClose) for i := range outputs { outputs[i].(*routerOutputOp).input = router } @@ -475,12 +479,14 @@ func newHashRouterWithOutputs( hashCols []uint32, unblockEventsChan <-chan struct{}, outputs []routerOutput, + toClose []IdempotentCloser, ) *HashRouter { r := &HashRouter{ OneInputNode: NewOneInputNode(input), types: types, hashCols: hashCols, outputs: outputs, + closers: toClose, unblockedEventsChan: unblockEventsChan, tupleDistributor: newTupleHashDistributor(defaultInitHashValue, len(outputs)), } @@ -496,6 +502,15 @@ func (r *HashRouter) Run(ctx context.Context) { r.mu.bufferedMeta = append(r.mu.bufferedMeta, execinfrapb.ProducerMetadata{Err: err}) r.mu.Unlock() } + defer func() { + for _, closer := range r.closers { + if err := closer.IdempotentClose(ctx); err != nil { + if log.V(1) { + log.Infof(ctx, "error closing IdempotentCloser: %v", err) + } + } + } + }() // Since HashRouter runs in a separate goroutine, we want to be safe and // make sure that we catch errors in all code paths, so we wrap the whole // method with a catcher. Note that we also have "internal" catchers as diff --git a/pkg/sql/colexec/routers_test.go b/pkg/sql/colexec/routers_test.go index fc2ea784708b..5cde022c0efb 100644 --- a/pkg/sql/colexec/routers_test.go +++ b/pkg/sql/colexec/routers_test.go @@ -590,7 +590,7 @@ func TestHashRouterComputesDestination(t *testing.T) { } } - r := newHashRouterWithOutputs(in, []coltypes.T{coltypes.Int64}, []uint32{0}, nil /* ch */, outputs) + r := newHashRouterWithOutputs(in, []coltypes.T{coltypes.Int64}, []uint32{0}, nil /* ch */, outputs, nil /* toClose */) for r.processNextBatch(ctx) { } @@ -628,7 +628,7 @@ func TestHashRouterCancellation(t *testing.T) { in := NewRepeatableBatchSource(testAllocator, batch) unbufferedCh := make(chan struct{}) - r := newHashRouterWithOutputs(in, []coltypes.T{coltypes.Int64}, []uint32{0}, unbufferedCh, outputs) + r := newHashRouterWithOutputs(in, []coltypes.T{coltypes.Int64}, []uint32{0}, unbufferedCh, outputs, nil /* toClose */) t.Run("BeforeRun", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -735,7 +735,7 @@ func TestHashRouterOneOutput(t *testing.T) { r, routerOutputs := NewHashRouter( []*Allocator{testAllocator}, newOpFixedSelTestInput(sel, len(sel), data), typs, []uint32{0}, mtc.bytes, queueCfg, NewTestingSemaphore(2), - []*mon.BoundAccount{&diskAcc}, + []*mon.BoundAccount{&diskAcc}, nil, /* toClose */ ) if len(routerOutputs) != 1 { @@ -851,7 +851,7 @@ func TestHashRouterRandom(t *testing.T) { } r := newHashRouterWithOutputs( - inputs[0], typs, hashCols, unblockEventsChan, outputs, + inputs[0], typs, hashCols, unblockEventsChan, outputs, nil, /* toClose */ ) var ( @@ -952,7 +952,7 @@ func BenchmarkHashRouter(b *testing.B) { diskAccounts[i] = &diskAcc defer diskAcc.Close(ctx) } - r, outputs := NewHashRouter(allocators, input, types, []uint32{0}, 64<<20, queueCfg, &TestingSemaphore{}, diskAccounts) + r, outputs := NewHashRouter(allocators, input, types, []uint32{0}, 64<<20, queueCfg, &TestingSemaphore{}, diskAccounts, nil /* toClose */) b.SetBytes(8 * int64(coldata.BatchSize()) * int64(numInputBatches)) // We expect distribution to not change. This is a sanity check that // we're resetting properly. diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 1ac1b248fa2e..98e883de2000 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -668,7 +668,8 @@ func (s *vectorizedFlowCreator) setupRouter( } diskMon, diskAccounts := s.createDiskAccounts(ctx, flowCtx, mmName, len(output.Streams)) router, outputs := colexec.NewHashRouter( - allocators, input, outputTyps, output.HashColumns, limit, s.diskQueueCfg, s.fdSemaphore, diskAccounts, + allocators, input, outputTyps, output.HashColumns, limit, + s.diskQueueCfg, s.fdSemaphore, diskAccounts, toClose, ) runRouter := func(ctx context.Context, _ context.CancelFunc) { logtags.AddTag(ctx, "hashRouterID", mmName) @@ -686,8 +687,10 @@ func (s *vectorizedFlowCreator) setupRouter( case execinfrapb.StreamEndpointSpec_SYNC_RESPONSE: return errors.Errorf("unexpected sync response output when setting up router") case execinfrapb.StreamEndpointSpec_REMOTE: + // Note that here we pass in nil 'toClose' slice because hash + // router is responsible for closing all of the idempotent closers. if _, err := s.setupRemoteOutputStream( - ctx, flowCtx, op, outputTyps, stream, metadataSourcesQueue, toClose, + ctx, flowCtx, op, outputTyps, stream, metadataSourcesQueue, nil, /* toClose */ ); err != nil { return err } diff --git a/pkg/sql/colflow/vectorized_flow_shutdown_test.go b/pkg/sql/colflow/vectorized_flow_shutdown_test.go index c4c1814898ad..f1c54641e00b 100644 --- a/pkg/sql/colflow/vectorized_flow_shutdown_test.go +++ b/pkg/sql/colflow/vectorized_flow_shutdown_test.go @@ -179,7 +179,7 @@ func TestVectorizedFlowShutdown(t *testing.T) { } hashRouter, hashRouterOutputs := colexec.NewHashRouter( allocators, hashRouterInput, typs, []uint32{0}, 64<<20, /* 64 MiB */ - queueCfg, &colexec.TestingSemaphore{}, diskAccounts, + queueCfg, &colexec.TestingSemaphore{}, diskAccounts, nil, /* toClose */ ) for i := 0; i < numInboxes; i++ { inboxMemAccount := testMemMonitor.MakeBoundAccount()