Skip to content

Commit

Permalink
Merge pull request #2888 from JasonRuonanWang/operator
Browse files Browse the repository at this point in the history
Add version info of thirdparty compression libraries into self-contained operator buffer
  • Loading branch information
JasonRuonanWang authored Sep 29, 2021
2 parents 8bbd685 + 62b94a7 commit 192ba48
Show file tree
Hide file tree
Showing 14 changed files with 140 additions and 59 deletions.
2 changes: 1 addition & 1 deletion source/adios2/operator/compress/CompressBZIP2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ size_t CompressBZIP2::Compress(const char *dataIn, const Dims &blockStart,
// Universal operator metadata
PutParameter(bufferOut, destOffset, OperatorType::BZIP2);
PutParameter(bufferOut, destOffset, bufferVersion);
destOffset += 2;
PutParameter(bufferOut, destOffset, static_cast<uint16_t>(0));
// Universal operator metadata end

const size_t sizeIn =
Expand Down
50 changes: 32 additions & 18 deletions source/adios2/operator/compress/CompressBlosc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,26 @@ size_t CompressBlosc::Compress(const char *dataIn, const Dims &blockStart,
const Dims &blockCount, const DataType type,
char *bufferOut, const Params &parameters)
{
size_t currentOutputSize = 0;
size_t bufferOutOffset = 0;
const uint8_t bufferVersion = 1;

// Universal operator metadata
PutParameter(bufferOut, currentOutputSize, OperatorType::BLOSC);
PutParameter(bufferOut, currentOutputSize, bufferVersion);
currentOutputSize += 2;
PutParameter(bufferOut, bufferOutOffset, OperatorType::BLOSC);
PutParameter(bufferOut, bufferOutOffset, bufferVersion);
PutParameter(bufferOut, bufferOutOffset, static_cast<uint16_t>(0));
// Universal operator metadata end

const size_t sizeIn =
helper::GetTotalSize(blockCount, helper::GetDataTypeSize(type));

// blosc V1 metadata
PutParameter(bufferOut, currentOutputSize, sizeIn);
PutParameter(bufferOut, bufferOutOffset, sizeIn);
PutParameter(bufferOut, bufferOutOffset,
static_cast<uint8_t>(BLOSC_VERSION_MAJOR));
PutParameter(bufferOut, bufferOutOffset,
static_cast<uint8_t>(BLOSC_VERSION_MINOR));
PutParameter(bufferOut, bufferOutOffset,
static_cast<uint8_t>(BLOSC_VERSION_RELEASE));
// blosc V1 metadata end

bool useMemcpy = false;
Expand Down Expand Up @@ -145,11 +151,11 @@ size_t CompressBlosc::Compress(const char *dataIn, const Dims &blockStart,

// write header to detect new compression format (set first 8 byte to zero)
DataHeader *headerPtr =
reinterpret_cast<DataHeader *>(bufferOut + currentOutputSize);
reinterpret_cast<DataHeader *>(bufferOut + bufferOutOffset);

// set default header
*headerPtr = DataHeader{};
currentOutputSize += sizeof(DataHeader);
bufferOutOffset += sizeof(DataHeader);

int32_t typesize = helper::GetDataTypeSize(type);
if (typesize > BLOSC_MAX_TYPESIZE)
Expand Down Expand Up @@ -190,10 +196,10 @@ size_t CompressBlosc::Compress(const char *dataIn, const Dims &blockStart,
bloscSize_t compressedChunkSize =
blosc_compress(compressionLevel, doShuffle, typesize,
maxIntputSize, dataIn + inputOffset,
bufferOut + currentOutputSize, maxChunkSize);
bufferOut + bufferOutOffset, maxChunkSize);

if (compressedChunkSize > 0)
currentOutputSize += static_cast<size_t>(compressedChunkSize);
bufferOutOffset += static_cast<size_t>(compressedChunkSize);
else
{
// something went wrong with the compression switch to memcopy
Expand All @@ -214,14 +220,13 @@ size_t CompressBlosc::Compress(const char *dataIn, const Dims &blockStart,

if (useMemcpy)
{
std::memcpy(bufferOut + currentOutputSize, dataIn + inputOffset,
sizeIn);
currentOutputSize += sizeIn;
std::memcpy(bufferOut + bufferOutOffset, dataIn + inputOffset, sizeIn);
bufferOutOffset += sizeIn;
headerPtr->SetNumChunks(0u);
}

blosc_destroy();
return currentOutputSize;
return bufferOutOffset;
}

size_t CompressBlosc::DecompressV1(const char *bufferIn, const size_t sizeIn,
Expand All @@ -234,9 +239,17 @@ size_t CompressBlosc::DecompressV1(const char *bufferIn, const size_t sizeIn,

size_t bufferInOffset = 0;
size_t sizeOut = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);

m_VersionInfo =
" Data is compressed using BLOSC Version " +
std::to_string(GetParameter<uint8_t>(bufferIn, bufferInOffset)) + "." +
std::to_string(GetParameter<uint8_t>(bufferIn, bufferInOffset)) + "." +
std::to_string(GetParameter<uint8_t>(bufferIn, bufferInOffset)) +
". Please make sure a compatible version is used for decompression.";

if (sizeIn - bufferInOffset < sizeof(DataHeader))
{
throw("corrupted blosc buffer header");
throw("corrupted blosc buffer header." + m_VersionInfo + "\n");
}
const bool isChunked =
reinterpret_cast<const DataHeader *>(bufferIn + bufferInOffset)
Expand All @@ -257,7 +270,7 @@ size_t CompressBlosc::DecompressV1(const char *bufferIn, const size_t sizeIn,
}
if (decompressedSize != sizeOut)
{
throw("corrupted blosc buffer");
throw("corrupted blosc buffer." + m_VersionInfo + "\n");
}
return sizeOut;
}
Expand Down Expand Up @@ -332,8 +345,8 @@ size_t CompressBlosc::DecompressChunkedFormat(const char *bufferIn,
*
* we need only the compressed size ( source address + 12 byte)
*/
bloscSize_t max_inputDataSize =
*reinterpret_cast<const bloscSize_t *>(in_ptr + 12u);
bloscSize_t max_inputDataSize;
std::memcpy(&max_inputDataSize, in_ptr + 12, sizeof(bloscSize_t));

char *out_ptr = dataOut + currentOutputSize;

Expand All @@ -352,7 +365,8 @@ size_t CompressBlosc::DecompressChunkedFormat(const char *bufferIn,
{
throw std::runtime_error(
"ERROR: ADIOS2 Blosc Decompress failed. Decompressed chunk "
"results in zero decompressed bytes.\n");
"results in zero decompressed bytes." +
m_VersionInfo + "\n");
}
inputOffset += static_cast<size_t>(max_inputDataSize);
}
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/operator/compress/CompressBlosc.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ class CompressBlosc : public Operator

static const std::map<std::string, uint32_t> m_Shuffles;
static const std::set<std::string> m_Compressors;

std::string m_VersionInfo;
};

} // end namespace compress
Expand Down
26 changes: 19 additions & 7 deletions source/adios2/operator/compress/CompressLibPressio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,9 @@ size_t CompressLibPressio::Compress(const char *dataIn, const Dims &blockStart,
size_t bufferOutOffset = 0;

// Universal operator metadata
PutParameter(bufferOut, bufferOutOffset, OperatorType::Sz);
PutParameter(bufferOut, bufferOutOffset, OperatorType::LIBPRESSIO);
PutParameter(bufferOut, bufferOutOffset, bufferVersion);
bufferOutOffset += 2;
PutParameter(bufferOut, bufferOutOffset, static_cast<uint16_t>(0));
// Universal operator metadata end

const size_t ndims = blockCount.size();
Expand All @@ -307,6 +307,12 @@ size_t CompressLibPressio::Compress(const char *dataIn, const Dims &blockStart,
PutParameter(bufferOut, bufferOutOffset, d);
}
PutParameter(bufferOut, bufferOutOffset, type);
PutParameter(bufferOut, bufferOutOffset,
static_cast<uint8_t>(pressio_major_version()));
PutParameter(bufferOut, bufferOutOffset,
static_cast<uint8_t>(pressio_minor_version()));
PutParameter(bufferOut, bufferOutOffset,
static_cast<uint8_t>(pressio_patch_version()));
PutParameters(bufferOut, bufferOutOffset, parameters);
// zfp V1 metadata end

Expand Down Expand Up @@ -364,6 +370,12 @@ size_t CompressLibPressio::DecompressV1(const char *bufferIn,
blockCount[i] = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
}
const DataType type = GetParameter<DataType>(bufferIn, bufferInOffset);
m_VersionInfo =
" Data is compressed using LibPressio Version " +
std::to_string(GetParameter<uint8_t>(bufferIn, bufferInOffset)) + "." +
std::to_string(GetParameter<uint8_t>(bufferIn, bufferInOffset)) + "." +
std::to_string(GetParameter<uint8_t>(bufferIn, bufferInOffset)) +
". Please make sure a compatible version is used for decompression.";
const Params parameters = GetParameters(bufferIn, bufferInOffset);

std::vector<size_t> dims = adios_to_libpressio_dims(blockCount);
Expand All @@ -384,7 +396,7 @@ size_t CompressLibPressio::DecompressV1(const char *bufferIn,
{
pressio_data_free(input_buf);
pressio_data_free(output_buf);
throw;
throw std::runtime_error(m_VersionInfo + "\n");
}

if (pressio_compressor_decompress(compressor, input_buf, output_buf) != 0)
Expand All @@ -393,7 +405,7 @@ size_t CompressLibPressio::DecompressV1(const char *bufferIn,
pressio_data_free(output_buf);
throw std::runtime_error(
std::string("pressio_compressor_decompress: ") +
pressio_compressor_error_msg(compressor));
pressio_compressor_error_msg(compressor) + m_VersionInfo + "\n");
}

size_t size_in_bytes = 0;
Expand All @@ -420,12 +432,12 @@ size_t CompressLibPressio::Decompress(const char *bufferIn, const size_t sizeIn,
}
else if (bufferVersion == 2)
{
// TODO: if a Version 2 zfp buffer is being implemented, put it here
// and keep the DecompressV1 routine for backward compatibility
// TODO: if a Version 2 LibPressio buffer is being implemented, put it
// here and keep the DecompressV1 routine for backward compatibility
}
else
{
throw("unknown zfp buffer version");
throw("unknown LibPressio buffer version");
}

return 0;
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/operator/compress/CompressLibPressio.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class CompressLibPressio : public Operator
*/
size_t DecompressV1(const char *bufferIn, const size_t sizeIn,
char *dataOut);

std::string m_VersionInfo;
};

} // end namespace compress
Expand Down
22 changes: 16 additions & 6 deletions source/adios2/operator/compress/CompressMGARD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,10 @@
*/

#include "CompressMGARD.h"

#include <cstring> //std::memcpy

#include <mgard_api.h>

#include "adios2/helper/adiosFunctions.h"
#include <MGARDConfig.h>
#include <cstring>
#include <mgard_api.h>

namespace adios2
{
Expand All @@ -38,7 +36,7 @@ size_t CompressMGARD::Compress(const char *dataIn, const Dims &blockStart,
// Universal operator metadata
PutParameter(bufferOut, bufferOutOffset, OperatorType::MGARD);
PutParameter(bufferOut, bufferOutOffset, bufferVersion);
bufferOutOffset += 2;
PutParameter(bufferOut, bufferOutOffset, static_cast<uint16_t>(0));
// Universal operator metadata end

const size_t ndims = blockCount.size();
Expand All @@ -50,6 +48,12 @@ size_t CompressMGARD::Compress(const char *dataIn, const Dims &blockStart,
PutParameter(bufferOut, bufferOutOffset, d);
}
PutParameter(bufferOut, bufferOutOffset, type);
PutParameter(bufferOut, bufferOutOffset,
static_cast<uint8_t>(MGARD_VERSION_MAJOR));
PutParameter(bufferOut, bufferOutOffset,
static_cast<uint8_t>(MGARD_VERSION_MINOR));
PutParameter(bufferOut, bufferOutOffset,
static_cast<uint8_t>(MGARD_VERSION_PATCH));
// mgard V1 metadata end

if (ndims > 3)
Expand Down Expand Up @@ -139,6 +143,12 @@ size_t CompressMGARD::DecompressV1(const char *bufferIn, const size_t sizeIn,
blockCount[i] = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
}
const DataType type = GetParameter<DataType>(bufferIn, bufferInOffset);
m_VersionInfo =
" Data is compressed using MGARD Version " +
std::to_string(GetParameter<uint8_t>(bufferIn, bufferInOffset)) + "." +
std::to_string(GetParameter<uint8_t>(bufferIn, bufferInOffset)) + "." +
std::to_string(GetParameter<uint8_t>(bufferIn, bufferInOffset)) +
". Please make sure a compatible version is used for decompression.";

int mgardType = -1;

Expand Down
2 changes: 2 additions & 0 deletions source/adios2/operator/compress/CompressMGARD.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class CompressMGARD : public Operator
*/
size_t DecompressV1(const char *bufferIn, const size_t sizeIn,
char *dataOut);

std::string m_VersionInfo;
};

} // end namespace compress
Expand Down
25 changes: 20 additions & 5 deletions source/adios2/operator/compress/CompressPNG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ size_t CompressPNG::Compress(const char *dataIn, const Dims &blockStart,
const uint8_t bufferVersion = 1;

// Universal operator metadata
PutParameter(bufferOut, bufferOutOffset, OperatorType::BLOSC);
PutParameter(bufferOut, bufferOutOffset, OperatorType::PNG);
PutParameter(bufferOut, bufferOutOffset, bufferVersion);
bufferOutOffset += 2;
PutParameter(bufferOut, bufferOutOffset, static_cast<uint16_t>(0));
// Universal operator metadata end

size_t paramOffset = bufferOutOffset;
bufferOutOffset += sizeof(size_t);
bufferOutOffset += sizeof(size_t) + 3;

auto lf_Write = [](png_structp png_ptr, png_bytep data, png_size_t length) {
DestInfo *pDestInfo =
Expand Down Expand Up @@ -181,6 +181,12 @@ size_t CompressPNG::Compress(const char *dataIn, const Dims &blockStart,
png_destroy_write_struct(&pngWrite, &pngInfo);

PutParameter(bufferOut, paramOffset, destInfo.Offset);
PutParameter(bufferOut, paramOffset,
static_cast<uint8_t>(PNG_LIBPNG_VER_MAJOR));
PutParameter(bufferOut, paramOffset,
static_cast<uint8_t>(PNG_LIBPNG_VER_MINOR));
PutParameter(bufferOut, paramOffset,
static_cast<uint8_t>(PNG_LIBPNG_VER_RELEASE));

return destInfo.Offset;
}
Expand All @@ -196,6 +202,13 @@ size_t CompressPNG::DecompressV1(const char *bufferIn, const size_t sizeIn,
size_t bufferInOffset = 0;
const size_t outSize = GetParameter<size_t>(bufferIn, bufferInOffset);

m_VersionInfo =
" Data is compressed using PNG Version " +
std::to_string(GetParameter<uint8_t>(bufferIn, bufferInOffset)) + "." +
std::to_string(GetParameter<uint8_t>(bufferIn, bufferInOffset)) + "." +
std::to_string(GetParameter<uint8_t>(bufferIn, bufferInOffset)) +
". Please make sure a compatible version is used for decompression.";

png_image image;
std::memset(&image, 0, sizeof(image));
image.version = PNG_IMAGE_VERSION;
Expand All @@ -207,7 +220,8 @@ size_t CompressPNG::DecompressV1(const char *bufferIn, const size_t sizeIn,
{
throw std::runtime_error(
"ERROR: png_image_begin_read_from_memory failed in call "
"to ADIOS2 PNG Decompress\n");
"to ADIOS2 PNG Decompress." +
m_VersionInfo + "\n");
}

// TODO might be needed from parameters?
Expand All @@ -216,7 +230,8 @@ size_t CompressPNG::DecompressV1(const char *bufferIn, const size_t sizeIn,
{
throw std::runtime_error(
"ERROR: png_image_finish_read_from_memory failed in call "
"to ADIOS2 PNG Decompress\n");
"to ADIOS2 PNG Decompress." +
m_VersionInfo + "\n");
}
return outSize;
}
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/operator/compress/CompressPNG.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class CompressPNG : public Operator
char *BufferOut = nullptr;
size_t Offset = 0;
};

std::string m_VersionInfo;
};

} // end namespace compress
Expand Down
Loading

0 comments on commit 192ba48

Please sign in to comment.