From 6d17dbc27a3d311bca283def8224492d16f73f91 Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Wed, 22 Sep 2021 21:19:17 -0400 Subject: [PATCH] Free resources in LocalExchangeSourceOperator::close --- velox/exec/LocalPartition.h | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/velox/exec/LocalPartition.h b/velox/exec/LocalPartition.h index 7a0ab64b270cd..88029672af917 100644 --- a/velox/exec/LocalPartition.h +++ b/velox/exec/LocalPartition.h @@ -85,6 +85,14 @@ class LocalExchangeSource { /// copied into the consumers memory pool. BlockingReason isFinished(ContinueFuture* future); + void close() { + queue_.withWLock([](auto& queue) { + while (!queue.empty()) { + queue.pop(); + } + }); + } + private: LocalExchangeMemoryManager* memoryManager_; const int partition_; @@ -120,6 +128,11 @@ class LocalExchangeSourceOperator : public SourceOperator { RowVectorPtr getOutput() override; + void close() override { + Operator::close(); + source_->close(); + } + private: const int partition_; const std::shared_ptr source_{nullptr};