Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add version info of thirdparty compression libraries into self-contained operator buffer #2888

Merged
merged 9 commits into from
Sep 29, 2021
2 changes: 1 addition & 1 deletion source/adios2/operator/compress/CompressBZIP2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ size_t CompressBZIP2::Compress(const char *dataIn, const Dims &blockStart,
// Universal operator metadata
PutParameter(bufferOut, destOffset, OperatorType::BZIP2);
PutParameter(bufferOut, destOffset, bufferVersion);
destOffset += 2;
PutParameter(bufferOut, destOffset, static_cast<uint16_t>(0));
// Universal operator metadata end

const size_t sizeIn =
Expand Down
50 changes: 32 additions & 18 deletions source/adios2/operator/compress/CompressBlosc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,26 @@ size_t CompressBlosc::Compress(const char *dataIn, const Dims &blockStart,
const Dims &blockCount, const DataType type,
char *bufferOut, const Params &parameters)
{
size_t currentOutputSize = 0;
size_t bufferOutOffset = 0;
const uint8_t bufferVersion = 1;

// Universal operator metadata
PutParameter(bufferOut, currentOutputSize, OperatorType::BLOSC);
PutParameter(bufferOut, currentOutputSize, bufferVersion);
currentOutputSize += 2;
PutParameter(bufferOut, bufferOutOffset, OperatorType::BLOSC);
PutParameter(bufferOut, bufferOutOffset, bufferVersion);
PutParameter(bufferOut, bufferOutOffset, static_cast<uint16_t>(0));
// Universal operator metadata end

const size_t sizeIn =
helper::GetTotalSize(blockCount, helper::GetDataTypeSize(type));

// blosc V1 metadata
PutParameter(bufferOut, currentOutputSize, sizeIn);
PutParameter(bufferOut, bufferOutOffset, sizeIn);
PutParameter(bufferOut, bufferOutOffset,
static_cast<uint8_t>(BLOSC_VERSION_MAJOR));
PutParameter(bufferOut, bufferOutOffset,
static_cast<uint8_t>(BLOSC_VERSION_MINOR));
PutParameter(bufferOut, bufferOutOffset,
static_cast<uint8_t>(BLOSC_VERSION_RELEASE));
// blosc V1 metadata end

bool useMemcpy = false;
Expand Down Expand Up @@ -145,11 +151,11 @@ size_t CompressBlosc::Compress(const char *dataIn, const Dims &blockStart,

// write header to detect new compression format (set first 8 byte to zero)
DataHeader *headerPtr =
reinterpret_cast<DataHeader *>(bufferOut + currentOutputSize);
reinterpret_cast<DataHeader *>(bufferOut + bufferOutOffset);

// set default header
*headerPtr = DataHeader{};
currentOutputSize += sizeof(DataHeader);
bufferOutOffset += sizeof(DataHeader);

int32_t typesize = helper::GetDataTypeSize(type);
if (typesize > BLOSC_MAX_TYPESIZE)
Expand Down Expand Up @@ -190,10 +196,10 @@ size_t CompressBlosc::Compress(const char *dataIn, const Dims &blockStart,
bloscSize_t compressedChunkSize =
blosc_compress(compressionLevel, doShuffle, typesize,
maxIntputSize, dataIn + inputOffset,
bufferOut + currentOutputSize, maxChunkSize);
bufferOut + bufferOutOffset, maxChunkSize);

if (compressedChunkSize > 0)
currentOutputSize += static_cast<size_t>(compressedChunkSize);
bufferOutOffset += static_cast<size_t>(compressedChunkSize);
else
{
// something went wrong with the compression switch to memcopy
Expand All @@ -214,14 +220,13 @@ size_t CompressBlosc::Compress(const char *dataIn, const Dims &blockStart,

if (useMemcpy)
{
std::memcpy(bufferOut + currentOutputSize, dataIn + inputOffset,
sizeIn);
currentOutputSize += sizeIn;
std::memcpy(bufferOut + bufferOutOffset, dataIn + inputOffset, sizeIn);
bufferOutOffset += sizeIn;
headerPtr->SetNumChunks(0u);
}

blosc_destroy();
return currentOutputSize;
return bufferOutOffset;
}

size_t CompressBlosc::DecompressV1(const char *bufferIn, const size_t sizeIn,
Expand All @@ -234,9 +239,17 @@ size_t CompressBlosc::DecompressV1(const char *bufferIn, const size_t sizeIn,

size_t bufferInOffset = 0;
size_t sizeOut = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);

m_VersionInfo =
" Data is compressed using BLOSC Version " +
std::to_string(GetParameter<uint8_t>(bufferIn, bufferInOffset)) + "." +
std::to_string(GetParameter<uint8_t>(bufferIn, bufferInOffset)) + "." +
std::to_string(GetParameter<uint8_t>(bufferIn, bufferInOffset)) +
". Please make sure a compatible version is used for decompression.";

if (sizeIn - bufferInOffset < sizeof(DataHeader))
{
throw("corrupted blosc buffer header");
throw("corrupted blosc buffer header." + m_VersionInfo + "\n");
}
const bool isChunked =
reinterpret_cast<const DataHeader *>(bufferIn + bufferInOffset)
Expand All @@ -257,7 +270,7 @@ size_t CompressBlosc::DecompressV1(const char *bufferIn, const size_t sizeIn,
}
if (decompressedSize != sizeOut)
{
throw("corrupted blosc buffer");
throw("corrupted blosc buffer." + m_VersionInfo + "\n");
}
return sizeOut;
}
Expand Down Expand Up @@ -332,8 +345,8 @@ size_t CompressBlosc::DecompressChunkedFormat(const char *bufferIn,
*
* we need only the compressed size ( source address + 12 byte)
*/
bloscSize_t max_inputDataSize =
*reinterpret_cast<const bloscSize_t *>(in_ptr + 12u);
bloscSize_t max_inputDataSize;
std::memcpy(&max_inputDataSize, in_ptr + 12, sizeof(bloscSize_t));

char *out_ptr = dataOut + currentOutputSize;

Expand All @@ -352,7 +365,8 @@ size_t CompressBlosc::DecompressChunkedFormat(const char *bufferIn,
{
throw std::runtime_error(
"ERROR: ADIOS2 Blosc Decompress failed. Decompressed chunk "
"results in zero decompressed bytes.\n");
"results in zero decompressed bytes." +
m_VersionInfo + "\n");
}
inputOffset += static_cast<size_t>(max_inputDataSize);
}
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/operator/compress/CompressBlosc.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ class CompressBlosc : public Operator

static const std::map<std::string, uint32_t> m_Shuffles;
static const std::set<std::string> m_Compressors;

std::string m_VersionInfo;
};

} // end namespace compress
Expand Down
26 changes: 19 additions & 7 deletions source/adios2/operator/compress/CompressLibPressio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,9 @@ size_t CompressLibPressio::Compress(const char *dataIn, const Dims &blockStart,
size_t bufferOutOffset = 0;

// Universal operator metadata
PutParameter(bufferOut, bufferOutOffset, OperatorType::Sz);
PutParameter(bufferOut, bufferOutOffset, OperatorType::LIBPRESSIO);
PutParameter(bufferOut, bufferOutOffset, bufferVersion);
bufferOutOffset += 2;
PutParameter(bufferOut, bufferOutOffset, static_cast<uint16_t>(0));
// Universal operator metadata end

const size_t ndims = blockCount.size();
Expand All @@ -307,6 +307,12 @@ size_t CompressLibPressio::Compress(const char *dataIn, const Dims &blockStart,
PutParameter(bufferOut, bufferOutOffset, d);
}
PutParameter(bufferOut, bufferOutOffset, type);
PutParameter(bufferOut, bufferOutOffset,
static_cast<uint8_t>(pressio_major_version()));
PutParameter(bufferOut, bufferOutOffset,
static_cast<uint8_t>(pressio_minor_version()));
PutParameter(bufferOut, bufferOutOffset,
static_cast<uint8_t>(pressio_patch_version()));
PutParameters(bufferOut, bufferOutOffset, parameters);
// zfp V1 metadata end

Expand Down Expand Up @@ -364,6 +370,12 @@ size_t CompressLibPressio::DecompressV1(const char *bufferIn,
blockCount[i] = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
}
const DataType type = GetParameter<DataType>(bufferIn, bufferInOffset);
m_VersionInfo =
" Data is compressed using LibPressio Version " +
std::to_string(GetParameter<uint8_t>(bufferIn, bufferInOffset)) + "." +
std::to_string(GetParameter<uint8_t>(bufferIn, bufferInOffset)) + "." +
std::to_string(GetParameter<uint8_t>(bufferIn, bufferInOffset)) +
". Please make sure a compatible version is used for decompression.";
const Params parameters = GetParameters(bufferIn, bufferInOffset);

std::vector<size_t> dims = adios_to_libpressio_dims(blockCount);
Expand All @@ -384,7 +396,7 @@ size_t CompressLibPressio::DecompressV1(const char *bufferIn,
{
pressio_data_free(input_buf);
pressio_data_free(output_buf);
throw;
throw std::runtime_error(m_VersionInfo + "\n");
}

if (pressio_compressor_decompress(compressor, input_buf, output_buf) != 0)
Expand All @@ -393,7 +405,7 @@ size_t CompressLibPressio::DecompressV1(const char *bufferIn,
pressio_data_free(output_buf);
throw std::runtime_error(
std::string("pressio_compressor_decompress: ") +
pressio_compressor_error_msg(compressor));
pressio_compressor_error_msg(compressor) + m_VersionInfo + "\n");
}

size_t size_in_bytes = 0;
Expand All @@ -420,12 +432,12 @@ size_t CompressLibPressio::Decompress(const char *bufferIn, const size_t sizeIn,
}
else if (bufferVersion == 2)
{
// TODO: if a Version 2 zfp buffer is being implemented, put it here
// and keep the DecompressV1 routine for backward compatibility
// TODO: if a Version 2 LibPressio buffer is being implemented, put it
// here and keep the DecompressV1 routine for backward compatibility
}
else
{
throw("unknown zfp buffer version");
throw("unknown LibPressio buffer version");
}

return 0;
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/operator/compress/CompressLibPressio.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class CompressLibPressio : public Operator
*/
size_t DecompressV1(const char *bufferIn, const size_t sizeIn,
char *dataOut);

std::string m_VersionInfo;
};

} // end namespace compress
Expand Down
22 changes: 16 additions & 6 deletions source/adios2/operator/compress/CompressMGARD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,10 @@
*/

#include "CompressMGARD.h"

#include <cstring> //std::memcpy

#include <mgard_api.h>

#include "adios2/helper/adiosFunctions.h"
#include <MGARDConfig.h>
#include <cstring>
#include <mgard_api.h>

namespace adios2
{
Expand All @@ -38,7 +36,7 @@ size_t CompressMGARD::Compress(const char *dataIn, const Dims &blockStart,
// Universal operator metadata
PutParameter(bufferOut, bufferOutOffset, OperatorType::MGARD);
PutParameter(bufferOut, bufferOutOffset, bufferVersion);
bufferOutOffset += 2;
PutParameter(bufferOut, bufferOutOffset, static_cast<uint16_t>(0));
// Universal operator metadata end

const size_t ndims = blockCount.size();
Expand All @@ -50,6 +48,12 @@ size_t CompressMGARD::Compress(const char *dataIn, const Dims &blockStart,
PutParameter(bufferOut, bufferOutOffset, d);
}
PutParameter(bufferOut, bufferOutOffset, type);
PutParameter(bufferOut, bufferOutOffset,
static_cast<uint8_t>(MGARD_VERSION_MAJOR));
PutParameter(bufferOut, bufferOutOffset,
static_cast<uint8_t>(MGARD_VERSION_MINOR));
PutParameter(bufferOut, bufferOutOffset,
static_cast<uint8_t>(MGARD_VERSION_PATCH));
// mgard V1 metadata end

if (ndims > 3)
Expand Down Expand Up @@ -139,6 +143,12 @@ size_t CompressMGARD::DecompressV1(const char *bufferIn, const size_t sizeIn,
blockCount[i] = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
}
const DataType type = GetParameter<DataType>(bufferIn, bufferInOffset);
m_VersionInfo =
" Data is compressed using MGARD Version " +
std::to_string(GetParameter<uint8_t>(bufferIn, bufferInOffset)) + "." +
std::to_string(GetParameter<uint8_t>(bufferIn, bufferInOffset)) + "." +
std::to_string(GetParameter<uint8_t>(bufferIn, bufferInOffset)) +
". Please make sure a compatible version is used for decompression.";

int mgardType = -1;

Expand Down
2 changes: 2 additions & 0 deletions source/adios2/operator/compress/CompressMGARD.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class CompressMGARD : public Operator
*/
size_t DecompressV1(const char *bufferIn, const size_t sizeIn,
char *dataOut);

std::string m_VersionInfo;
};

} // end namespace compress
Expand Down
25 changes: 20 additions & 5 deletions source/adios2/operator/compress/CompressPNG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ size_t CompressPNG::Compress(const char *dataIn, const Dims &blockStart,
const uint8_t bufferVersion = 1;

// Universal operator metadata
PutParameter(bufferOut, bufferOutOffset, OperatorType::BLOSC);
PutParameter(bufferOut, bufferOutOffset, OperatorType::PNG);
PutParameter(bufferOut, bufferOutOffset, bufferVersion);
bufferOutOffset += 2;
PutParameter(bufferOut, bufferOutOffset, static_cast<uint16_t>(0));
// Universal operator metadata end

size_t paramOffset = bufferOutOffset;
bufferOutOffset += sizeof(size_t);
bufferOutOffset += sizeof(size_t) + 3;

auto lf_Write = [](png_structp png_ptr, png_bytep data, png_size_t length) {
DestInfo *pDestInfo =
Expand Down Expand Up @@ -181,6 +181,12 @@ size_t CompressPNG::Compress(const char *dataIn, const Dims &blockStart,
png_destroy_write_struct(&pngWrite, &pngInfo);

PutParameter(bufferOut, paramOffset, destInfo.Offset);
PutParameter(bufferOut, paramOffset,
static_cast<uint8_t>(PNG_LIBPNG_VER_MAJOR));
PutParameter(bufferOut, paramOffset,
static_cast<uint8_t>(PNG_LIBPNG_VER_MINOR));
PutParameter(bufferOut, paramOffset,
static_cast<uint8_t>(PNG_LIBPNG_VER_RELEASE));

return destInfo.Offset;
}
Expand All @@ -196,6 +202,13 @@ size_t CompressPNG::DecompressV1(const char *bufferIn, const size_t sizeIn,
size_t bufferInOffset = 0;
const size_t outSize = GetParameter<size_t>(bufferIn, bufferInOffset);

m_VersionInfo =
" Data is compressed using PNG Version " +
std::to_string(GetParameter<uint8_t>(bufferIn, bufferInOffset)) + "." +
std::to_string(GetParameter<uint8_t>(bufferIn, bufferInOffset)) + "." +
std::to_string(GetParameter<uint8_t>(bufferIn, bufferInOffset)) +
". Please make sure a compatible version is used for decompression.";

png_image image;
std::memset(&image, 0, sizeof(image));
image.version = PNG_IMAGE_VERSION;
Expand All @@ -207,7 +220,8 @@ size_t CompressPNG::DecompressV1(const char *bufferIn, const size_t sizeIn,
{
throw std::runtime_error(
"ERROR: png_image_begin_read_from_memory failed in call "
"to ADIOS2 PNG Decompress\n");
"to ADIOS2 PNG Decompress." +
m_VersionInfo + "\n");
}

// TODO might be needed from parameters?
Expand All @@ -216,7 +230,8 @@ size_t CompressPNG::DecompressV1(const char *bufferIn, const size_t sizeIn,
{
throw std::runtime_error(
"ERROR: png_image_finish_read_from_memory failed in call "
"to ADIOS2 PNG Decompress\n");
"to ADIOS2 PNG Decompress." +
m_VersionInfo + "\n");
}
return outSize;
}
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/operator/compress/CompressPNG.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class CompressPNG : public Operator
char *BufferOut = nullptr;
size_t Offset = 0;
};

std::string m_VersionInfo;
};

} // end namespace compress
Expand Down
Loading