Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed Feb 21, 2025
1 parent 12c9ff6 commit 89cb8b0
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 69 deletions.
161 changes: 98 additions & 63 deletions cpp-ch/local-engine/Resource/JVMClassReference.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/
#include "JVMClassReference.h"
#include <algorithm>
#include <Common/ErrorCodes.h>
#include <Common/Exception.h>

Expand All @@ -26,12 +27,12 @@ extern const int UNRECOGNIZED_ARGUMENTS;
namespace local_engine
{

JVMClassReference::JVMClassReference(const std::string & class_name_, const std::vector<std::pair<std::string, std::string>> & methods_)
: class_name(class_name_)
, methods(methods_)
JVMClassReference::JVMClassReference(const JVMClassDescription & description_)
: description(description_)
{
}


jmethodID JVMClassReference::getJMethod(const std::string methodId) const
{
auto it = method_ids.find(methodId);
Expand All @@ -42,113 +43,147 @@ jmethodID JVMClassReference::getJMethod(const std::string methodId) const

void JVMClassReference::initialize(JNIEnv * env)
{
jvm_class = local_engine::CreateGlobalClassReference(env, class_name.c_str());
const auto & class_signature = description.class_signature;
const auto & methods = description.methods;
const auto & static_methods = description.static_methods;
jvm_class = local_engine::CreateGlobalClassReference(env, class_signature.c_str());
if (jvm_class == nullptr)
throw DB::Exception(DB::ErrorCodes::UNRECOGNIZED_ARGUMENTS, "Unrecognized class: {}", class_name);
throw DB::Exception(DB::ErrorCodes::UNRECOGNIZED_ARGUMENTS, "Unrecognized class: {}", class_signature);
for (const auto & [method_name, method_sig] : methods)
{
jmethodID method_id = local_engine::GetMethodID(env, jvm_class, method_name.c_str(), method_sig.c_str());
if (method_id == nullptr)
throw DB::Exception(DB::ErrorCodes::UNRECOGNIZED_ARGUMENTS, "Unrecognized method: {}", method_name);
method_ids[method_name] = method_id;
}
for (const auto & [method_name, method_sig] : static_methods)
{
jmethodID method_id = local_engine::GetStaticMethodID(env, jvm_class, method_name.c_str(), method_sig.c_str());
if (method_id == nullptr)
throw DB::Exception(DB::ErrorCodes::UNRECOGNIZED_ARGUMENTS, "Unrecognized static method: {}", method_name);
method_ids[method_name] = method_id;
}
}

void JVMClassReference::destroy(JNIEnv * env)
{
env->DeleteGlobalRef(jvm_class);
}

class JVMClassReferenceCreator
class JVMClassReferenceRegister
{
public:
JVMClassReferenceCreator(const std::string id_, const std::string & class_name_, const std::vector<std::string> & methods_)
JVMClassReferenceRegister(const std::string & id_, const JVMClassDescription & description)
{
std::vector<std::pair<std::string, std::string>> sig_methods;
for (size_t i = 0; i + 1 < methods_.size(); i += 2)
sig_methods.emplace_back(methods_[i], methods_[i + 1]);
DependencyResourceMetadata metadata{
.id = id_,
.creator = [class_name_, sig_methods]() { return std::make_shared<JVMClassReference>(class_name_, sig_methods); },
.creator = [description]() { return std::make_shared<JVMClassReference>(description); },
.group_id = "jvm_class_reference",
.description = "JVM class reference: " + class_name_};
.description = "JVM class reference: " + description.class_signature};
JNIEnvResourceManager::instance().registerResource(metadata);
}
};

void buildVector(std::vector<std::string> & vec)
{
}
#define BEGINE_CLASS_DECLARE(id) \
static JVMClassDescription id##_description{

template <typename T, typename... Args>
void buildVector(std::vector<std::string> & vec, T && first, Args &&... args)
{
vec.push_back(std::forward<T>(first));
buildVector(vec, std::forward<Args>(args)...);
}
#define END_CLASS_DECLARE(id) \
};\
static JVMClassReferenceRegister id##_register(#id, id##_description);

template <typename... Args>
std::vector<std::string> createVector(Args &&... args)
{
std::vector<std::string> vec;
buildVector(vec, std::forward<Args>(args)...);
return vec;
}
BEGINE_CLASS_DECLARE(block_stripes_class)
.class_signature = "Lorg/apache/spark/sql/execution/datasources/BlockStripes;",
.methods = {{"<init>", "(J[J[II)V"}}
END_CLASS_DECLARE(block_stripes_class)

#define STRING_VECTOR(...) createVector(__VA_ARGS__)
BEGINE_CLASS_DECLARE(split_result_class)
.class_signature = "Lorg/apache/gluten/vectorized/CHSplitResult;",
.methods = {{"<init>", "(JJJJJJ[J[JJJJJJJ)V"}}
END_CLASS_DECLARE(split_result_class)

// Third part arguments are pairs of method name and method signature
#define REGISTER_JVM_CLASS_REFERENCE(id, class_name, ...) \
static JVMClassReferenceCreator id##_creator(#id, class_name, createVector(__VA_ARGS__));
BEGINE_CLASS_DECLARE(block_stats_class)
.class_signature = "Lorg/apache/gluten/vectorized/BlockStats;",
.methods = {{"<init>", "(JZ)V"}}
END_CLASS_DECLARE(block_stats_class)

REGISTER_JVM_CLASS_REFERENCE(block_stripes_class, "Lorg/apache/spark/sql/execution/datasources/BlockStripes;", "<init>", "(J[J[II)V")
REGISTER_JVM_CLASS_REFERENCE(split_result_class, "Lorg/apache/gluten/vectorized/CHSplitResult;", "<init>", "(JJJJJJ[J[JJJJJJJ)V")
REGISTER_JVM_CLASS_REFERENCE(block_stats_class, "Lorg/apache/gluten/vectorized/BlockStats;", "<init>", "(JZ)V")
REGISTER_JVM_CLASS_REFERENCE(shuffle_input_stream, "Lorg/apache/gluten/vectorized/ShuffleInputStream;", "read", "(JJ)J")
BEGINE_CLASS_DECLARE(shuffle_input_stream)
.class_signature = "Lorg/apache/gluten/vectorized/ShuffleInputStream;",
.methods = {{"read", "(JJ)J"}}
END_CLASS_DECLARE(shuffle_input_stream)

// Used in NativeSplitter
REGISTER_JVM_CLASS_REFERENCE(splitter_iterator_class, "Lorg/apache/gluten/vectorized/IteratorWrapper;", "hasNext", "()Z", "next", "()J")
BEGINE_CLASS_DECLARE(splitter_iterator_class)
.class_signature = "Lorg/apache/gluten/vectorized/IteratorWrapper;",
.methods = {{"hasNext", "()Z"}, {"next", "()J"}}
END_CLASS_DECLARE(splitter_iterator_class)

// Used in WriteBufferFromJavaOutputStream
REGISTER_JVM_CLASS_REFERENCE(write_buffer_from_java_output_stream_class, "Ljava/io/OutputStream;", "write", "([BII)V", "flush", "()V")
BEGINE_CLASS_DECLARE(write_buffer_from_java_output_stream_class)
.class_signature = "Ljava/io/OutputStream;",
.methods = {{"write", "([BII)V"}, {"flush", "()V"}}
END_CLASS_DECLARE(write_buffer_from_java_output_stream_class)

// Used in SourceFromJavaIter
REGISTER_JVM_CLASS_REFERENCE(
source_from_java_iterator_class, "Lorg/apache/gluten/execution/ColumnarNativeIterator;", "hasNext", "()Z", "next", "()[B")
BEGINE_CLASS_DECLARE(source_from_java_iterator_class)
.class_signature = "Lorg/apache/gluten/execution/ColumnarNativeIterator;",
.methods = {{"hasNext", "()Z"}, {"next", "()[B"}}
END_CLASS_DECLARE(source_from_java_iterator_class)

// Used in SparkRowToCHColumn
REGISTER_JVM_CLASS_REFERENCE(
row_to_column_iterator_class,
"Lorg/apache/gluten/execution/SparkRowIterator;",
"hasNext",
"()Z",
"next",
"()[B",
"nextBatch",
"()Ljava/nio/ByteBuffer;")

// Reference to exception classes
REGISTER_JVM_CLASS_REFERENCE(io_exception_class, "Ljava/io/IOException;")
REGISTER_JVM_CLASS_REFERENCE(runtime_exception_class, "Lorg/apache/gluten/exception/GlutenException;")
REGISTER_JVM_CLASS_REFERENCE(unsupportedoperation_exception_class, "Ljava/lang/UnsupportedOperationException;")
REGISTER_JVM_CLASS_REFERENCE(illegal_access_exception_class, "Ljava/lang/IllegalAccessException;")
REGISTER_JVM_CLASS_REFERENCE(illegal_argument_exception_class, "Ljava/lang/IllegalArgumentException;")

// Used in BroadCastJoinBuilder
BEGINE_CLASS_DECLARE(row_to_column_iterator_class)
.class_signature = "Lorg/apache/gluten/execution/SparkRowIterator;",
.methods = {{"hasNext", "()Z"}, {"next", "()[B"}, {"nextBatch", "()Ljava/nio/ByteBuffer;"}}
END_CLASS_DECLARE(row_to_column_iterator_class)

BEGINE_CLASS_DECLARE(io_exception_class)
.class_signature = "Ljava/io/IOException;"
END_CLASS_DECLARE(io_exception_class)

BEGINE_CLASS_DECLARE(runtime_exception_class)
.class_signature = "Lorg/apache/gluten/exception/GlutenException;"
END_CLASS_DECLARE(runtime_exception_class)

BEGINE_CLASS_DECLARE(unsupportedoperation_exception_class)
.class_signature = "Ljava/lang/UnsupportedOperationException;"
END_CLASS_DECLARE(unsupportedoperation_exception_class)

BEGINE_CLASS_DECLARE(illegal_access_exception_class)
.class_signature = "Ljava/lang/IllegalAccessException;"
END_CLASS_DECLARE(illegal_access_exception_class)

BEGINE_CLASS_DECLARE(illegal_argument_exception_class)
.class_signature = "Ljava/lang/IllegalArgumentException;"
END_CLASS_DECLARE(illegal_argument_exception_class)

/**
* Used in BroadCastJoinBuilder
* Scala object will be compiled into two classes, one is with '$' suffix which is normal class,
* and one is utility class which only has static method.
*
* Here, we use utility class.
*/
REGISTER_JVM_CLASS_REFERENCE(
broadcast_join_builder_side_cache_class, "Lorg/apache/gluten/execution/CHBroadcastBuildSideCache;", "get", "(Ljava/lang/String;)J")
BEGINE_CLASS_DECLARE(broadcast_join_builder_side_cache_class)
.class_signature = "Lorg/apache/gluten/execution/CHBroadcastBuildSideCache;",
.static_methods = {{"get", "(Ljava/lang/String;)J"}}
END_CLASS_DECLARE(broadcast_join_builder_side_cache_class)

// Used in CacheManager
REGISTER_JVM_CLASS_REFERENCE(cache_manager_result_class, "Lorg/apache/gluten/execution/CacheResult;", "<init>", "(ILjava/lang/String;)V")
BEGINE_CLASS_DECLARE(cache_manager_result_class)
.class_signature = "Lorg/apache/gluten/execution/CacheResult;",
.methods = {{"<init>", "(ILjava/lang/String;)V"}}
END_CLASS_DECLARE(cache_manager_result_class)

// Used in SparkMergeTreeWriterJNI
REGISTER_JVM_CLASS_REFERENCE(merge_tree_committer_helper, "Lorg/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeCommiterHelper;", "setCurrentTaskWriteInfo", "(Ljava/lang/String;Ljava/lang/String;)V")
BEGINE_CLASS_DECLARE(merge_tree_committer_helper)
.class_signature = "Lorg/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeCommiterHelper;",
.static_methods = {{"setCurrentTaskWriteInfo", "(Ljava/lang/String;Ljava/lang/String;)V"}}
END_CLASS_DECLARE(merge_tree_committer_helper)

BEGINE_CLASS_DECLARE(spark_row_info_class)
.class_signature = "Lorg/apache/gluten/row/SparkRowInfo;",
.methods = {{"<init>", "([J[JJJJ)V"}}
END_CLASS_DECLARE(spark_row_info_class)

REGISTER_JVM_CLASS_REFERENCE(spark_row_info_class, "Lorg/apache/gluten/row/SparkRowInfo;", "<init>", "([J[JJJJ)V")
}


15 changes: 11 additions & 4 deletions cpp-ch/local-engine/Resource/JVMClassReference.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,18 @@

namespace local_engine
{

struct JVMClassDescription
{
std::string class_signature;
std::vector<std::pair<std::string, std::string>> methods;
std::vector<std::pair<std::string, std::string>> static_methods;
};

class JVMClassReference : public JNIEnvResource, public boost::noncopyable
{
public:
JVMClassReference(const std::string & class_name_, const std::vector<std::pair<std::string, std::string>> & methods_);
JVMClassReference(const JVMClassDescription & description_);

~JVMClassReference() override = default;

Expand All @@ -41,12 +49,11 @@ class JVMClassReference : public JNIEnvResource, public boost::noncopyable
void destroy(JNIEnv * env) override;

private:
std::string class_name;
std::vector<std::pair<std::string, std::string>> methods;
JVMClassDescription description;
jclass jvm_class;
std::unordered_map<std::string, jmethodID> method_ids;
};

#define JVM_CLASS_REFERENCE(id) \
(*(dynamic_cast<local_engine::JVMClassReference *>(local_engine::JNIEnvResourceManager::instance().getResource(#id).get())))
}
}
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Resource/ResourceManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,4 +302,4 @@ void JNIEnvResourceManager::destroy(JNIEnv * env)
dynamic_cast<JNIEnvResource *>(resource.get())->destroy(env);

}
} // namespace local_engine
} // namespace local_engine
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Resource/ResourceManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,4 @@ class JNIEnvResourceManager : public IDependencyResourceManager

};

}
}

0 comments on commit 89cb8b0

Please sign in to comment.