diff --git a/bazel/BUILD.zstd b/bazel/BUILD.zstd new file mode 100644 index 000000000000..24232cb98382 --- /dev/null +++ b/bazel/BUILD.zstd @@ -0,0 +1,14 @@ +load("@rules_foreign_cc//foreign_cc:defs.bzl", "make") +load("@com_github_ray_project_ray//bazel:ray.bzl", "filter_files_with_suffix") + +filegroup( + name = "all", + srcs = glob(["**"]), +) + +make( + name = "libzstd", + lib_source = ":all", + args = ["ZSTD_NO_ASM=1"], + visibility = ["//visibility:public"], +) diff --git a/bazel/ray_deps_setup.bzl b/bazel/ray_deps_setup.bzl index a2c1ce1f4c65..75213aeacb40 100644 --- a/bazel/ray_deps_setup.bzl +++ b/bazel/ray_deps_setup.bzl @@ -372,3 +372,11 @@ def ray_deps_setup(): sha256 = "2db82d1e7119df3e71b7640219b6dfe84789bc0537983c3b7ac4f7189aecfeaa", strip_prefix = "jemalloc-5.3.0", ) + + http_archive( + name = "zstd", + urls = ["https://github.com/facebook/zstd/releases/download/v1.5.6/zstd-1.5.6.tar.gz"], + build_file = "@com_github_ray_project_ray//bazel:BUILD.zstd", + sha256 = "8c29e06cf42aacc1eafc4077ae2ec6c6fcb96a626157e0593d5e82a34fd403c1", + strip_prefix = "zstd-1.5.6", + ) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 3ead71ebc18f..0f6a21f4d715 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -357,6 +357,11 @@ RAY_CONFIG(uint64_t, object_manager_max_bytes_in_flight, ((uint64_t)2) * 1024 * 1024 * 1024) +/// Data compression to apply to Push request payload +/// 0: no compression +/// 1: ZSTD compression +RAY_CONFIG(int, object_manager_push_compression_algorithm, 0) + /// Maximum number of ids in one batch to send to GCS to delete keys. RAY_CONFIG(uint32_t, maximum_gcs_deletion_batch_size, 1000) diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index ce9f34c0b246..96a225d4ddcc 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -533,7 +533,11 @@ void ObjectManager::SendObjectChunk(const UniqueID &push_id, on_complete(Status::IOError("Failed to read spilled object")); return; } - push_request.set_data(std::move(optional_chunk.value())); + if (config_.push_payload_compression_algorithm == CompressionAlgorithm::none) { + push_request.set_data(std::move(optional_chunk.value())); + } else { + push_request.set_data(CompressData(optional_chunk.value())); + } if (from_disk) { num_bytes_pushed_from_disk_ += push_request.data().length(); } else { @@ -573,7 +577,15 @@ void ObjectManager::HandlePush(rpc::PushRequest request, const std::string &data = request.data(); bool success = ReceiveObjectChunk( - node_id, object_id, owner_address, data_size, metadata_size, chunk_index, data); + node_id, + object_id, + owner_address, + data_size, + metadata_size, + chunk_index, + config_.push_payload_compression_algorithm == CompressionAlgorithm::none + ? data + : DecompressData(data)); num_chunks_received_total_++; if (!success) { num_chunks_received_total_failed_++; @@ -809,4 +821,24 @@ void ObjectManager::Tick(const boost::system::error_code &e) { pull_retry_timer_.async_wait([this](const boost::system::error_code &e) { Tick(e); }); } +std::string ObjectManager::CompressData(const std::string &data) const { + switch (config_.push_payload_compression_algorithm) { + case CompressionAlgorithm::zstd: + return CompressZstd(data); + default: + RAY_LOG(FATAL) << "Unknown compression algorithm"; + } + return data; +} + +std::string ObjectManager::DecompressData(const std::string &data) const { + switch (config_.push_payload_compression_algorithm) { + case CompressionAlgorithm::zstd: + return DecompressZstd(data); + default: + RAY_LOG(FATAL) << "Unknown compression algorithm"; + } + return data; +} + } // namespace ray diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index 305cfc6f50be..370bcc7cf1f7 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -48,6 +48,11 @@ namespace ray { +enum class CompressionAlgorithm : int { + none = 0, + zstd = 1, +}; + struct ObjectManagerConfig { /// The IP address this object manager is running on. std::string object_manager_address; @@ -82,6 +87,22 @@ struct ObjectManagerConfig { std::string fallback_directory; /// Enable huge pages. bool huge_pages; + /// Compression algorithm for push payload. + /// 0: no compression + /// 1: zstd + CompressionAlgorithm push_payload_compression_algorithm; + /// Helper function to get the compression algorithm. + static CompressionAlgorithm GetCompressionAlgorithm(int algorithm) { + switch (algorithm) { + case 0: + return CompressionAlgorithm::none; + case 1: + return CompressionAlgorithm::zstd; + default: + RAY_LOG(FATAL) << "Unknown compression algorithm " << algorithm; + } + return CompressionAlgorithm::none; + } }; struct LocalObjectInfo { @@ -392,6 +413,18 @@ class ObjectManager : public ObjectManagerInterface, /// \param node_id Remote node id, will send rpc request to it std::shared_ptr GetRpcClient(const NodeID &node_id); + /// Compress the input data. + /// + /// \param data The data to be compressed + /// \return The compressed data + std::string CompressData(const std::string &data) const; + + /// Decompress the input data. + /// + /// \param data The data to be decompressed + /// \return The decompressed data + std::string DecompressData(const std::string &data) const; + /// Weak reference to main service. We ensure this object is destroyed before /// main_service_ is stopped. instrumented_io_context *main_service_; diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 1ceece83dbba..1c11c19f1548 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -350,6 +350,9 @@ int main(int argc, char *argv[]) { } object_manager_config.object_chunk_size = RayConfig::instance().object_manager_default_chunk_size(); + object_manager_config.push_payload_compression_algorithm = + ray::ObjectManagerConfig::GetCompressionAlgorithm( + RayConfig::instance().object_manager_push_compression_algorithm()); RAY_LOG(DEBUG) << "Starting object manager with configuration: \n" << "rpc_service_threads_number = " diff --git a/src/ray/util/BUILD b/src/ray/util/BUILD index ce3ddd114bb3..9342a4b5d1be 100644 --- a/src/ray/util/BUILD +++ b/src/ray/util/BUILD @@ -40,5 +40,6 @@ cc_library( "@com_google_absl//absl/time", "@com_google_googletest//:gtest_main", "@nlohmann_json", + "@zstd//:libzstd", ], ) diff --git a/src/ray/util/tests/util_test.cc b/src/ray/util/tests/util_test.cc index 9be4bf110c95..3fce8ca726be 100644 --- a/src/ray/util/tests/util_test.cc +++ b/src/ray/util/tests/util_test.cc @@ -241,6 +241,13 @@ TEST(UtilTest, GetAllProcsWithPpid) { #endif } +TEST(UtilTest, ZstdTest) { + std::string data = "Zstd test input data"; + std::string compressed = CompressZstd(data); + std::string decompressed = DecompressZstd(compressed); + ASSERT_EQ(data, decompressed); +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/util/util.h b/src/ray/util/util.h index e79fe9505c9a..db0b252f0447 100644 --- a/src/ray/util/util.h +++ b/src/ray/util/util.h @@ -14,6 +14,8 @@ #pragma once +#include + #include #include #include @@ -155,6 +157,32 @@ inline std::string GenerateUUIDV4() { return ss.str(); } +/// Compress the input data using zstd compression +inline std::string CompressZstd(const std::string &data) { + // Compress the data. + const size_t compressed_size = ZSTD_compressBound(data.size()); + std::string compressed_data(compressed_size, '\0'); + const size_t actual_compressed_size = + ZSTD_compress(compressed_data.data(), compressed_size, data.data(), data.size(), 1); + compressed_data.resize(actual_compressed_size); + return compressed_data; +} + +/// Decompress the input data compressed by zstd compression +inline std::string DecompressZstd(const std::string &data) { + // Get the decompressed size. + const size_t decompressed_size = ZSTD_getFrameContentSize(data.data(), data.size()); + // Decompress the data. + std::string decompressed_data(decompressed_size, '\0'); + const size_t actual_decompressed_size = ZSTD_decompress( + decompressed_data.data(), decompressed_size, data.data(), data.size()); + if (actual_decompressed_size != decompressed_size) { + RAY_LOG(ERROR) << "ZSTD_decompress failed"; + return ""; + } + return decompressed_data; +} + /// A helper function to parse command-line arguments in a platform-compatible manner. /// /// \param cmdline The command-line to split.