Skip to content

Commit

Permalink
[ISSUE #928] [CPP] Fix some cpp client bug and make logs cleaner (#929)
Browse files Browse the repository at this point in the history
  • Loading branch information
lizhimins authored Jan 21, 2025
1 parent a60bec6 commit eeeb643
Show file tree
Hide file tree
Showing 22 changed files with 62 additions and 63 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cpp_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
# Disable VS 2022 before https://github.com/bazelbuild/bazel/issues/18592 issue is solved
# Remove macos-11 since there is no such runner available
# os: [ubuntu-20.04, ubuntu-22.04, macos-11, macos-12, windows-2019, windows-2022]
os: [ubuntu-20.04, ubuntu-22.04, macos-12, windows-2019]
os: [ubuntu-20.04, ubuntu-22.04, windows-2019]
steps:
- uses: actions/checkout@v2
- name: Compile On Linux
Expand Down
1 change: 1 addition & 0 deletions cpp/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ bazel-rocketmq-client-cpp
/compile_commands.json
/.cache/
.clangd
build
5 changes: 2 additions & 3 deletions cpp/examples/ExampleFifoProducer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include "rocketmq/FifoProducer.h"
#include "rocketmq/Logger.h"
#include "rocketmq/Message.h"
#include "rocketmq/Producer.h"
#include "rocketmq/SendReceipt.h"

using namespace ROCKETMQ_NAMESPACE;
Expand Down Expand Up @@ -93,8 +92,8 @@ std::string randomString(std::string::size_type len) {
return result;
}

DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published");
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider");
DEFINE_string(topic, "FifoTopic", "Topic to which messages are published");
DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider");
DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
DEFINE_string(access_key, "", "Your access key ID");
Expand Down
4 changes: 2 additions & 2 deletions cpp/examples/ExampleProducer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ std::string randomString(std::string::size_type len) {
return result;
}

DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published");
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider");
DEFINE_string(topic, "NormalTopic", "Topic to which messages are published");
DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider");
DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
DEFINE_string(access_key, "", "Your access key ID");
Expand Down
4 changes: 2 additions & 2 deletions cpp/examples/ExampleProducerWithAsync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ std::string randomString(std::string::size_type len) {
return result;
}

DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published");
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider");
DEFINE_string(topic, "NormalTopic", "Topic to which messages are published");
DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider");
DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
DEFINE_uint32(concurrency, 128, "Concurrency of async send");
Expand Down
4 changes: 2 additions & 2 deletions cpp/examples/ExampleProducerWithFifoMessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ std::string randomString(std::string::size_type len) {
return result;
}

DEFINE_string(topic, "fifo_topic_sample", "Topic to which messages are published");
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider");
DEFINE_string(topic, "FifoTopic", "Topic to which messages are published");
DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider");
DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
DEFINE_string(access_key, "", "Your access key ID");
Expand Down
5 changes: 2 additions & 3 deletions cpp/examples/ExampleProducerWithTimedMessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <iostream>
#include <random>
#include <string>
Expand Down Expand Up @@ -50,8 +49,8 @@ std::string randomString(std::string::size_type len) {
return result;
}

DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published");
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider");
DEFINE_string(topic, "TimerTopic", "Topic to which messages are published");
DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider");
DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
DEFINE_string(access_key, "", "Your access key ID");
Expand Down
4 changes: 2 additions & 2 deletions cpp/examples/ExampleProducerWithTransactionalMessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ std::string randomString(std::string::size_type len) {
return result;
}

DEFINE_string(topic, "tx_topic_sample", "Topic to which messages are published");
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider");
DEFINE_string(topic, "TransTopic", "Topic to which messages are published");
DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider");
DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
DEFINE_string(access_key, "", "Your access key ID");
Expand Down
6 changes: 3 additions & 3 deletions cpp/examples/ExamplePushConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@

using namespace ROCKETMQ_NAMESPACE;

DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published");
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider");
DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through your instance management console");
DEFINE_string(topic, "NormalTopic", "Topic to which messages are published");
DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider");
DEFINE_string(group, "PushConsumer", "GroupId, created through your instance management console");
DEFINE_string(access_key, "", "Your access key ID");
DEFINE_string(access_secret, "", "Your access secret");
DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
Expand Down
6 changes: 3 additions & 3 deletions cpp/examples/ExampleSimpleConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@

using namespace ROCKETMQ_NAMESPACE;

DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published");
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider");
DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through your instance management console");
DEFINE_string(topic, "NormalTopic", "Topic to which messages are published");
DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider");
DEFINE_string(group, "SimpleConsumer", "GroupId, created through your instance management console");
DEFINE_string(access_key, "", "Your access key ID");
DEFINE_string(access_secret, "", "Your access secret");
DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
Expand Down
4 changes: 2 additions & 2 deletions cpp/source/base/include/InvocationContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ struct InvocationContext : public BaseInvocationContext {

if (!status.ok() && grpc::StatusCode::DEADLINE_EXCEEDED == status.error_code()) {
auto diff =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - context.deadline())
.count();
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now() - context.deadline()).count();
SPDLOG_WARN("Asynchronous RPC[{}.{}] timed out, elapsing {}ms, deadline-over-due: {}ms",
absl::FormatTime(created_time, absl::UTCTimeZone()), elapsed, diff);
}
Expand Down
11 changes: 6 additions & 5 deletions cpp/source/client/ClientManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool with_s
state_(State::CREATED),
callback_thread_pool_(absl::make_unique<ThreadPoolImpl>(std::thread::hardware_concurrency())),
with_ssl_(with_ssl) {

certificate_verifier_ = grpc::experimental::ExternalCertificateVerifier::Create<InsecureCertificateVerifier>();
tls_channel_credential_options_.set_verify_server_certs(false);
tls_channel_credential_options_.set_check_call_host(false);
Expand Down Expand Up @@ -78,7 +79,7 @@ ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool with_s
*/
channel_arguments_.SetInt(GRPC_ARG_ENABLE_RETRIES, 0);

channel_arguments_.SetSslTargetNameOverride("localhost");
// channel_arguments_.SetSslTargetNameOverride("localhost");

SPDLOG_INFO("ClientManager[ResourceNamespace={}] created", resource_namespace_);
}
Expand Down Expand Up @@ -282,7 +283,7 @@ bool ClientManagerImpl::send(const std::string& target_host,
SendMessageRequest& request,
SendResultCallback cb) {
assert(cb);
SPDLOG_DEBUG("Prepare to send message to {} asynchronously. Request: {}", target_host, request.DebugString());
SPDLOG_DEBUG("Prepare to send message to {} asynchronously. Request: {}", target_host, request.ShortDebugString());
RpcClientSharedPtr client = getRpcClient(target_host);
// Invocation context will be deleted in its onComplete() method.
auto invocation_context = new InvocationContext<SendMessageResponse>();
Expand Down Expand Up @@ -440,7 +441,7 @@ bool ClientManagerImpl::send(const std::string& target_host,

case rmq::Code::MESSAGE_PROPERTY_CONFLICT_WITH_TYPE: {
SPDLOG_WARN("Message-property-conflict-with-type: Host={}, Response={}", invocation_context->remote_address,
invocation_context->response.DebugString());
invocation_context->response.ShortDebugString());
send_result.ec = ErrorCode::MessagePropertyConflictWithType;
break;
}
Expand Down Expand Up @@ -482,7 +483,7 @@ RpcClientSharedPtr ClientManagerImpl::getRpcClient(const std::string& target_hos
auto search = rpc_clients_.find(target_host);
if (search == rpc_clients_.end() || !search->second->ok()) {
if (search == rpc_clients_.end()) {
SPDLOG_INFO("Create a RPC client to {}", target_host.data());
SPDLOG_INFO("Create a RPC client to [{}]", target_host.data());
} else if (!search->second->ok()) {
SPDLOG_INFO("Prior RPC client to {} is not OK. Re-create one", target_host);
}
Expand Down Expand Up @@ -549,7 +550,7 @@ void ClientManagerImpl::resolveRoute(const std::string& target_host,
std::chrono::milliseconds timeout,
const std::function<void(const std::error_code&, const TopicRouteDataPtr&)>& cb) {
SPDLOG_DEBUG("Name server connection URL: {}", target_host);
SPDLOG_DEBUG("Query route request: {}", request.DebugString());
SPDLOG_DEBUG("Query route request: {}", request.ShortDebugString());
RpcClientSharedPtr client = getRpcClient(target_host, false);
if (!client) {
SPDLOG_WARN("Failed to create RPC client for name server[host={}]", target_host);
Expand Down
14 changes: 7 additions & 7 deletions cpp/source/client/LogInterceptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ void LogInterceptor::Intercept(grpc::experimental::InterceptorBatchMethods* meth
if (methods->QueryInterceptionHookPoint(grpc::experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
std::multimap<std::string, std::string>* metadata = methods->GetSendInitialMetadata();
if (metadata) {
SPDLOG_DEBUG("[Outbound]Headers of {}: \n{}", client_rpc_info_->method(),
absl::StrJoin(*metadata, "\n", absl::PairFormatter(" --> ")));
SPDLOG_DEBUG("[Outbound]Headers of {}: {}", client_rpc_info_->method(),
absl::StrJoin(*metadata, " ", absl::PairFormatter(" --> ")));
}
}

Expand All @@ -73,8 +73,8 @@ void LogInterceptor::Intercept(grpc::experimental::InterceptorBatchMethods* meth
absl::string_view(it.second.data(), it.second.length())});
}
if (!response_headers.empty()) {
SPDLOG_DEBUG("[Inbound]Response Headers of {}:\n{}", client_rpc_info_->method(),
absl::StrJoin(response_headers, "\n", absl::PairFormatter(" --> ")));
SPDLOG_DEBUG("[Inbound]Response Headers of {}: {}", client_rpc_info_->method(),
absl::StrJoin(response_headers, " ", absl::PairFormatter(" --> ")));
} else {
SPDLOG_DEBUG("[Inbound]Response metadata of {} is empty", client_rpc_info_->method());
}
Expand All @@ -85,12 +85,12 @@ void LogInterceptor::Intercept(grpc::experimental::InterceptorBatchMethods* meth
void* message = methods->GetRecvMessage();
if (message) {
auto* response = reinterpret_cast<google::protobuf::Message*>(message);
std::string&& response_text = response->DebugString();
std::string&& response_text = response->ShortDebugString();
std::size_t limit = 1024;
if (response_text.size() <= limit) {
SPDLOG_DEBUG("[Inbound] {}\n{}", client_rpc_info_->method(), response_text);
SPDLOG_DEBUG("[Inbound] {} {}", client_rpc_info_->method(), response_text);
} else {
SPDLOG_DEBUG("[Inbound] {}\n{}...", client_rpc_info_->method(), response_text.substr(0, limit));
SPDLOG_DEBUG("[Inbound] {} {}...", client_rpc_info_->method(), response_text.substr(0, limit));
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions cpp/source/client/RpcClientImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
#include "RpcClientImpl.h"

#include <chrono>
#include <functional>
#include <sstream>
#include <thread>
Expand All @@ -26,7 +25,6 @@
#include "RpcClient.h"
#include "TelemetryBidiReactor.h"
#include "TlsHelper.h"
#include "absl/time/time.h"

ROCKETMQ_NAMESPACE_BEGIN

Expand Down
2 changes: 1 addition & 1 deletion cpp/source/client/SessionImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ bool SessionImpl::await() {

void SessionImpl::syncSettings() {
auto ptr = client_.lock();
SPDLOG_INFO("Sync client settings to {}", rpc_client_->remoteAddress());
SPDLOG_INFO("Request client settings to {}", rpc_client_->remoteAddress());
TelemetryCommand command;
command.mutable_settings()->CopyFrom(ptr->clientSettings());
telemetry_->write(command);
Expand Down
22 changes: 10 additions & 12 deletions cpp/source/client/TelemetryBidiReactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,12 @@
*/
#include "TelemetryBidiReactor.h"

#include <atomic>
#include <cstdint>
#include <memory>
#include <utility>

#include "ClientManager.h"
#include "MessageExt.h"
#include "Metadata.h"
#include "RpcClient.h"
#include "Signature.h"
#include "google/protobuf/util/time_util.h"
#include "rocketmq/Logger.h"
Expand Down Expand Up @@ -70,7 +67,7 @@ void TelemetryBidiReactor::OnWriteDone(bool ok) {
RemoveHold();

if (!ok) {
SPDLOG_WARN("Failed to write telemetry command {} to {}", writes_.front().DebugString(), peer_address_);
SPDLOG_WARN("Failed to write telemetry command {} to {}", writes_.front().ShortDebugString(), peer_address_);
signalClose();
return;
}
Expand All @@ -91,7 +88,7 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
if (!ok) {
// for read stream
RemoveHold();
SPDLOG_WARN("Failed to read from telemetry stream from {}", peer_address_);
// SPDLOG_WARN("Failed to read from telemetry stream from {}", peer_address_);
signalClose();
return;
}
Expand All @@ -103,7 +100,7 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
}
}

SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_, read_.DebugString());
SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_, read_.ShortDebugString());
auto client = client_.lock();
if (!client) {
SPDLOG_INFO("Client for {} has destructed", peer_address_);
Expand All @@ -114,19 +111,20 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
switch (read_.command_case()) {
case rmq::TelemetryCommand::kSettings: {
auto settings = read_.settings();
SPDLOG_INFO("Received settings from {}: {}", peer_address_, settings.DebugString());
SPDLOG_INFO("Receive settings from {}: {}", peer_address_, settings.ShortDebugString());
applySettings(settings);
sync_settings_promise_.set_value(true);
break;
}

case rmq::TelemetryCommand::kRecoverOrphanedTransactionCommand: {
SPDLOG_DEBUG("Receive orphan transaction command: {}", read_.DebugString());
auto message = client->manager()->wrapMessage(read_.release_verify_message_command()->message());
SPDLOG_INFO("Receive orphan transaction command: {}", read_.ShortDebugString());
auto message = client->manager()->wrapMessage(
read_.recover_orphaned_transaction_command().message());
auto raw = const_cast<Message*>(message.get());
raw->mutableExtension().target_endpoint = peer_address_;
raw->mutableExtension().transaction_id = read_.recover_orphaned_transaction_command().transaction_id();
client->recoverOrphanedTransaction(message);

break;
}

Expand Down Expand Up @@ -156,7 +154,7 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
}

default: {
SPDLOG_WARN("Unsupported command");
SPDLOG_WARN("Telemetry command receive unsupported command");
break;
}
}
Expand Down Expand Up @@ -291,7 +289,7 @@ void TelemetryBidiReactor::tryWriteNext() {
}

if (!writes_.empty()) {
SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, writes_.front().DebugString());
SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, writes_.front().ShortDebugString());
AddHold();
StartWrite(&(writes_.front()));
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/source/client/include/TopicRouteData.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class TopicRouteData {

std::string debugString() const {
return absl::StrJoin(message_queues_.begin(), message_queues_.end(), ",",
[](std::string* out, const rmq::MessageQueue& m) { out->append(m.DebugString()); });
[](std::string* out, const rmq::MessageQueue& m) { out->append(m.ShortDebugString()); });
};

private:
Expand Down
3 changes: 2 additions & 1 deletion cpp/source/log/LoggerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ Logger& getLogger() {
const std::size_t LoggerImpl::DEFAULT_MAX_LOG_FILE_QUANTITY = 16;
const std::size_t LoggerImpl::DEFAULT_FILE_SIZE = 1048576 * 256;
const char* LoggerImpl::USER_HOME_ENV = "HOME";
const char* LoggerImpl::DEFAULT_PATTERN = "[%Y/%m/%d-%H:%M:%S.%e %z] [%n] [%^---%L---%$] [thread %t] %v %@";
const char* LoggerImpl::DEFAULT_PATTERN = "%Y-%m-%d %H:%M:%S.%e [%^--%L--%$] [%7t] %v %@";
// const char* LoggerImpl::DEFAULT_PATTERN = "[%Y/%m/%d-%H:%M:%S.%e %z] [%n] [%^---%L---%$] [thread %t] %v %@";

ROCKETMQ_NAMESPACE_END
13 changes: 5 additions & 8 deletions cpp/source/rocketmq/ClientImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstdint>
#include <cstdlib>
#include <exception>
#include <functional>
#include <iterator>
#include <memory>
#include <string>
#include <system_error>
Expand All @@ -43,9 +41,6 @@
#include "absl/strings/str_split.h"
#include "fmt/format.h"
#include "opencensus/stats/stats.h"
#include "rocketmq/Logger.h"
#include "rocketmq/Message.h"
#include "rocketmq/MessageListener.h"
#include "spdlog/spdlog.h"

ROCKETMQ_NAMESPACE_BEGIN
Expand Down Expand Up @@ -175,12 +170,14 @@ void ClientImpl::start() {
auto telemetry_functor = [ptr]() {
std::shared_ptr<ClientImpl> base = ptr.lock();
if (base) {
SPDLOG_INFO("Sync client settings to servers");
SPDLOG_DEBUG("Sync client settings to servers");
base->syncClientSettings();
}
};
telemetry_handle_ = client_manager_->getScheduler()->schedule(telemetry_functor, TELEMETRY_TASK_NAME,
std::chrono::minutes(5), std::chrono::minutes(5));

telemetry_handle_ = client_manager_->getScheduler()->schedule(
telemetry_functor, TELEMETRY_TASK_NAME,
std::chrono::minutes(5), std::chrono::minutes(5));

auto&& metric_service_endpoint = metricServiceEndpoint();
if (!metric_service_endpoint.empty()) {
Expand Down
Loading

0 comments on commit eeeb643

Please sign in to comment.