Skip to content

Commit

Permalink
Basic Xrootd remote fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
eisenhauer committed May 17, 2024
1 parent 5499b23 commit 06911b6
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 30 deletions.
4 changes: 2 additions & 2 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ void BP5Reader::PerformGets()
if (getenv("DoXRootD"))
{
m_Remote = std::unique_ptr<XrootdRemote>(new XrootdRemote());
m_Remote->Open("localhost", 1049, m_Name, m_OpenMode, RowMajorOrdering);
m_Remote->Open("localhost", 1094, m_Name, m_OpenMode, RowMajorOrdering);
}
else
#endif
Expand Down Expand Up @@ -535,7 +535,7 @@ void BP5Reader::Init()
// Don't try to open the remote file when we open local metadata. Do that on demand.
if (!m_Parameters.RemoteDataPath.empty())
m_dataIsRemote = true;
if (getenv("DoRemote"))
if (getenv("DoRemote") || getenv("DoXRootD"))
m_dataIsRemote = true;
}

Expand Down
6 changes: 3 additions & 3 deletions source/adios2/toolkit/remote/EVPathRemote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ EVPathRemote::GetHandle EVPathRemote::Get(char *VarName, size_t Step, size_t Blo
GetMsg.Dest = dest;
CMwrite(m_conn, ev_state.GetRequestFormat, &GetMsg);
CMCondition_wait(ev_state.cm, GetMsg.GetResponseCondition);
return GetMsg.GetResponseCondition;
return (Remote::GetHandle)(intptr_t)GetMsg.GetResponseCondition;
}

EVPathRemote::GetHandle EVPathRemote::Read(size_t Start, size_t Size, void *Dest)
Expand All @@ -186,12 +186,12 @@ EVPathRemote::GetHandle EVPathRemote::Read(size_t Start, size_t Size, void *Dest
ReadMsg.Dest = Dest;
CMwrite(m_conn, ev_state.ReadRequestFormat, &ReadMsg);
CMCondition_wait(ev_state.cm, ReadMsg.ReadResponseCondition);
return ReadMsg.ReadResponseCondition;
return (Remote::GetHandle)(intptr_t)ReadMsg.ReadResponseCondition;
}

bool EVPathRemote::WaitForGet(GetHandle handle)
{
return CMCondition_wait(ev_state.cm, (int)handle);
return CMCondition_wait(ev_state.cm, (int)(intptr_t)handle);
}
#else

Expand Down
2 changes: 0 additions & 2 deletions source/adios2/toolkit/remote/EVPathRemote.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ class EVPathRemote : public Remote

void OpenSimpleFile(const std::string hostname, const int32_t port, const std::string filename);

typedef int GetHandle;

GetHandle Get(char *VarName, size_t Step, size_t BlockID, Dims &Count, Dims &Start, void *dest);

bool WaitForGet(GetHandle handle);
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/toolkit/remote/Remote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Remote::GetHandle Remote::Get(char *VarName, size_t Step, size_t BlockID, Dims &
void *dest)
{
ThrowUp("RemoteGet");
return 0;
return (intptr_t)0;
};

bool Remote::WaitForGet(GetHandle handle)
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/toolkit/remote/Remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class Remote
virtual void OpenSimpleFile(const std::string hostname, const int32_t port,
const std::string filename);

typedef int GetHandle;
typedef void *GetHandle;

virtual GetHandle Get(char *VarName, size_t Step, size_t BlockID, Dims &Count, Dims &Start,
void *dest);
Expand Down
45 changes: 31 additions & 14 deletions source/adios2/toolkit/remote/XrootdRemote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
* accompanying file Copyright.txt for details.
* [email protected]
*/
#include <future>

#include "XrootdRemote.h"
#include "adios2/core/ADIOS.h"
#include "adios2/helper/adiosLog.h"
Expand Down Expand Up @@ -100,6 +102,8 @@ class myRequest : public XrdSsiRequest
static myRequest *currentRequest;
char *responseBuffer;
int responseBufferLen;
char *dest;
std::promise<void> *promise;

private:
XrdSsiResource rSpec;
Expand Down Expand Up @@ -130,8 +134,9 @@ void myRequest::Alert(XrdSsiRespInfoMsg &aMsg)

// Print what we received
//
fprintf(XrdSsiCl::outFile, "%s@%s: Rcvd %d bytes alert: '%s'\n", rName, GetEndPoint().c_str(),
theMsz, theMsg);
// fprintf(XrdSsiCl::outFile, "%s@%s: Rcvd %d bytes alert: '%s'\n", rName,
// GetEndPoint().c_str(),
// theMsz, theMsg);

// Recycle the message
//
Expand Down Expand Up @@ -199,15 +204,15 @@ bool myRequest::ProcessResponse(const XrdSsiErrInfo &eInfo, const XrdSsiRespInfo
// theMD = GetMetadata(theML);
if (rInfo.mdlen)
{
fprintf(XrdSsiCl::outFile,
"%s@%s: Rcvd %s response "
"with %d metabytes '%s'\n",
rName, GetEndPoint().c_str(), rInfo.State(), rInfo.mdlen, rInfo.mdata);
// fprintf(XrdSsiCl::outFile,
// "%s@%s: Rcvd %s response "
// "with %d metabytes '%s'\n",
// rName, GetEndPoint().c_str(), rInfo.State(), rInfo.mdlen, rInfo.mdata);
}
else
{
fprintf(XrdSsiCl::outFile, "%s@%s: Rcvd %s response\n", rName, GetEndPoint().c_str(),
rInfo.State());
// fprintf(XrdSsiCl::outFile, "%s@%s: Rcvd %s response\n", rName, GetEndPoint().c_str(),
// rInfo.State());
}

// While a response can have one of several forms a good response can only be
Expand All @@ -232,6 +237,7 @@ void myRequest::ProcessResponseData(const XrdSsiErrInfo &eInfo, char *buff, int
"%s@%s; %s\n",
rName, GetEndPoint().c_str(), eInfo.Get().c_str());
Finished();
promise->set_value();
delete this;
return;
}
Expand All @@ -256,12 +262,12 @@ void myRequest::ProcessResponseData(const XrdSsiErrInfo &eInfo, char *buff, int

// End with new line character
//
fprintf(XrdSsiCl::outFile, "\nReceived %d bytes from %s@%s\n", totbytes, rName,
GetEndPoint().c_str());
memcpy(dest, responseBuffer, responseBufferLen);

// We are done with our request. We avoid calling Finished if we got here
// because we were cancelled.
//
promise->set_value();
Finished();
// delete this;
}
Expand Down Expand Up @@ -325,11 +331,19 @@ void XrootdRemote::Open(const std::string hostname, const int32_t port, const st
return;
}

bool XrootdRemote::WaitForGet(GetHandle handle)
{
std::promise<void> *p = (std::promise<void> *)handle;
p->get_future().wait();
delete p;
return true;
}

Remote::GetHandle XrootdRemote::Get(char *VarName, size_t Step, size_t BlockID, Dims &Count,
Dims &Start, void *dest)
{
#ifdef ADIOS2_HAVE_XROOTD
char rName[512] = "/adios";
char rName[512] = "/home/eisen/xroot/data";
XrdSsiResource rSpec((std::string)rName);
myRequest *reqP;
std::string reqData = "get " + fileName + " " + std::string(VarName);
Expand Down Expand Up @@ -366,15 +380,18 @@ Remote::GetHandle XrootdRemote::Get(char *VarName, size_t Step, size_t BlockID,
char *reqDataStr = strdup(reqData.c_str());
reqP = new myRequest(clUI, rName, GetReqID(), reqDataStr, reqLen);
reqP->SetResource(rSpec);
reqP->dest = (char *)dest;
reqP->promise = new std::promise<void>();
// We simply hand off the request to the service to deal with it. When a
// response is ready or an error occured our callback is invoked.
//
clUI.ssiService->ProcessRequest(*reqP, rSpec);
// thread synchronization
sleep(1);
memcpy(dest, reqP->responseBuffer, reqP->responseBufferLen);
WaitForGet((void *)(reqP->promise));
return (intptr_t)0;
#else
return (intptr_t)0;
#endif
return 0;
}

XrootdRemote::GetHandle XrootdRemote::Read(size_t Start, size_t Size, void *Dest)
Expand Down
3 changes: 1 addition & 2 deletions source/adios2/toolkit/remote/XrootdRemote.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,10 @@ class XrootdRemote : public Remote
void Open(const std::string hostname, const int32_t port, const std::string filename,
const Mode mode, bool RowMajorOrdering);

typedef int GetHandle;

GetHandle Get(char *VarName, size_t Step, size_t BlockID, Dims &Count, Dims &Start, void *dest);

GetHandle Read(size_t Start, size_t Size, void *Dest);
bool WaitForGet(GetHandle handle);
};

} // end namespace adios2
Expand Down
9 changes: 4 additions & 5 deletions source/utils/xrootd-plugin/XrdSsiSvService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,6 @@ void XrdSsiSvService::ProcessRequest4Me(XrdSsiRequest *rqstP)
m_engine = m_io.Open(reqData, adios2::Mode::ReadRandomAccess);
std::string VarName = requestParams[0];
auto var = m_io.InquireVariable(VarName);

adios2::DataType TypeOfVar = m_io.InquireVariableType(VarName);
try
{
Expand All @@ -469,7 +468,7 @@ void XrdSsiSvService::ProcessRequest4Me(XrdSsiRequest *rqstP)
adios2::Variable<T> var = m_io.InquireVariable<T>(VarName); \
std::vector<T> resBuffer; \
size_t step = std::stoi(requestParams[1]); \
var.SetStepSelection({step, step + 1}); \
var.SetStepSelection({step, 1}); \
size_t paramLength = (requestParams.size() - 3) / 2; \
adios2::Dims s(paramLength); \
adios2::Dims c(paramLength); \
Expand All @@ -482,9 +481,9 @@ void XrdSsiSvService::ProcessRequest4Me(XrdSsiRequest *rqstP)
var.SetSelection(varSel); \
m_engine.Get(var, resBuffer, adios2::Mode::Sync); \
size_t responseSize = resBuffer.size(); \
responseBuffer = new char[responseSize * sizeof(float)]; \
responseBufferSize = responseSize * sizeof(float); \
memcpy(responseBuffer, resBuffer.data(), responseSize * sizeof(float)); \
responseBuffer = new char[responseSize * sizeof(T)]; \
responseBufferSize = responseSize * sizeof(T); \
memcpy(responseBuffer, resBuffer.data(), responseSize * sizeof(T)); \
XrdSysThread::Run(&tid, SvAdiosGet, (void *)this, 0, "get"); \
}
ADIOS2_FOREACH_PRIMITIVE_STDTYPE_1ARG(GET)
Expand Down

0 comments on commit 06911b6

Please sign in to comment.