Skip to content

Commit

Permalink
[CELEBORN-1819][CIP-14] Refactor cppClient with nested namespace
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR refactors cppClient to use nested namespace.

### Why are the changes needed?
The nested namespace would improve symbol isolation and make the celebornCpp project more extensible.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Compilation and UTs.

Closes #3050 from HolyLow/issue/celeborn-1819-refactor-cppClient-with-nested-namespace.

Authored-by: HolyLow <[email protected]>
Signed-off-by: mingji <[email protected]>
  • Loading branch information
HolyLow authored and FMX committed Jan 4, 2025
1 parent f886751 commit 6853b23
Show file tree
Hide file tree
Showing 27 changed files with 226 additions and 176 deletions.
2 changes: 2 additions & 0 deletions cpp/celeborn/conf/BaseConf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace fs = std::experimental::filesystem;
#include "celeborn/utils/CelebornUtils.h"

namespace celeborn {
namespace conf {
namespace core {

folly::Optional<std::string> MemConfig::get(const std::string& key) const {
Expand Down Expand Up @@ -225,4 +226,5 @@ void BaseConf::checkRegisteredProperties(
<< str;
}
}
} // namespace conf
} // namespace celeborn
2 changes: 2 additions & 0 deletions cpp/celeborn/conf/BaseConf.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "celeborn/utils/Exceptions.h"

namespace celeborn {
namespace conf {

class Config {
public:
Expand Down Expand Up @@ -260,4 +261,5 @@ class BaseConf {
registeredProps_;
};

} // namespace conf
} // namespace celeborn
6 changes: 5 additions & 1 deletion cpp/celeborn/conf/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
add_library(conf BaseConf.cpp CelebornConf.cpp)
add_library(
conf
STATIC
BaseConf.cpp
CelebornConf.cpp)

target_link_libraries(
conf
Expand Down
11 changes: 7 additions & 4 deletions cpp/celeborn/conf/CelebornConf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include "celeborn/conf/CelebornConf.h"

namespace celeborn {
namespace conf {
using Duration = utils::Duration;
namespace {

// folly::to<> does not generate 'true' and 'false', so we do it ourselves.
Expand Down Expand Up @@ -167,21 +169,21 @@ void CelebornConf::registerProperty(
}

Timeout CelebornConf::rpcLookupTimeout() const {
return toTimeout(toDuration(optionalProperty(kRpcLookupTimeout).value()));
return utils::toTimeout(toDuration(optionalProperty(kRpcLookupTimeout).value()));
}

Timeout CelebornConf::clientRpcGetReducerFileGroupRpcAskTimeout() const {
return toTimeout(toDuration(
return utils::toTimeout(toDuration(
optionalProperty(kClientRpcGetReducerFileGroupRpcAskTimeout).value()));
}

Timeout CelebornConf::networkConnectTimeout() const {
return toTimeout(
return utils::toTimeout(
toDuration(optionalProperty(kNetworkConnectTimeout).value()));
}

Timeout CelebornConf::clientFetchTimeout() const {
return toTimeout(toDuration(optionalProperty(kClientFetchTimeout).value()));
return utils::toTimeout(toDuration(optionalProperty(kClientFetchTimeout).value()));
}

int CelebornConf::networkIoNumConnectionsPerPeer() const {
Expand All @@ -195,4 +197,5 @@ int CelebornConf::networkIoClientThreads() const {
int CelebornConf::clientFetchMaxReqsInFlight() const {
return std::stoi(optionalProperty(kClientFetchMaxReqsInFlight).value());
}
} // namespace conf
} // namespace celeborn
3 changes: 3 additions & 0 deletions cpp/celeborn/conf/CelebornConf.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include "celeborn/utils/CelebornUtils.h"

namespace celeborn {
using Timeout = utils::Timeout;
namespace conf {
/***
* steps to add a new config:
* === in CelebornConf.h:
Expand Down Expand Up @@ -82,4 +84,5 @@ class CelebornConf : public BaseConf {

int clientFetchMaxReqsInFlight() const;
};
} // namespace conf
} // namespace celeborn
20 changes: 11 additions & 9 deletions cpp/celeborn/conf/tests/BaseConfTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

#include "celeborn/conf/BaseConf.h"

using namespace celeborn;
using namespace celeborn::conf;

using CelebornUserError = celeborn::utils::CelebornUserError;

// As the BaseConf's construction is protected, we expose it explicitly.
class TestedBaseConf : public BaseConf {};
Expand All @@ -30,24 +32,24 @@ void testInsertToConf(TestedBaseConf* conf, bool isMutable = true) {
const std::string key1 = "key1";
const std::string val1 = "val1";
// Before insert.
EXPECT_THROW(conf->requiredProperty(key1), celeborn::CelebornUserError);
EXPECT_THROW(conf->requiredProperty(key1), CelebornUserError);
EXPECT_EQ(conf->optionalProperty(key1), ValueType());

// Insert.
EXPECT_THROW(conf->setValue(key1, val1), celeborn::CelebornUserError);
EXPECT_THROW(conf->setValue(key1, val1), CelebornUserError);
EXPECT_TRUE(conf->registerProperty(key1, ValueType(val1)));

// After insert.
EXPECT_FALSE(conf->registerProperty(key1, ValueType(val1)));
EXPECT_THROW(conf->requiredProperty(key1), celeborn::CelebornUserError);
EXPECT_THROW(conf->requiredProperty(key1), CelebornUserError);
EXPECT_EQ(conf->optionalProperty(key1), ValueType(val1));

// Update.
const std::string val1_1 = "val1_1";
if (isMutable) {
EXPECT_EQ(conf->setValue(key1, val1_1), ValueType(val1));
} else {
EXPECT_THROW(conf->setValue(key1, val1_1), celeborn::CelebornUserError);
EXPECT_THROW(conf->setValue(key1, val1_1), CelebornUserError);
}
}

Expand All @@ -62,7 +64,7 @@ void testInitedConf(

// Test insert to init config.
const std::string val_1 = "val_1_random";
EXPECT_THROW(conf->setValue(key, val_1), celeborn::CelebornUserError);
EXPECT_THROW(conf->setValue(key, val_1), CelebornUserError);
EXPECT_TRUE(conf->registerProperty(key, val_1));
// The init config has highest priority.
EXPECT_EQ(conf->optionalProperty(key), val);
Expand All @@ -75,7 +77,7 @@ void testInitedConf(
EXPECT_EQ(conf->optionalProperty(key), ValueType(val_1));
} else {
// Insert with setValue() would not work!
EXPECT_THROW(conf->setValue(key, val_1), celeborn::CelebornUserError);
EXPECT_THROW(conf->setValue(key, val_1), CelebornUserError);
}
}

Expand Down Expand Up @@ -154,7 +156,7 @@ TEST(BaseConfTest, registerToConfInitedWithMutableConfigFile) {

// The annotated kv would not be recorded.
EXPECT_THROW(
conf->requiredProperty(annotatedKey), celeborn::CelebornUserError);
conf->requiredProperty(annotatedKey), CelebornUserError);
// Test init.
testInitedConf(conf.get(), true, key0, val0);

Expand Down Expand Up @@ -191,7 +193,7 @@ TEST(BaseConfTest, registerToConfInitedWithImmutableConfigFile) {

// The annotated kv would not be recorded.
EXPECT_THROW(
conf->requiredProperty(annotatedKey), celeborn::CelebornUserError);
conf->requiredProperty(annotatedKey), CelebornUserError);
// Test init.
testInitedConf(conf.get(), false, key0, val0);

Expand Down
4 changes: 3 additions & 1 deletion cpp/celeborn/conf/tests/CelebornConfTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

#include "celeborn/conf/CelebornConf.h"

using namespace celeborn;
using namespace celeborn::conf;

using CelebornUserError = celeborn::utils::CelebornUserError;
using SECOND = std::chrono::seconds;
using MILLISENCOND = std::chrono::milliseconds;

Expand Down
4 changes: 3 additions & 1 deletion cpp/celeborn/memory/ByteBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "celeborn/memory/ByteBuffer.h"

namespace celeborn {
namespace memory{
std::unique_ptr<WriteOnlyByteBuffer> ByteBuffer::createWriteOnly(
size_t initialCapacity,
bool isBigEndian) {
Expand Down Expand Up @@ -73,4 +74,5 @@ std::unique_ptr<folly::IOBuf> ByteBuffer::trimBuffer(
}
return std::move(data);
}
} // namespace celeborn
} // namespace memory
} // namespace celeborn
2 changes: 2 additions & 0 deletions cpp/celeborn/memory/ByteBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <folly/io/IOBuf.h>

namespace celeborn {
namespace memory {
class ReadOnlyByteBuffer;
class WriteOnlyByteBuffer;

Expand Down Expand Up @@ -177,4 +178,5 @@ class WriteOnlyByteBuffer : public ByteBuffer {
private:
std::unique_ptr<folly::io::Appender> appender_;
};
} // namespace memory
} // namespace celeborn
1 change: 1 addition & 0 deletions cpp/celeborn/memory/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.
add_library(
memory
STATIC
ByteBuffer.cpp)


Expand Down
2 changes: 1 addition & 1 deletion cpp/celeborn/memory/tests/ByteBufferTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

#include "celeborn/memory/ByteBuffer.h"

using namespace celeborn;
using namespace celeborn::memory;

namespace {
template <typename T>
Expand Down
2 changes: 2 additions & 0 deletions cpp/celeborn/protocol/PartitionLocation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "celeborn/utils/Exceptions.h"

namespace celeborn {
namespace protocol {
std::unique_ptr<StorageInfo> StorageInfo::fromPb(const PbStorageInfo& pb) {
auto result = std::make_unique<StorageInfo>();
result->type = static_cast<Type>(pb.type());
Expand Down Expand Up @@ -88,4 +89,5 @@ StatusCode toStatusCode(int32_t code) {
CELEBORN_CHECK(code <= StatusCode::TAIL);
return static_cast<StatusCode>(code);
}
} // namespace protocol
} // namespace celeborn
2 changes: 2 additions & 0 deletions cpp/celeborn/protocol/PartitionLocation.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "celeborn/utils/Exceptions.h"

namespace celeborn {
namespace protocol {
struct StorageInfo {
static std::unique_ptr<StorageInfo> fromPb(const PbStorageInfo& pb);

Expand Down Expand Up @@ -92,4 +93,5 @@ struct PartitionLocation {
static std::unique_ptr<PartitionLocation> fromPbWithoutPeer(
const PbPartitionLocation& pb);
};
} // namespace protocol
} // namespace celeborn
4 changes: 2 additions & 2 deletions cpp/celeborn/protocol/StatusCode.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

#pragma once

namespace celeborn {
namespace celeborn::protocol {
enum StatusCode {
// 1/0 Status
SUCCESS = 0,
Expand Down Expand Up @@ -92,4 +92,4 @@ enum StatusCode {
};

StatusCode toStatusCode(int32_t code);
} // namespace celeborn
} // namespace celeborn::protocol
11 changes: 6 additions & 5 deletions cpp/celeborn/protocol/TransportMessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ TransportMessage::TransportMessage(MessageType type, std::string&& payload)
messageTypeValue_ = type;
}

TransportMessage::TransportMessage(std::unique_ptr<ReadOnlyByteBuffer> buf) {
TransportMessage::TransportMessage(
std::unique_ptr<memory::ReadOnlyByteBuffer> buf) {
int messageTypeValue = buf->read<int32_t>();
int payloadLen = buf->read<int32_t>();
CELEBORN_CHECK_EQ(buf->remainingSize(), payloadLen);
Expand All @@ -35,15 +36,15 @@ TransportMessage::TransportMessage(std::unique_ptr<ReadOnlyByteBuffer> buf) {
payload_ = buf->readToString(payloadLen);
}

std::unique_ptr<ReadOnlyByteBuffer> TransportMessage::toReadOnlyByteBuffer()
const {
std::unique_ptr<memory::ReadOnlyByteBuffer>
TransportMessage::toReadOnlyByteBuffer() const {
int bufSize = payload_.size() + 4 + 4;
auto buffer = ByteBuffer::createWriteOnly(bufSize);
auto buffer = memory::ByteBuffer::createWriteOnly(bufSize);
buffer->write<int>(messageTypeValue_);
buffer->write<int>(payload_.size());
buffer->writeFromString(payload_);
CELEBORN_CHECK_EQ(buffer->size(), bufSize);
return ByteBuffer::toReadOnly(std::move(buffer));
return memory::ByteBuffer::toReadOnly(std::move(buffer));
}
} // namespace protocol
} // namespace celeborn
4 changes: 2 additions & 2 deletions cpp/celeborn/protocol/TransportMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ class TransportMessage {
public:
TransportMessage(MessageType type, std::string&& payload);

TransportMessage(std::unique_ptr<ReadOnlyByteBuffer> buf);
TransportMessage(std::unique_ptr<memory::ReadOnlyByteBuffer> buf);

std::unique_ptr<ReadOnlyByteBuffer> toReadOnlyByteBuffer() const;
std::unique_ptr<memory::ReadOnlyByteBuffer> toReadOnlyByteBuffer() const;

MessageType type() const {
return type_;
Expand Down
2 changes: 1 addition & 1 deletion cpp/celeborn/protocol/tests/PartitionLocationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include "celeborn/proto/TransportMessagesCpp.pb.h"
#include "celeborn/protocol/PartitionLocation.h"

using namespace celeborn;
using namespace celeborn::protocol;

std::unique_ptr<PbStorageInfo> generateStorageInfoPb() {
auto pbStorageInfo = std::make_unique<PbStorageInfo>();
Expand Down
1 change: 0 additions & 1 deletion cpp/celeborn/protocol/tests/TransportMessageTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include "celeborn/proto/TransportMessagesCpp.pb.h"
#include "celeborn/protocol/TransportMessage.h"

using namespace celeborn;
using namespace celeborn::protocol;

TEST(TransportMessageTest, constructFromPayload) {
Expand Down
10 changes: 6 additions & 4 deletions cpp/celeborn/utils/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
# limitations under the License.
add_library(
utils
ProcessBase.cpp StackTrace.cpp CelebornException.cpp Exceptions.cpp flags.cpp)
STATIC
ProcessBase.cpp
StackTrace.cpp
CelebornException.cpp
Exceptions.cpp
flags.cpp)

target_link_libraries(
utils
${WANGLE}
${FIZZ}
${LIBSODIUM_LIBRARY}
${FOLLY_WITH_DEPENDENCIES}
${GLOG}
${GFLAGS_LIBRARIES}
Expand Down
2 changes: 2 additions & 0 deletions cpp/celeborn/utils/CelebornException.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <exception>

namespace celeborn {
namespace utils {

std::exception_ptr toCelebornException(const std::exception_ptr& exceptionPtr) {
try {
Expand Down Expand Up @@ -278,4 +279,5 @@ const char* CelebornException::State::what() const noexcept {
}
}

} // namespace utils
} // namespace celeborn
2 changes: 2 additions & 0 deletions cpp/celeborn/utils/CelebornException.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ DECLARE_int32(celeborn_exception_user_stacktrace_rate_limit_ms);
DECLARE_int32(celeborn_exception_system_stacktrace_rate_limit_ms);

namespace celeborn {
namespace utils {

namespace error_source {
using namespace folly::string_literals;
Expand Down Expand Up @@ -412,4 +413,5 @@ class ExceptionContextSetter {
private:
ExceptionContext prev_;
};
} // namespace utils
} // namespace celeborn
3 changes: 2 additions & 1 deletion cpp/celeborn/utils/CelebornUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "celeborn/utils/Exceptions.h"

namespace celeborn {
namespace utils {
#define CELEBORN_STARTUP_LOG_PREFIX "[CELEBORN_STARTUP] "
#define CELEBORN_STARTUP_LOG(severity) \
LOG(severity) << CELEBORN_STARTUP_LOG_PREFIX
Expand All @@ -37,5 +38,5 @@ inline Timeout toTimeout(Duration duration) {
return std::chrono::duration_cast<Timeout>(duration);

}
} // namespace utils
} // namespace celeborn

Loading

0 comments on commit 6853b23

Please sign in to comment.