Skip to content

Commit

Permalink
fix(tianmu):assert failed on ptr == buff.get() + data_.sum_len at pac…
Browse files Browse the repository at this point in the history
…k_str.cpp:584(#1620)
  • Loading branch information
konghaiya authored and mergify[bot] committed Apr 27, 2023
1 parent cc8a96b commit f1a7259
Showing 1 changed file with 18 additions and 11 deletions.
29 changes: 18 additions & 11 deletions storage/tianmu/data/pack_str.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ void PackStr::UpdateValue(size_t locationInPack, const Value &v) {
dpn_->max_i = -1;
}

// ASSERT(v.HasValue(), col_share_->DataFile() + " locationInPack: " + std::to_string(locationInPack));
UnsetNull(locationInPack);
dpn_->numOfNulls--;
auto &str = v.GetString();
Expand All @@ -227,10 +226,13 @@ void PackStr::UpdateValue(size_t locationInPack, const Value &v) {
SetPtrSize(locationInPack, nullptr, 0);
SetNull(locationInPack);
dpn_->numOfNulls++;
if(dpn_->NullOnly()){
data_.sum_len = 0;
}
} else {
// update non-null to another nonull
auto vsize = GetValueBinary(locationInPack).size();
ASSERT(data_.sum_len >= vsize);
ASSERT(data_.sum_len >= vsize, col_share_->DataFile() + ", data_.sum_len:" + std::to_string(data_.sum_len) + ", vsize:"+std::to_string(vsize));
data_.sum_len -= vsize;
auto &str = v.GetString();
if (str.size() <= vsize) {
Expand Down Expand Up @@ -260,6 +262,9 @@ void PackStr::DeleteByRow(size_t locationInPack) {
SetPtrSize(locationInPack, nullptr, 0);
SetNull(locationInPack);
dpn_->numOfNulls++;
if(dpn_->NullOnly()){
data_.sum_len = 0;
}
}
SetDeleted(locationInPack);
dpn_->numOfDeleted++;
Expand Down Expand Up @@ -381,7 +386,7 @@ std::pair<PackStr::UniquePtr, size_t> PackStr::Compress() {
reinterpret_cast<uint *>(alloc(comp_len_buf_size / 4 * sizeof(uint), mm::BLOCK_TYPE::BLOCK_TEMPORARY)), *this);
uint tmp_comp_len_buf_size = comp_len_buf_size - 8;
compress::NumCompressor<uint> nc;
CprsErr res = nc.Compress(reinterpret_cast<char *>(comp_len_buf.get() + sizeof(ushort)), tmp_comp_len_buf_size,
CprsErr res = nc.Compress(reinterpret_cast<char *>(comp_len_buf.get() + 2), tmp_comp_len_buf_size,
nc_buffer.get(), onn, maxv);
if (res != CprsErr::CPRS_SUCCESS) {
throw common::InternalException("Compression of lengths of values failed for column " +
Expand Down Expand Up @@ -409,7 +414,7 @@ std::pair<PackStr::UniquePtr, size_t> PackStr::Compress() {

mm::MMGuard<char> comp_buf(reinterpret_cast<char *>(alloc(dlen, mm::BLOCK_TYPE::BLOCK_TEMPORARY)), *this);

if (data_.sum_len) {
if (data_.sum_len > 0) {
int objs = (dpn_->numOfRecords - dpn_->numOfNulls) - zlo;

mm::MMGuard<char *> tmp_index(
Expand Down Expand Up @@ -559,7 +564,7 @@ void PackStr::Save() {
SaveUncompressed(&f);
}

ASSERT(f.Tell() == off_t(dpn_->dataAddress + dpn_->dataLength),
ASSERT(f.Tell() == off_t(dpn_->dataAddress + dpn_->dataLength), col_share_->DataFile() + ", " +
std::to_string(dpn_->dataAddress) + ":" + std::to_string(dpn_->dataLength) + "/" + std::to_string(f.Tell()));
dpn_->synced = true;
}
Expand All @@ -568,8 +573,10 @@ void PackStr::SaveUncompressed(system::Stream *f) {
f->WriteExact(nulls_ptr_.get(), bitmap_size_);
f->WriteExact(deletes_ptr_.get(), bitmap_size_);
f->WriteExact(data_.lens, (data_.len_mode * (1 << col_share_->pss)));
if (data_.v.empty())
if (data_.v.empty() || dpn_->NullOnly()){
return;
}


std::unique_ptr<char[]> buff(new char[data_.sum_len]);
char *ptr = buff.get();
Expand All @@ -586,7 +593,7 @@ void PackStr::SaveUncompressed(system::Stream *f) {
}

void PackStr::LoadCompressed(system::Stream *f) {
ASSERT(IsModeCompressionApplied());
ASSERT(IsModeCompressionApplied() , "path:" + col_share_->DataFile());

auto compressed_buf = alloc_ptr(dpn_->dataLength + 1, mm::BLOCK_TYPE::BLOCK_COMPRESSED);
f->ReadExact(compressed_buf.get(), dpn_->dataLength);
Expand Down Expand Up @@ -619,7 +626,7 @@ void PackStr::LoadCompressed(system::Stream *f) {
}

if (dpn_->numOfDeleted > 0) {
uint delete_buf_size = 0;
ushort delete_buf_size = 0;
delete_buf_size = (*reinterpret_cast<ushort *>(cur_buf));
if (delete_buf_size > bitmap_size_)
throw common::DatabaseException("Unexpected bytes found in data pack.");
Expand All @@ -628,7 +635,7 @@ void PackStr::LoadCompressed(system::Stream *f) {
else {
compress::BitstreamCompressor bsc;
CprsErr res = bsc.Decompress(reinterpret_cast<char *>(deletes_ptr_.get()), delete_buf_size,
reinterpret_cast<char *>(cur_buf) + 2, dpn_->numOfRecords, dpn_->numOfDeleted);
reinterpret_cast<char *>(cur_buf) + sizeof(ushort), dpn_->numOfRecords, dpn_->numOfDeleted);
if (res != CprsErr::CPRS_SUCCESS) {
throw common::DatabaseException("Decompression of deletes failed for column " +
std::to_string(pc_column(GetCoordinate().co.pack) + 1) + ", pack " +
Expand All @@ -646,7 +653,7 @@ void PackStr::LoadCompressed(system::Stream *f) {
compress::NumCompressor<uint> nc;
mm::MMGuard<uint> cn_ptr((uint *)alloc((1 << col_share_->pss) * sizeof(uint), mm::BLOCK_TYPE::BLOCK_TEMPORARY),
*this);
CprsErr res = nc.Decompress(cn_ptr.get(), reinterpret_cast<char *>(cur_buf + 8), comp_len_buf_size - 8,
CprsErr res = nc.Decompress(cn_ptr.get(), reinterpret_cast<char *>(cur_buf + sizeof(uint32_t)*2), comp_len_buf_size - 8,
dpn_->numOfRecords - dpn_->numOfNulls, maxv);
if (res != CprsErr::CPRS_SUCCESS) {
std::stringstream msg_buf;
Expand Down Expand Up @@ -714,7 +721,7 @@ void PackStr::LoadCompressed(system::Stream *f) {
}

void PackStr::LoadCompressedTrie(system::Stream *f) {
ASSERT(IsModeCompressionApplied());
ASSERT(IsModeCompressionApplied() , "path:" + col_share_->DataFile());
compressed_data_.reset(nullptr);
compressed_data_ = alloc_ptr(dpn_->dataLength + 1, mm::BLOCK_TYPE::BLOCK_COMPRESSED);
f->ReadExact(compressed_data_.get(), dpn_->dataLength);
Expand Down

0 comments on commit f1a7259

Please sign in to comment.