Skip to content

Commit

Permalink
Merge pull request #2821 from eisenhauer/DeferAddToVec
Browse files Browse the repository at this point in the history
Delay adding large Deferred Puts to output vector until last minute
  • Loading branch information
eisenhauer authored Aug 11, 2021
2 parents e842db7 + 7a68362 commit e263974
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 46 deletions.
2 changes: 0 additions & 2 deletions source/adios2/engine/bp5/BP5Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ uint64_t BP5Writer::WriteMetadata(

void BP5Writer::WriteData(format::BufferV *Data)
{
format::BufferV::BufferV_iovec DataVec = Data->DataVec();
(void)DataVec;
switch (m_Parameters.AggregationType)
{
case (int)AggregationType::EveryoneWrites:
Expand Down
107 changes: 65 additions & 42 deletions source/adios2/toolkit/format/bp5/BP5Serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -377,14 +377,6 @@ BP5Serializer::CreateWriterRec(void *Variable, const char *Name, DataType Type,
free(LocationsName);
RecalcMarshalStorageSize();

#ifdef NDEF
if ((ConfigParams->CompressionMethod == SstCompressZFP) &&
ZFPcompressionPossible(Type, DimCount))
{
Type = Int8;
ElemSize = 1;
}
#endif
// To Data, add FMFields for ElemCount and Array matching _ArrayRec
char *ElemCountName = ConcatName(Name, "ElemCount");
AddField(&Info.DataFields, &Info.DataFieldCount, ElemCountName,
Expand Down Expand Up @@ -431,25 +423,61 @@ size_t BP5Serializer::CalcSize(const size_t Count, const size_t *Vals)
return Elems;
}

void BP5Serializer::PerformPuts() { CurDataBuffer->CopyExternalToInternal(); }
void BP5Serializer::PerformPuts()
{
// Dump data for externs into iovec
DumpDeferredBlocks();

CurDataBuffer->CopyExternalToInternal();
}

void BP5Serializer::DumpDeferredBlocks()
{
for (auto &Def : DeferredExterns)
{
MetaArrayRec *MetaEntry =
(MetaArrayRec *)((char *)(MetadataBuf) + Def.MetaOffset);
size_t DataOffset = m_PriorDataBufferSizeTotal +
CurDataBuffer->AddToVec(Def.DataSize, Def.Data,
Def.AlignReq, false);
MetaEntry->DataLocation[Def.BlockID] = DataOffset;
}
DeferredExterns.clear();
}

void BP5Serializer::Marshal(void *Variable, const char *Name,
const DataType Type, size_t ElemSize,
size_t DimCount, const size_t *Shape,
const size_t *Count, const size_t *Offsets,
const void *Data, bool Sync,
BufferV::BufferPos *span)
BufferV::BufferPos *Span)
{

FFSMetadataInfoStruct *MBase;

BP5WriterRec Rec = LookupWriterRec(Variable);

bool DeferAddToVec;

if (!Rec)
{
Rec = CreateWriterRec(Variable, Name, Type, ElemSize, DimCount);
}

if (!Sync && (Rec->DimCount != 0) && !Span)
{
/*
* If this is a big external block, we'll do everything except add it to
* the BufferV now, saving enough information to add it and patch back
* the DataLocation in the metadata in DumpDeferredBlocks()
*/
DeferAddToVec = true;
}
else
{
DeferAddToVec = false;
}

MBase = (struct FFSMetadataInfoStruct *)MetadataBuf;
int AlreadyWritten = FFSBitfieldTest(MBase, Rec->FieldID);
FFSBitfieldSet(MBase, Rec->FieldID);
Expand All @@ -471,7 +499,7 @@ void BP5Serializer::Marshal(void *Variable, const char *Name,
MetaArrayRec *MetaEntry =
(MetaArrayRec *)((char *)(MetadataBuf) + Rec->MetaOffset);
size_t ElemCount = CalcSize(DimCount, Count);
size_t DataOffset;
size_t DataOffset = 0;

/* handle metadata */
MetaEntry->Dims = DimCount;
Expand All @@ -481,21 +509,19 @@ void BP5Serializer::Marshal(void *Variable, const char *Name,
"BP5Serializer:: Marshall without Prior Init");
}

if (span == nullptr)
if (Span == nullptr)
{
DataOffset = m_PriorDataBufferSizeTotal +
CurDataBuffer->AddToVec(ElemCount * ElemSize, Data,
ElemSize, Sync);
if (AlreadyWritten)
if (!DeferAddToVec)
{
printf("Marshalling %g at offset %ld\n", *(float *)Data,
DataOffset);
DataOffset = m_PriorDataBufferSizeTotal +
CurDataBuffer->AddToVec(ElemCount * ElemSize, Data,
ElemSize, Sync);
}
}
else
{
*span = CurDataBuffer->Allocate(ElemCount * ElemSize, ElemSize);
DataOffset = m_PriorDataBufferSizeTotal + span->globalPos;
*Span = CurDataBuffer->Allocate(ElemCount * ElemSize, ElemSize);
DataOffset = m_PriorDataBufferSizeTotal + Span->globalPos;
}

if (!AlreadyWritten)
Expand All @@ -513,6 +539,12 @@ void BP5Serializer::Marshal(void *Variable, const char *Name,
MetaEntry->Offsets = CopyDims(DimCount, Offsets);
else
MetaEntry->Offsets = NULL;
if (DeferAddToVec)
{
DeferredExtern rec = {Rec->MetaOffset, 0, Data,
ElemCount * ElemSize, ElemSize};
DeferredExterns.push_back(rec);
}
}
else
{
Expand All @@ -530,26 +562,16 @@ void BP5Serializer::Marshal(void *Variable, const char *Name,
(size_t *)realloc(MetaEntry->DataLocation,
MetaEntry->BlockCount * sizeof(size_t));
MetaEntry->DataLocation[MetaEntry->BlockCount - 1] = DataOffset;
if (DeferAddToVec)
{
DeferredExterns.push_back({Rec->MetaOffset,
MetaEntry->BlockCount - 1, Data,
ElemCount * ElemSize, ElemSize});
}
if (Offsets)
MetaEntry->Offsets = AppendDims(
MetaEntry->Offsets, PreviousDBCount, DimCount, Offsets);
}

// if ((Stream->ConfigParams->CompressionMethod ==
// SstCompressZFP) &&
// ZFPcompressionPossible(Type, DimCount))
// {
#ifdef ADIOS2_HAVE_ZFP
// /* this should never be true if ZFP is not available
// */ size_t ByteCount; char *Output =
// FFS_ZFPCompress(Stream, Rec->DimCount, Rec->Type,
// (void *)Data, Count,
// &ByteCount);
// DataEntry->ElemCount = ByteCount;
// DataEntry->Array = Output;
#endif
// }
// else
}
}

Expand Down Expand Up @@ -610,12 +632,6 @@ void BP5Serializer::MarshalAttribute(const char *Name, const DataType Type,
/* free(OffsetsName); */
/* RecalcMarshalStorageSize(Stream); */

/* if ((Stream->ConfigParams->CompressionMethod == SstCompressZFP) && */
/* ZFPcompressionPossible(Type, DimCount)) */
/* { */
/* Type = "char"; */
/* ElemSize = 1; */
/* } */
/* // To Data, add FMFields for ElemCount and Array matching _ArrayRec
*/
/* char *ElemCountName = ConcatName(Name, "ElemCount"); */
Expand Down Expand Up @@ -652,6 +668,9 @@ BufferV *BP5Serializer::ReinitStepData(BufferV *DataBuffer)
{
throw std::logic_error("BP5Serializer:: ReinitStep without prior Init");
}
// Dump data for externs into iovec
DumpDeferredBlocks();

m_PriorDataBufferSizeTotal += CurDataBuffer->AddToVec(
0, NULL, sizeof(max_align_t), true); // output block size aligned

Expand Down Expand Up @@ -715,6 +734,10 @@ BP5Serializer::TimestepInfo BP5Serializer::CloseTimestep(int timestep)
throw std::logic_error(
"BP5Serializer:: CloseTimestep without Prior Init");
}

// Dump data for externs into iovec
DumpDeferredBlocks();

MBase->DataBlockSize = CurDataBuffer->AddToVec(
0, NULL, sizeof(max_align_t), true); // output block size aligned

Expand Down
14 changes: 12 additions & 2 deletions source/adios2/toolkit/format/bp5/BP5Serializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,18 @@ class BP5Serializer : virtual public BP5Base
FMFormat AttributeFormat = NULL;
void *AttributeData = NULL;
int AttributeSize = 0;
int CompressZFP = 0;
attr_list ZFPParams = NULL;
};

struct DeferredExtern
{
size_t MetaOffset;
size_t BlockID;
const void *Data;
size_t DataSize;
size_t AlignReq;
};
std::vector<DeferredExtern> DeferredExterns;

FFSWriterMarshalBase Info;
void *MetadataBuf = NULL;
bool NewAttribute = false;
Expand Down Expand Up @@ -176,6 +184,8 @@ class BP5Serializer : virtual public BP5Base
size_t *AppendDims(size_t *OldDims, const size_t OldCount,
const size_t Count, const size_t *Vals);

void DumpDeferredBlocks();

typedef struct _ArrayRec
{
size_t ElemCount;
Expand Down

0 comments on commit e263974

Please sign in to comment.