From 5cdb268fafbccafdf0deafae5eb08037017ba7be Mon Sep 17 00:00:00 2001 From: BeautyYuYanli Date: Fri, 12 Aug 2022 15:22:38 +0800 Subject: [PATCH] [ISSUE-435][PART-A]Support receiving traceOn signal. (#434) * add traceOn to sendRedult * add getExtField to RemoteCommand; use unordered_map to store extField * complete extFields * complete traceOn --- include/SendResult.h | 6 +++++- src/MQClientAPIImpl.cpp | 4 +++- src/producer/SendResult.cpp | 14 ++++++++++++-- src/protocol/RemotingCommand.cpp | 10 ++++++++++ src/protocol/RemotingCommand.h | 4 +++- 5 files changed, 33 insertions(+), 5 deletions(-) diff --git a/include/SendResult.h b/include/SendResult.h index cfe83ce42..94ba543e8 100644 --- a/include/SendResult.h +++ b/include/SendResult.h @@ -37,7 +37,8 @@ class ROCKETMQCLIENT_API SendResult { const std::string& offsetMsgId, const MQMessageQueue& messageQueue, int64 queueOffset, - const std::string& regionId); + const std::string& regionId, + const bool traceOn); virtual ~SendResult(); SendResult(const SendResult& other); @@ -55,6 +56,8 @@ class ROCKETMQCLIENT_API SendResult { SendStatus getSendStatus() const; MQMessageQueue getMessageQueue() const; int64 getQueueOffset() const; + bool getTraceOn() const; + std::string toString() const; private: @@ -65,6 +68,7 @@ class ROCKETMQCLIENT_API SendResult { int64 m_queueOffset; std::string m_transactionId; std::string m_regionId; + bool m_traceOn; }; } // namespace rocketmq diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp index 19cf3f5e5..671f3289b 100644 --- a/src/MQClientAPIImpl.cpp +++ b/src/MQClientAPIImpl.cpp @@ -542,10 +542,12 @@ SendResult MQClientAPIImpl::processSendResponse(const string& brokerName, } if (res == 0) { SendMessageResponseHeader* responseHeader = (SendMessageResponseHeader*)pResponse->getCommandHeader(); + auto extFields = pResponse->getExtFields(); + bool traceOn = (extFields->count("TRACE_ON") && extFields->at("TRACE_ON") == "true"); MQMessageQueue messageQueue(msg.getTopic(), brokerName, responseHeader->queueId); string unique_msgId = msg.getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); return SendResult(sendStatus, unique_msgId, responseHeader->msgId, messageQueue, responseHeader->queueOffset, - responseHeader->regionId); + responseHeader->regionId, traceOn); } LOG_ERROR("processSendResponse error remark:%s, error code:%d", (pResponse->getRemark()).c_str(), pResponse->getCode()); diff --git a/src/producer/SendResult.cpp b/src/producer/SendResult.cpp index 3b56ff3f4..f6926d500 100644 --- a/src/producer/SendResult.cpp +++ b/src/producer/SendResult.cpp @@ -39,13 +39,15 @@ SendResult::SendResult(const SendStatus& sendStatus, const std::string& offsetMsgId, const MQMessageQueue& messageQueue, int64 queueOffset, - const string& regionId) + const string& regionId, + const bool traceOn) : m_sendStatus(sendStatus), m_msgId(msgId), m_offsetMsgId(offsetMsgId), m_messageQueue(messageQueue), m_queueOffset(queueOffset), - m_regionId(regionId) {} + m_regionId(regionId), + m_traceOn(traceOn) {} SendResult::SendResult(const SendResult& other) { m_sendStatus = other.m_sendStatus; @@ -54,6 +56,7 @@ SendResult::SendResult(const SendResult& other) { m_messageQueue = other.m_messageQueue; m_queueOffset = other.m_queueOffset; m_regionId = other.m_regionId; + m_traceOn = other.m_traceOn; } SendResult& SendResult::operator=(const SendResult& other) { @@ -64,6 +67,7 @@ SendResult& SendResult::operator=(const SendResult& other) { m_messageQueue = other.m_messageQueue; m_queueOffset = other.m_queueOffset; m_regionId = other.m_regionId; + m_traceOn = other.m_traceOn; } return *this; } @@ -96,6 +100,10 @@ int64 SendResult::getQueueOffset() const { return m_queueOffset; } +bool SendResult::getTraceOn() const { + return m_traceOn; +} + std::string SendResult::toString() const { stringstream ss; ss << "SendResult: "; @@ -105,6 +113,8 @@ std::string SendResult::toString() const { ss << ",queueOffset:" << m_queueOffset; ss << ",transactionId:" << m_transactionId; ss << ",messageQueue:" << m_messageQueue.toString(); + ss << ",regionId:" << m_regionId; + ss << ",traceOn:" << m_traceOn; return ss.str(); } diff --git a/src/protocol/RemotingCommand.cpp b/src/protocol/RemotingCommand.cpp index 91d023e55..bdde864f0 100644 --- a/src/protocol/RemotingCommand.cpp +++ b/src/protocol/RemotingCommand.cpp @@ -180,6 +180,12 @@ RemotingCommand* RemotingCommand::Decode(const MemoryBlock& mem) { if (bodyLen > 0) { cmd->SetBody(pData + 4 + headLen, bodyLen); } + if (object.isMember("extFields")) { + Json::Value& extFields = object["extFields"]; + for (auto& it : extFields.getMemberNames()) { + cmd->m_extFields[it] = extFields[it].asString(); + } + } return cmd; } @@ -304,6 +310,10 @@ void RemotingCommand::addExtField(const string& key, const string& value) { m_extFields[key] = value; } +const unordered_map* RemotingCommand::getExtFields() const{ + return &m_extFields; +} + std::string RemotingCommand::ToString() const { std::stringstream ss; ss << "code:" << m_code << ",opaque:" << m_opaque << ",flag:" << m_flag << ",body.size:" << m_body.getSize() diff --git a/src/protocol/RemotingCommand.h b/src/protocol/RemotingCommand.h index 1e039c8fe..b0525b72a 100644 --- a/src/protocol/RemotingCommand.h +++ b/src/protocol/RemotingCommand.h @@ -21,6 +21,7 @@ #include #include #include +#include #include "CommandHeader.h" #include "dataBlock.h" @@ -62,6 +63,7 @@ class RemotingCommand { const int getFlag() const; const int getVersion() const; void addExtField(const string& key, const string& value); + const unordered_map* getExtFields() const; string getMsgBody() const; void setMsgBody(const string& body); @@ -81,7 +83,7 @@ class RemotingCommand { int m_flag; string m_remark; string m_msgBody; - map m_extFields; + unordered_map m_extFields; MemoryBlock m_head; MemoryBlock m_body;