Skip to content

Commit

Permalink
[ISSUE-435][PART-A]Support receiving traceOn signal. (#434)
Browse files Browse the repository at this point in the history
* add traceOn to sendRedult

* add getExtField to RemoteCommand; use unordered_map to store extField

* complete extFields

* complete traceOn
  • Loading branch information
BeautyyuYanli authored Aug 12, 2022
1 parent c64df6a commit 5cdb268
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 5 deletions.
6 changes: 5 additions & 1 deletion include/SendResult.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class ROCKETMQCLIENT_API SendResult {
const std::string& offsetMsgId,
const MQMessageQueue& messageQueue,
int64 queueOffset,
const std::string& regionId);
const std::string& regionId,
const bool traceOn);

virtual ~SendResult();
SendResult(const SendResult& other);
Expand All @@ -55,6 +56,8 @@ class ROCKETMQCLIENT_API SendResult {
SendStatus getSendStatus() const;
MQMessageQueue getMessageQueue() const;
int64 getQueueOffset() const;
bool getTraceOn() const;

std::string toString() const;

private:
Expand All @@ -65,6 +68,7 @@ class ROCKETMQCLIENT_API SendResult {
int64 m_queueOffset;
std::string m_transactionId;
std::string m_regionId;
bool m_traceOn;
};

} // namespace rocketmq
Expand Down
4 changes: 3 additions & 1 deletion src/MQClientAPIImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -542,10 +542,12 @@ SendResult MQClientAPIImpl::processSendResponse(const string& brokerName,
}
if (res == 0) {
SendMessageResponseHeader* responseHeader = (SendMessageResponseHeader*)pResponse->getCommandHeader();
auto extFields = pResponse->getExtFields();
bool traceOn = (extFields->count("TRACE_ON") && extFields->at("TRACE_ON") == "true");
MQMessageQueue messageQueue(msg.getTopic(), brokerName, responseHeader->queueId);
string unique_msgId = msg.getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
return SendResult(sendStatus, unique_msgId, responseHeader->msgId, messageQueue, responseHeader->queueOffset,
responseHeader->regionId);
responseHeader->regionId, traceOn);
}
LOG_ERROR("processSendResponse error remark:%s, error code:%d", (pResponse->getRemark()).c_str(),
pResponse->getCode());
Expand Down
14 changes: 12 additions & 2 deletions src/producer/SendResult.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ SendResult::SendResult(const SendStatus& sendStatus,
const std::string& offsetMsgId,
const MQMessageQueue& messageQueue,
int64 queueOffset,
const string& regionId)
const string& regionId,
const bool traceOn)
: m_sendStatus(sendStatus),
m_msgId(msgId),
m_offsetMsgId(offsetMsgId),
m_messageQueue(messageQueue),
m_queueOffset(queueOffset),
m_regionId(regionId) {}
m_regionId(regionId),
m_traceOn(traceOn) {}

SendResult::SendResult(const SendResult& other) {
m_sendStatus = other.m_sendStatus;
Expand All @@ -54,6 +56,7 @@ SendResult::SendResult(const SendResult& other) {
m_messageQueue = other.m_messageQueue;
m_queueOffset = other.m_queueOffset;
m_regionId = other.m_regionId;
m_traceOn = other.m_traceOn;
}

SendResult& SendResult::operator=(const SendResult& other) {
Expand All @@ -64,6 +67,7 @@ SendResult& SendResult::operator=(const SendResult& other) {
m_messageQueue = other.m_messageQueue;
m_queueOffset = other.m_queueOffset;
m_regionId = other.m_regionId;
m_traceOn = other.m_traceOn;
}
return *this;
}
Expand Down Expand Up @@ -96,6 +100,10 @@ int64 SendResult::getQueueOffset() const {
return m_queueOffset;
}

bool SendResult::getTraceOn() const {
return m_traceOn;
}

std::string SendResult::toString() const {
stringstream ss;
ss << "SendResult: ";
Expand All @@ -105,6 +113,8 @@ std::string SendResult::toString() const {
ss << ",queueOffset:" << m_queueOffset;
ss << ",transactionId:" << m_transactionId;
ss << ",messageQueue:" << m_messageQueue.toString();
ss << ",regionId:" << m_regionId;
ss << ",traceOn:" << m_traceOn;
return ss.str();
}

Expand Down
10 changes: 10 additions & 0 deletions src/protocol/RemotingCommand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@ RemotingCommand* RemotingCommand::Decode(const MemoryBlock& mem) {
if (bodyLen > 0) {
cmd->SetBody(pData + 4 + headLen, bodyLen);
}
if (object.isMember("extFields")) {
Json::Value& extFields = object["extFields"];
for (auto& it : extFields.getMemberNames()) {
cmd->m_extFields[it] = extFields[it].asString();
}
}
return cmd;
}

Expand Down Expand Up @@ -304,6 +310,10 @@ void RemotingCommand::addExtField(const string& key, const string& value) {
m_extFields[key] = value;
}

const unordered_map<string, string>* RemotingCommand::getExtFields() const{
return &m_extFields;
}

std::string RemotingCommand::ToString() const {
std::stringstream ss;
ss << "code:" << m_code << ",opaque:" << m_opaque << ",flag:" << m_flag << ",body.size:" << m_body.getSize()
Expand Down
4 changes: 3 additions & 1 deletion src/protocol/RemotingCommand.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <boost/thread/thread.hpp>
#include <memory>
#include <sstream>
#include <unordered_map>
#include "CommandHeader.h"
#include "dataBlock.h"

Expand Down Expand Up @@ -62,6 +63,7 @@ class RemotingCommand {
const int getFlag() const;
const int getVersion() const;
void addExtField(const string& key, const string& value);
const unordered_map<string, string>* getExtFields() const;
string getMsgBody() const;
void setMsgBody(const string& body);

Expand All @@ -81,7 +83,7 @@ class RemotingCommand {
int m_flag;
string m_remark;
string m_msgBody;
map<string, string> m_extFields;
unordered_map<string, string> m_extFields;

MemoryBlock m_head;
MemoryBlock m_body;
Expand Down

0 comments on commit 5cdb268

Please sign in to comment.