Skip to content

Commit

Permalink
Java Parquet reads via multiple host buffers (#17673)
Browse files Browse the repository at this point in the history
Adds a custom cuio datasource that can provide file data via multiple host memory buffers.  This allows data that arrives from multiple threads in multiple buffers to be read directly rather than requiring the buffers to be concatenated into a single host memory buffer before reading.

Authors:
  - Jason Lowe (https://github.com/jlowe)

Approvers:
  - Alessandro Bellina (https://github.com/abellina)
  - Robert (Bobby) Evans (https://github.com/revans2)

URL: #17673
  • Loading branch information
jlowe authored Jan 7, 2025
1 parent a0487be commit f308122
Show file tree
Hide file tree
Showing 8 changed files with 390 additions and 48 deletions.
59 changes: 48 additions & 11 deletions java/src/main/java/ai/rapids/cudf/ParquetChunkedReader.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -62,12 +62,13 @@ public ParquetChunkedReader(long chunkSizeByteLimit, ParquetOptions opts, File f
* @param filePath Full path of the input Parquet file to read.
*/
public ParquetChunkedReader(long chunkSizeByteLimit, long passReadLimit, ParquetOptions opts, File filePath) {
handle = create(chunkSizeByteLimit, passReadLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(),
filePath.getAbsolutePath(), 0, 0, opts.timeUnit().typeId.getNativeId());

long[] handles = create(chunkSizeByteLimit, passReadLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(),
filePath.getAbsolutePath(), null, opts.timeUnit().typeId.getNativeId());
handle = handles[0];
if (handle == 0) {
throw new IllegalStateException("Cannot create native chunked Parquet reader object.");
}
multiHostBufferSourceHandle = handles[1];
}

/**
Expand Down Expand Up @@ -100,12 +101,41 @@ public ParquetChunkedReader(long chunkSizeByteLimit, ParquetOptions opts, HostMe
public ParquetChunkedReader(long chunkSizeByteLimit, long passReadLimit,
ParquetOptions opts, HostMemoryBuffer buffer,
long offset, long len) {
handle = create(chunkSizeByteLimit,passReadLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), null,
buffer.getAddress() + offset, len, opts.timeUnit().typeId.getNativeId());
long[] addrsSizes = new long[]{ buffer.getAddress() + offset, len };
long[] handles = create(chunkSizeByteLimit,passReadLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), null,
addrsSizes, opts.timeUnit().typeId.getNativeId());
handle = handles[0];
if (handle == 0) {
throw new IllegalStateException("Cannot create native chunked Parquet reader object.");
}
multiHostBufferSourceHandle = handles[1];
}

/**
* Construct the reader instance from a read limit and data in host memory buffers.
*
* @param chunkSizeByteLimit Limit on total number of bytes to be returned per read,
* or 0 if there is no limit.
* @param passReadLimit Limit on the amount of memory used for reading and decompressing data or
* 0 if there is no limit
* @param opts The options for Parquet reading.
* @param buffers Array of buffers containing the file data. The buffers are logically
* concatenated to construct the file being read.
*/
public ParquetChunkedReader(long chunkSizeByteLimit, long passReadLimit,
ParquetOptions opts, HostMemoryBuffer... buffers) {
long[] addrsSizes = new long[buffers.length * 2];
for (int i = 0; i < buffers.length; i++) {
addrsSizes[i * 2] = buffers[i].getAddress();
addrsSizes[(i * 2) + 1] = buffers[i].getLength();
}
long[] handles = create(chunkSizeByteLimit,passReadLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), null,
addrsSizes, opts.timeUnit().typeId.getNativeId());
handle = handles[0];
if (handle == 0) {
throw new IllegalStateException("Cannot create native chunked Parquet reader object.");
}
multiHostBufferSourceHandle = handles[1];
}

/**
Expand Down Expand Up @@ -181,6 +211,10 @@ public void close() {
DataSourceHelper.destroyWrapperDataSource(dataSourceHandle);
dataSourceHandle = 0;
}
if (multiHostBufferSourceHandle != 0) {
destroyMultiHostBufferSource(multiHostBufferSourceHandle);
multiHostBufferSourceHandle = 0;
}
}


Expand All @@ -196,6 +230,8 @@ public void close() {

private long dataSourceHandle = 0;

private long multiHostBufferSourceHandle = 0;

/**
* Create a native chunked Parquet reader object on heap and return its memory address.
*
Expand All @@ -206,13 +242,12 @@ public void close() {
* @param filterColumnNames Name of the columns to read, or an empty array if we want to read all.
* @param binaryToString Whether to convert the corresponding column to String if it is binary.
* @param filePath Full path of the file to read, or given as null if reading from a buffer.
* @param bufferAddrs The address of a buffer to read from, or 0 if we are not using that buffer.
* @param length The length of the buffer to read from.
* @param bufferAddrsSizes The address and size pairs of buffers to read from, or null if we are not using buffers.
* @param timeUnit Return type of time unit for timestamps.
*/
private static native long create(long chunkSizeByteLimit, long passReadLimit,
String[] filterColumnNames, boolean[] binaryToString,
String filePath, long bufferAddrs, long length, int timeUnit);
private static native long[] create(long chunkSizeByteLimit, long passReadLimit,
String[] filterColumnNames, boolean[] binaryToString,
String filePath, long[] bufferAddrsSizes, int timeUnit);

private static native long createWithDataSource(long chunkedSizeByteLimit,
String[] filterColumnNames, boolean[] binaryToString, int timeUnit, long dataSourceHandle);
Expand All @@ -222,4 +257,6 @@ private static native long createWithDataSource(long chunkedSizeByteLimit,
private static native long[] readChunk(long handle);

private static native void close(long handle);

private static native void destroyMultiHostBufferSource(long handle);
}
44 changes: 38 additions & 6 deletions java/src/main/java/ai/rapids/cudf/Table.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -313,12 +313,11 @@ private static native long readAndInferJSON(long address, long length,
* all of them
* @param binaryToString whether to convert this column to String if binary
* @param filePath the path of the file to read, or null if no path should be read.
* @param address the address of the buffer to read from or 0 if we should not.
* @param length the length of the buffer to read from.
* @param addrsAndSizes the address and size pairs for every buffer or null for no buffers.
* @param timeUnit return type of TimeStamp in units
*/
private static native long[] readParquet(String[] filterColumnNames, boolean[] binaryToString, String filePath,
long address, long length, int timeUnit) throws CudfException;
long[] addrsAndSizes, int timeUnit) throws CudfException;

private static native long[] readParquetFromDataSource(String[] filterColumnNames,
boolean[] binaryToString, int timeUnit,
Expand Down Expand Up @@ -1357,7 +1356,7 @@ public static Table readParquet(File path) {
*/
public static Table readParquet(ParquetOptions opts, File path) {
return new Table(readParquet(opts.getIncludeColumnNames(), opts.getReadBinaryAsString(),
path.getAbsolutePath(), 0, 0, opts.timeUnit().typeId.getNativeId()));
path.getAbsolutePath(), null, opts.timeUnit().typeId.getNativeId()));
}

/**
Expand Down Expand Up @@ -1402,6 +1401,14 @@ public static Table readParquet(ParquetOptions opts, byte[] buffer, long offset,
}
}

/**
* Read parquet formatted data.
* @param opts various parquet parsing options.
* @param buffer raw parquet formatted bytes.
* @param offset the starting offset into buffer.
* @param len the number of bytes to parse.
* @return the data parsed as a table on the GPU.
*/
public static Table readParquet(ParquetOptions opts, byte[] buffer, long offset, long len) {
return readParquet(opts, buffer, offset, len, DefaultHostMemoryAllocator.get());
}
Expand All @@ -1422,10 +1429,35 @@ public static Table readParquet(ParquetOptions opts, HostMemoryBuffer buffer,
assert len > 0;
assert len <= buffer.getLength() - offset;
assert offset >= 0 && offset < buffer.length;
long[] addrsSizes = new long[]{ buffer.getAddress() + offset, len };
return new Table(readParquet(opts.getIncludeColumnNames(), opts.getReadBinaryAsString(),
null, addrsSizes, opts.timeUnit().typeId.getNativeId()));
}

/**
* Read parquet formatted data.
* @param opts various parquet parsing options.
* @param buffers Buffers containing the Parquet data. The buffers are logically concatenated
* in order to construct the file being read.
* @return the data parsed as a table on the GPU.
*/
public static Table readParquet(ParquetOptions opts, HostMemoryBuffer... buffers) {
assert buffers.length > 0;
long[] addrsSizes = new long[buffers.length * 2];
for (int i = 0; i < buffers.length; i++) {
addrsSizes[i * 2] = buffers[i].getAddress();
addrsSizes[(i * 2) + 1] = buffers[i].getLength();
}
return new Table(readParquet(opts.getIncludeColumnNames(), opts.getReadBinaryAsString(),
null, buffer.getAddress() + offset, len, opts.timeUnit().typeId.getNativeId()));
null, addrsSizes, opts.timeUnit().typeId.getNativeId()));
}

/**
* Read parquet formatted data.
* @param opts various parquet parsing options.
* @param ds custom datasource to provide the Parquet file data
* @return the data parsed as a table on the GPU.
*/
public static Table readParquet(ParquetOptions opts, DataSource ds) {
long dataSourceHandle = DataSourceHelper.createWrapperDataSource(ds);
try {
Expand Down
5 changes: 3 additions & 2 deletions java/src/main/native/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# =============================================================================
# Copyright (c) 2019-2024, NVIDIA CORPORATION.
# Copyright (c) 2019-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -156,8 +156,9 @@ add_library(
src/ScalarJni.cpp
src/TableJni.cpp
src/aggregation128_utils.cu
src/maps_column_view.cu
src/check_nvcomp_output_sizes.cu
src/maps_column_view.cu
src/multi_host_buffer_source.cpp
)

# Disable NVTX if necessary
Expand Down
57 changes: 57 additions & 0 deletions java/src/main/native/include/multi_host_buffer_source.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include "jni_utils.hpp"

#include <cudf/io/datasource.hpp>

#include <vector>

namespace cudf {
namespace jni {

/**
* @brief A custom datasource providing data from an array of host memory buffers.
*/
class multi_host_buffer_source : public cudf::io::datasource {
std::vector<uint8_t const*> addrs_;
std::vector<size_t> offsets_;

size_t locate_offset_index(size_t offset);

public:
explicit multi_host_buffer_source(native_jlongArray const& addrs_sizes);
std::unique_ptr<buffer> host_read(size_t offset, size_t size) override;
size_t host_read(size_t offset, size_t size, uint8_t* dst) override;
bool supports_device_read() const override { return true; }
bool is_device_read_preferred(size_t size) const override { return true; }
std::unique_ptr<buffer> device_read(size_t offset,
size_t size,
rmm::cuda_stream_view stream) override;
size_t device_read(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override;
std::future<size_t> device_read_async(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override;
size_t size() const override { return offsets_.back(); }
};

} // namespace jni
} // namespace cudf
58 changes: 40 additions & 18 deletions java/src/main/native/src/ChunkedReaderJni.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* Copyright (c) 2022-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

#include "cudf_jni_apis.hpp"
#include "jni_utils.hpp"
#include "multi_host_buffer_source.hpp"

#include <cudf/column/column.hpp>
#include <cudf/io/orc.hpp>
Expand All @@ -36,35 +37,34 @@ extern "C" {

// This function should take all the parameters that `Table.readParquet` takes,
// plus one more parameter `long chunkSizeByteLimit`.
JNIEXPORT jlong JNICALL
JNIEXPORT jlongArray JNICALL
Java_ai_rapids_cudf_ParquetChunkedReader_create(JNIEnv* env,
jclass,
jlong chunk_read_limit,
jlong pass_read_limit,
jobjectArray filter_col_names,
jbooleanArray j_col_binary_read,
jstring inp_file_path,
jlong buffer,
jlong buffer_length,
jlongArray addrs_sizes,
jint unit)
{
JNI_NULL_CHECK(env, j_col_binary_read, "Null col_binary_read", 0);
JNI_NULL_CHECK(env, j_col_binary_read, "Null col_binary_read", nullptr);
bool read_buffer = true;
if (buffer == 0) {
JNI_NULL_CHECK(env, inp_file_path, "Input file or buffer must be supplied", 0);
if (addrs_sizes == nullptr) {
JNI_NULL_CHECK(env, inp_file_path, "Input file or buffer must be supplied", nullptr);
read_buffer = false;
} else if (inp_file_path != nullptr) {
JNI_THROW_NEW(
env, cudf::jni::ILLEGAL_ARG_CLASS, "Cannot pass in both a buffer and an inp_file_path", 0);
} else if (buffer_length <= 0) {
JNI_THROW_NEW(env, cudf::jni::ILLEGAL_ARG_CLASS, "An empty buffer is not supported", 0);
JNI_THROW_NEW(env,
cudf::jni::ILLEGAL_ARG_CLASS,
"Cannot pass in both buffers and an inp_file_path",
nullptr);
}

try {
cudf::jni::auto_set_device(env);
cudf::jni::native_jstring filename(env, inp_file_path);
if (!read_buffer && filename.is_empty()) {
JNI_THROW_NEW(env, cudf::jni::ILLEGAL_ARG_CLASS, "inp_file_path cannot be empty", 0);
JNI_THROW_NEW(env, cudf::jni::ILLEGAL_ARG_CLASS, "inp_file_path cannot be empty", nullptr);
}

cudf::jni::native_jstringArray n_filter_col_names(env, filter_col_names);
Expand All @@ -75,9 +75,15 @@ Java_ai_rapids_cudf_ParquetChunkedReader_create(JNIEnv* env,
cudf::jni::native_jbooleanArray n_col_binary_read(env, j_col_binary_read);
(void)n_col_binary_read;

auto const source = read_buffer ? cudf::io::source_info(reinterpret_cast<char*>(buffer),
static_cast<std::size_t>(buffer_length))
: cudf::io::source_info(filename.get());
cudf::jni::native_jlongArray n_addrs_sizes(env, addrs_sizes);
std::unique_ptr<cudf::io::datasource> multi_buffer_source;
cudf::io::source_info source;
if (read_buffer) {
multi_buffer_source.reset(new cudf::jni::multi_host_buffer_source(n_addrs_sizes));
source = cudf::io::source_info(multi_buffer_source.get());
} else {
source = cudf::io::source_info(filename.get());
}

auto opts_builder = cudf::io::parquet_reader_options::builder(source);
if (n_filter_col_names.size() > 0) {
Expand All @@ -86,13 +92,18 @@ Java_ai_rapids_cudf_ParquetChunkedReader_create(JNIEnv* env,
auto const read_opts = opts_builder.convert_strings_to_categories(false)
.timestamp_type(cudf::data_type(static_cast<cudf::type_id>(unit)))
.build();

return reinterpret_cast<jlong>(
n_addrs_sizes.cancel();
n_col_binary_read.cancel();
auto reader_handle = reinterpret_cast<jlong>(
new cudf::io::chunked_parquet_reader(static_cast<std::size_t>(chunk_read_limit),
static_cast<std::size_t>(pass_read_limit),
read_opts));
cudf::jni::native_jlongArray result(env, 2);
result[0] = reader_handle;
result[1] = cudf::jni::release_as_jlong(multi_buffer_source);
return result.get_jArray();
}
CATCH_STD(env, 0);
CATCH_STD(env, nullptr);
}

JNIEXPORT jlong JNICALL
Expand Down Expand Up @@ -177,6 +188,17 @@ JNIEXPORT void JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_close(JNIEnv* en
CATCH_STD(env, );
}

JNIEXPORT void JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_destroyMultiHostBufferSource(
JNIEnv* env, jclass, jlong handle)
{
JNI_NULL_CHECK(env, handle, "handle is null", );

try {
delete reinterpret_cast<cudf::jni::multi_host_buffer_source*>(handle);
}
CATCH_STD(env, );
}

//
// Chunked ORC reader JNI
//
Expand Down
Loading

0 comments on commit f308122

Please sign in to comment.