Skip to content

Commit

Permalink
[CELEBORN-1799][CIP-14] Add celebornConf to cppClient
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Add CelebornConf to cppClient.

### Why are the changes needed?
The CelebornConf will be used as configuration module in cppClient.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Compilation and UTs.

Closes #3027 from HolyLow/issue/celeborn-1799-add-celeborn-conf-to-cppClient.

Authored-by: HolyLow <[email protected]>
Signed-off-by: mingji <[email protected]>
  • Loading branch information
HolyLow authored and FMX committed Dec 25, 2024
1 parent fde6365 commit 7f030d4
Show file tree
Hide file tree
Showing 6 changed files with 398 additions and 3 deletions.
2 changes: 1 addition & 1 deletion cpp/celeborn/conf/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
add_library(conf BaseConf.cpp)
add_library(conf BaseConf.cpp CelebornConf.cpp)

target_link_libraries(
conf
Expand Down
198 changes: 198 additions & 0 deletions cpp/celeborn/conf/CelebornConf.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <re2/re2.h>

#include "celeborn/conf/CelebornConf.h"

namespace celeborn {
namespace {

// folly::to<> does not generate 'true' and 'false', so we do it ourselves.
std::string bool2String(bool value) {
return value ? "true" : "false";
}

#define STR_PROP(_key_, _val_) {std::string(_key_), std::string(_val_)}
#define NUM_PROP(_key_, _val_) \
{std::string(_key_), folly::to<std::string>(_val_)}
#define BOOL_PROP(_key_, _val_) {std::string(_key_), bool2String(_val_)}
#define NONE_PROP(_key_) {std::string(_key_), folly::none}

enum class CapacityUnit {
BYTE,
KILOBYTE,
MEGABYTE,
GIGABYTE,
TERABYTE,
PETABYTE
};

double toBytesPerCapacityUnit(CapacityUnit unit) {
switch (unit) {
case CapacityUnit::BYTE:
return 1;
case CapacityUnit::KILOBYTE:
return exp2(10);
case CapacityUnit::MEGABYTE:
return exp2(20);
case CapacityUnit::GIGABYTE:
return exp2(30);
case CapacityUnit::TERABYTE:
return exp2(40);
case CapacityUnit::PETABYTE:
return exp2(50);
default:
CELEBORN_USER_FAIL("Invalid capacity unit '{}'", (int)unit);
}
}

CapacityUnit valueOfCapacityUnit(const std::string& unitStr) {
if (unitStr == "B") {
return CapacityUnit::BYTE;
}
if (unitStr == "kB") {
return CapacityUnit::KILOBYTE;
}
if (unitStr == "MB") {
return CapacityUnit::MEGABYTE;
}
if (unitStr == "GB") {
return CapacityUnit::GIGABYTE;
}
if (unitStr == "TB") {
return CapacityUnit::TERABYTE;
}
if (unitStr == "PB") {
return CapacityUnit::PETABYTE;
}
CELEBORN_USER_FAIL("Invalid capacity unit '{}'", unitStr);
}

// Convert capacity string with unit to the capacity number in the specified
// units
uint64_t toCapacity(const std::string& from, CapacityUnit to) {
static const RE2 kPattern(R"(^\s*(\d+(?:\.\d+)?)\s*([a-zA-Z]+)\s*$)");
double value;
std::string unit;
if (!RE2::FullMatch(from, kPattern, &value, &unit)) {
CELEBORN_USER_FAIL("Invalid capacity string '{}'", from);
}

return value *
(toBytesPerCapacityUnit(valueOfCapacityUnit(unit)) /
toBytesPerCapacityUnit(to));
}

Duration toDuration(const std::string& str) {
static const RE2 kPattern(R"(^\s*(\d+(?:\.\d+)?)\s*([a-zA-Z]+)\s*)");

double value;
std::string unit;
if (!RE2::FullMatch(str, kPattern, &value, &unit)) {
CELEBORN_USER_FAIL("Invalid duration {}", str);
}
if (unit == "ns") {
return std::chrono::duration<double, std::nano>(value);
} else if (unit == "us") {
return std::chrono::duration<double, std::micro>(value);
} else if (unit == "ms") {
return std::chrono::duration<double, std::milli>(value);
} else if (unit == "s") {
return Duration(value);
} else if (unit == "m") {
return std::chrono::duration<double, std::ratio<60>>(value);
} else if (unit == "h") {
return std::chrono::duration<double, std::ratio<60 * 60>>(value);
} else if (unit == "d") {
return std::chrono::duration<double, std::ratio<60 * 60 * 24>>(value);
}
CELEBORN_USER_FAIL("Invalid duration {}", str);
}

} // namespace

const std::unordered_map<std::string, folly::Optional<std::string>>
CelebornConf::kDefaultProperties = {
STR_PROP(kRpcLookupTimeout, "30s"),
STR_PROP(kClientRpcGetReducerFileGroupRpcAskTimeout, "60s"),
STR_PROP(kNetworkConnectTimeout, "10s"),
STR_PROP(kClientFetchTimeout, "600s"),
NUM_PROP(kNetworkIoNumConnectionsPerPeer, "1"),
NUM_PROP(kNetworkIoClientThreads, 0),
NUM_PROP(kClientFetchMaxReqsInFlight, 3),
// NUM_PROP(kNumExample, 50'000),
// BOOL_PROP(kBoolExample, false),
};

CelebornConf::CelebornConf() {
registeredProps_ = kDefaultProperties;
}

CelebornConf::CelebornConf(const std::string& filename) {
initialize(filename);
registeredProps_ = kDefaultProperties;
}

CelebornConf::CelebornConf(const CelebornConf& other) {
if (auto* memConfig =
dynamic_cast<core::MemConfigMutable*>(other.config_.get())) {
config_ =
std::make_unique<core::MemConfigMutable>(other.config_->valuesCopy());
} else {
config_ = std::make_unique<core::MemConfig>(other.config_->valuesCopy());
}
registeredProps_ = other.registeredProps_;
filePath_ = other.filePath_;
}

void CelebornConf::registerProperty(
const std::string_view& key,
const std::string& value) {
setValue(static_cast<std::string>(key), value);
}

Timeout CelebornConf::rpcLookupTimeout() const {
return toTimeout(toDuration(optionalProperty(kRpcLookupTimeout).value()));
}

Timeout CelebornConf::clientRpcGetReducerFileGroupRpcAskTimeout() const {
return toTimeout(toDuration(
optionalProperty(kClientRpcGetReducerFileGroupRpcAskTimeout).value()));
}

Timeout CelebornConf::networkConnectTimeout() const {
return toTimeout(
toDuration(optionalProperty(kNetworkConnectTimeout).value()));
}

Timeout CelebornConf::clientFetchTimeout() const {
return toTimeout(toDuration(optionalProperty(kClientFetchTimeout).value()));
}

int CelebornConf::networkIoNumConnectionsPerPeer() const {
return std::stoi(optionalProperty(kNetworkIoNumConnectionsPerPeer).value());
}

int CelebornConf::networkIoClientThreads() const {
return std::stoi(optionalProperty(kNetworkIoClientThreads).value());
}

int CelebornConf::clientFetchMaxReqsInFlight() const {
return std::stoi(optionalProperty(kClientFetchMaxReqsInFlight).value());
}
} // namespace celeborn
85 changes: 85 additions & 0 deletions cpp/celeborn/conf/CelebornConf.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "celeborn/conf/BaseConf.h"
#include "celeborn/utils/CelebornUtils.h"

namespace celeborn {
/***
* steps to add a new config:
* === in CelebornConf.h:
* 1. define the configName with "static constexpr std::string_view";
* 2. declare the getter method within class;
* === in CelebornConf.cpp:
* 3. register the configName in CelebornConf's constructor, with proper
* data type and proper default value;
* 4. implement the getter method.
*/

class CelebornConf : public BaseConf {
public:
static const std::unordered_map<std::string, folly::Optional<std::string>>
kDefaultProperties;

static constexpr std::string_view kRpcLookupTimeout{
"celeborn.rpc.lookupTimeout"};

static constexpr std::string_view kClientRpcGetReducerFileGroupRpcAskTimeout{
"celeborn.client.rpc.getReducerFileGroup.askTimeout"};

static constexpr std::string_view kNetworkConnectTimeout{
"celeborn.network.connect.timeout"};

static constexpr std::string_view kClientFetchTimeout{
"celeborn.client.fetch.timeout"};

static constexpr std::string_view kNetworkIoNumConnectionsPerPeer{
"celeborn.data.io.numConnectionsPerPeer"};

static constexpr std::string_view kNetworkIoClientThreads{
"celeborn.data.io.clientThreads"};

static constexpr std::string_view kClientFetchMaxReqsInFlight{
"celeborn.client.fetch.maxReqsInFlight"};

CelebornConf();

CelebornConf(const std::string& filename);

CelebornConf(const CelebornConf& other);

CelebornConf(CelebornConf&& other) = delete;

void registerProperty(const std::string_view& key, const std::string& value);

Timeout rpcLookupTimeout() const;

Timeout clientRpcGetReducerFileGroupRpcAskTimeout() const;

Timeout networkConnectTimeout() const;

Timeout clientFetchTimeout() const;

int networkIoNumConnectionsPerPeer() const;

int networkIoClientThreads() const;

int clientFetchMaxReqsInFlight() const;
};
} // namespace celeborn
4 changes: 2 additions & 2 deletions cpp/celeborn/conf/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

add_executable(celeborn_conf_test BaseConfTest.cpp)
add_executable(celeborn_conf_test BaseConfTest.cpp CelebornConfTest.cpp)

add_test(NAME celeborn_conf_test COMMAND celeborn_conf_test)

Expand All @@ -26,4 +26,4 @@ target_link_libraries(
${GFLAGS_LIBRARIES}
GTest::gtest
GTest::gmock
GTest::gtest_main)
GTest::gtest_main)
Loading

0 comments on commit 7f030d4

Please sign in to comment.