Skip to content

Commit

Permalink
Merge pull request #3682 from eisenhauer/BP5
Browse files Browse the repository at this point in the history
Tweaks for BP5 on windows.
  • Loading branch information
eisenhauer authored Jul 3, 2023
2 parents 9ee15a3 + e08dba0 commit 9785b9a
Show file tree
Hide file tree
Showing 223 changed files with 15,419 additions and 14,974 deletions.
2 changes: 1 addition & 1 deletion cmake/DetectOptions.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ if(DAOS_FOUND)
endif()

# BP5
if(ADIOS2_USE_BP5 AND NOT WIN32)
if(ADIOS2_USE_BP5)
set(ADIOS2_HAVE_BP5 TRUE)
endif()

Expand Down
4 changes: 4 additions & 0 deletions source/adios2/core/Variable.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,13 @@ std::pair<T, T> Variable<T>::DoMinMax(const size_t step) const
MinMaxStruct MM;
if (m_Engine->VariableMinMax(*this, step, MM))
{
if (std::is_same<T, std::string>::value) {
return minMax;
} else {
minMax.first = *(T *)&MM.MinUnion;
minMax.second = *(T *)&MM.MaxUnion;
return minMax;
}
}
}
if (m_Engine != nullptr && !m_FirstStreamingStep)
Expand Down
12 changes: 6 additions & 6 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ void BP5Reader::PerformGets()
// then main thread process the last subset
for (size_t tid = 0; tid < nThreads - 1; ++tid)
{
futures[tid] = std::async(std::launch::async, lf_Reader, tid + 1,
futures[tid] = std::async(std::launch::async, lf_Reader, (int)(tid + 1),
maxOpenFiles);
}
// main thread runs last subset of reads
Expand Down Expand Up @@ -513,9 +513,9 @@ void BP5Reader::InitParameters()
}

size_t limit = helper::RaiseLimitNoFile();
if (m_Parameters.MaxOpenFilesAtOnce > limit - 8)
if (m_Parameters.MaxOpenFilesAtOnce > (unsigned int) limit - 8)
{
m_Parameters.MaxOpenFilesAtOnce = limit - 8;
m_Parameters.MaxOpenFilesAtOnce = (unsigned int) limit - 8;
}
}

Expand Down Expand Up @@ -986,11 +986,11 @@ size_t BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL,
{
auto p = m_WriterMap.emplace(m_StepsCount, WriterMapStruct());
auto &s = p.first->second;
s.WriterCount = helper::ReadValue<uint64_t>(
s.WriterCount = (uint32_t)helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
s.AggregatorCount = helper::ReadValue<uint64_t>(
s.AggregatorCount = (uint32_t)helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
s.SubfileCount = helper::ReadValue<uint64_t>(
s.SubfileCount = (uint32_t)helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
// Get the process -> subfile map
s.RankToSubfile.reserve(s.WriterCount);
Expand Down
54 changes: 31 additions & 23 deletions source/adios2/engine/bp5/BP5Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ void BP5Writer::WriteMetaMetadata(
m_FileMetaMetadataManager.WriteFiles((char *)b.MetaMetaInfo,
b.MetaMetaInfoLen);
}
m_FileMetaMetadataManager.FlushFiles();
}

uint64_t
Expand Down Expand Up @@ -206,6 +207,8 @@ BP5Writer::WriteMetadata(const std::vector<core::iovec> &MetaDataBlocks,
MetaDataSize += b.iov_len;
}

m_FileMetadataManager.FlushFiles();

m_MetaDataPos += MetaDataSize;
return MetaDataSize;
}
Expand Down Expand Up @@ -272,6 +275,7 @@ void BP5Writer::WriteData(format::BufferV *Data)
std::to_string(m_Parameters.AggregationType) +
"is not supported in BP5");
}
m_FileDataManager.FlushFiles();
delete Data;
}
}
Expand Down Expand Up @@ -337,8 +341,6 @@ void BP5Writer::WriteData_EveryoneWrites(format::BufferV *Data,
void BP5Writer::WriteMetadataFileIndex(uint64_t MetaDataPos,
uint64_t MetaDataSize)
{
m_FileMetadataManager.FlushFiles();

// bufsize: Step record
size_t bufsize =
1 + (4 + ((FlushPosSizeInfo.size() * 2) + 1) * m_Comm.Size()) *
Expand Down Expand Up @@ -407,7 +409,6 @@ void BP5Writer::WriteMetadataFileIndex(uint64_t MetaDataPos,
}

m_FileMetadataIndexManager.WriteFiles((char *)buf.data(), buf.size());

#ifdef DUMPDATALOCINFO
std::cout << "Flush count is :" << FlushPosSizeInfo.size() << std::endl;
std::cout << "Write Index positions = {" << std::endl;
Expand All @@ -427,6 +428,8 @@ void BP5Writer::WriteMetadataFileIndex(uint64_t MetaDataPos,
}
std::cout << "}" << std::endl;
#endif
m_FileMetadataIndexManager.FlushFiles();

/* reset for next timestep */
FlushPosSizeInfo.clear();
}
Expand Down Expand Up @@ -472,7 +475,7 @@ void BP5Writer::MarshalAttributes()

if (!attributePair.second->m_IsSingleValue)
{
element_count = (*baseAttr)->m_Elements;
element_count = (int)(*baseAttr)->m_Elements;
}

if (type == DataType::None)
Expand Down Expand Up @@ -511,7 +514,7 @@ void BP5Writer::MarshalAttributes()
void *data_addr = &attribute.m_DataSingleValue; \
if (!attribute.m_IsSingleValue) \
{ \
element_count = attribute.m_Elements; \
element_count = (int)attribute.m_Elements; \
data_addr = attribute.m_DataArray.data(); \
} \
m_BP5Serializer.MarshalAttribute(attribute.m_Name.c_str(), type, \
Expand All @@ -536,7 +539,7 @@ void BP5Writer::EndStep()

// true: advances step
auto TSInfo = m_BP5Serializer.CloseTimestep(
m_WriterStep, m_Parameters.AsyncWrite || m_Parameters.DirectIO);
(int)m_WriterStep, m_Parameters.AsyncWrite || m_Parameters.DirectIO);

/* TSInfo includes NewMetaMetaBlocks, the MetaEncodeBuffer, the
* AttributeEncodeBuffer and the data encode Vector */
Expand Down Expand Up @@ -684,6 +687,10 @@ void BP5Writer::EndStep()
m_AsyncWriteLock.unlock();
}
}
m_FileMetadataIndexManager.FlushFiles();
m_FileMetadataManager.FlushFiles();
m_FileMetaMetadataManager.FlushFiles();
m_FileDataManager.FlushFiles();

m_Profiler.Stop("ES");
m_WriterStep++;
Expand Down Expand Up @@ -752,7 +759,8 @@ void BP5Writer::InitParameters()
{
size_t k =
m_Parameters.StripeSize / m_Parameters.DirectIOAlignOffset + 1;
m_Parameters.StripeSize = k * m_Parameters.DirectIOAlignOffset;
m_Parameters.StripeSize =
(unsigned int)(k * m_Parameters.DirectIOAlignOffset);
}
if (m_Parameters.BufferChunkSize % m_Parameters.DirectIOAlignOffset)
{
Expand Down Expand Up @@ -852,12 +860,12 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL)
{
case IndexRecord::WriterMapRecord:
{
m_AppendWriterCount =
helper::ReadValue<uint64_t>(buffer, position, IsLittleEndian);
m_AppendAggregatorCount =
helper::ReadValue<uint64_t>(buffer, position, IsLittleEndian);
m_AppendSubfileCount =
helper::ReadValue<uint64_t>(buffer, position, IsLittleEndian);
m_AppendWriterCount = (uint32_t)helper::ReadValue<uint64_t>(
buffer, position, IsLittleEndian);
m_AppendAggregatorCount = (uint32_t)helper::ReadValue<uint64_t>(
buffer, position, IsLittleEndian);
m_AppendSubfileCount = (uint32_t)helper::ReadValue<uint64_t>(
buffer, position, IsLittleEndian);
if (m_AppendSubfileCount > nDataFiles)
{
nDataFiles = m_AppendSubfileCount;
Expand Down Expand Up @@ -939,12 +947,12 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL)
{
case IndexRecord::WriterMapRecord:
{
m_AppendWriterCount =
helper::ReadValue<uint64_t>(buffer, position, IsLittleEndian);
m_AppendAggregatorCount =
helper::ReadValue<uint64_t>(buffer, position, IsLittleEndian);
m_AppendSubfileCount =
helper::ReadValue<uint64_t>(buffer, position, IsLittleEndian);
m_AppendWriterCount = (uint32_t)helper::ReadValue<uint64_t>(
buffer, position, IsLittleEndian);
m_AppendAggregatorCount = (uint32_t)helper::ReadValue<uint64_t>(
buffer, position, IsLittleEndian);
m_AppendSubfileCount = (uint32_t)helper::ReadValue<uint64_t>(
buffer, position, IsLittleEndian);

// Get the process -> subfile map
writerToFileMap.clear();
Expand Down Expand Up @@ -1796,8 +1804,8 @@ void BP5Writer::PutCommon(VariableBase &variable, const void *values, bool sync)
helper::DimsArray MemoryCount(variable.m_MemoryCount);
helper::DimsArray varCount(variable.m_Count);

int DimCount = variable.m_Count.size();
std::vector<size_t> ZeroDims(DimCount);
int DimCount = (int)variable.m_Count.size();
helper::DimsArray ZeroDims(DimCount, (size_t)0);
// get a temporary span then fill with memselection now
format::BufferV::BufferPos bp5span(0, 0, 0);

Expand All @@ -1816,8 +1824,8 @@ void BP5Writer::PutCommon(VariableBase &variable, const void *values, bool sync)
}
helper::NdCopy((const char *)values, helper::CoreDims(ZeroDims),
MemoryCount, sourceRowMajor, false, (char *)ptr,
MemoryStart, varCount, sourceRowMajor, false, ObjSize,
helper::CoreDims(), helper::CoreDims(),
MemoryStart, varCount, sourceRowMajor, false,
(int)ObjSize, helper::CoreDims(), helper::CoreDims(),
helper::CoreDims(), helper::CoreDims(),
false /* safemode */, variable.m_MemSpace);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ void BP5Writer::AsyncWriteOwnData(AsyncWriteInfo *info,
size_t wrote = 0;
size_t block = 0;
size_t temp_offset = 0;
size_t max_size = std::max(1024 * 1024UL, totalsize / 100UL);
size_t max_size = std::max((size_t)1024 * 1024UL, totalsize / 100UL);

bool firstWrite = seekOnFirstWrite;
while (block < nBlocks)
Expand Down
Loading

0 comments on commit 9785b9a

Please sign in to comment.