diff --git a/pkg/sql/colexec/routers.go b/pkg/sql/colexec/routers.go index 867ebd4bcd94..5ae3d32b9f82 100644 --- a/pkg/sql/colexec/routers.go +++ b/pkg/sql/colexec/routers.go @@ -407,6 +407,9 @@ type HashRouter struct { // One output for each stream. outputs []routerOutput + // closers is a slice of IdempotentClosers that need to be closed when the + // hash router terminates. + closers []IdempotentCloser // unblockedEventsChan is a channel shared between the HashRouter and its // outputs. outputs send events on this channel when they are unblocked by a @@ -443,6 +446,7 @@ func NewHashRouter( diskQueueCfg colcontainer.DiskQueueCfg, fdSemaphore semaphore.Semaphore, diskAccounts []*mon.BoundAccount, + toClose []IdempotentCloser, ) (*HashRouter, []colexecbase.Operator) { if diskQueueCfg.CacheMode != colcontainer.DiskQueueCacheModeDefault { colexecerror.InternalError(errors.Errorf("hash router instantiated with incompatible disk queue cache mode: %d", diskQueueCfg.CacheMode)) @@ -463,7 +467,7 @@ func NewHashRouter( outputs[i] = op outputsAsOps[i] = op } - router := newHashRouterWithOutputs(input, types, hashCols, unblockEventsChan, outputs) + router := newHashRouterWithOutputs(input, types, hashCols, unblockEventsChan, outputs, toClose) for i := range outputs { outputs[i].(*routerOutputOp).input = router } @@ -476,12 +480,14 @@ func newHashRouterWithOutputs( hashCols []uint32, unblockEventsChan <-chan struct{}, outputs []routerOutput, + toClose []IdempotentCloser, ) *HashRouter { r := &HashRouter{ OneInputNode: NewOneInputNode(input), types: types, hashCols: hashCols, outputs: outputs, + closers: toClose, unblockedEventsChan: unblockEventsChan, tupleDistributor: newTupleHashDistributor(defaultInitHashValue, len(outputs)), } @@ -497,6 +503,15 @@ func (r *HashRouter) Run(ctx context.Context) { r.mu.bufferedMeta = append(r.mu.bufferedMeta, execinfrapb.ProducerMetadata{Err: err}) r.mu.Unlock() } + defer func() { + for _, closer := range r.closers { + if err := closer.IdempotentClose(ctx); err != nil { + if log.V(1) { + log.Infof(ctx, "error closing IdempotentCloser: %v", err) + } + } + } + }() // Since HashRouter runs in a separate goroutine, we want to be safe and // make sure that we catch errors in all code paths, so we wrap the whole // method with a catcher. Note that we also have "internal" catchers as diff --git a/pkg/sql/colexec/routers_test.go b/pkg/sql/colexec/routers_test.go index 9a310801ab1a..1759a01c2151 100644 --- a/pkg/sql/colexec/routers_test.go +++ b/pkg/sql/colexec/routers_test.go @@ -592,7 +592,7 @@ func TestHashRouterComputesDestination(t *testing.T) { } } - r := newHashRouterWithOutputs(in, typs, []uint32{0}, nil /* ch */, outputs) + r := newHashRouterWithOutputs(in, typs, []uint32{0}, nil /* ch */, outputs, nil /* toClose */) for r.processNextBatch(ctx) { } @@ -631,7 +631,7 @@ func TestHashRouterCancellation(t *testing.T) { in := colexecbase.NewRepeatableBatchSource(testAllocator, batch, typs) unbufferedCh := make(chan struct{}) - r := newHashRouterWithOutputs(in, typs, []uint32{0}, unbufferedCh, outputs) + r := newHashRouterWithOutputs(in, typs, []uint32{0}, unbufferedCh, outputs, nil /* toClose */) t.Run("BeforeRun", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -737,7 +737,7 @@ func TestHashRouterOneOutput(t *testing.T) { r, routerOutputs := NewHashRouter( []*colmem.Allocator{testAllocator}, newOpFixedSelTestInput(sel, len(sel), data, typs), typs, []uint32{0}, mtc.bytes, queueCfg, colexecbase.NewTestingSemaphore(2), - []*mon.BoundAccount{&diskAcc}, + []*mon.BoundAccount{&diskAcc}, nil, /* toClose */ ) if len(routerOutputs) != 1 { @@ -853,7 +853,7 @@ func TestHashRouterRandom(t *testing.T) { } r := newHashRouterWithOutputs( - inputs[0], typs, hashCols, unblockEventsChan, outputs, + inputs[0], typs, hashCols, unblockEventsChan, outputs, nil, /* toClose */ ) var ( @@ -953,7 +953,10 @@ func BenchmarkHashRouter(b *testing.B) { diskAccounts[i] = &diskAcc defer diskAcc.Close(ctx) } - r, outputs := NewHashRouter(allocators, input, typs, []uint32{0}, 64<<20, queueCfg, &colexecbase.TestingSemaphore{}, diskAccounts) + r, outputs := NewHashRouter( + allocators, input, typs, []uint32{0}, 64<<20, + queueCfg, &colexecbase.TestingSemaphore{}, diskAccounts, nil, /* toClose */ + ) b.SetBytes(8 * int64(coldata.BatchSize()) * int64(numInputBatches)) // We expect distribution to not change. This is a sanity check that // we're resetting properly. diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index c98842e99738..8fbd326eda2d 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -646,7 +646,8 @@ func (s *vectorizedFlowCreator) setupRouter( } diskMon, diskAccounts := s.createDiskAccounts(ctx, flowCtx, mmName, len(output.Streams)) router, outputs := colexec.NewHashRouter( - allocators, input, outputTyps, output.HashColumns, limit, s.diskQueueCfg, s.fdSemaphore, diskAccounts, + allocators, input, outputTyps, output.HashColumns, limit, + s.diskQueueCfg, s.fdSemaphore, diskAccounts, toClose, ) runRouter := func(ctx context.Context, _ context.CancelFunc) { logtags.AddTag(ctx, "hashRouterID", mmName) @@ -664,8 +665,10 @@ func (s *vectorizedFlowCreator) setupRouter( case execinfrapb.StreamEndpointSpec_SYNC_RESPONSE: return errors.Errorf("unexpected sync response output when setting up router") case execinfrapb.StreamEndpointSpec_REMOTE: + // Note that here we pass in nil 'toClose' slice because hash + // router is responsible for closing all of the idempotent closers. if _, err := s.setupRemoteOutputStream( - ctx, flowCtx, op, outputTyps, stream, metadataSourcesQueue, toClose, factory, + ctx, flowCtx, op, outputTyps, stream, metadataSourcesQueue, nil /* toClose */, factory, ); err != nil { return err } diff --git a/pkg/sql/colflow/vectorized_flow_shutdown_test.go b/pkg/sql/colflow/vectorized_flow_shutdown_test.go index 00dbab6ff8e4..e7fb8989b54c 100644 --- a/pkg/sql/colflow/vectorized_flow_shutdown_test.go +++ b/pkg/sql/colflow/vectorized_flow_shutdown_test.go @@ -180,7 +180,7 @@ func TestVectorizedFlowShutdown(t *testing.T) { } hashRouter, hashRouterOutputs := colexec.NewHashRouter( allocators, hashRouterInput, typs, []uint32{0}, 64<<20, /* 64 MiB */ - queueCfg, &colexecbase.TestingSemaphore{}, diskAccounts, + queueCfg, &colexecbase.TestingSemaphore{}, diskAccounts, nil, /* toClose */ ) for i := 0; i < numInboxes; i++ { inboxMemAccount := testMemMonitor.MakeBoundAccount()