Skip to content

Commit

Permalink
[object_store] add zstd compression to object_store push payload (#5) (
Browse files Browse the repository at this point in the history
…#9)

We use ray.data on remote workers to process training data before
feeding it into the trainer. Our training data sees good compression
ratio when stored on disk; yet with ray.data pipeline we can only
transfer raw data between workers.

We would like to introduce zstd data compression to the object_manager's
Push payloads. In our production environments, we see significant (35% -
40%) training throughput (examples/sec) improvement for one of our
model.

Test plan:

https://docs.google.com/document/d/13XSiAdHbH6qbkSkFHqOKOcdCJG92XldOYgmBbijhtNY/edit?usp=sharing

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: cyang <[email protected]>
Co-authored-by: Chen Yang <[email protected]>
  • Loading branch information
votrou and iamyangchen authored Jul 10, 2024
1 parent ee94228 commit f12e5b4
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 2 deletions.
14 changes: 14 additions & 0 deletions bazel/BUILD.zstd
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
load("@rules_foreign_cc//foreign_cc:defs.bzl", "make")
load("@com_github_ray_project_ray//bazel:ray.bzl", "filter_files_with_suffix")

filegroup(
name = "all",
srcs = glob(["**"]),
)

make(
name = "libzstd",
lib_source = ":all",
args = ["ZSTD_NO_ASM=1"],
visibility = ["//visibility:public"],
)
8 changes: 8 additions & 0 deletions bazel/ray_deps_setup.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -372,3 +372,11 @@ def ray_deps_setup():
sha256 = "2db82d1e7119df3e71b7640219b6dfe84789bc0537983c3b7ac4f7189aecfeaa",
strip_prefix = "jemalloc-5.3.0",
)

http_archive(
name = "zstd",
urls = ["https://github.com/facebook/zstd/releases/download/v1.5.6/zstd-1.5.6.tar.gz"],
build_file = "@com_github_ray_project_ray//bazel:BUILD.zstd",
sha256 = "8c29e06cf42aacc1eafc4077ae2ec6c6fcb96a626157e0593d5e82a34fd403c1",
strip_prefix = "zstd-1.5.6",
)
5 changes: 5 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,11 @@ RAY_CONFIG(uint64_t,
object_manager_max_bytes_in_flight,
((uint64_t)2) * 1024 * 1024 * 1024)

/// Data compression to apply to Push request payload
/// 0: no compression
/// 1: ZSTD compression
RAY_CONFIG(int, object_manager_push_compression_algorithm, 0)

/// Maximum number of ids in one batch to send to GCS to delete keys.
RAY_CONFIG(uint32_t, maximum_gcs_deletion_batch_size, 1000)

Expand Down
36 changes: 34 additions & 2 deletions src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,11 @@ void ObjectManager::SendObjectChunk(const UniqueID &push_id,
on_complete(Status::IOError("Failed to read spilled object"));
return;
}
push_request.set_data(std::move(optional_chunk.value()));
if (config_.push_payload_compression_algorithm == CompressionAlgorithm::none) {
push_request.set_data(std::move(optional_chunk.value()));
} else {
push_request.set_data(CompressData(optional_chunk.value()));
}
if (from_disk) {
num_bytes_pushed_from_disk_ += push_request.data().length();
} else {
Expand Down Expand Up @@ -573,7 +577,15 @@ void ObjectManager::HandlePush(rpc::PushRequest request,
const std::string &data = request.data();

bool success = ReceiveObjectChunk(
node_id, object_id, owner_address, data_size, metadata_size, chunk_index, data);
node_id,
object_id,
owner_address,
data_size,
metadata_size,
chunk_index,
config_.push_payload_compression_algorithm == CompressionAlgorithm::none
? data
: DecompressData(data));
num_chunks_received_total_++;
if (!success) {
num_chunks_received_total_failed_++;
Expand Down Expand Up @@ -809,4 +821,24 @@ void ObjectManager::Tick(const boost::system::error_code &e) {
pull_retry_timer_.async_wait([this](const boost::system::error_code &e) { Tick(e); });
}

std::string ObjectManager::CompressData(const std::string &data) const {
switch (config_.push_payload_compression_algorithm) {
case CompressionAlgorithm::zstd:
return CompressZstd(data);
default:
RAY_LOG(FATAL) << "Unknown compression algorithm";
}
return data;
}

std::string ObjectManager::DecompressData(const std::string &data) const {
switch (config_.push_payload_compression_algorithm) {
case CompressionAlgorithm::zstd:
return DecompressZstd(data);
default:
RAY_LOG(FATAL) << "Unknown compression algorithm";
}
return data;
}

} // namespace ray
33 changes: 33 additions & 0 deletions src/ray/object_manager/object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@

namespace ray {

enum class CompressionAlgorithm : int {
none = 0,
zstd = 1,
};

struct ObjectManagerConfig {
/// The IP address this object manager is running on.
std::string object_manager_address;
Expand Down Expand Up @@ -82,6 +87,22 @@ struct ObjectManagerConfig {
std::string fallback_directory;
/// Enable huge pages.
bool huge_pages;
/// Compression algorithm for push payload.
/// 0: no compression
/// 1: zstd
CompressionAlgorithm push_payload_compression_algorithm;
/// Helper function to get the compression algorithm.
static CompressionAlgorithm GetCompressionAlgorithm(int algorithm) {
switch (algorithm) {
case 0:
return CompressionAlgorithm::none;
case 1:
return CompressionAlgorithm::zstd;
default:
RAY_LOG(FATAL) << "Unknown compression algorithm " << algorithm;
}
return CompressionAlgorithm::none;
}
};

struct LocalObjectInfo {
Expand Down Expand Up @@ -392,6 +413,18 @@ class ObjectManager : public ObjectManagerInterface,
/// \param node_id Remote node id, will send rpc request to it
std::shared_ptr<rpc::ObjectManagerClient> GetRpcClient(const NodeID &node_id);

/// Compress the input data.
///
/// \param data The data to be compressed
/// \return The compressed data
std::string CompressData(const std::string &data) const;

/// Decompress the input data.
///
/// \param data The data to be decompressed
/// \return The decompressed data
std::string DecompressData(const std::string &data) const;

/// Weak reference to main service. We ensure this object is destroyed before
/// main_service_ is stopped.
instrumented_io_context *main_service_;
Expand Down
3 changes: 3 additions & 0 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ int main(int argc, char *argv[]) {
}
object_manager_config.object_chunk_size =
RayConfig::instance().object_manager_default_chunk_size();
object_manager_config.push_payload_compression_algorithm =
ray::ObjectManagerConfig::GetCompressionAlgorithm(
RayConfig::instance().object_manager_push_compression_algorithm());

RAY_LOG(DEBUG) << "Starting object manager with configuration: \n"
<< "rpc_service_threads_number = "
Expand Down
1 change: 1 addition & 0 deletions src/ray/util/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ cc_library(
"@com_google_absl//absl/time",
"@com_google_googletest//:gtest_main",
"@nlohmann_json",
"@zstd//:libzstd",
],
)
7 changes: 7 additions & 0 deletions src/ray/util/tests/util_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,13 @@ TEST(UtilTest, GetAllProcsWithPpid) {
#endif
}

TEST(UtilTest, ZstdTest) {
std::string data = "Zstd test input data";
std::string compressed = CompressZstd(data);
std::string decompressed = DecompressZstd(compressed);
ASSERT_EQ(data, decompressed);
}

} // namespace ray

int main(int argc, char **argv) {
Expand Down
28 changes: 28 additions & 0 deletions src/ray/util/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#pragma once

#include <zstd.h>

#include <chrono>
#include <iterator>
#include <memory>
Expand Down Expand Up @@ -155,6 +157,32 @@ inline std::string GenerateUUIDV4() {
return ss.str();
}

/// Compress the input data using zstd compression
inline std::string CompressZstd(const std::string &data) {
// Compress the data.
const size_t compressed_size = ZSTD_compressBound(data.size());
std::string compressed_data(compressed_size, '\0');
const size_t actual_compressed_size =
ZSTD_compress(compressed_data.data(), compressed_size, data.data(), data.size(), 1);
compressed_data.resize(actual_compressed_size);
return compressed_data;
}

/// Decompress the input data compressed by zstd compression
inline std::string DecompressZstd(const std::string &data) {
// Get the decompressed size.
const size_t decompressed_size = ZSTD_getFrameContentSize(data.data(), data.size());
// Decompress the data.
std::string decompressed_data(decompressed_size, '\0');
const size_t actual_decompressed_size = ZSTD_decompress(
decompressed_data.data(), decompressed_size, data.data(), data.size());
if (actual_decompressed_size != decompressed_size) {
RAY_LOG(ERROR) << "ZSTD_decompress failed";
return "";
}
return decompressed_data;
}

/// A helper function to parse command-line arguments in a platform-compatible manner.
///
/// \param cmdline The command-line to split.
Expand Down

0 comments on commit f12e5b4

Please sign in to comment.