From 3f607fa74a5ffa8ecb9df6f1eefbc2118d2e7178 Mon Sep 17 00:00:00 2001
From: lidezhu <lidezhu@pingcap.com>
Date: Mon, 8 Aug 2022 16:03:18 +0800
Subject: [PATCH] support config blob file limit size

---
 dbms/src/Interpreters/Settings.h              |   7 +-
 dbms/src/Storages/Page/ConfigSettings.cpp     |   1 +
 dbms/src/Storages/Page/V3/BlobStore.cpp       | 161 ++++++----------
 dbms/src/Storages/Page/V3/BlobStore.h         |  48 ++---
 .../Storages/Page/V3/spacemap/SpaceMap.cpp    |   9 -
 dbms/src/Storages/Page/V3/spacemap/SpaceMap.h |   3 -
 .../Storages/Page/V3/spacemap/SpaceMapBig.h   | 151 ---------------
 .../Page/V3/tests/gtest_blob_store.cpp        | 180 ++++++++++++++++--
 .../Page/V3/tests/gtest_page_directory.cpp    |   4 +-
 dbms/src/TestUtils/MockDiskDelegator.h        |   6 +-
 10 files changed, 248 insertions(+), 322 deletions(-)
 delete mode 100644 dbms/src/Storages/Page/V3/spacemap/SpaceMapBig.h

diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h
index 904a88e9a6b..eb0e5730c44 100644
--- a/dbms/src/Interpreters/Settings.h
+++ b/dbms/src/Interpreters/Settings.h
@@ -316,10 +316,11 @@ struct Settings
     M(SettingUInt64, dt_checksum_frame_size, DBMS_DEFAULT_BUFFER_SIZE, "Frame size for delta tree stable storage")                                                                                                                      \
                                                                                                                                                                                                                                         \
     M(SettingDouble, dt_storage_blob_heavy_gc_valid_rate, 0.2, "Max valid rate of deciding a blob can be compact")                                                                                                                      \
+    M(SettingUInt64, dt_storage_blob_file_limit_size, 268435456, "Max size of single blob file. Some blob file may exceed this limit due to large write. And change of the config only affect newly created blob file.")                \
     M(SettingDouble, dt_storage_blob_block_alignment_bytes, 0, "Blob IO alignment size")                                                                                                                                                \
-    M(SettingBool, dt_enable_read_thread, false, "Enable storage read thread or not")                                                                                                                                                          \
-    M(SettingDouble, dt_block_slots_scale, 1.0, "Block slots limit of a read request")                                                                                                                                                          \
-    M(SettingDouble, dt_active_segments_scale, 1.0, "Acitve segments limit of a read request")                                                                                                                                                          \
+    M(SettingBool, dt_enable_read_thread, false, "Enable storage read thread or not")                                                                                                                                                   \
+    M(SettingDouble, dt_block_slots_scale, 1.0, "Block slots limit of a read request")                                                                                                                                                  \
+    M(SettingDouble, dt_active_segments_scale, 1.0, "Acitve segments limit of a read request")                                                                                                                                          \
                                                                                                                                                                                                                                         \
     M(SettingChecksumAlgorithm, dt_checksum_algorithm, ChecksumAlgo::XXH3, "Checksum algorithm for delta tree stable storage")                                                                                                          \
     M(SettingCompressionMethod, dt_compression_method, CompressionMethod::LZ4, "The method of data compression when writing.")                                                                                                          \
diff --git a/dbms/src/Storages/Page/ConfigSettings.cpp b/dbms/src/Storages/Page/ConfigSettings.cpp
index 6962dc08baf..997bf2d4277 100644
--- a/dbms/src/Storages/Page/ConfigSettings.cpp
+++ b/dbms/src/Storages/Page/ConfigSettings.cpp
@@ -36,6 +36,7 @@ void mergeConfigFromSettings(const DB::Settings & settings, PageStorage::Config
 
     // V3 setting which export to global setting
     config.blob_heavy_gc_valid_rate = settings.dt_storage_blob_heavy_gc_valid_rate;
+    config.blob_file_limit_size = settings.dt_storage_blob_file_limit_size;
     config.blob_block_alignment_bytes = settings.dt_storage_blob_block_alignment_bytes;
 }
 
diff --git a/dbms/src/Storages/Page/V3/BlobStore.cpp b/dbms/src/Storages/Page/V3/BlobStore.cpp
index a45f3a6e760..52cbe87954e 100644
--- a/dbms/src/Storages/Page/V3/BlobStore.cpp
+++ b/dbms/src/Storages/Page/V3/BlobStore.cpp
@@ -100,14 +100,9 @@ void BlobStore::registerPaths()
                 Poco::File blob(fmt::format("{}/{}", path, blob_name));
                 auto blob_size = blob.getSize();
                 delegator->addPageFileUsedSize({blob_id, 0}, blob_size, path, true);
-                if (blob_size > config.file_limit_size)
-                {
-                    blob_stats.createBigPageStatNotChecking(blob_id, lock_stats);
-                }
-                else
-                {
-                    blob_stats.createStatNotChecking(blob_id, lock_stats);
-                }
+                blob_stats.createStatNotChecking(blob_id,
+                                                 std::max(blob_size, config.file_limit_size.get()),
+                                                 lock_stats);
             }
             else
             {
@@ -130,7 +125,7 @@ FileUsageStatistics BlobStore::getFileUsageStatistics() const
         for (const auto & stat : stats)
         {
             // We can access to these type without any locking.
-            if (stat->isReadOnly() || stat->isBigBlob())
+            if (stat->isReadOnly())
             {
                 usage.total_disk_size += stat->sm_total_size;
                 usage.total_valid_size += stat->sm_valid_size;
@@ -236,6 +231,8 @@ PageEntriesEdit BlobStore::write(DB::WriteBatch & wb, const WriteLimiterPtr & wr
 
     const size_t all_page_data_size = wb.getTotalDataSize();
 
+    // If the WriteBatch is too big, we will split the Writes in the WriteBatch to different `BlobFile`.
+    // This can avoid allocating a big buffer for writing data and can smooth memory usage.
     if (all_page_data_size > config.file_limit_size)
     {
         return handleLargeWrite(wb, write_limiter);
@@ -412,7 +409,7 @@ void BlobStore::remove(const PageEntriesV3 & del_entries)
         }
     }
 
-    // After we remove postion of blob, we need recalculate the blob.
+    // After we remove position of blob, we need recalculate the blob.
     for (const auto & blob_id : blob_updated)
     {
         const auto & stat = blob_stats.blobIdToStat(blob_id,
@@ -443,29 +440,21 @@ std::pair<BlobFileId, BlobFileOffset> BlobStore::getPosFromStats(size_t size)
 
     auto lock_stat = [size, this, &stat]() {
         auto lock_stats = blob_stats.lock();
-        if (size > config.file_limit_size)
+        BlobFileId blob_file_id = INVALID_BLOBFILE_ID;
+        std::tie(stat, blob_file_id) = blob_stats.chooseStat(size, lock_stats);
+        if (stat == nullptr)
         {
-            auto blob_file_id = blob_stats.chooseBigStat(lock_stats);
-            stat = blob_stats.createBigStat(blob_file_id, lock_stats);
-
-            return stat->lock();
+            // No valid stat for putting data with `size`, create a new one
+            stat = blob_stats.createStat(blob_file_id,
+                                         std::max(size, config.file_limit_size.get()),
+                                         lock_stats);
         }
-        else
-        {
-            BlobFileId blob_file_id = INVALID_BLOBFILE_ID;
-            std::tie(stat, blob_file_id) = blob_stats.chooseStat(size, lock_stats);
-            if (stat == nullptr)
-            {
-                // No valid stat for puting data with `size`, create a new one
-                stat = blob_stats.createStat(blob_file_id, lock_stats);
-            }
 
-            // We must get the lock from BlobStat under the BlobStats lock
-            // to ensure that BlobStat updates are serialized.
-            // Otherwise it may cause stat to fail to get the span for writing
-            // and throwing exception.
-            return stat->lock();
-        }
+        // We must get the lock from BlobStat under the BlobStats lock
+        // to ensure that BlobStat updates are serialized.
+        // Otherwise it may cause stat to fail to get the span for writing
+        // and throwing exception.
+        return stat->lock();
     }();
 
     // We need to assume that this insert will reduce max_cap.
@@ -473,10 +462,11 @@ std::pair<BlobFileId, BlobFileOffset> BlobStore::getPosFromStats(size_t size)
     // If max_cap is not reduced, it may cause the same BlobStat to accept multiple buffers and exceed its max_cap.
     // After the BlobStore records the buffer size, max_caps will also get an accurate update.
     // So there won't get problem in reducing max_caps here.
+    auto old_max_cap = stat->sm_max_caps;
+    assert(stat->sm_max_caps >= size);
     stat->sm_max_caps -= size;
 
-    // Get Postion from single stat
-    auto old_max_cap = stat->sm_max_caps;
+    // Get Position from single stat
     BlobFileOffset offset = stat->getPosFromStat(size, lock_stat);
 
     // Can't insert into this spacemap
@@ -500,7 +490,10 @@ void BlobStore::removePosFromStats(BlobFileId blob_id, BlobFileOffset offset, si
     const auto & stat = blob_stats.blobIdToStat(blob_id);
     {
         auto lock = stat->lock();
-        need_remove_stat = stat->removePosFromStat(offset, size, lock);
+        auto remaining_valid_size = stat->removePosFromStat(offset, size, lock);
+        // BlobFile which is read-only or with capacity larger than config.file_limit_size won't be reused for other write,
+        // so it's safe and necessary to remove it here.
+        need_remove_stat = ((stat->isReadOnly() || stat->file_total_caps > config.file_limit_size) && remaining_valid_size == 0);
     }
 
     // We don't need hold the BlobStat lock(Also can't do that).
@@ -849,11 +842,10 @@ struct BlobStoreGCInfo
 {
     String toString() const
     {
-        return fmt::format("{}. {}. {}. {}. {}.",
+        return fmt::format("{}. {}. {}. {}.",
                            toTypeString("Read-Only Blob", 0),
                            toTypeString("No GC Blob", 1),
                            toTypeString("Full GC Blob", 2),
-                           toTypeString("Big Blob", 3),
                            toTypeTruncateString("Truncated Blob"));
     }
 
@@ -872,11 +864,6 @@ struct BlobStoreGCInfo
         blob_gc_info[2].emplace_back(std::make_pair(blob_id, valid_rate));
     }
 
-    void appendToBigBlob(const BlobFileId blob_id, double valid_rate)
-    {
-        blob_gc_info[3].emplace_back(std::make_pair(blob_id, valid_rate));
-    }
-
     void appendToTruncatedBlob(const BlobFileId blob_id, UInt64 origin_size, UInt64 truncated_size, double valid_rate)
     {
         blob_gc_truncate_info.emplace_back(std::make_tuple(blob_id, origin_size, truncated_size, valid_rate));
@@ -886,8 +873,7 @@ struct BlobStoreGCInfo
     // 1. read only blob
     // 2. no need gc blob
     // 3. full gc blob
-    // 4. big blob
-    std::vector<std::pair<BlobFileId, double>> blob_gc_info[4];
+    std::vector<std::pair<BlobFileId, double>> blob_gc_info[3];
 
     std::vector<std::tuple<BlobFileId, UInt64, UInt64, double>> blob_gc_truncate_info;
 
@@ -974,13 +960,6 @@ std::vector<BlobFileId> BlobStore::getGCStats()
                 continue;
             }
 
-            if (stat->isBigBlob())
-            {
-                blobstore_gc_info.appendToBigBlob(stat->id, stat->sm_valid_rate);
-                LOG_FMT_TRACE(log, "Current [blob_id={}] is big-blob", stat->id);
-                continue;
-            }
-
             auto lock = stat->lock();
             auto right_margin = stat->smap->getUsedBoundary();
 
@@ -1106,10 +1085,25 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
         }
     };
 
-    const auto config_file_limit = config.file_limit_size.get();
-    // If `total_page_size` is greater than `config_file_limit`, we need to write the page data into multiple `BlobFile`s to
+    auto alloc_size = config.file_limit_size.get();
+    // If `total_page_size` is greater than `config_file_limit`, we will try to write the page data into multiple `BlobFile`s to
     // make the memory consumption smooth during GC.
-    auto alloc_size = total_page_size > config_file_limit ? config_file_limit : total_page_size;
+    if (total_page_size > alloc_size)
+    {
+        size_t biggest_page_size = 0;
+        for (const auto & [file_id, versioned_pageid_entry_list] : entries_need_gc)
+        {
+            (void)file_id;
+            for (const auto & [page_id, version, entry] : versioned_pageid_entry_list)
+            {
+                (void)page_id;
+                (void)version;
+                biggest_page_size = std::max(biggest_page_size, entry.size);
+            }
+        }
+        alloc_size = std::max(alloc_size, biggest_page_size);
+    }
+
     BlobFileOffset remaining_page_size = total_page_size - alloc_size;
 
     char * data_buf = static_cast<char *>(alloc(alloc_size));
@@ -1130,7 +1124,7 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
     // ...
     for (const auto & [file_id, versioned_pageid_entry_list] : entries_need_gc)
     {
-        for (const auto & [page_id, versioned, entry] : versioned_pageid_entry_list)
+        for (const auto & [page_id, version, entry] : versioned_pageid_entry_list)
         {
             /// If `total_page_size` is greater than `config_file_limit`, we need to write the page data into multiple `BlobFile`s.
             /// So there may be some page entry that cannot be fit into the current blob file, and we need to write it into the next one.
@@ -1140,7 +1134,6 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
             /// After writing data into the current blob file, we reuse the original buffer for future write.
             if (offset_in_data + entry.size > alloc_size)
             {
-                assert(alloc_size == config_file_limit);
                 assert(file_offset_begin == 0);
                 // Remove the span that is not actually used
                 if (offset_in_data != alloc_size)
@@ -1157,10 +1150,11 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
                 offset_in_data = 0;
 
                 // Acquire a span from stats for remaining data
-                auto next_alloc_size = (remaining_page_size > config_file_limit ? config_file_limit : remaining_page_size);
+                auto next_alloc_size = (remaining_page_size > alloc_size ? alloc_size : remaining_page_size);
                 remaining_page_size -= next_alloc_size;
                 std::tie(blobfile_id, file_offset_begin) = getPosFromStats(next_alloc_size);
             }
+            assert(offset_in_data + entry.size <= alloc_size);
 
             // Read the data into buffer by old entry
             read(page_id, file_id, entry.offset, data_pos, entry.size, read_limiter, /*background*/ true);
@@ -1175,7 +1169,7 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
             offset_in_data += new_entry.size;
             data_pos += new_entry.size;
 
-            edit.upsertPage(page_id, versioned, new_entry);
+            edit.upsertPage(page_id, version, new_entry);
         }
     }
 
@@ -1280,7 +1274,7 @@ std::lock_guard<std::mutex> BlobStore::BlobStats::lock() const
     return std::lock_guard(lock_stats);
 }
 
-BlobStatPtr BlobStore::BlobStats::createStat(BlobFileId blob_file_id, const std::lock_guard<std::mutex> & guard)
+BlobStatPtr BlobStore::BlobStats::createStat(BlobFileId blob_file_id, UInt64 max_caps, const std::lock_guard<std::mutex> & guard)
 {
     // New blob file id won't bigger than roll_id
     if (blob_file_id > roll_id)
@@ -1306,7 +1300,7 @@ BlobStatPtr BlobStore::BlobStats::createStat(BlobFileId blob_file_id, const std:
     }
 
     // Create a stat without checking the file_id exist or not
-    auto stat = createStatNotChecking(blob_file_id, guard);
+    auto stat = createStatNotChecking(blob_file_id, max_caps, guard);
 
     // Roll to the next new blob id
     if (blob_file_id == roll_id)
@@ -1317,43 +1311,13 @@ BlobStatPtr BlobStore::BlobStats::createStat(BlobFileId blob_file_id, const std:
     return stat;
 }
 
-BlobStatPtr BlobStore::BlobStats::createStatNotChecking(BlobFileId blob_file_id, const std::lock_guard<std::mutex> &)
+BlobStatPtr BlobStore::BlobStats::createStatNotChecking(BlobFileId blob_file_id, UInt64 max_caps, const std::lock_guard<std::mutex> &)
 {
-    LOG_FMT_INFO(log, "Created a new BlobStat [blob_id={}]", blob_file_id);
+    LOG_FMT_INFO(log, "Created a new BlobStat [blob_id={}] with capacity {}", blob_file_id, max_caps);
     BlobStatPtr stat = std::make_shared<BlobStat>(
         blob_file_id,
         static_cast<SpaceMap::SpaceMapType>(config.spacemap_type.get()),
-        config.file_limit_size);
-
-    PageFileIdAndLevel id_lvl{blob_file_id, 0};
-    auto path = delegator->choosePath(id_lvl);
-    /// This function may be called when restoring an old BlobFile at restart or creating a new BlobFile.
-    /// If restoring an old BlobFile, the BlobFile path maybe already added to delegator, but an another call to `addPageFileUsedSize` should do no harm.
-    /// If creating a new BlobFile, we need to register the BlobFile's path to delegator, so it's necessary to call `addPageFileUsedSize` here.
-    delegator->addPageFileUsedSize({blob_file_id, 0}, 0, path, true);
-    stats_map[path].emplace_back(stat);
-    return stat;
-}
-
-BlobStatPtr BlobStore::BlobStats::createBigStat(BlobFileId blob_file_id, const std::lock_guard<std::mutex> & guard)
-{
-    auto stat = createBigPageStatNotChecking(blob_file_id, guard);
-    // Roll to the next new blob id
-    if (blob_file_id == roll_id)
-    {
-        roll_id++;
-    }
-
-    return stat;
-}
-
-BlobStatPtr BlobStore::BlobStats::createBigPageStatNotChecking(BlobFileId blob_file_id, const std::lock_guard<std::mutex> &)
-{
-    LOG_FMT_INFO(log, "Created a new big BlobStat [blob_id={}]", blob_file_id);
-    BlobStatPtr stat = std::make_shared<BlobStat>(
-        blob_file_id,
-        SpaceMap::SpaceMapType::SMAP64_BIG,
-        config.file_limit_size);
+        max_caps);
 
     PageFileIdAndLevel id_lvl{blob_file_id, 0};
     auto path = delegator->choosePath(id_lvl);
@@ -1424,9 +1388,11 @@ std::pair<BlobStatPtr, BlobFileId> BlobStore::BlobStats::chooseStat(size_t buf_s
         for (const auto & stat : stats_iter->second)
         {
             auto lock = stat->lock(); // TODO: will it bring performance regression?
+            // Only BlobFile which total capacity is smaller than config.file_limit_size can be reused for other write
             if (stat->isNormal()
                 && stat->sm_max_caps >= buf_size
-                && stat->sm_valid_rate < smallest_valid_rate)
+                && stat->sm_valid_rate < smallest_valid_rate
+                && stat->file_total_caps <= config.file_limit_size)
             {
                 smallest_valid_rate = stat->sm_valid_rate;
                 stat_ptr = stat;
@@ -1459,11 +1425,6 @@ std::pair<BlobStatPtr, BlobFileId> BlobStore::BlobStats::chooseStat(size_t buf_s
     return std::make_pair(stat_ptr, INVALID_BLOBFILE_ID);
 }
 
-BlobFileId BlobStore::BlobStats::chooseBigStat(const std::lock_guard<std::mutex> &) const
-{
-    return roll_id;
-}
-
 BlobStatPtr BlobStore::BlobStats::blobIdToStat(BlobFileId file_id, bool ignore_not_exist)
 {
     auto guard = lock();
@@ -1531,12 +1492,12 @@ BlobFileOffset BlobStore::BlobStats::BlobStat::getPosFromStat(size_t buf_size, c
     return offset;
 }
 
-bool BlobStore::BlobStats::BlobStat::removePosFromStat(BlobFileOffset offset, size_t buf_size, const std::lock_guard<std::mutex> &)
+size_t BlobStore::BlobStats::BlobStat::removePosFromStat(BlobFileOffset offset, size_t buf_size, const std::lock_guard<std::mutex> &)
 {
     if (!smap->markFree(offset, buf_size))
     {
         smap->logDebugString();
-        throw Exception(fmt::format("Remove postion from BlobStat failed, invalid position [offset={}] [buf_size={}] [blob_id={}]",
+        throw Exception(fmt::format("Remove position from BlobStat failed, invalid position [offset={}] [buf_size={}] [blob_id={}]",
                                     offset,
                                     buf_size,
                                     id),
@@ -1545,7 +1506,7 @@ bool BlobStore::BlobStats::BlobStat::removePosFromStat(BlobFileOffset offset, si
 
     sm_valid_size -= buf_size;
     sm_valid_rate = sm_valid_size * 1.0 / sm_total_size;
-    return ((isReadOnly() || isBigBlob()) && sm_valid_size == 0);
+    return sm_valid_size;
 }
 
 void BlobStore::BlobStats::BlobStat::restoreSpaceMap(BlobFileOffset offset, size_t buf_size)
diff --git a/dbms/src/Storages/Page/V3/BlobStore.h b/dbms/src/Storages/Page/V3/BlobStore.h
index 9735794a765..5f21fcb73c2 100644
--- a/dbms/src/Storages/Page/V3/BlobStore.h
+++ b/dbms/src/Storages/Page/V3/BlobStore.h
@@ -72,11 +72,7 @@ class BlobStore : private Allocator<false>
             // Read Only.
             // Only after heavy GC, BlobFile will change to READ_ONLY type.
             // After GC remove, empty files will be removed.
-            READ_ONLY = 2,
-
-            // Big Blob file
-            // Only used to page size > config.file_limit_size
-            BIG_BLOB = 3
+            READ_ONLY = 2
         };
 
         static String blobTypeToString(BlobStatType type)
@@ -87,8 +83,6 @@ class BlobStore : private Allocator<false>
                 return "normal";
             case BlobStatType::READ_ONLY:
                 return "read only";
-            case BlobStatType::BIG_BLOB:
-                return "big blob";
             }
             return "Invalid";
         }
@@ -101,13 +95,19 @@ class BlobStore : private Allocator<false>
             std::mutex sm_lock;
             const SpaceMapPtr smap;
             /**
-             * If no any data inside. It shoule be same as space map `biggest_cap`,
+             * If no any data inside. It should be same as space map `biggest_cap`,
              * It is a hint for choosing quickly, should use `recalculateCapacity`
              * to update it after some space are free in the spacemap.
              */
+            // The map capacity of the whole BlobFile
+            UInt64 file_total_caps = 0;
+            // The max capacity of all available slots in SpaceMap
             UInt64 sm_max_caps = 0;
+            // The current file size of the BlobFile
             UInt64 sm_total_size = 0;
+            // The sum of the size of all valid data in the BlobFile
             UInt64 sm_valid_size = 0;
+            // sm_valid_size / sm_total_size
             double sm_valid_rate = 0.0;
 
         public:
@@ -115,16 +115,9 @@ class BlobStore : private Allocator<false>
                 : id(id_)
                 , type(BlobStatType::NORMAL)
                 , smap(SpaceMap::createSpaceMap(sm_type, 0, sm_max_caps_))
+                , file_total_caps(sm_max_caps_)
                 , sm_max_caps(sm_max_caps_)
-            {
-                if (sm_type == SpaceMap::SpaceMapType::SMAP64_BIG)
-                {
-                    type = BlobStatType::BIG_BLOB;
-                }
-
-                // Won't create read-only blob by default.
-                assert(type != BlobStatType::READ_ONLY);
-            }
+            {}
 
             [[nodiscard]] std::lock_guard<std::mutex> lock()
             {
@@ -146,14 +139,9 @@ class BlobStore : private Allocator<false>
                 type.store(BlobStatType::READ_ONLY);
             }
 
-            bool isBigBlob() const
-            {
-                return type.load() == BlobStatType::BIG_BLOB;
-            }
-
             BlobFileOffset getPosFromStat(size_t buf_size, const std::lock_guard<std::mutex> &);
 
-            bool removePosFromStat(BlobFileOffset offset, size_t buf_size, const std::lock_guard<std::mutex> &);
+            size_t removePosFromStat(BlobFileOffset offset, size_t buf_size, const std::lock_guard<std::mutex> &);
 
             /**
              * This method is only used when blobstore restore
@@ -191,13 +179,9 @@ class BlobStore : private Allocator<false>
         //
         [[nodiscard]] std::lock_guard<std::mutex> lock() const;
 
-        BlobStatPtr createStatNotChecking(BlobFileId blob_file_id, const std::lock_guard<std::mutex> &);
+        BlobStatPtr createStatNotChecking(BlobFileId blob_file_id, UInt64 max_caps, const std::lock_guard<std::mutex> &);
 
-        BlobStatPtr createStat(BlobFileId blob_file_id, const std::lock_guard<std::mutex> & guard);
-
-        BlobStatPtr createBigPageStatNotChecking(BlobFileId blob_file_id, const std::lock_guard<std::mutex> &);
-
-        BlobStatPtr createBigStat(BlobFileId blob_file_id, const std::lock_guard<std::mutex> & guard);
+        BlobStatPtr createStat(BlobFileId blob_file_id, UInt64 max_caps, const std::lock_guard<std::mutex> & guard);
 
         void eraseStat(const BlobStatPtr && stat, const std::lock_guard<std::mutex> &);
 
@@ -219,8 +203,6 @@ class BlobStore : private Allocator<false>
          */
         std::pair<BlobStatPtr, BlobFileId> chooseStat(size_t buf_size, const std::lock_guard<std::mutex> &);
 
-        BlobFileId chooseBigStat(const std::lock_guard<std::mutex> &) const;
-
         BlobStatPtr blobIdToStat(BlobFileId file_id, bool ignore_not_exist = false);
 
         using StatsMap = std::map<String, std::list<BlobStatPtr>>;
@@ -266,6 +248,8 @@ class BlobStore : private Allocator<false>
                        const WriteLimiterPtr & write_limiter = nullptr,
                        const ReadLimiterPtr & read_limiter = nullptr);
 
+    PageEntriesEdit handleLargeWrite(DB::WriteBatch & wb, const WriteLimiterPtr & write_limiter = nullptr);
+
     PageEntriesEdit write(DB::WriteBatch & wb, const WriteLimiterPtr & write_limiter = nullptr);
 
     void remove(const PageEntriesV3 & del_entries);
@@ -295,8 +279,6 @@ class BlobStore : private Allocator<false>
 private:
 #endif
 
-    PageEntriesEdit handleLargeWrite(DB::WriteBatch & wb, const WriteLimiterPtr & write_limiter = nullptr);
-
     BlobFilePtr read(const PageIdV3Internal & page_id_v3, BlobFileId blob_id, BlobFileOffset offset, char * buffers, size_t size, const ReadLimiterPtr & read_limiter = nullptr, bool background = false);
 
     /**
diff --git a/dbms/src/Storages/Page/V3/spacemap/SpaceMap.cpp b/dbms/src/Storages/Page/V3/spacemap/SpaceMap.cpp
index 7ec89d174e1..0543ac82742 100644
--- a/dbms/src/Storages/Page/V3/spacemap/SpaceMap.cpp
+++ b/dbms/src/Storages/Page/V3/spacemap/SpaceMap.cpp
@@ -15,7 +15,6 @@
 #include <Core/Types.h>
 #include <IO/WriteHelpers.h>
 #include <Storages/Page/V3/spacemap/SpaceMap.h>
-#include <Storages/Page/V3/spacemap/SpaceMapBig.h>
 #include <Storages/Page/V3/spacemap/SpaceMapRBTree.h>
 #include <Storages/Page/V3/spacemap/SpaceMapSTDMap.h>
 #include <common/likely.h>
@@ -43,9 +42,6 @@ SpaceMapPtr SpaceMap::createSpaceMap(SpaceMapType type, UInt64 start, UInt64 end
     case SMAP64_STD_MAP:
         smap = STDMapSpaceMap::create(start, end);
         break;
-    case SMAP64_BIG:
-        smap = BigSpaceMap::create(start, end);
-        break;
     default:
         throw Exception(fmt::format("Invalid [type={}] to create spaceMap", static_cast<UInt8>(type)), ErrorCodes::LOGICAL_ERROR);
     }
@@ -60,11 +56,6 @@ SpaceMapPtr SpaceMap::createSpaceMap(SpaceMapType type, UInt64 start, UInt64 end
 
 bool SpaceMap::checkSpace(UInt64 offset, size_t size) const
 {
-    // If we used `SMAP64_BIG`, we won't check the space.
-    if (type == SMAP64_BIG)
-    {
-        return false;
-    }
     return (offset < start) || (offset > end) || (offset + size - 1 > end);
 }
 
diff --git a/dbms/src/Storages/Page/V3/spacemap/SpaceMap.h b/dbms/src/Storages/Page/V3/spacemap/SpaceMap.h
index d230b2f3e35..b219018a7ca 100644
--- a/dbms/src/Storages/Page/V3/spacemap/SpaceMap.h
+++ b/dbms/src/Storages/Page/V3/spacemap/SpaceMap.h
@@ -39,7 +39,6 @@ class SpaceMap
         SMAP64_INVALID = 0,
         SMAP64_RBTREE = 1,
         SMAP64_STD_MAP = 2,
-        SMAP64_BIG = 3 // support for writebatch bigger than blobstore.config.file_limit_size
     };
 
     /**
@@ -143,8 +142,6 @@ class SpaceMap
             return "RB-Tree";
         case SMAP64_STD_MAP:
             return "STD Map";
-        case SMAP64_BIG:
-            return "STD Big";
         default:
             return "Invalid";
         }
diff --git a/dbms/src/Storages/Page/V3/spacemap/SpaceMapBig.h b/dbms/src/Storages/Page/V3/spacemap/SpaceMapBig.h
deleted file mode 100644
index 81c2a5cb786..00000000000
--- a/dbms/src/Storages/Page/V3/spacemap/SpaceMapBig.h
+++ /dev/null
@@ -1,151 +0,0 @@
-// Copyright 2022 PingCAP, Ltd.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#pragma once
-#include <Common/Exception.h>
-#include <Storages/Page/V3/MapUtils.h>
-#include <Storages/Page/V3/spacemap/SpaceMap.h>
-#include <fmt/format.h>
-
-#include <ext/shared_ptr_helper.h>
-#include <map>
-
-namespace DB
-{
-namespace ErrorCodes
-{
-extern const int LOGICAL_ERROR;
-}
-
-namespace PS::V3
-{
-// A space map that is designed for holding only one large page data (size > blobstore.config.file_limit_size)
-class BigSpaceMap
-    : public SpaceMap
-    , public ext::SharedPtrHelper<BigSpaceMap>
-{
-public:
-    ~BigSpaceMap() override = default;
-
-    bool check(std::function<bool(size_t idx, UInt64 start, UInt64 end)> /*checker*/, size_t /*size*/) override
-    {
-        // Can't do check
-        return true;
-    }
-
-protected:
-    BigSpaceMap(UInt64 start, UInt64 end)
-        : SpaceMap(start, end, SMAP64_BIG)
-    {
-        if (start != 0)
-        {
-            throw Exception(fmt::format("[start={}] is not zero. We should not use [type=SMAP64_BIG] to do that.", start), //
-                            ErrorCodes::LOGICAL_ERROR);
-        }
-    }
-
-    String toDebugString() override
-    {
-        FmtBuffer fmt_buffer;
-        fmt_buffer.append("    BIG-Map entries status: \n");
-        fmt_buffer.fmtAppend("      Single Space start: 0 size : {}\n", size_in_used);
-
-        return fmt_buffer.toString();
-    }
-
-    std::pair<UInt64, UInt64> getSizes() const override
-    {
-        if (size_in_used == 0)
-        {
-            throw Exception("size_in_used is zero. it should not happend", //
-                            ErrorCodes::LOGICAL_ERROR);
-        }
-        return std::make_pair(size_in_used, size_in_used);
-    }
-
-    UInt64 getUsedBoundary() override
-    {
-        return end;
-    }
-
-    bool isMarkUnused(UInt64 /*offset*/, size_t /*length*/) override
-    {
-        return true;
-    }
-
-    bool markUsedImpl(UInt64 offset, size_t length) override
-    {
-        if (offset != 0)
-        {
-            throw Exception(fmt::format("[offset={}] is not zero. We should not use [type=SMAP64_BIG] to do that.", offset), //
-                            ErrorCodes::LOGICAL_ERROR);
-        }
-
-        if (length < end)
-        {
-            throw Exception(fmt::format("[length={}] less than [end={}]. We should not use [type=SMAP64_BIG] to do that.", length, end), //
-                            ErrorCodes::LOGICAL_ERROR);
-        }
-
-        size_in_used = length;
-
-        return true;
-    }
-
-    std::tuple<UInt64, UInt64, bool> searchInsertOffset(size_t size) override
-    {
-        if (size < end)
-        {
-            throw Exception(fmt::format("[size={}] less than [end={}]. We should not use [type=SMAP64_BIG] to do that.", size, end), //
-                            ErrorCodes::LOGICAL_ERROR);
-        }
-        size_in_used = size;
-
-        // offset : from start 0
-        // max_cap : will be 0
-        // is expansion : true(or false, both ok)
-        return std::make_tuple(0, 0, true);
-    }
-
-    UInt64 updateAccurateMaxCapacity() override
-    {
-        // Won't update max_cap.
-        return 0;
-    }
-
-    bool markFreeImpl(UInt64 offset, size_t length) override
-    {
-        if (length != size_in_used)
-        {
-            throw Exception(fmt::format("[length={}] less than [end={}]. We should not use [type=SMAP64_BIG] to do that.", length, end), //
-                            ErrorCodes::LOGICAL_ERROR);
-        }
-
-        if (offset != 0)
-        {
-            throw Exception(fmt::format("[offset={}] is not zero. We should not use [type=SMAP64_BIG] to do that.", offset), //
-                            ErrorCodes::LOGICAL_ERROR);
-        }
-
-        return true;
-    }
-
-private:
-    UInt64 size_in_used = 0;
-};
-
-using BigSpaceMapPtr = std::shared_ptr<BigSpaceMap>;
-
-}; // namespace PS::V3
-}; // namespace DB
\ No newline at end of file
diff --git a/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp b/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp
index a377461d13e..061d8966095 100644
--- a/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp
+++ b/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp
@@ -78,7 +78,7 @@ TEST_F(BlobStoreStatsTest, RestoreEmpty)
     ASSERT_TRUE(stats_copy.empty());
 
     EXPECT_EQ(stats.roll_id, 1);
-    EXPECT_NO_THROW(stats.createStat(stats.roll_id, stats.lock()));
+    EXPECT_NO_THROW(stats.createStat(stats.roll_id, config.file_limit_size, stats.lock()));
 }
 
 TEST_F(BlobStoreStatsTest, Restore)
@@ -91,8 +91,8 @@ try
 
     {
         const auto & lock = stats.lock();
-        stats.createStatNotChecking(file_id1, lock);
-        stats.createStatNotChecking(file_id2, lock);
+        stats.createStatNotChecking(file_id1, config.file_limit_size, lock);
+        stats.createStatNotChecking(file_id2, config.file_limit_size, lock);
     }
 
     {
@@ -138,11 +138,11 @@ try
 
     // This will throw exception since we try to create
     // a new file bigger than restored `roll_id`
-    EXPECT_ANY_THROW({ stats.createStat(14, stats.lock()); });
+    EXPECT_ANY_THROW({ stats.createStat(14, config.file_limit_size, stats.lock()); });
 
-    EXPECT_ANY_THROW({ stats.createStat(file_id1, stats.lock()); });
-    EXPECT_ANY_THROW({ stats.createStat(file_id2, stats.lock()); });
-    EXPECT_ANY_THROW({ stats.createStat(stats.roll_id + 1, stats.lock()); });
+    EXPECT_ANY_THROW({ stats.createStat(file_id1, config.file_limit_size, stats.lock()); });
+    EXPECT_ANY_THROW({ stats.createStat(file_id2, config.file_limit_size, stats.lock()); });
+    EXPECT_ANY_THROW({ stats.createStat(stats.roll_id + 1, config.file_limit_size, stats.lock()); });
 }
 CATCH
 
@@ -150,12 +150,12 @@ TEST_F(BlobStoreStatsTest, testStats)
 {
     BlobStats stats(logger, delegator, config);
 
-    auto stat = stats.createStat(0, stats.lock());
+    auto stat = stats.createStat(0, config.file_limit_size, stats.lock());
 
     ASSERT_TRUE(stat);
     ASSERT_TRUE(stat->smap);
-    stats.createStat(1, stats.lock());
-    stats.createStat(2, stats.lock());
+    stats.createStat(1, config.file_limit_size, stats.lock());
+    stats.createStat(2, config.file_limit_size, stats.lock());
 
     auto stats_copy = stats.getStats();
 
@@ -186,7 +186,7 @@ TEST_F(BlobStoreStatsTest, testStat)
     ASSERT_EQ(blob_file_id, 1);
     ASSERT_FALSE(stat);
 
-    stats.createStat(0, stats.lock());
+    stats.createStat(0, config.file_limit_size, stats.lock());
     std::tie(stat, blob_file_id) = stats.chooseStat(10, stats.lock());
     ASSERT_EQ(blob_file_id, INVALID_BLOBFILE_ID);
     ASSERT_TRUE(stat);
@@ -250,7 +250,7 @@ TEST_F(BlobStoreStatsTest, testFullStats)
 
     BlobStats stats(logger, delegator, config);
 
-    stat = stats.createStat(1, stats.lock());
+    stat = stats.createStat(1, config.file_limit_size, stats.lock());
     offset = stat->getPosFromStat(BLOBFILE_LIMIT_SIZE - 1, stats.lock());
     ASSERT_EQ(offset, 0);
 
@@ -269,7 +269,7 @@ TEST_F(BlobStoreStatsTest, testFullStats)
     ASSERT_FALSE(stat);
 
     // A new stat can use
-    stat = stats.createStat(blob_file_id, stats.lock());
+    stat = stats.createStat(blob_file_id, config.file_limit_size, stats.lock());
     offset = stat->getPosFromStat(100, stats.lock());
     ASSERT_EQ(offset, 0);
 
@@ -1413,7 +1413,7 @@ try
         wb.clear();
     }
 
-    // PUT page_id 51 into blob 2 range [0,500] , stat will be a BIG_BLOB in mem
+    // PUT page_id 51 into blob 2 range [0,500]
     {
         size_t size_500 = 500;
         char c_buff[size_500];
@@ -1431,7 +1431,6 @@ try
 
         // verify blobstat
         const auto & stat = blob_store.blob_stats.blobIdToStat(2);
-        ASSERT_TRUE(stat->isBigBlob());
         ASSERT_EQ(stat->sm_max_caps, 0);
         ASSERT_DOUBLE_EQ(stat->sm_valid_rate, 1.0);
         ASSERT_EQ(stat->sm_valid_size, 500);
@@ -1542,7 +1541,6 @@ try
 }
 CATCH
 
-
 TEST_F(BlobStoreTest, TestBigBlobRemove)
 try
 {
@@ -1566,14 +1564,15 @@ try
         const auto & gc_info = blob_store.getGCStats();
         ASSERT_TRUE(gc_info.empty());
 
+        ASSERT_EQ(getTotalStatsNum(blob_store.blob_stats.getStats()), 1);
         const auto & records = edit.getRecords();
         ASSERT_EQ(records.size(), 1);
         blob_store.remove({records[0].entry});
+        ASSERT_EQ(getTotalStatsNum(blob_store.blob_stats.getStats()), 0);
     }
 }
 CATCH
 
-
 TEST_F(BlobStoreTest, TestBigBlobRegisterPath)
 try
 {
@@ -1603,11 +1602,9 @@ try
         auto blob_store = BlobStore(getCurrentTestName(), file_provider, delegator, config_with_small_file_limit_size);
         blob_store.registerPaths();
 
-        const auto & stat = blob_store.blob_stats.blobIdToStat(1);
-        ASSERT_TRUE(stat->isBigBlob());
-
         blob_store.blob_stats.restoreByEntry(entry_from_write);
         blob_store.blob_stats.restore();
+        const auto & stat = blob_store.blob_stats.blobIdToStat(1);
         ASSERT_EQ(stat->sm_max_caps, 0);
         ASSERT_DOUBLE_EQ(stat->sm_valid_rate, 1.0);
         ASSERT_EQ(stat->sm_valid_size, 500);
@@ -1616,4 +1613,147 @@ try
 }
 CATCH
 
+TEST_F(BlobStoreTest, TestRestartWithSmallerFileLimitSize)
+try
+{
+    const auto file_provider = DB::tests::TiFlashTestEnv::getContext().getFileProvider();
+    PageId page_id = 50;
+
+    BlobStore::Config config_with_small_file_limit_size;
+    config_with_small_file_limit_size.file_limit_size = 800;
+
+    PageEntryV3 entry_from_write1;
+    PageEntryV3 entry_from_write2;
+    {
+        auto blob_store = BlobStore(getCurrentTestName(), file_provider, delegator, config_with_small_file_limit_size);
+        size_t size_500 = 500;
+        size_t size_200 = 200;
+        char c_buff1[size_500];
+        char c_buff2[size_200];
+
+        WriteBatch wb;
+        ReadBufferPtr buff1 = std::make_shared<ReadBufferFromMemory>(const_cast<char *>(c_buff1), size_500);
+        ReadBufferPtr buff2 = std::make_shared<ReadBufferFromMemory>(const_cast<char *>(c_buff2), size_200);
+        wb.putPage(page_id, /* tag */ 0, buff1, size_500);
+        wb.putPage(page_id + 1, /* tag */ 0, buff2, size_200);
+        auto edit = blob_store.write(wb, nullptr);
+        const auto & records = edit.getRecords();
+        ASSERT_EQ(records.size(), 2);
+        entry_from_write1 = records[0].entry;
+        entry_from_write2 = records[1].entry;
+        ASSERT_EQ(entry_from_write1.size, 500);
+        ASSERT_EQ(entry_from_write2.size, 200);
+
+        ASSERT_TRUE(blob_store.blob_stats.blobIdToStat(1)->isNormal());
+    }
+
+    config_with_small_file_limit_size.file_limit_size = 400;
+    {
+        auto blob_store = BlobStore(getCurrentTestName(), file_provider, delegator, config_with_small_file_limit_size);
+        blob_store.registerPaths();
+        blob_store.blob_stats.restoreByEntry(entry_from_write1);
+        blob_store.blob_stats.restoreByEntry(entry_from_write2);
+        blob_store.blob_stats.restore();
+        const auto & stat = blob_store.blob_stats.blobIdToStat(1);
+        ASSERT_EQ(stat->sm_max_caps, 0);
+        ASSERT_DOUBLE_EQ(stat->sm_valid_rate, 1.0);
+        ASSERT_EQ(stat->sm_valid_size, 700);
+        ASSERT_EQ(stat->sm_total_size, 700);
+
+        blob_store.remove({entry_from_write1});
+
+        // new write will create new blob file
+        size_t size_100 = 100;
+        char c_buff[size_100];
+
+        WriteBatch wb;
+        ReadBufferPtr buff = std::make_shared<ReadBufferFromMemory>(const_cast<char *>(c_buff), size_100);
+        wb.putPage(page_id, /* tag */ 0, buff, size_100);
+        auto edit = blob_store.write(wb, nullptr);
+        const auto & records = edit.getRecords();
+        ASSERT_EQ(records.size(), 1);
+        ASSERT_EQ(records[0].entry.file_id, 2);
+        ASSERT_EQ(getTotalStatsNum(blob_store.blob_stats.getStats()), 2);
+
+        // remove one shot blob file
+        blob_store.remove({entry_from_write2});
+        ASSERT_EQ(getTotalStatsNum(blob_store.blob_stats.getStats()), 1);
+    }
+}
+CATCH
+
+TEST_F(BlobStoreTest, TestBigBlobGC)
+try
+{
+    const auto file_provider = DB::tests::TiFlashTestEnv::getContext().getFileProvider();
+    PageId page_id1 = 50;
+    PageId page_id2 = 51;
+    PageId page_id3 = 52;
+
+    BlobStore::Config config_with_small_file_limit_size;
+    config_with_small_file_limit_size.file_limit_size = 800;
+
+    PageEntryV3 entry_from_write1;
+    PageEntryV3 entry_from_write2;
+    PageEntryV3 entry_from_write3;
+    {
+        auto blob_store = BlobStore(getCurrentTestName(), file_provider, delegator, config_with_small_file_limit_size);
+        size_t size_100 = 100;
+        size_t size_500 = 500;
+        size_t size_200 = 200;
+        char c_buff1[size_100];
+        char c_buff2[size_500];
+        char c_buff3[size_200];
+
+        WriteBatch wb;
+        ReadBufferPtr buff1 = std::make_shared<ReadBufferFromMemory>(const_cast<char *>(c_buff1), size_100);
+        ReadBufferPtr buff2 = std::make_shared<ReadBufferFromMemory>(const_cast<char *>(c_buff2), size_500);
+        ReadBufferPtr buff3 = std::make_shared<ReadBufferFromMemory>(const_cast<char *>(c_buff3), size_200);
+        wb.putPage(page_id1, /* tag */ 0, buff1, size_100);
+        wb.putPage(page_id2, /* tag */ 0, buff2, size_500);
+        wb.putPage(page_id3, /* tag */ 0, buff3, size_200);
+        auto edit = blob_store.write(wb, nullptr);
+        const auto & records = edit.getRecords();
+        ASSERT_EQ(records.size(), 3);
+        entry_from_write1 = records[0].entry;
+        entry_from_write2 = records[1].entry;
+        entry_from_write3 = records[2].entry;
+
+        ASSERT_TRUE(blob_store.blob_stats.blobIdToStat(1)->isNormal());
+    }
+
+    config_with_small_file_limit_size.file_limit_size = 400;
+    config_with_small_file_limit_size.heavy_gc_valid_rate = 0.99;
+    {
+        auto blob_store = BlobStore(getCurrentTestName(), file_provider, delegator, config_with_small_file_limit_size);
+        blob_store.registerPaths();
+
+        blob_store.blob_stats.restoreByEntry(entry_from_write1);
+        blob_store.blob_stats.restoreByEntry(entry_from_write2);
+        blob_store.blob_stats.restoreByEntry(entry_from_write3);
+        blob_store.blob_stats.restore();
+
+        Poco::File file1(blob_store.getBlobFile(1)->getPath());
+        ASSERT_EQ(file1.getSize(), 800);
+        blob_store.remove({entry_from_write3});
+        auto blob_need_gc = blob_store.getGCStats();
+        ASSERT_EQ(blob_need_gc.size(), 0);
+        ASSERT_EQ(file1.getSize(), 600);
+
+        blob_store.remove({entry_from_write1});
+
+        const auto & blob_need_gc2 = blob_store.getGCStats();
+        ASSERT_EQ(blob_need_gc2.size(), 1);
+        std::map<BlobFileId, PageIdAndVersionedEntries> gc_context;
+        PageIdAndVersionedEntries versioned_pageid_entries;
+        versioned_pageid_entries.emplace_back(page_id2, 1, entry_from_write2);
+        gc_context[1] = versioned_pageid_entries;
+        PageEntriesEdit gc_edit = blob_store.gc(gc_context, 500);
+        const auto & records = gc_edit.getRecords();
+        ASSERT_EQ(records.size(), 1);
+        ASSERT_EQ(records[0].entry.file_id, 2);
+        ASSERT_EQ(records[0].entry.size, 500);
+    }
+}
+CATCH
 } // namespace DB::PS::V3::tests
diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp
index 6d6ef41630f..6ad55ec3913 100644
--- a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp
+++ b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp
@@ -2252,8 +2252,8 @@ try
         BlobStore::BlobStats stats(log, delegator, BlobStore::Config{});
         {
             const auto & lock = stats.lock();
-            stats.createStatNotChecking(file_id1, lock);
-            stats.createStatNotChecking(file_id2, lock);
+            stats.createStatNotChecking(file_id1, BLOBFILE_LIMIT_SIZE, lock);
+            stats.createStatNotChecking(file_id2, BLOBFILE_LIMIT_SIZE, lock);
         }
         auto restored_dir = restore_from_edit(edit, stats);
         auto temp_snap = restored_dir->createSnapshot();
diff --git a/dbms/src/TestUtils/MockDiskDelegator.h b/dbms/src/TestUtils/MockDiskDelegator.h
index 1a7386e39c8..dd5447774ff 100644
--- a/dbms/src/TestUtils/MockDiskDelegator.h
+++ b/dbms/src/TestUtils/MockDiskDelegator.h
@@ -132,8 +132,12 @@ class MockDiskDelegatorMulti final : public PSDiskDelegator
         return paths;
     }
 
-    String choosePath(const PageFileIdAndLevel & /*id_lvl*/)
+    String choosePath(const PageFileIdAndLevel & id_lvl)
     {
+        if (page_path_map.find(id_lvl) != page_path_map.end())
+        {
+            return paths[page_path_map[id_lvl]];
+        }
         auto chosen = paths[choose_idx];
         choose_idx = (choose_idx + 1) % paths.size();
         return chosen;