Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
lidezhu committed Aug 5, 2022
1 parent ee2f753 commit 651088c
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 367 deletions.
199 changes: 39 additions & 160 deletions dbms/src/Storages/Page/V3/BlobStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,11 @@ void BlobStore::registerPaths()
Poco::File blob(fmt::format("{}/{}", path, blob_name));
auto blob_size = blob.getSize();
delegator->addPageFileUsedSize({blob_id, 0}, blob_size, path, true);
if (blob_size > config.file_limit_size)
{
blob_stats.createBigPageStatNotChecking(blob_id, lock_stats);
}
else
{
blob_stats.createStatNotChecking(blob_id, lock_stats);
}
bool is_one_shot = blob_size > config.file_limit_size;
blob_stats.createStatNotChecking(blob_id,
is_one_shot ? blob_size : config.file_limit_size.get(),
is_one_shot,
lock_stats);
}
else
{
Expand All @@ -130,7 +127,7 @@ FileUsageStatistics BlobStore::getFileUsageStatistics() const
for (const auto & stat : stats)
{
// We can access to these type without any locking.
if (stat->isReadOnly() || stat->isBigBlob())
if (stat->isReadOnly())
{
usage.total_disk_size += stat->sm_total_size;
usage.total_valid_size += stat->sm_valid_size;
Expand All @@ -149,98 +146,12 @@ FileUsageStatistics BlobStore::getFileUsageStatistics() const
return usage;
}

PageEntriesEdit BlobStore::handleLargeWrite(DB::WriteBatch & wb, const WriteLimiterPtr & write_limiter)
{
auto ns_id = wb.getNamespaceId();
PageEntriesEdit edit;
for (auto & write : wb.getWrites())
{
switch (write.type)
{
case WriteBatch::WriteType::PUT:
{
ChecksumClass digest;
PageEntryV3 entry;

auto [blob_id, offset_in_file] = getPosFromStats(write.size);

entry.file_id = blob_id;
entry.size = write.size;
entry.tag = write.tag;
entry.offset = offset_in_file;
// padding size won't work on big write batch
entry.padded_size = 0;

BufferBase::Buffer data_buf = write.read_buffer->buffer();

digest.update(data_buf.begin(), write.size);
entry.checksum = digest.checksum();

UInt64 field_begin, field_end;

for (size_t i = 0; i < write.offsets.size(); ++i)
{
ChecksumClass field_digest;
field_begin = write.offsets[i].first;
field_end = (i == write.offsets.size() - 1) ? write.size : write.offsets[i + 1].first;

field_digest.update(data_buf.begin() + field_begin, field_end - field_begin);
write.offsets[i].second = field_digest.checksum();
}

if (!write.offsets.empty())
{
// we can swap from WriteBatch instead of copying
entry.field_offsets.swap(write.offsets);
}

try
{
auto blob_file = getBlobFile(blob_id);
blob_file->write(data_buf.begin(), offset_in_file, write.size, write_limiter);
}
catch (DB::Exception & e)
{
removePosFromStats(blob_id, offset_in_file, write.size);
LOG_FMT_ERROR(log, "[blob_id={}] [offset_in_file={}] [size={}] write failed.", blob_id, offset_in_file, write.size);
throw e;
}

edit.put(buildV3Id(ns_id, write.page_id), entry);
break;
}
case WriteBatch::WriteType::DEL:
{
edit.del(buildV3Id(ns_id, write.page_id));
break;
}
case WriteBatch::WriteType::REF:
{
edit.ref(buildV3Id(ns_id, write.page_id), buildV3Id(ns_id, write.ori_page_id));
break;
}
case WriteBatch::WriteType::PUT_EXTERNAL:
edit.putExternal(buildV3Id(ns_id, write.page_id));
break;
case WriteBatch::WriteType::UPSERT:
throw Exception(fmt::format("Unknown write type: {}", write.type));
}
}

return edit;
}

PageEntriesEdit BlobStore::write(DB::WriteBatch & wb, const WriteLimiterPtr & write_limiter)
{
ProfileEvents::increment(ProfileEvents::PSMWritePages, wb.putWriteCount());

const size_t all_page_data_size = wb.getTotalDataSize();

if (all_page_data_size > config.file_limit_size)
{
return handleLargeWrite(wb, write_limiter);
}

PageEntriesEdit edit;

auto ns_id = wb.getNamespaceId();
Expand Down Expand Up @@ -412,7 +323,7 @@ void BlobStore::remove(const PageEntriesV3 & del_entries)
}
}

// After we remove postion of blob, we need recalculate the blob.
// After we remove position of blob, we need recalculate the blob.
for (const auto & blob_id : blob_updated)
{
const auto & stat = blob_stats.blobIdToStat(blob_id,
Expand Down Expand Up @@ -443,40 +354,36 @@ std::pair<BlobFileId, BlobFileOffset> BlobStore::getPosFromStats(size_t size)

auto lock_stat = [size, this, &stat]() {
auto lock_stats = blob_stats.lock();
if (size > config.file_limit_size)
BlobFileId blob_file_id = INVALID_BLOBFILE_ID;
std::tie(stat, blob_file_id) = blob_stats.chooseStat(size, lock_stats);
if (stat == nullptr)
{
auto blob_file_id = blob_stats.chooseBigStat(lock_stats);
stat = blob_stats.createBigStat(blob_file_id, lock_stats);

return stat->lock();
// No valid stat for putting data with `size`, create a new one
// one_shot means the blob file will just accept this writing and won't be reused for other writing
bool is_one_shot = size > config.file_limit_size;
stat = blob_stats.createStat(blob_file_id,
is_one_shot ? size : config.file_limit_size.get(),
is_one_shot,
lock_stats);
}
else
{
BlobFileId blob_file_id = INVALID_BLOBFILE_ID;
std::tie(stat, blob_file_id) = blob_stats.chooseStat(size, lock_stats);
if (stat == nullptr)
{
// No valid stat for puting data with `size`, create a new one
stat = blob_stats.createStat(blob_file_id, lock_stats);
}

// We must get the lock from BlobStat under the BlobStats lock
// to ensure that BlobStat updates are serialized.
// Otherwise it may cause stat to fail to get the span for writing
// and throwing exception.
return stat->lock();
}
// We must get the lock from BlobStat under the BlobStats lock
// to ensure that BlobStat updates are serialized.
// Otherwise it may cause stat to fail to get the span for writing
// and throwing exception.
return stat->lock();
}();

// We need to assume that this insert will reduce max_cap.
// Because other threads may also be waiting for BlobStats to chooseStat during this time.
// If max_cap is not reduced, it may cause the same BlobStat to accept multiple buffers and exceed its max_cap.
// After the BlobStore records the buffer size, max_caps will also get an accurate update.
// So there won't get problem in reducing max_caps here.
auto old_max_cap = stat->sm_max_caps;
assert(stat->sm_max_caps >= size);
stat->sm_max_caps -= size;

// Get Postion from single stat
auto old_max_cap = stat->sm_max_caps;
// Get Position from single stat
BlobFileOffset offset = stat->getPosFromStat(size, lock_stat);

// Can't insert into this spacemap
Expand Down Expand Up @@ -851,7 +758,7 @@ struct BlobStoreGCInfo
toTypeString("Read-Only Blob", 0),
toTypeString("No GC Blob", 1),
toTypeString("Full GC Blob", 2),
toTypeString("Big Blob", 3),
toTypeString("One Shot Blob", 3),
toTypeTruncateString("Truncated Blob"));
}

Expand All @@ -870,7 +777,7 @@ struct BlobStoreGCInfo
blob_gc_info[2].emplace_back(std::make_pair(blob_id, valid_rate));
}

void appendToBigBlob(const BlobFileId blob_id, double valid_rate)
void appendToOneShotBlob(const BlobFileId blob_id, double valid_rate)
{
blob_gc_info[3].emplace_back(std::make_pair(blob_id, valid_rate));
}
Expand Down Expand Up @@ -972,10 +879,10 @@ std::vector<BlobFileId> BlobStore::getGCStats()
continue;
}

if (stat->isBigBlob())
if (stat->isOneShot())
{
blobstore_gc_info.appendToBigBlob(stat->id, stat->sm_valid_rate);
LOG_FMT_TRACE(log, "Current [blob_id={}] is big-blob", stat->id);
blobstore_gc_info.appendToOneShotBlob(stat->id, stat->sm_valid_rate);
LOG_FMT_TRACE(log, "Current [blob_id={}] is one-shot", stat->id);
continue;
}

Expand Down Expand Up @@ -1278,7 +1185,7 @@ std::lock_guard<std::mutex> BlobStore::BlobStats::lock() const
return std::lock_guard(lock_stats);
}

BlobStatPtr BlobStore::BlobStats::createStat(BlobFileId blob_file_id, const std::lock_guard<std::mutex> & guard)
BlobStatPtr BlobStore::BlobStats::createStat(BlobFileId blob_file_id, UInt64 max_caps, bool is_one_shot, const std::lock_guard<std::mutex> & guard)
{
// New blob file id won't bigger than roll_id
if (blob_file_id > roll_id)
Expand All @@ -1304,7 +1211,7 @@ BlobStatPtr BlobStore::BlobStats::createStat(BlobFileId blob_file_id, const std:
}

// Create a stat without checking the file_id exist or not
auto stat = createStatNotChecking(blob_file_id, guard);
auto stat = createStatNotChecking(blob_file_id, max_caps, is_one_shot, guard);

// Roll to the next new blob id
if (blob_file_id == roll_id)
Expand All @@ -1315,38 +1222,14 @@ BlobStatPtr BlobStore::BlobStats::createStat(BlobFileId blob_file_id, const std:
return stat;
}

BlobStatPtr BlobStore::BlobStats::createStatNotChecking(BlobFileId blob_file_id, const std::lock_guard<std::mutex> &)
BlobStatPtr BlobStore::BlobStats::createStatNotChecking(BlobFileId blob_file_id, UInt64 max_caps, bool is_one_shot, const std::lock_guard<std::mutex> &)
{
LOG_FMT_INFO(log, "Created a new BlobStat [blob_id={}]", blob_file_id);
LOG_FMT_INFO(log, "Created a new BlobStat [blob_id={}] with capacity {}", blob_file_id, max_caps);
BlobStatPtr stat = std::make_shared<BlobStat>(
blob_file_id,
static_cast<SpaceMap::SpaceMapType>(config.spacemap_type.get()),
config.file_limit_size);

PageFileIdAndLevel id_lvl{blob_file_id, 0};
stats_map[delegator->choosePath(id_lvl)].emplace_back(stat);
return stat;
}

BlobStatPtr BlobStore::BlobStats::createBigStat(BlobFileId blob_file_id, const std::lock_guard<std::mutex> & guard)
{
auto stat = createBigPageStatNotChecking(blob_file_id, guard);
// Roll to the next new blob id
if (blob_file_id == roll_id)
{
roll_id++;
}

return stat;
}

BlobStatPtr BlobStore::BlobStats::createBigPageStatNotChecking(BlobFileId blob_file_id, const std::lock_guard<std::mutex> &)
{
LOG_FMT_INFO(log, "Created a new big BlobStat [blob_id={}]", blob_file_id);
BlobStatPtr stat = std::make_shared<BlobStat>(
blob_file_id,
SpaceMap::SpaceMapType::SMAP64_BIG,
config.file_limit_size);
max_caps,
is_one_shot);

PageFileIdAndLevel id_lvl{blob_file_id, 0};
stats_map[delegator->choosePath(id_lvl)].emplace_back(stat);
Expand Down Expand Up @@ -1447,11 +1330,6 @@ std::pair<BlobStatPtr, BlobFileId> BlobStore::BlobStats::chooseStat(size_t buf_s
return std::make_pair(stat_ptr, INVALID_BLOBFILE_ID);
}

BlobFileId BlobStore::BlobStats::chooseBigStat(const std::lock_guard<std::mutex> &) const
{
return roll_id;
}

BlobStatPtr BlobStore::BlobStats::blobIdToStat(BlobFileId file_id, bool ignore_not_exist)
{
auto guard = lock();
Expand Down Expand Up @@ -1487,6 +1365,7 @@ BlobFileOffset BlobStore::BlobStats::BlobStat::getPosFromStat(size_t buf_size, c
UInt64 max_cap = 0;
bool expansion = true;

// TODO: check the detail algorithm and what `expansion` means
std::tie(offset, max_cap, expansion) = smap->searchInsertOffset(buf_size);
ProfileEvents::increment(expansion ? ProfileEvents::PSV3MBlobExpansion : ProfileEvents::PSV3MBlobReused);

Expand Down Expand Up @@ -1524,7 +1403,7 @@ bool BlobStore::BlobStats::BlobStat::removePosFromStat(BlobFileOffset offset, si
if (!smap->markFree(offset, buf_size))
{
smap->logDebugString();
throw Exception(fmt::format("Remove postion from BlobStat failed, invalid position [offset={}] [buf_size={}] [blob_id={}]",
throw Exception(fmt::format("Remove position from BlobStat failed, invalid position [offset={}] [buf_size={}] [blob_id={}]",
offset,
buf_size,
id),
Expand All @@ -1533,7 +1412,7 @@ bool BlobStore::BlobStats::BlobStat::removePosFromStat(BlobFileOffset offset, si

sm_valid_size -= buf_size;
sm_valid_rate = sm_valid_size * 1.0 / sm_total_size;
return ((isReadOnly() || isBigBlob()) && sm_valid_size == 0);
return ((isReadOnly() || isOneShot()) && sm_valid_size == 0);
}

void BlobStore::BlobStats::BlobStat::restoreSpaceMap(BlobFileOffset offset, size_t buf_size)
Expand Down
Loading

0 comments on commit 651088c

Please sign in to comment.