Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bp4 stream read #1609

Merged
merged 15 commits into from
Jul 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 221 additions & 50 deletions source/adios2/engine/bp4/BP4Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#include "adios2/helper/adiosFunctions.h" // MPI BroadcastVector
#include "adios2/toolkit/profiling/taustubs/tautimer.hpp"

#include <limits>

namespace adios2
{
namespace core
Expand All @@ -24,9 +26,10 @@ namespace engine
BP4Reader::BP4Reader(IO &io, const std::string &name, const Mode mode,
MPI_Comm mpiComm)
: Engine("BP4Reader", io, name, mode, mpiComm),
m_BP4Deserializer(mpiComm, m_DebugMode), m_FileManager(mpiComm, m_DebugMode),
m_SubFileManager(mpiComm, m_DebugMode),
m_FileMetadataIndexManager(mpiComm, m_DebugMode)
m_BP4Deserializer(mpiComm, m_DebugMode),
m_MDFileManager(mpiComm, m_DebugMode),
m_DataFileManager(mpiComm, m_DebugMode),
m_MDIndexFileManager(mpiComm, m_DebugMode)
{
TAU_SCOPED_TIMER("BP4Reader::Open");
Init();
Expand Down Expand Up @@ -65,41 +68,23 @@ StepStatus BP4Reader::BeginStep(StepMode mode, const float timeoutSeconds)

// used to inquire for variables in streaming mode
m_IO.m_ReadStreaming = true;
m_IO.m_EngineStep = m_CurrentStep;
StepStatus status = StepStatus::OK;

if (m_CurrentStep >= m_BP4Deserializer.m_MetadataSet.StepsCount)
{
m_IO.m_ReadStreaming = false;
return StepStatus::EndOfStream;
status = CheckForNewSteps(timeoutSeconds);
}

/*
const auto &variablesData = m_IO.GetVariablesDataMap();
// This should be after getting new steps
m_IO.m_EngineStep = m_CurrentStep;

for (const auto &variableData : variablesData)
if (status == StepStatus::OK)
{
const std::string name = variableData.first;
const std::string type = m_IO.InquireVariableType(name);

if (type == "compound")
{
}
#define declare_type(T) \
else if (type == helper::GetType<T>()) \
{ \
Variable<T> *variable = m_IO.InquireVariable<T>(name); \
if (mode == StepMode::NextAvailable) \
{ \
variable->SetStepSelection({m_CurrentStep, 1}); \
} \
}
ADIOS2_FOREACH_STDTYPE_1ARG(declare_type)
#undef declare_type
m_IO.ResetVariablesStepSelection(false,
"in call to BP4 Reader BeginStep");
}
*/
m_IO.ResetVariablesStepSelection(false, "in call to BP4 Reader BeginStep");

return StepStatus::OK;
return status;
}

size_t BP4Reader::CurrentStep() const { return m_CurrentStep; }
Expand Down Expand Up @@ -151,9 +136,9 @@ void BP4Reader::Init()
{
if (m_OpenMode != Mode::Read)
{
throw std::invalid_argument(
"ERROR: BPFileReader only supports OpenMode::Read from" +
m_Name + " " + m_EndMessage);
throw std::invalid_argument("ERROR: BPFileReader only "
"supports OpenMode::Read from" +
m_Name + " " + m_EndMessage);
}
}

Expand All @@ -177,15 +162,14 @@ void BP4Reader::InitTransports()
m_BP4Deserializer.GetBPMetadataFileName(m_Name));

const bool profile = m_BP4Deserializer.m_Profiler.IsActive;
m_FileManager.OpenFiles({metadataFile}, adios2::Mode::Read,
m_IO.m_TransportsParameters, profile);
m_MDFileManager.OpenFiles({metadataFile}, adios2::Mode::Read,
m_IO.m_TransportsParameters, profile);

/* Open file to save the metadata index table */
const std::string metadataIndexFile(
m_BP4Deserializer.GetBPMetadataIndexFileName(m_Name));
m_FileMetadataIndexManager.OpenFiles(
{metadataIndexFile}, adios2::Mode::Read,
m_IO.m_TransportsParameters, profile);
m_MDIndexFileManager.OpenFiles({metadataIndexFile}, adios2::Mode::Read,
m_IO.m_TransportsParameters, profile);
}
}

Expand All @@ -194,21 +178,27 @@ void BP4Reader::InitBuffer()
// Put all metadata in buffer
if (m_BP4Deserializer.m_RankMPI == 0)
{
const size_t fileSize = m_FileManager.GetFileSize(0);
m_BP4Deserializer.m_Metadata.Resize(
fileSize, "allocating metadata buffer, in call to BP4Reader Open");

m_FileManager.ReadFile(m_BP4Deserializer.m_Metadata.m_Buffer.data(),
fileSize);

/* Read metadata index table into memory */
const size_t metadataIndexFileSize =
m_FileMetadataIndexManager.GetFileSize(0);
m_MDIndexFileManager.GetFileSize(0);
m_BP4Deserializer.m_MetadataIndex.Resize(
metadataIndexFileSize,
"allocating metadata index buffer, in call to BPFileReader Open");
m_FileMetadataIndexManager.ReadFile(
metadataIndexFileSize, "allocating metadata index buffer, "
"in call to BPFileReader Open");
m_MDIndexFileManager.ReadFile(
m_BP4Deserializer.m_MetadataIndex.m_Buffer.data(),
metadataIndexFileSize);

m_MDIndexFileProcessedSize = metadataIndexFileSize;

/* Read metadata file into memory */
const size_t fileSize = m_MDFileManager.GetFileSize(0);
m_BP4Deserializer.m_Metadata.Resize(
fileSize, "allocating metadata buffer, in call to BP4Reader Open");

m_MDFileManager.ReadFile(m_BP4Deserializer.m_Metadata.m_Buffer.data(),
fileSize);

m_MDFileProcessedSize = fileSize;
}
// broadcast buffer to all ranks from zero
helper::BroadcastVector(m_BP4Deserializer.m_Metadata.m_Buffer, m_MPIComm);
Expand All @@ -224,6 +214,187 @@ void BP4Reader::InitBuffer()
m_BP4Deserializer.ParseMetadata(m_BP4Deserializer.m_Metadata, *this);
}

std::pair<size_t, size_t> BP4Reader::UpdateBuffer()
{
std::vector<size_t> sizes(3, 0);
if (m_BP4Deserializer.m_RankMPI == 0)
{
const size_t idxFileSize = m_MDIndexFileManager.GetFileSize(0);
if (idxFileSize > m_MDIndexFileProcessedSize)
{
const size_t newIdxSize = idxFileSize - m_MDIndexFileProcessedSize;
if (m_BP4Deserializer.m_MetadataIndex.m_Buffer.size() < newIdxSize)
{
m_BP4Deserializer.m_MetadataIndex.Resize(
newIdxSize, "re-allocating metadata index buffer, in "
"call to BP4Reader::BeginStep/UpdateBuffer");
}
m_BP4Deserializer.m_MetadataIndex.m_Position = 0;
m_MDIndexFileManager.ReadFile(
m_BP4Deserializer.m_MetadataIndex.m_Buffer.data(), newIdxSize,
m_MDIndexFileProcessedSize);

sizes[0] = newIdxSize;

/* Read corresponding new metadata (throwing away the old)
*/
const size_t fileSize = m_MDFileManager.GetFileSize(0);
const size_t newMDSize = fileSize - m_MDFileProcessedSize;
if (m_BP4Deserializer.m_Metadata.m_Buffer.size() < newMDSize)
{
m_BP4Deserializer.m_Metadata.Resize(
newMDSize, "allocating metadata buffer, in call to "
"BP4Reader Open");
}
m_BP4Deserializer.m_Metadata.m_Position = 0;
m_MDFileManager.ReadFile(
m_BP4Deserializer.m_Metadata.m_Buffer.data(), newMDSize,
m_MDFileProcessedSize);

sizes[1] = newMDSize;
sizes[2] = m_MDFileProcessedSize;
}
}

helper::BroadcastVector(sizes, m_MPIComm, 0);
size_t newIdxSize = sizes[0];
size_t newMDSize = sizes[1];

if (newIdxSize > 0)
{
// broadcast buffer to all ranks from zero
helper::BroadcastVector(m_BP4Deserializer.m_Metadata.m_Buffer,
m_MPIComm);

// broadcast metadata index buffer to all ranks from zero
helper::BroadcastVector(m_BP4Deserializer.m_MetadataIndex.m_Buffer,
m_MPIComm);

if (m_BP4Deserializer.m_RankMPI != 0)
{
m_MDFileProcessedSize = sizes[2];
// we need this pointer in Metadata buffer on all processes
// for parsing it correctly in ProcessMetadataForNewSteps()
}
}
return std::make_pair(newIdxSize, newMDSize);
}
void BP4Reader::ProcessMetadataForNewSteps(const size_t newIdxSize,
const size_t newMDSize)
{
/* Remove all existing variables from previous steps
It seems easier than trying to update them */
m_IO.RemoveAllVariables();

/* Parse metadata index table (without header) */
/* We need to skew the index table pointers with the
size of the already-processed metadata because the memory buffer of
new metadata starts from 0 */
m_BP4Deserializer.ParseMetadataIndex(m_BP4Deserializer.m_MetadataIndex,
m_MDFileProcessedSize);

// fills IO with Variables and Attributes (not first step)
m_BP4Deserializer.ParseMetadata(m_BP4Deserializer.m_Metadata, *this, false);

// remember current end position in metadata and index table for next round
if (m_BP4Deserializer.m_RankMPI == 0)
{
m_MDIndexFileProcessedSize += newIdxSize;
m_MDFileProcessedSize += newMDSize;
}
}

bool BP4Reader::CheckWriterActive()
{
size_t flag = 0;
if (m_BP4Deserializer.m_RankMPI == 0)
{
std::vector<char> header(64, '\0');
m_MDIndexFileManager.ReadFile(header.data(), 64, 0, 0);
bool active = m_BP4Deserializer.ReadActiveFlag(header);
flag = (active ? 1 : 0);
}
flag = helper::BroadcastValue(flag, m_BP4Deserializer.m_MPIComm, 0);
m_BP4Deserializer.m_WriterIsActive = (flag > 0);
return m_BP4Deserializer.m_WriterIsActive;
}

StepStatus BP4Reader::CheckForNewSteps(float timeoutSeconds)
{
/* Do a collective wait for a step within timeout.
Make sure every writer comes to the same conclusion */
StepStatus retval = StepStatus::OK;
bool haveNewStep = false;
float TO = timeoutSeconds;
if (TO < 0.0)
{
TO = std::numeric_limits<float>::max() / 10000;
}
uint64_t milliTO = static_cast<uint64_t>(TO * 1000.0);
if (milliTO < 1)
{
milliTO = 1; // avoid 0
}
uint64_t pollTime = milliTO / 100; // TO/100 seconds polling time
if (pollTime < 1000)
{
pollTime = 1000; // min 1 second polling time
}
if (pollTime > 10000)
{
pollTime = 10000; // max 10 seconds polling time
}

/* Poll */
double waited = 0.0;
double startTime, endTime;

// Hack: processing metadata for multiple new steps only works
// when pretending not to be in streaming mode
const bool saveReadStreaming = m_IO.m_ReadStreaming;

m_IO.m_ReadStreaming = false;
while (waited < TO && m_BP4Deserializer.m_WriterIsActive)
{
startTime = MPI_Wtime();
std::pair<size_t, size_t> sizes = UpdateBuffer();
if (sizes.first > 0)
{
haveNewStep = true;
/* we have new metadata in memory. Need to parse it now */
ProcessMetadataForNewSteps(sizes.first, sizes.second);
break;
}
if (!CheckWriterActive())
{
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(pollTime));
endTime = MPI_Wtime();
waited += endTime - startTime;
}

if (!haveNewStep)
{
m_IO.m_ReadStreaming = false;
if (m_BP4Deserializer.m_WriterIsActive)
{
retval = StepStatus::NotReady;
}
else
{
retval = StepStatus::EndOfStream;
}
}
else
{
retval = StepStatus::OK;
}
m_IO.m_ReadStreaming = saveReadStreaming;

return retval;
}

#define declare_type(T) \
void BP4Reader::DoGetSync(Variable<T> &variable, T *data) \
{ \
Expand All @@ -242,8 +413,8 @@ void BP4Reader::DoClose(const int transportIndex)
{
TAU_SCOPED_TIMER("BP4Reader::Close");
PerformGets();
m_SubFileManager.CloseFiles();
m_FileManager.CloseFiles();
m_DataFileManager.CloseFiles();
m_MDFileManager.CloseFiles();
}

#define declare_type(T) \
Expand Down
40 changes: 37 additions & 3 deletions source/adios2/engine/bp4/BP4Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,16 @@ class BP4Reader : public Engine

private:
format::BP4Deserializer m_BP4Deserializer;
transportman::TransportMan m_FileManager;
transportman::TransportMan m_SubFileManager;
/* transport manager for metadata file */
transportman::TransportMan m_MDFileManager;
size_t m_MDFileProcessedSize = 0;

/* transport manager for managing data file(s) */
transportman::TransportMan m_DataFileManager;

/* transport manager for managing the metadata index file */
transportman::TransportMan m_FileMetadataIndexManager;
transportman::TransportMan m_MDIndexFileManager;
size_t m_MDIndexFileProcessedSize = 0;

/** used for per-step reads, TODO: to be moved to BP4Deserializer */
size_t m_CurrentStep = 0;
Expand All @@ -63,6 +69,34 @@ class BP4Reader : public Engine
void InitTransports();
void InitBuffer();

/** Read in more metadata if exist (throwing away old).
* For streaming only.
* @return pair of sizes if new metadata was read in
* sizes.first = size of new content from Index Table
* sizes.second = size of new content from Metadata File
*/
std::pair<size_t, size_t> UpdateBuffer();

/** Process the new metadata coming in (in UpdateBuffer)
* @param newIdxSize: the size of the new content from Index Table
* @param neMDSize: the size of new content from Metadata File
*/
void ProcessMetadataForNewSteps(const size_t newIdxSize,
const size_t newMDSize);

/** Check the active status flag in index file.
* @return true if writer is still active
* it sets BP4Deserialized.m_WriterIsActive
*/
bool CheckWriterActive();

/** Check for new steps withing timeout and only if writer is active.
* @return the status flag
* Used by BeginStep() to get new steps from file when it reaches the
* end of steps in memory.
*/
StepStatus CheckForNewSteps(float timeoutSeconds);

#define declare_type(T) \
void DoGetSync(Variable<T> &, T *) final; \
void DoGetDeferred(Variable<T> &, T *) final;
Expand Down
Loading