From 8550885fe2e3e959393b8c1b848165e47411449c Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 15 Jul 2022 15:30:03 +0800 Subject: [PATCH 1/6] [OPPRO-174] Velox memory pool backed by Gluten's context allocator --- cpp/src/compute/kernels_ext.h | 3 +- cpp/src/compute/substrait_utils.cc | 3 +- cpp/src/jni/jni_common.h | 2 +- cpp/src/jni/jni_wrapper.cc | 7 +- cpp/src/memory/allocator.cc | 33 ++- cpp/src/memory/allocator.h | 10 +- cpp/src/operators/shuffle/type.h | 3 +- cpp/velox/CMakeLists.txt | 1 + cpp/velox/memory/velox_allocator.cc | 421 ++++++++++++++++++++++++++++ cpp/velox/memory/velox_allocator.h | 36 +++ 10 files changed, 492 insertions(+), 27 deletions(-) create mode 100644 cpp/velox/memory/velox_allocator.cc create mode 100644 cpp/velox/memory/velox_allocator.h diff --git a/cpp/src/compute/kernels_ext.h b/cpp/src/compute/kernels_ext.h index 9334bda4659f..15e6eff59f85 100644 --- a/cpp/src/compute/kernels_ext.h +++ b/cpp/src/compute/kernels_ext.h @@ -50,7 +50,8 @@ class LazyReadIterator { arrow::Status Next(std::shared_ptr* out); private: - arrow::MemoryPool* pool_ = gluten::memory::GetDefaultWrappedArrowMemoryPool(); + arrow::MemoryPool* pool_ = + gluten::memory::GetDefaultWrappedArrowMemoryPool().get(); std::shared_ptr array_iter_; bool need_process_ = false; bool no_next_ = false; diff --git a/cpp/src/compute/substrait_utils.cc b/cpp/src/compute/substrait_utils.cc index 503aaaf62aa1..fea48cbe9f85 100644 --- a/cpp/src/compute/substrait_utils.cc +++ b/cpp/src/compute/substrait_utils.cc @@ -332,7 +332,8 @@ class SubstraitParser::FirstStageResultIterator { } private: - arrow::MemoryPool* pool_ = gluten::memory::GetDefaultWrappedArrowMemoryPool(); + arrow::MemoryPool* pool_ = + gluten::memory::GetDefaultWrappedArrowMemoryPool().get(); std::unique_ptr builder_; bool has_next_ = true; // std::vector> res_arrays; diff --git a/cpp/src/jni/jni_common.h b/cpp/src/jni/jni_common.h index a511cdd13b53..75217b3807ad 100644 --- a/cpp/src/jni/jni_common.h +++ b/cpp/src/jni/jni_common.h @@ -299,7 +299,7 @@ jbyteArray ToSchemaByteArray( // std::shared_ptr buffer; arrow::Result> maybe_buffer; maybe_buffer = arrow::ipc::SerializeSchema( - *schema.get(), gluten::memory::GetDefaultWrappedArrowMemoryPool()); + *schema.get(), gluten::memory::GetDefaultWrappedArrowMemoryPool().get()); if (!status.ok()) { std::string error_message = "Unable to convert schema to byte array, err is " + status.message(); diff --git a/cpp/src/jni/jni_wrapper.cc b/cpp/src/jni/jni_wrapper.cc index 658d38e3c03b..e0b3c251b776 100644 --- a/cpp/src/jni/jni_wrapper.cc +++ b/cpp/src/jni/jni_wrapper.cc @@ -292,7 +292,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { env, java_reservation_listener_class, "unreserve", "(J)V"); default_memory_allocator_id = - reinterpret_cast(gluten::memory::DefaultMemoryAllocator()); + reinterpret_cast(gluten::memory::DefaultMemoryAllocator().get()); return JNI_VERSION; } @@ -818,7 +818,8 @@ Java_io_glutenproject_vectorized_ShuffleDecompressionJniWrapper_decompress( // decompress buffers auto options = arrow::ipc::IpcReadOptions::Defaults(); - options.memory_pool = gluten::memory::GetDefaultWrappedArrowMemoryPool(); + options.memory_pool = + gluten::memory::GetDefaultWrappedArrowMemoryPool().get(); options.use_threads = false; gluten::JniAssertOkOrThrow( DecompressBuffers( @@ -884,7 +885,7 @@ Java_io_glutenproject_memory_NativeMemoryAllocator_createListenableAllocator( unreserve_memory_method, 8L << 10 << 10); auto allocator = new gluten::memory::ListenableMemoryAllocator( - gluten::memory::DefaultMemoryAllocator(), listener); + gluten::memory::DefaultMemoryAllocator().get(), listener); return reinterpret_cast(allocator); JNI_METHOD_END(-1L) } diff --git a/cpp/src/memory/allocator.cc b/cpp/src/memory/allocator.cc index 0cb7ba112463..336a4d589f48 100644 --- a/cpp/src/memory/allocator.cc +++ b/cpp/src/memory/allocator.cc @@ -175,12 +175,7 @@ int64_t gluten::memory::StdMemoryAllocator::GetBytes() { return bytes_; } -gluten::memory::MemoryAllocator* gluten::memory::DefaultMemoryAllocator() { - static MemoryAllocator* alloc = new StdMemoryAllocator(); - return alloc; -} - -arrow::Status gluten::memory::WrappedMemoryPool::Allocate( +arrow::Status gluten::memory::WrappedArrowMemoryPool::Allocate( int64_t size, uint8_t** out) { if (!allocator_->Allocate(size, reinterpret_cast(out))) { @@ -191,7 +186,7 @@ arrow::Status gluten::memory::WrappedMemoryPool::Allocate( return arrow::Status::OK(); } -arrow::Status gluten::memory::WrappedMemoryPool::Reallocate( +arrow::Status gluten::memory::WrappedArrowMemoryPool::Reallocate( int64_t old_size, int64_t new_size, uint8_t** ptr) { @@ -204,26 +199,36 @@ arrow::Status gluten::memory::WrappedMemoryPool::Reallocate( return arrow::Status::OK(); } -void gluten::memory::WrappedMemoryPool::Free(uint8_t* buffer, int64_t size) { +void gluten::memory::WrappedArrowMemoryPool::Free( + uint8_t* buffer, + int64_t size) { allocator_->Free(buffer, size); } -int64_t gluten::memory::WrappedMemoryPool::bytes_allocated() const { +int64_t gluten::memory::WrappedArrowMemoryPool::bytes_allocated() const { // fixme use self accountant return allocator_->GetBytes(); } -std::string gluten::memory::WrappedMemoryPool::backend_name() const { +std::string gluten::memory::WrappedArrowMemoryPool::backend_name() const { return "gluten allocator"; } +std::shared_ptr +gluten::memory::DefaultMemoryAllocator() { + static std::shared_ptr alloc = + std::make_shared(); + return alloc; +} + std::shared_ptr gluten::memory::AsWrappedArrowMemoryPool( gluten::memory::MemoryAllocator* allocator) { - return std::make_shared(allocator); + return std::make_shared(allocator); } -arrow::MemoryPool* gluten::memory::GetDefaultWrappedArrowMemoryPool() { +std::shared_ptr +gluten::memory::GetDefaultWrappedArrowMemoryPool() { static auto static_pool = - AsWrappedArrowMemoryPool(gluten::memory::DefaultMemoryAllocator()); - return static_pool.get(); + AsWrappedArrowMemoryPool(gluten::memory::DefaultMemoryAllocator().get()); + return static_pool; } diff --git a/cpp/src/memory/allocator.h b/cpp/src/memory/allocator.h index d52c3e782f76..a92c8ad305ff 100644 --- a/cpp/src/memory/allocator.h +++ b/cpp/src/memory/allocator.h @@ -117,12 +117,10 @@ class StdMemoryAllocator : public MemoryAllocator { std::atomic_int64_t bytes_{0}; }; -MemoryAllocator* DefaultMemoryAllocator(); - // TODO aligned allocation -class WrappedMemoryPool : public arrow::MemoryPool { +class WrappedArrowMemoryPool : public arrow::MemoryPool { public: - explicit WrappedMemoryPool(MemoryAllocator* allocator) + explicit WrappedArrowMemoryPool(MemoryAllocator* allocator) : allocator_(allocator) {} arrow::Status Allocate(int64_t size, uint8_t** out) override; @@ -140,10 +138,12 @@ class WrappedMemoryPool : public arrow::MemoryPool { MemoryAllocator* allocator_; }; +std::shared_ptr DefaultMemoryAllocator(); + std::shared_ptr AsWrappedArrowMemoryPool( MemoryAllocator* allocator); -arrow::MemoryPool* GetDefaultWrappedArrowMemoryPool(); +std::shared_ptr GetDefaultWrappedArrowMemoryPool(); } // namespace memory } // namespace gluten diff --git a/cpp/src/operators/shuffle/type.h b/cpp/src/operators/shuffle/type.h index 399d68cad98d..44ff14a46aca 100644 --- a/cpp/src/operators/shuffle/type.h +++ b/cpp/src/operators/shuffle/type.h @@ -54,8 +54,7 @@ struct SplitOptions { int64_t task_attempt_id = -1; std::shared_ptr memory_pool = - gluten::memory::AsWrappedArrowMemoryPool( - gluten::memory::DefaultMemoryAllocator()); + gluten::memory::GetDefaultWrappedArrowMemoryPool(); arrow::ipc::IpcWriteOptions ipc_write_options = arrow::ipc::IpcWriteOptions::Defaults(); diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index 264cba185b41..b86e97ff1c09 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -141,6 +141,7 @@ set(VELOX_SRCS compute/VeloxToRowConverter.cc compute/DwrfDatasource.cc compute/bridge.cc + memory/velox_allocator.cc ) add_library(velox SHARED ${VELOX_SRCS}) diff --git a/cpp/velox/memory/velox_allocator.cc b/cpp/velox/memory/velox_allocator.cc new file mode 100644 index 000000000000..1f98a77d65df --- /dev/null +++ b/cpp/velox/memory/velox_allocator.cc @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox_allocator.h" + +namespace gluten { +namespace memory { + +class VeloxMemoryAllocatorVariant { + public: + VeloxMemoryAllocatorVariant(MemoryAllocator* gluten_alloc) + : gluten_alloc_(gluten_alloc), + manager_(std::make_shared>()) {} + + static std::shared_ptr createDefaultAllocator() { + static std::shared_ptr velox_alloc = + std::make_shared( + DefaultMemoryAllocator().get()); + return velox_alloc; + } + + void* alloc(int64_t size) { + void* out; + if (!gluten_alloc_->Allocate(size, &out)) { + VELOX_FAIL( + "VeloxMemoryAllocatorVariant: Failed to allocate " + + std::to_string(size) + " bytes") + } + return out; + } + + void* allocZeroFilled(int64_t numMembers, int64_t sizeEach) { + void* out; + if (!gluten_alloc_->AllocateZeroFilled(numMembers, sizeEach, &out)) { + VELOX_FAIL( + "VeloxMemoryAllocatorVariant: Failed to allocate (zero filled) " + + std::to_string(numMembers) + " members, " + std::to_string(sizeEach) + + " bytes for each") + } + return out; + } + + void* allocAligned(uint16_t alignment, int64_t size) { + void* out; + if (!gluten_alloc_->AllocateAligned(alignment, size, &out)) { + VELOX_FAIL( + "VeloxMemoryAllocatorVariant: Failed to allocate (aligned) " + + std::to_string(size) + " bytes") + } + return out; + } + + void* realloc(void* p, int64_t size, int64_t newSize) { + void* out; + if (!gluten_alloc_->Reallocate(p, size, newSize, &out)) { + VELOX_FAIL( + "VeloxMemoryAllocatorVariant: Failed to reallocate " + + std::to_string(newSize) + " bytes") + } + return out; + } + + void* + reallocAligned(void* p, uint16_t alignment, int64_t size, int64_t newSize) { + void* out; + if (!gluten_alloc_->ReallocateAligned(p, alignment, size, newSize, &out)) { + VELOX_FAIL( + "VeloxMemoryAllocatorVariant: Failed to reallocate (aligned) " + + std::to_string(newSize) + " bytes") + } + return out; + } + + void free(void* p, int64_t size) { + if (!gluten_alloc_->Free(p, size)) { + VELOX_FAIL( + "VeloxMemoryAllocatorVariant: Failed to free " + + std::to_string(size) + " bytes") + } + } + + private: + MemoryAllocator* gluten_alloc_; + std::shared_ptr> + manager_; +}; + +// The code was originated from /velox/common/memory/Memory.h +template < + typename Allocator = VeloxMemoryAllocatorVariant, + uint16_t ALIGNMENT = kNoAlignment> +class WrappedVeloxMemoryPoolImpl + : public facebook::velox::memory::MemoryPoolBase { + public: + // Should perhaps make this method private so that we only create node through + // parent. + explicit WrappedVeloxMemoryPoolImpl( + const std::string& name, + std::weak_ptr parent, + std::shared_ptr allocator, + int64_t cap = kMaxMemory) + : MemoryPoolBase{name, parent}, + localMemoryUsage_{}, + cap_{cap}, + allocator_{allocator} { + VELOX_USER_CHECK_GT(cap, 0); + }; + + // Actual memory allocation operations. Can be delegated. + // Access global MemoryManager to check usage of current node and enforce + // memory cap accordingly. Since MemoryManager walks the MemoryPoolImpl + // tree periodically, this is slightly stale and we have to reserve our own + // overhead. + void* FOLLY_NULLABLE allocate(int64_t size) { + if (this->isMemoryCapped()) { + VELOX_MEM_MANUAL_CAP(); + } + auto alignedSize = sizeAlign(ALIGNER{}, size); + reserve(alignedSize); + return allocAligned(ALIGNER{}, alignedSize); + } + + void* FOLLY_NULLABLE + allocateZeroFilled(int64_t numMembers, int64_t sizeEach) { + VELOX_USER_CHECK_EQ(sizeEach, 1); + auto alignedSize = sizeAlign(ALIGNER{}, numMembers); + if (this->isMemoryCapped()) { + VELOX_MEM_MANUAL_CAP(); + } + reserve(alignedSize * sizeEach); + return allocator_->allocZeroFilled(alignedSize, sizeEach); + } + + // No-op for attempts to shrink buffer. + void* FOLLY_NULLABLE + reallocate(void* FOLLY_NULLABLE p, int64_t size, int64_t newSize) { + auto alignedSize = sizeAlign(ALIGNER{}, size); + auto alignedNewSize = sizeAlign(ALIGNER{}, newSize); + int64_t difference = alignedNewSize - alignedSize; + if (UNLIKELY(difference <= 0)) { + // Track and pretend the shrink took place for accounting purposes. + release(-difference); + return p; + } + + reserve(difference); + void* newP = reallocAligned( + ALIGNER{}, p, alignedSize, alignedNewSize); + if (UNLIKELY(!newP)) { + free(p, alignedSize); + VELOX_MEM_CAP_EXCEEDED(cap_); + } + + return newP; + } + + void free(void* FOLLY_NULLABLE p, int64_t size) { + auto alignedSize = sizeAlign(ALIGNER{}, size); + allocator_->free(p, alignedSize); + release(alignedSize); + } + + //////////////////// Memory Management methods ///////////////////// + // Library checks for low memory mode on a push model. The respective root, + // component level or global, would compute for memory pressure. + // This is the signaling mechanism the customer application can use to make + // all subcomponents start trimming memory usage. + // virtual bool shouldTrim() const { + // return trimming_; + // } + // // Set by MemoryManager in periodic refresh threads. Stores the trim + // target + // // state potentially for a more granular/simplified global control. + // virtual void startTrimming(int64_t target) { + // trimming_ = true; + // trimTarget_ = target; + // } + // // Resets the trim flag and trim target. + // virtual void stopTrimming() { + // trimming_ = false; + // trimTarget_ = std::numeric_limits::max(); + // } + + // TODO: Consider putting these in base class also. + int64_t getCurrentBytes() const { + return getAggregateBytes(); + } + + int64_t getMaxBytes() const { + return std::max(getSubtreeMaxBytes(), localMemoryUsage_.getMaxBytes()); + } + + void setMemoryUsageTracker( + const std::shared_ptr& + tracker) { + memoryUsageTracker_ = tracker; + } + const std::shared_ptr& + getMemoryUsageTracker() const { + return memoryUsageTracker_; + } + void setSubtreeMemoryUsage(int64_t size) { + updateSubtreeMemoryUsage( + [size](facebook::velox::memory::MemoryUsage& subtreeUsage) { + subtreeUsage.setCurrentBytes(size); + }); + } + int64_t updateSubtreeMemoryUsage(int64_t size) { + int64_t aggregateBytes; + updateSubtreeMemoryUsage( + [&aggregateBytes, + size](facebook::velox::memory::MemoryUsage& subtreeUsage) { + aggregateBytes = subtreeUsage.getCurrentBytes() + size; + subtreeUsage.setCurrentBytes(aggregateBytes); + }); + return aggregateBytes; + } + // Get the cap for the memory node and its subtree. + int64_t getCap() const { + return cap_; + } + uint16_t getAlignment() const { + return ALIGNMENT; + } + + void capMemoryAllocation() { + capped_.store(true); + for (const auto& child : children_) { + child->capMemoryAllocation(); + } + } + void uncapMemoryAllocation() { + // This means if we try to post-order traverse the tree like we do + // in MemoryManager, only parent has the right to lift the cap. + // This suffices because parent will then recursively lift the cap on the + // entire tree. + if (getAggregateBytes() > getCap()) { + return; + } + if (auto parentPtr = parent_.lock()) { + if (parentPtr->isMemoryCapped()) { + return; + } + } + capped_.store(false); + visitChildren([](MemoryPool* child) { child->uncapMemoryAllocation(); }); + } + bool isMemoryCapped() const { + return capped_.load(); + } + + std::shared_ptr genChild( + std::weak_ptr parent, + const std::string& name, + int64_t cap) { + return std::make_shared>( + name, parent, allocator_, cap); + } + // Gets the memory allocation stats of the MemoryPoolImpl attached to the + // current MemoryPoolImpl. Not to be confused with total memory usage of the + // subtree. + const facebook::velox::memory::MemoryUsage& getLocalMemoryUsage() const { + return localMemoryUsage_; + } + + // Get the total memory consumption of the subtree, self + all recursive + // children. + int64_t getAggregateBytes() const { + int64_t aggregateBytes = localMemoryUsage_.getCurrentBytes(); + accessSubtreeMemoryUsage( + [&aggregateBytes]( + const facebook::velox::memory::MemoryUsage& subtreeUsage) { + aggregateBytes += subtreeUsage.getCurrentBytes(); + }); + return aggregateBytes; + } + int64_t getSubtreeMaxBytes() const { + int64_t maxBytes; + accessSubtreeMemoryUsage( + [&maxBytes](const facebook::velox::memory::MemoryUsage& subtreeUsage) { + maxBytes = subtreeUsage.getMaxBytes(); + }); + return maxBytes; + } + + private: + VELOX_FRIEND_TEST(MemoryPoolTest, Ctor); + + template + struct ALIGNER {}; + + template > + int64_t sizeAlign(ALIGNER /* unused */, int64_t size) { + auto remainder = size % ALIGNMENT; + return (remainder == 0) ? size : (size + ALIGNMENT - remainder); + } + + template + int64_t sizeAlign(ALIGNER /* unused */, int64_t size) { + return size; + } + + template > + void* FOLLY_NULLABLE allocAligned(ALIGNER /* unused */, int64_t size) { + return allocator_.allocAligned(A, size); + } + + template + void* FOLLY_NULLABLE + allocAligned(ALIGNER /* unused */, int64_t size) { + return allocator_->alloc(size); + } + + template > + void* FOLLY_NULLABLE reallocAligned( + ALIGNER /* unused */, + void* FOLLY_NULLABLE p, + int64_t size, + int64_t newSize) { + return allocator_.reallocAligned(p, A, size, newSize); + } + + template + void* FOLLY_NULLABLE reallocAligned( + ALIGNER /* unused */, + void* FOLLY_NULLABLE p, + int64_t size, + int64_t newSize) { + return allocator_->realloc(p, size, newSize); + } + + void accessSubtreeMemoryUsage( + std::function visitor) + const { + folly::SharedMutex::ReadHolder readLock{subtreeUsageMutex_}; + visitor(subtreeMemoryUsage_); + } + void updateSubtreeMemoryUsage( + std::function visitor) { + folly::SharedMutex::WriteHolder writeLock{subtreeUsageMutex_}; + visitor(subtreeMemoryUsage_); + } + + // TODO: consider returning bool instead. + void reserve(int64_t size) { + if (memoryUsageTracker_) { + memoryUsageTracker_->update(size); + } + localMemoryUsage_.incrementCurrentBytes(size); + bool manualCap = isMemoryCapped(); + int64_t aggregateBytes = getAggregateBytes(); + if (UNLIKELY(manualCap || aggregateBytes > cap_)) { + // NOTE: If we can make the reserve and release a single transaction we + // would have more accurate aggregates in intermediate states. However, + // this is low-pri because we can only have inflated aggregates, and be on + // the more conservative side. + release(size); + if (manualCap) { + VELOX_MEM_MANUAL_CAP(); + } + VELOX_MEM_CAP_EXCEEDED(cap_); + } + } + + void release(int64_t size) { + localMemoryUsage_.incrementCurrentBytes(-size); + if (memoryUsageTracker_) { + memoryUsageTracker_->update(-size); + } + } + + // Memory allocated attributed to the memory node. + facebook::velox::memory::MemoryUsage localMemoryUsage_; + std::shared_ptr + memoryUsageTracker_; + mutable folly::SharedMutex subtreeUsageMutex_; + facebook::velox::memory::MemoryUsage subtreeMemoryUsage_; + int64_t cap_; + std::atomic_bool capped_{false}; + + std::shared_ptr allocator_; +}; + +std::shared_ptr AsWrappedVeloxMemoryPool( + MemoryAllocator* allocator) { + return std::make_shared< + WrappedVeloxMemoryPoolImpl>( + "wrapped", + std::weak_ptr(), + std::make_shared(allocator)); +} + +std::shared_ptr +GetDefaultWrappedVeloxMemoryPool() { + static auto default_pool = + std::make_shared>( + "root", + std::weak_ptr(), + VeloxMemoryAllocatorVariant::createDefaultAllocator()); + return default_pool; +} + +} // namespace memory +} // namespace gluten \ No newline at end of file diff --git a/cpp/velox/memory/velox_allocator.h b/cpp/velox/memory/velox_allocator.h new file mode 100644 index 000000000000..b3afb007e526 --- /dev/null +++ b/cpp/velox/memory/velox_allocator.h @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "memory/allocator.h" +#include "velox/common/memory/Memory.h" + +namespace gluten { +namespace memory { + +constexpr uint16_t kNoAlignment = facebook::velox::memory::kNoAlignment; +constexpr int64_t kMaxMemory = facebook::velox::memory::kMaxMemory; + +std::shared_ptr AsWrappedVeloxMemoryPool( + gluten::memory::MemoryAllocator* allocator); + +std::shared_ptr +GetDefaultWrappedVeloxMemoryPool(); + +} // namespace memory +} // namespace gluten \ No newline at end of file From 32a3a6a5a15c9ae725655e089c47176c2b631dba Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 15 Jul 2022 15:33:08 +0800 Subject: [PATCH 2/6] style --- cpp/velox/memory/velox_allocator.cc | 2 +- cpp/velox/memory/velox_allocator.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/velox/memory/velox_allocator.cc b/cpp/velox/memory/velox_allocator.cc index 1f98a77d65df..4fcfd8f40dae 100644 --- a/cpp/velox/memory/velox_allocator.cc +++ b/cpp/velox/memory/velox_allocator.cc @@ -418,4 +418,4 @@ GetDefaultWrappedVeloxMemoryPool() { } } // namespace memory -} // namespace gluten \ No newline at end of file +} // namespace gluten diff --git a/cpp/velox/memory/velox_allocator.h b/cpp/velox/memory/velox_allocator.h index b3afb007e526..884318c94d08 100644 --- a/cpp/velox/memory/velox_allocator.h +++ b/cpp/velox/memory/velox_allocator.h @@ -33,4 +33,4 @@ std::shared_ptr GetDefaultWrappedVeloxMemoryPool(); } // namespace memory -} // namespace gluten \ No newline at end of file +} // namespace gluten From 436b9ccdc0366ee3843c1e840b36f051cc0fa365 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 15 Jul 2022 16:45:24 +0800 Subject: [PATCH 3/6] Fixup --- cpp/velox/compute/VeloxPlanConverter.cc | 25 +++++++++++++++++++++---- cpp/velox/compute/VeloxPlanConverter.h | 5 ++++- cpp/velox/jni/jni_wrapper.cc | 8 ++++---- 3 files changed, 29 insertions(+), 9 deletions(-) diff --git a/cpp/velox/compute/VeloxPlanConverter.cc b/cpp/velox/compute/VeloxPlanConverter.cc index 70b14d498c99..3a8411bd7481 100644 --- a/cpp/velox/compute/VeloxPlanConverter.cc +++ b/cpp/velox/compute/VeloxPlanConverter.cc @@ -47,8 +47,19 @@ namespace { const std::string kHiveConnectorId = "test-hive"; std::atomic taskSerial; } // namespace -std::shared_ptr createNewVeloxQueryCtx() { - return std::make_shared(); + +std::shared_ptr createNewVeloxQueryCtx( + memory::MemoryPool* memoryPool) { + std::unique_ptr ctxRoot = + memoryPool->addScopedChild("ctx_root"); + std::shared_ptr ctx = std::make_shared( + nullptr, + std::make_shared(), + std::unordered_map>(), + memory::MappedMemory::getInstance(), + std::move(ctxRoot), + nullptr); + return ctx; } // The Init will be called per executor. @@ -350,6 +361,10 @@ arrow::Result> WholeStageResIter::Next() { return std::make_shared(out); } +memory::MemoryPool* WholeStageResIter::getPool() const { + return pool_; +} + class VeloxPlanConverter::WholeStageResIterFirstStage : public WholeStageResIter { public: @@ -399,7 +414,8 @@ class VeloxPlanConverter::WholeStageResIterFirstStage // Set task parameters. core::PlanFragment planFragment{ planNode, core::ExecutionStrategy::kUngrouped, 1}; - std::shared_ptr queryCtx = createNewVeloxQueryCtx(); + std::shared_ptr queryCtx = + createNewVeloxQueryCtx(getPool()); task_ = std::make_shared( fmt::format("gluten task {}", ++taskSerial), std::move(planFragment), @@ -446,7 +462,8 @@ class VeloxPlanConverter::WholeStageResIterMiddleStage : WholeStageResIter(pool, planNode), streamIds_(streamIds) { core::PlanFragment planFragment{ planNode, core::ExecutionStrategy::kUngrouped, 1}; - std::shared_ptr queryCtx = createNewVeloxQueryCtx(); + std::shared_ptr queryCtx = + createNewVeloxQueryCtx(getPool()); task_ = std::make_shared( fmt::format("gluten task {}", ++taskSerial), std::move(planFragment), diff --git a/cpp/velox/compute/VeloxPlanConverter.h b/cpp/velox/compute/VeloxPlanConverter.h index 53501adb6bcf..be46da876362 100644 --- a/cpp/velox/compute/VeloxPlanConverter.h +++ b/cpp/velox/compute/VeloxPlanConverter.h @@ -66,7 +66,8 @@ using namespace facebook::velox::exec; namespace velox { namespace compute { -std::shared_ptr createNewVeloxQueryCtx(); +std::shared_ptr createNewVeloxQueryCtx( + memory::MemoryPool* memoryPool); class VeloxInitializer { public: @@ -90,6 +91,8 @@ class WholeStageResIter { std::shared_ptr task_; std::function addSplits_; + memory::MemoryPool* getPool() const; + private: /// This method converts Velox RowVector into Arrow Array based on Velox's /// Arrow conversion implementation, in which memcopy is not needed for diff --git a/cpp/velox/jni/jni_wrapper.cc b/cpp/velox/jni/jni_wrapper.cc index 6e469cc463b9..77a4370d9cbc 100644 --- a/cpp/velox/jni/jni_wrapper.cc +++ b/cpp/velox/jni/jni_wrapper.cc @@ -23,6 +23,7 @@ #include "compute/VeloxPlanConverter.h" #include "jni/jni_errors.h" #include "velox/substrait/SubstraitToVeloxPlanValidator.h" +#include "memory/velox_allocator.h" // #include "jni/jni_common.h" @@ -31,7 +32,7 @@ #include -static std::unique_ptr veloxPool_; +static std::shared_ptr veloxPool_; static std::unordered_map sparkConfs_; #ifdef __cplusplus @@ -44,7 +45,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { return JNI_ERR; } gluten::GetJniErrorsState()->Initialize(env); - veloxPool_ = memory::getDefaultScopedMemoryPool(); + veloxPool_ = gluten::memory::GetDefaultWrappedVeloxMemoryPool(); #ifdef DEBUG std::cout << "Loaded Velox backend." << std::endl; #endif @@ -123,8 +124,7 @@ Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeDoValidate( // A query context used for function validation. std::shared_ptr queryCtx_{core::QueryCtx::createForTest()}; // A memory pool used for function validation. - std::unique_ptr pool_ = - memory::getDefaultScopedMemoryPool(); + std::shared_ptr pool_ = veloxPool_; // An execution context used for function validation. std::unique_ptr execCtx_ = std::make_unique(pool_.get(), queryCtx_.get()); From b0f2811da91f697f00c74823a645670f285b7fb1 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 15 Jul 2022 16:46:44 +0800 Subject: [PATCH 4/6] style --- cpp/velox/jni/jni_wrapper.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/velox/jni/jni_wrapper.cc b/cpp/velox/jni/jni_wrapper.cc index 77a4370d9cbc..d8a31423c2f1 100644 --- a/cpp/velox/jni/jni_wrapper.cc +++ b/cpp/velox/jni/jni_wrapper.cc @@ -22,8 +22,8 @@ #include "compute/DwrfDatasource.h" #include "compute/VeloxPlanConverter.h" #include "jni/jni_errors.h" -#include "velox/substrait/SubstraitToVeloxPlanValidator.h" #include "memory/velox_allocator.h" +#include "velox/substrait/SubstraitToVeloxPlanValidator.h" // #include "jni/jni_common.h" From 2d175ebf895c26553766397bf91f5bc31f68cccf Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 15 Jul 2022 17:25:50 +0800 Subject: [PATCH 5/6] Fixup --- cpp/velox/memory/velox_allocator.cc | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/cpp/velox/memory/velox_allocator.cc b/cpp/velox/memory/velox_allocator.cc index 4fcfd8f40dae..0bd49e4434e7 100644 --- a/cpp/velox/memory/velox_allocator.cc +++ b/cpp/velox/memory/velox_allocator.cc @@ -23,10 +23,7 @@ namespace memory { class VeloxMemoryAllocatorVariant { public: VeloxMemoryAllocatorVariant(MemoryAllocator* gluten_alloc) - : gluten_alloc_(gluten_alloc), - manager_(std::make_shared>()) {} + : gluten_alloc_(gluten_alloc) {} static std::shared_ptr createDefaultAllocator() { static std::shared_ptr velox_alloc = @@ -97,12 +94,10 @@ class VeloxMemoryAllocatorVariant { private: MemoryAllocator* gluten_alloc_; - std::shared_ptr> - manager_; }; // The code was originated from /velox/common/memory/Memory.h +// Removed memory manager. template < typename Allocator = VeloxMemoryAllocatorVariant, uint16_t ALIGNMENT = kNoAlignment> From 2c280ab7dbbc392dd72ebc6f8f5d015937c2276d Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 15 Jul 2022 21:34:47 +0800 Subject: [PATCH 6/6] Update velox_allocator.cc --- cpp/velox/memory/velox_allocator.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/velox/memory/velox_allocator.cc b/cpp/velox/memory/velox_allocator.cc index 0bd49e4434e7..3a143225f349 100644 --- a/cpp/velox/memory/velox_allocator.cc +++ b/cpp/velox/memory/velox_allocator.cc @@ -96,7 +96,7 @@ class VeloxMemoryAllocatorVariant { MemoryAllocator* gluten_alloc_; }; -// The code was originated from /velox/common/memory/Memory.h +// The code is originated from /velox/common/memory/Memory.h // Removed memory manager. template < typename Allocator = VeloxMemoryAllocatorVariant,