Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OPPRO-174] Velox memory pool backed by Gluten's context allocator #264

Merged
merged 6 commits into from
Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cpp/src/compute/kernels_ext.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class LazyReadIterator {
arrow::Status Next(std::shared_ptr<ArrowArray>* out);

private:
arrow::MemoryPool* pool_ = gluten::memory::GetDefaultWrappedArrowMemoryPool();
arrow::MemoryPool* pool_ =
gluten::memory::GetDefaultWrappedArrowMemoryPool().get();
std::shared_ptr<ArrowArrayIterator> array_iter_;
bool need_process_ = false;
bool no_next_ = false;
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/compute/substrait_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,8 @@ class SubstraitParser::FirstStageResultIterator {
}

private:
arrow::MemoryPool* pool_ = gluten::memory::GetDefaultWrappedArrowMemoryPool();
arrow::MemoryPool* pool_ =
gluten::memory::GetDefaultWrappedArrowMemoryPool().get();
std::unique_ptr<arrow::DoubleBuilder> builder_;
bool has_next_ = true;
// std::vector<std::shared_ptr<arrow::Array>> res_arrays;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/jni/jni_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ jbyteArray ToSchemaByteArray(
// std::shared_ptr<arrow::Buffer> buffer;
arrow::Result<std::shared_ptr<arrow::Buffer>> maybe_buffer;
maybe_buffer = arrow::ipc::SerializeSchema(
*schema.get(), gluten::memory::GetDefaultWrappedArrowMemoryPool());
*schema.get(), gluten::memory::GetDefaultWrappedArrowMemoryPool().get());
if (!status.ok()) {
std::string error_message =
"Unable to convert schema to byte array, err is " + status.message();
Expand Down
7 changes: 4 additions & 3 deletions cpp/src/jni/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
env, java_reservation_listener_class, "unreserve", "(J)V");

default_memory_allocator_id =
reinterpret_cast<jlong>(gluten::memory::DefaultMemoryAllocator());
reinterpret_cast<jlong>(gluten::memory::DefaultMemoryAllocator().get());

return JNI_VERSION;
}
Expand Down Expand Up @@ -818,7 +818,8 @@ Java_io_glutenproject_vectorized_ShuffleDecompressionJniWrapper_decompress(

// decompress buffers
auto options = arrow::ipc::IpcReadOptions::Defaults();
options.memory_pool = gluten::memory::GetDefaultWrappedArrowMemoryPool();
options.memory_pool =
gluten::memory::GetDefaultWrappedArrowMemoryPool().get();
options.use_threads = false;
gluten::JniAssertOkOrThrow(
DecompressBuffers(
Expand Down Expand Up @@ -884,7 +885,7 @@ Java_io_glutenproject_memory_NativeMemoryAllocator_createListenableAllocator(
unreserve_memory_method,
8L << 10 << 10);
auto allocator = new gluten::memory::ListenableMemoryAllocator(
gluten::memory::DefaultMemoryAllocator(), listener);
gluten::memory::DefaultMemoryAllocator().get(), listener);
return reinterpret_cast<jlong>(allocator);
JNI_METHOD_END(-1L)
}
Expand Down
33 changes: 19 additions & 14 deletions cpp/src/memory/allocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,7 @@ int64_t gluten::memory::StdMemoryAllocator::GetBytes() {
return bytes_;
}

gluten::memory::MemoryAllocator* gluten::memory::DefaultMemoryAllocator() {
static MemoryAllocator* alloc = new StdMemoryAllocator();
return alloc;
}

arrow::Status gluten::memory::WrappedMemoryPool::Allocate(
arrow::Status gluten::memory::WrappedArrowMemoryPool::Allocate(
int64_t size,
uint8_t** out) {
if (!allocator_->Allocate(size, reinterpret_cast<void**>(out))) {
Expand All @@ -191,7 +186,7 @@ arrow::Status gluten::memory::WrappedMemoryPool::Allocate(
return arrow::Status::OK();
}

arrow::Status gluten::memory::WrappedMemoryPool::Reallocate(
arrow::Status gluten::memory::WrappedArrowMemoryPool::Reallocate(
int64_t old_size,
int64_t new_size,
uint8_t** ptr) {
Expand All @@ -204,26 +199,36 @@ arrow::Status gluten::memory::WrappedMemoryPool::Reallocate(
return arrow::Status::OK();
}

void gluten::memory::WrappedMemoryPool::Free(uint8_t* buffer, int64_t size) {
void gluten::memory::WrappedArrowMemoryPool::Free(
uint8_t* buffer,
int64_t size) {
allocator_->Free(buffer, size);
}

int64_t gluten::memory::WrappedMemoryPool::bytes_allocated() const {
int64_t gluten::memory::WrappedArrowMemoryPool::bytes_allocated() const {
// fixme use self accountant
return allocator_->GetBytes();
}

std::string gluten::memory::WrappedMemoryPool::backend_name() const {
std::string gluten::memory::WrappedArrowMemoryPool::backend_name() const {
return "gluten allocator";
}

std::shared_ptr<gluten::memory::MemoryAllocator>
gluten::memory::DefaultMemoryAllocator() {
static std::shared_ptr<MemoryAllocator> alloc =
std::make_shared<StdMemoryAllocator>();
return alloc;
}

std::shared_ptr<arrow::MemoryPool> gluten::memory::AsWrappedArrowMemoryPool(
gluten::memory::MemoryAllocator* allocator) {
return std::make_shared<WrappedMemoryPool>(allocator);
return std::make_shared<WrappedArrowMemoryPool>(allocator);
}

arrow::MemoryPool* gluten::memory::GetDefaultWrappedArrowMemoryPool() {
std::shared_ptr<arrow::MemoryPool>
gluten::memory::GetDefaultWrappedArrowMemoryPool() {
static auto static_pool =
AsWrappedArrowMemoryPool(gluten::memory::DefaultMemoryAllocator());
return static_pool.get();
AsWrappedArrowMemoryPool(gluten::memory::DefaultMemoryAllocator().get());
return static_pool;
}
10 changes: 5 additions & 5 deletions cpp/src/memory/allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,10 @@ class StdMemoryAllocator : public MemoryAllocator {
std::atomic_int64_t bytes_{0};
};

MemoryAllocator* DefaultMemoryAllocator();

// TODO aligned allocation
class WrappedMemoryPool : public arrow::MemoryPool {
class WrappedArrowMemoryPool : public arrow::MemoryPool {
public:
explicit WrappedMemoryPool(MemoryAllocator* allocator)
explicit WrappedArrowMemoryPool(MemoryAllocator* allocator)
: allocator_(allocator) {}

arrow::Status Allocate(int64_t size, uint8_t** out) override;
Expand All @@ -140,10 +138,12 @@ class WrappedMemoryPool : public arrow::MemoryPool {
MemoryAllocator* allocator_;
};

std::shared_ptr<MemoryAllocator> DefaultMemoryAllocator();

std::shared_ptr<arrow::MemoryPool> AsWrappedArrowMemoryPool(
MemoryAllocator* allocator);

arrow::MemoryPool* GetDefaultWrappedArrowMemoryPool();
std::shared_ptr<arrow::MemoryPool> GetDefaultWrappedArrowMemoryPool();

} // namespace memory
} // namespace gluten
3 changes: 1 addition & 2 deletions cpp/src/operators/shuffle/type.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ struct SplitOptions {
int64_t task_attempt_id = -1;

std::shared_ptr<arrow::MemoryPool> memory_pool =
gluten::memory::AsWrappedArrowMemoryPool(
gluten::memory::DefaultMemoryAllocator());
gluten::memory::GetDefaultWrappedArrowMemoryPool();

arrow::ipc::IpcWriteOptions ipc_write_options =
arrow::ipc::IpcWriteOptions::Defaults();
Expand Down
1 change: 1 addition & 0 deletions cpp/velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ set(VELOX_SRCS
compute/VeloxToRowConverter.cc
compute/DwrfDatasource.cc
compute/bridge.cc
memory/velox_allocator.cc
)
add_library(velox SHARED ${VELOX_SRCS})

Expand Down
25 changes: 21 additions & 4 deletions cpp/velox/compute/VeloxPlanConverter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,19 @@ namespace {
const std::string kHiveConnectorId = "test-hive";
std::atomic<int32_t> taskSerial;
} // namespace
std::shared_ptr<core::QueryCtx> createNewVeloxQueryCtx() {
return std::make_shared<core::QueryCtx>();

std::shared_ptr<core::QueryCtx> createNewVeloxQueryCtx(
memory::MemoryPool* memoryPool) {
std::unique_ptr<memory::MemoryPool> ctxRoot =
memoryPool->addScopedChild("ctx_root");
std::shared_ptr<core::QueryCtx> ctx = std::make_shared<core::QueryCtx>(
nullptr,
std::make_shared<facebook::velox::core::MemConfig>(),
std::unordered_map<std::string, std::shared_ptr<Config>>(),
memory::MappedMemory::getInstance(),
std::move(ctxRoot),
nullptr);
return ctx;
}

// The Init will be called per executor.
Expand Down Expand Up @@ -350,6 +361,10 @@ arrow::Result<std::shared_ptr<ArrowArray>> WholeStageResIter::Next() {
return std::make_shared<ArrowArray>(out);
}

memory::MemoryPool* WholeStageResIter::getPool() const {
return pool_;
}

class VeloxPlanConverter::WholeStageResIterFirstStage
: public WholeStageResIter {
public:
Expand Down Expand Up @@ -399,7 +414,8 @@ class VeloxPlanConverter::WholeStageResIterFirstStage
// Set task parameters.
core::PlanFragment planFragment{
planNode, core::ExecutionStrategy::kUngrouped, 1};
std::shared_ptr<core::QueryCtx> queryCtx = createNewVeloxQueryCtx();
std::shared_ptr<core::QueryCtx> queryCtx =
createNewVeloxQueryCtx(getPool());
task_ = std::make_shared<exec::Task>(
fmt::format("gluten task {}", ++taskSerial),
std::move(planFragment),
Expand Down Expand Up @@ -446,7 +462,8 @@ class VeloxPlanConverter::WholeStageResIterMiddleStage
: WholeStageResIter(pool, planNode), streamIds_(streamIds) {
core::PlanFragment planFragment{
planNode, core::ExecutionStrategy::kUngrouped, 1};
std::shared_ptr<core::QueryCtx> queryCtx = createNewVeloxQueryCtx();
std::shared_ptr<core::QueryCtx> queryCtx =
createNewVeloxQueryCtx(getPool());
task_ = std::make_shared<exec::Task>(
fmt::format("gluten task {}", ++taskSerial),
std::move(planFragment),
Expand Down
5 changes: 4 additions & 1 deletion cpp/velox/compute/VeloxPlanConverter.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ using namespace facebook::velox::exec;
namespace velox {
namespace compute {

std::shared_ptr<core::QueryCtx> createNewVeloxQueryCtx();
std::shared_ptr<core::QueryCtx> createNewVeloxQueryCtx(
memory::MemoryPool* memoryPool);

class VeloxInitializer {
public:
Expand All @@ -90,6 +91,8 @@ class WholeStageResIter {
std::shared_ptr<exec::Task> task_;
std::function<void(exec::Task*)> addSplits_;

memory::MemoryPool* getPool() const;

private:
/// This method converts Velox RowVector into Arrow Array based on Velox's
/// Arrow conversion implementation, in which memcopy is not needed for
Expand Down
8 changes: 4 additions & 4 deletions cpp/velox/jni/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "compute/DwrfDatasource.h"
#include "compute/VeloxPlanConverter.h"
#include "jni/jni_errors.h"
#include "memory/velox_allocator.h"
#include "velox/substrait/SubstraitToVeloxPlanValidator.h"

// #include "jni/jni_common.h"
Expand All @@ -31,7 +32,7 @@

#include <iostream>

static std::unique_ptr<memory::MemoryPool> veloxPool_;
static std::shared_ptr<memory::MemoryPool> veloxPool_;
static std::unordered_map<std::string, std::string> sparkConfs_;

#ifdef __cplusplus
Expand All @@ -44,7 +45,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
return JNI_ERR;
}
gluten::GetJniErrorsState()->Initialize(env);
veloxPool_ = memory::getDefaultScopedMemoryPool();
veloxPool_ = gluten::memory::GetDefaultWrappedVeloxMemoryPool();
#ifdef DEBUG
std::cout << "Loaded Velox backend." << std::endl;
#endif
Expand Down Expand Up @@ -123,8 +124,7 @@ Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeDoValidate(
// A query context used for function validation.
std::shared_ptr<core::QueryCtx> queryCtx_{core::QueryCtx::createForTest()};
// A memory pool used for function validation.
std::unique_ptr<memory::MemoryPool> pool_ =
memory::getDefaultScopedMemoryPool();
std::shared_ptr<memory::MemoryPool> pool_ = veloxPool_;
// An execution context used for function validation.
std::unique_ptr<core::ExecCtx> execCtx_ =
std::make_unique<core::ExecCtx>(pool_.get(), queryCtx_.get());
Expand Down
Loading