Skip to content

Commit

Permalink
Common library: implement MD5 file content computation routine.
Browse files Browse the repository at this point in the history
IO library: implement QFS network protocol state machine's node id set method, and use node id to set lookup and authenticate RPCs node id fields.
Meta server: maintain node id in client state machine by assigning it from lookup and authenticate RPCs. Implement node id parsing in lookup, authenticate, and chunk server hello RPCs. Maintain node id in chunk server state machine by assigning it from client hello RPC.
Chunk server: implement node id setting from chunk server configuration, and add it to the chunk server hello. Set rack id and node id for the meta server state machines used by the chunk recovery code.
Client library: implement node id setting from client configuration option. Set meta server state machine node id.
Annotated configuration files: add node id options descriptions to chunk server and QFS client configuration files.
  • Loading branch information
mikeov committed Nov 28, 2020
1 parent 3c2ed11 commit ac79fc4
Show file tree
Hide file tree
Showing 22 changed files with 258 additions and 49 deletions.
19 changes: 19 additions & 0 deletions conf/ChunkServer.prp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,25 @@ chunkServer.clientPort = 22000
# Default is no rack assigned.
# chunkServer.rackId = -1

# Chunk server's node id. If set to non empty string, used as network node /
# host identifier to determine if chunk server and client are co-located on the
# same network node. Takes precedence over client and chunk server IP addresses.
# Intended to be used in the cases where NAT and / or docker containers makes IP
# addresses comparison meaningless for this purpose.
#
# The values with FILE: prefix handled by setting node id to the md5 of the
# file content.
# For example the following would set node id to
# d41d8cd98f00b204e9800998ecf8427e
# chunkServer.nodeId = FILE:/dev/null
#
# The client.nodeId parameter must also be set by the QFS client configuration
# to the same value as chunk server nodeId in order to indicate that both chunk
# server and client are co-located on the same node.
#
# Default is empty string.
# chunkServer.nodeId =

# Space separated list of directories to store chunks (blocks).
# Usually one directory per physical disk. More than one directory can
# be used in the cases where the host file system has problems / limitations
Expand Down
19 changes: 19 additions & 0 deletions conf/QfsClient.prp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,25 @@
# Default is -1, no rack Id specified.
# client.rackId = -1

# QFS client's node id. If set to non empty string, used as network node /
# host identifier to determine if chunk server and client are co-located on the
# same network node. Takes precedence over client and chunk server IP addresses.
# Intended to be used in the cases where NAT and / or docker containers makes IP
# addresses comparison meaningless for this purpose.
#
# The values with FILE: prefix handled by setting node id to the md5 of the
# file content.
# For example the following would set node id to
# d41d8cd98f00b204e9800998ecf8427e
# client.nodeId = FILE:/dev/null
#
# The chunkServer.nodeId parameter must also be set by the QFS client configuration
# to the same value as chunk server nodeId in order to indicate that both chunk
# server and client are co-located on the same node.
#
# Default is empty string.
# client.nodeId =

#-------------------------------------------------------------------------------
# The following two parameter only have effect with no authentication configured.

Expand Down
8 changes: 7 additions & 1 deletion src/cc/chunk/KfsOps.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4099,7 +4099,13 @@ HelloMetaOp::Request(ReqOstream& os, IOBuffer& buf)
(shortRpcFormatFlag ? "CK:" : "Cluster-key: ") <<
clusterKey << "\r\n" <<
(shortRpcFormatFlag ? "5:" : "MD5Sum: ") <<
md5sum << "\r\n" <<
md5sum << "\r\n";
if (! nodeId.empty()) {
os <<
(shortRpcFormatFlag ? "ND:" : "Node-id: ") <<
nodeId << "\r\n";
}
os <<
(shortRpcFormatFlag ? "RI:" : "Rack-id: ") <<
rackId << "\r\n" <<
(shortRpcFormatFlag ? "T:" : "Total-space: ") <<
Expand Down
3 changes: 3 additions & 0 deletions src/cc/chunk/KfsOps.h
Original file line number Diff line number Diff line change
Expand Up @@ -2228,6 +2228,7 @@ struct HelloMetaOp : public KfsOp {
ServerLocation const myLocation;
string clusterKey;
string md5sum;
string nodeId;
int rackId;
int64_t totalSpace;
int64_t totalFsSpace;
Expand Down Expand Up @@ -2263,6 +2264,7 @@ struct HelloMetaOp : public KfsOp {
myLocation(l),
clusterKey(k),
md5sum(m),
nodeId(),
rackId(r),
totalSpace(0),
totalFsSpace(0),
Expand Down Expand Up @@ -2302,6 +2304,7 @@ struct HelloMetaOp : public KfsOp {
" mylocation: " << myLocation <<
" cluster-key: " << clusterKey <<
" md5sum: " << md5sum <<
" nodeId: " << nodeId <<
" rackId: " << rackId <<
" space: " << totalSpace <<
" used: " << usedSpace <<
Expand Down
16 changes: 12 additions & 4 deletions src/cc/chunk/MetaServerSM.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class MetaServerSM::Impl : public KfsCallbackObj, private ITimeout
/// doesn't know about and shouldn't be inlcuded in the system.
int SetMetaInfo(const ServerLocation& metaLoc, const string& clusterKey,
int inactivityTimeout, int rackId, const string& md5sum,
const Properties& prop);
const string& nodeId, const Properties& prop);

void EnqueueOp(KfsOp* op);

Expand Down Expand Up @@ -182,6 +182,7 @@ class MetaServerSM::Impl : public KfsCallbackObj, private ITimeout

/// An MD5 sum computed over the binaries that we send to the metaserver.
string mMD5Sum;
string mNodeId;

/// the port that the metaserver tells the clients to connect to us at.
int mChunkServerPort;
Expand Down Expand Up @@ -375,6 +376,7 @@ MetaServerSM::Impl::Impl(
mRackId(-1),
mClusterKey(),
mMD5Sum(),
mNodeId(),
mChunkServerPort(-1),
mChunkServerHostname(),
mSentHello(false),
Expand Down Expand Up @@ -463,6 +465,7 @@ MetaServerSM::Impl::SetMetaInfo(
int inactivityTimeout,
int rackId,
const string& md5sum,
const string& nodeId,
const Properties& prop)
{
if (! metaLoc.IsValid()) {
Expand All @@ -475,6 +478,7 @@ MetaServerSM::Impl::SetMetaInfo(
mClusterKey = clusterKey;
mRackId = rackId;
mMD5Sum = md5sum;
mNodeId = nodeId;
return SetParameters(prop, inactivityTimeout);
}

Expand Down Expand Up @@ -1795,6 +1799,7 @@ MetaServerSM::Impl::SubmitHello()
}
mHelloOp = new HelloMetaOp(
mMyLocation, mClusterKey, mMD5Sum, mRackId, mChannelId);
mHelloOp->nodeId = mNodeId;
mHelloOp->seq = nextSeq();
mHelloOp->sendCurrentKeyFlag = true;
mHelloOp->noFidsFlag = mNoFidsFlag;
Expand Down Expand Up @@ -1934,6 +1939,7 @@ MetaServerSM::MetaServerSM()
mParameters(),
mClusterKey(),
mMd5sum(),
mNodeId(),
mRackId(-1)
{
Impl::List::Init(mImpls);
Expand Down Expand Up @@ -2163,7 +2169,7 @@ MetaServerSM::Resolved(MetaServerSM::ResolverReq& req)
Impl& impl = *(new Impl(mCounters, mPendingOps,
mPrimary, mUpdateServerIpFlag, mChanId++, this));
const int res = impl.SetMetaInfo(loc, mClusterKey,
mInactivityTimeout, mRackId, mMd5sum, mParameters);
mInactivityTimeout, mRackId, mMd5sum, mNodeId, mParameters);
if (0 != res) {
KFS_LOG_STREAM_ERROR <<
*it << ": " << QCUtils::SysError(-res) <<
Expand Down Expand Up @@ -2195,6 +2201,7 @@ MetaServerSM::SetMetaInfo(
const string& clusterKey,
int rackId,
const string& md5sum,
const string& nodeId,
const Properties& prop)
{
if (! Impl::List::IsEmpty(mImpls) || ! mLocations.empty() || mRunningFlag) {
Expand All @@ -2206,6 +2213,7 @@ MetaServerSM::SetMetaInfo(
mClusterKey = clusterKey;
mRackId = rackId;
mMd5sum = md5sum;
mNodeId = nodeId;
int res = SetParameters(prop);
if (0 != res) {
return res;
Expand Down Expand Up @@ -2267,8 +2275,8 @@ MetaServerSM::SetMetaInfo(
Impl& impl = *(new Impl(mCounters, mPendingOps, mPrimary,
mUpdateServerIpFlag, mChanId++, 0));
Impl::List::PushBack(mImpls, impl);
if ((res = impl.SetMetaInfo(*it, mClusterKey,
mInactivityTimeout, mRackId, mMd5sum, mParameters)) != 0) {
if ((res = impl.SetMetaInfo(*it, mClusterKey, mInactivityTimeout,
mRackId, mMd5sum, mNodeId, mParameters)) != 0) {
break;
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/cc/chunk/MetaServerSM.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,16 @@ class MetaServerSM : public ITimeout
const string& clusterKey,
int rackId,
const string& md5sum,
const string& nodeId,
const Properties& prop
);
virtual void Timeout();
int GetRackId() const {
return mRackId;
}
const string& GetNodeId() const {
return mNodeId;
}
private:
typedef SingleLinkedQueue<KfsOp, KfsOp::GetNext> OpsQueue;
class Impl;
Expand All @@ -128,6 +135,7 @@ class MetaServerSM : public ITimeout
Properties mParameters;
string mClusterKey;
string mMd5sum;
string mNodeId;
int mRackId;
Impl* mImpls[1];
ResolverReq* mResolverReqs[1];
Expand Down
2 changes: 2 additions & 0 deletions src/cc/chunk/Replicator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1587,6 +1587,8 @@ class RSReplicatorImpl :
if (thread && sDebugSetThreadFlag) {
ret[i].mMeta->SetThread(&thread->GetThread());
}
ret[i].mMeta->SetRackId(gMetaServerSM.GetRackId());
ret[i].mMeta->SetNodeId(gMetaServerSM.GetNodeId().c_str());
ret[i].mDebugSetThreadUpdateCount = sDebugSetThreadUpdateCount;
}
return ret;
Expand Down
58 changes: 28 additions & 30 deletions src/cc/chunk/chunkserver_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "common/Properties.h"
#include "common/MdStream.h"
#include "common/MemLock.h"
#include "common/computemd5.h"

#include "kfsio/NetManager.h"
#include "kfsio/Globals.h"
Expand Down Expand Up @@ -151,6 +152,7 @@ class ChunkServerMain
int mFirstCpuIndex;
string mChunkServerHostname;
string mClusterKey;
string mNodeId;
int mChunkServerRackId;
int64_t mMaxLockedMemorySize;

Expand All @@ -164,46 +166,19 @@ class ChunkServerMain
mFirstCpuIndex(-1),
mChunkServerHostname(),
mClusterKey(),
mNodeId(),
mChunkServerRackId(-1),
mMaxLockedMemorySize(0)
{}
~ChunkServerMain()
{}
void ComputeMD5(const char *pathname);
bool LoadParams(const char *fileName);
friend class ChunkServerGlobals;
private:
ChunkServerMain(const ChunkServerMain&);
ChunkServerMain& operator=(const ChunkServerMain&);
};

void
ChunkServerMain::ComputeMD5(const char* pathname)
{
const size_t kBufSize = size_t(1) << 20;
char* const buf = new char[kBufSize];
fstream is(pathname, fstream::in | fstream::binary);
MdStream mds(0, false, string(), 0);

while (is && mds) {
is.read(buf, kBufSize);
mds.write(buf, is.gcount());
}
delete [] buf;

if (! is.eof() || ! mds) {
KFS_LOG_STREAM_ERROR <<
"md5sum " << QCUtils::SysError(errno, pathname) <<
KFS_LOG_EOM;
} else {
mMD5Sum = mds.GetMd();
KFS_LOG_STREAM_INFO <<
"md5sum " << pathname << ": " << mMD5Sum <<
KFS_LOG_EOM;
}
is.close();
}

///
/// Read and validate the configuration settings for the chunk
/// server. The configuration file is assumed to contain lines of the
Expand Down Expand Up @@ -279,6 +254,28 @@ ChunkServerMain::LoadParams(const char* fileName)
mChunkDirs.push_back(dir);
}

mNodeId = mProp.getValue("chunkServer.nodeId", mNodeId);
const char* const kFilePrefix = "FILE:";
size_t const kFilePrefixLen = strlen(kFilePrefix);
if (0 == mNodeId.compare(0, kFilePrefixLen, kFilePrefix)) {
const char* const fileName = mNodeId.c_str() + kFilePrefixLen;
string const md5 = ComputeMD5(fileName);
if (md5.empty()) {
KFS_LOG_STREAM_FATAL <<
mNodeId << " " << fileName << ": read failure" <<
KFS_LOG_EOM;
return false;
}
mNodeId = md5;
} else if ((size_t)MAX_RPC_HEADER_LEN / 8 < mNodeId.size()) {
KFS_LOG_STREAM_FATAL <<
mNodeId << ": exceeds size limit of " <<
MAX_RPC_HEADER_LEN / 8 <<
KFS_LOG_EOM;
return false;
}
KFS_LOG_STREAM_INFO << "nodeId: " << mNodeId <<
KFS_LOG_EOM;
mChunkServerRackId = mProp.getValue("chunkServer.rackId", mChunkServerRackId);
KFS_LOG_STREAM_INFO << "rack: " << mChunkServerRackId <<
KFS_LOG_EOM;
Expand Down Expand Up @@ -391,10 +388,10 @@ ChunkServerMain::Run(int argc, char **argv)

// compute the MD5 of the binary
#ifdef KFS_OS_NAME_LINUX
ComputeMD5("/proc/self/exe");
mMD5Sum = ComputeMD5("/proc/self/exe");
#endif
if (mMD5Sum.empty()) {
ComputeMD5(argv[0]);
mMD5Sum = ComputeMD5(argv[0]);
}
if (! LoadParams(argv[1])) {
return 1;
Expand All @@ -413,6 +410,7 @@ ChunkServerMain::Run(int argc, char **argv)
mClusterKey,
mChunkServerRackId,
mMD5Sum,
mNodeId,
mProp) == 0 &&
gChunkServer.Init(
mClientListener,
Expand Down
Loading

0 comments on commit ac79fc4

Please sign in to comment.