Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multithreaded reading of compressed buffers in JSON reader #17670

Open
wants to merge 24 commits into
base: branch-25.02
Choose a base branch
from
Open
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
cc98870
Detect mismatches in begin and end tokens returned by JSON tokenizer …
shrshi Dec 19, 2024
fd1b78b
partial work
shrshi Dec 29, 2024
6965e9f
Merge branch 'branch-25.02' into json-multithreaded-compio
shrshi Dec 29, 2024
2e59978
partial commit
shrshi Jan 2, 2025
2e747d5
Merge branch 'branch-25.02' into json-multithreaded-compio
shrshi Jan 2, 2025
528e750
device_read_async comments
shrshi Jan 2, 2025
8b9daa9
cleanup
shrshi Jan 2, 2025
fe82b66
more cleanup
shrshi Jan 2, 2025
47a6c8b
formatting
shrshi Jan 2, 2025
e0e86a3
more formatting
shrshi Jan 2, 2025
8006dce
added device read async
shrshi Jan 2, 2025
483eed4
big bug fix; device read async works now
shrshi Jan 3, 2025
a655c7b
discard changes
shrshi Jan 3, 2025
3c557b2
Merge branch 'branch-25.02' into json-multithreaded-compio
shrshi Jan 3, 2025
b23d334
function returning static so that cuda apis are called after main() i…
shrshi Jan 4, 2025
4aa7d59
Merge branch 'json-multithreaded-compio' of github.com:shrshi/cudf in…
shrshi Jan 4, 2025
fc9cc67
Merge branch 'branch-25.02' into json-multithreaded-compio
shrshi Jan 4, 2025
d768471
pr reviews
shrshi Jan 7, 2025
16921c4
Merge branch 'json-multithreaded-compio' of github.com:shrshi/cudf in…
shrshi Jan 7, 2025
48f862a
Merge branch 'branch-25.02' into json-multithreaded-compio
shrshi Jan 7, 2025
2b93515
using the global stream pool to get rid of static object destruction …
shrshi Jan 8, 2025
7033902
cleanup; formatting
shrshi Jan 8, 2025
7482d51
Merge branch 'json-multithreaded-compio' of github.com:shrshi/cudf in…
shrshi Jan 8, 2025
d974fda
Merge branch 'branch-25.02' into json-multithreaded-compio
shrshi Jan 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 61 additions & 10 deletions cpp/src/io/json/read_json.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* Copyright (c) 2022-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,19 +30,33 @@
#include <cudf/utilities/memory_resource.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_pool.hpp>
#include <rmm/device_uvector.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/distance.h>
#include <thrust/iterator/constant_iterator.h>
#include <thrust/scatter.h>

#include <BS_thread_pool.hpp>
#include <BS_thread_pool_utils.hpp>

#include <numeric>

namespace cudf::io::json::detail {

namespace {

namespace pools {

BS::thread_pool& tpool()
{
static BS::thread_pool _tpool(std::thread::hardware_concurrency());
return _tpool;
}

} // namespace pools

class compressed_host_buffer_source final : public datasource {
public:
explicit compressed_host_buffer_source(std::unique_ptr<datasource> const& src,
Expand All @@ -51,8 +65,8 @@ class compressed_host_buffer_source final : public datasource {
{
auto ch_buffer = host_span<uint8_t const>(reinterpret_cast<uint8_t const*>(_dbuf_ptr->data()),
_dbuf_ptr->size());
if (comptype == compression_type::GZIP || comptype == compression_type::ZIP ||
comptype == compression_type::SNAPPY) {
if (_comptype == compression_type::GZIP || _comptype == compression_type::ZIP ||
_comptype == compression_type::SNAPPY) {
_decompressed_ch_buffer_size = cudf::io::detail::get_uncompressed_size(_comptype, ch_buffer);
} else {
_decompressed_buffer = cudf::io::detail::decompress(_comptype, ch_buffer);
Expand Down Expand Up @@ -96,7 +110,22 @@ class compressed_host_buffer_source final : public datasource {
return std::make_unique<non_owning_buffer>(_decompressed_buffer.data() + offset, count);
}

[[nodiscard]] bool supports_device_read() const override { return false; }
std::future<size_t> device_read_async(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override
{
auto& thread_pool = pools::tpool();
return thread_pool.submit_task([this, offset, size, dst, stream] {
Comment on lines +113 to +119
Copy link
Contributor

@ttnghia ttnghia Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this be called by multiple threads? If so, we may have a race condition issue.

Copy link
Contributor Author

@shrshi shrshi Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, device_read_async is only called by the primary thread. Each of the worker threads executes the code in thread_pool.submit_task(..)

auto hbuf = host_read(offset, size);
CUDF_CUDA_TRY(
cudaMemcpyAsync(dst, hbuf->data(), hbuf->size(), cudaMemcpyHostToDevice, stream.value()));
stream.synchronize();
return hbuf->size();
});
}

[[nodiscard]] bool supports_device_read() const override { return true; }

[[nodiscard]] size_t size() const override { return _decompressed_ch_buffer_size; }

Expand Down Expand Up @@ -431,6 +460,8 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
// line of file i+1 don't end up on the same JSON line, if file i does not already end with a line
// delimiter.
auto constexpr num_delimiter_chars = 1;
std::vector<std::future<size_t>> thread_tasks;
auto stream_pool = cudf::detail::fork_streams(stream, pools::tpool().get_thread_count());

auto delimiter_map = cudf::detail::make_empty_host_vector<std::size_t>(sources.size(), stream);
std::vector<std::size_t> prefsum_source_sizes(sources.size());
Expand All @@ -447,13 +478,17 @@ device_span<char> ingest_raw_input(device_span<char> buffer,

auto const total_bytes_to_read = std::min(range_size, prefsum_source_sizes.back() - range_offset);
range_offset -= start_source ? prefsum_source_sizes[start_source - 1] : 0;
for (std::size_t i = start_source; i < sources.size() && bytes_read < total_bytes_to_read; i++) {
for (std::size_t i = start_source, cur_stream = 0;
i < sources.size() && bytes_read < total_bytes_to_read;
i++) {
if (sources[i]->is_empty()) continue;
auto data_size = std::min(sources[i]->size() - range_offset, total_bytes_to_read - bytes_read);
auto destination = reinterpret_cast<uint8_t*>(buffer.data()) + bytes_read +
(num_delimiter_chars * delimiter_map.size());
if (sources[i]->is_device_read_preferred(data_size)) {
bytes_read += sources[i]->device_read(range_offset, data_size, destination, stream);
if (sources[i]->supports_device_read()) {
thread_tasks.emplace_back(sources[i]->device_read_async(
range_offset, data_size, destination, stream_pool[cur_stream++ % stream_pool.size()]));
bytes_read += data_size;
} else {
h_buffers.emplace_back(sources[i]->host_read(range_offset, data_size));
auto const& h_buffer = h_buffers.back();
Expand Down Expand Up @@ -481,6 +516,15 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
buffer.data());
}
stream.synchronize();

if (thread_tasks.size()) {
auto const bytes_read = std::accumulate(
thread_tasks.begin(), thread_tasks.end(), std::size_t{0}, [](std::size_t sum, auto& task) {
return sum + task.get();
});
CUDF_EXPECTS(bytes_read == total_bytes_to_read, "something's fishy");
}

return buffer.first(bytes_read + (delimiter_map.size() * num_delimiter_chars));
}

Expand All @@ -505,10 +549,17 @@ table_with_metadata read_json(host_span<std::unique_ptr<datasource>> sources,
return read_json_impl(sources, reader_opts, stream, mr);

std::vector<std::unique_ptr<datasource>> compressed_sources;
for (size_t i = 0; i < sources.size(); i++) {
compressed_sources.emplace_back(
std::make_unique<compressed_host_buffer_source>(sources[i], reader_opts.get_compression()));
std::vector<std::future<std::unique_ptr<compressed_host_buffer_source>>> thread_tasks;
auto& thread_pool = pools::tpool();
for (auto& src : sources) {
thread_tasks.emplace_back(thread_pool.submit_task([&reader_opts, &src] {
return std::make_unique<compressed_host_buffer_source>(src, reader_opts.get_compression());
}));
}
std::transform(thread_tasks.begin(),
thread_tasks.end(),
std::back_inserter(compressed_sources),
[](auto& task) { return task.get(); });
// in read_json_impl, we need the compressed source size to actually be the
// uncompressed source size for correct batching
return read_json_impl(compressed_sources, reader_opts, stream, mr);
Expand Down
Loading