Skip to content

Commit

Permalink
Fix wal may dump write diff type in same page_id (#4850)
Browse files Browse the repository at this point in the history
close #4849
  • Loading branch information
jiaqizho authored May 11, 2022
1 parent cf0dd86 commit ec4d058
Show file tree
Hide file tree
Showing 13 changed files with 203 additions and 64 deletions.
55 changes: 46 additions & 9 deletions dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ GlobalStoragePool::GlobalStoragePool(const PathPool & path_pool, Context & globa

void GlobalStoragePool::restore()
{
log_storage->restore();
data_storage->restore();
meta_storage->restore();
log_max_ids = log_storage->restore();
data_max_ids = data_storage->restore();
meta_max_ids = meta_storage->restore();

gc_handle = global_context.getBackgroundPool().addTask(
[this] {
Expand Down Expand Up @@ -187,21 +187,58 @@ StoragePool::StoragePool(NamespaceId ns_id_, const GlobalStoragePool & global_st
, data_storage_reader(ns_id, data_storage, nullptr)
, meta_storage_reader(ns_id, meta_storage, nullptr)
, global_context(global_ctx)
, v3_log_max_ids(global_storage_pool.getLogMaxIds())
, v3_data_max_ids(global_storage_pool.getDataMaxIds())
, v3_meta_max_ids(global_storage_pool.getMetaMaxIds())
{}

void StoragePool::restore()
{
// If the storage instances is not global, we need to initialize it by ourselves and add a gc task.
if (owned_storage)
{
log_storage->restore();
data_storage->restore();
meta_storage->restore();
auto log_max_ids = log_storage->restore();
auto data_max_ids = data_storage->restore();
auto meta_max_ids = meta_storage->restore();

assert(log_max_ids.size() == 1);
assert(data_max_ids.size() == 1);
assert(meta_max_ids.size() == 1);

max_log_page_id = log_max_ids[0];
max_data_page_id = data_max_ids[0];
max_meta_page_id = meta_max_ids[0];
}
else
{
if (const auto & it = v3_log_max_ids.find(ns_id); it == v3_log_max_ids.end())
{
max_log_page_id = 0;
}
else
{
max_log_page_id = it->second;
}

max_log_page_id = log_storage->getMaxId(ns_id);
max_data_page_id = data_storage->getMaxId(ns_id);
max_meta_page_id = meta_storage->getMaxId(ns_id);
if (const auto & it = v3_data_max_ids.find(ns_id); it == v3_data_max_ids.end())
{
max_data_page_id = 0;
}
else
{
max_data_page_id = it->second;
}

if (const auto & it = v3_meta_max_ids.find(ns_id); it == v3_meta_max_ids.end())
{
max_meta_page_id = 0;
}
else
{
max_meta_page_id = it->second;
}
}
// TODO: add a log to show max_*_page_id after mix mode pr ready.
}

StoragePool::~StoragePool()
Expand Down
24 changes: 24 additions & 0 deletions dbms/src/Storages/DeltaMerge/StoragePool.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,21 @@ class GlobalStoragePool : private boost::noncopyable
PageStoragePtr data() const { return data_storage; }
PageStoragePtr meta() const { return meta_storage; }

std::map<NamespaceId, PageId> getLogMaxIds() const
{
return log_max_ids;
}

std::map<NamespaceId, PageId> getDataMaxIds() const
{
return data_max_ids;
}

std::map<NamespaceId, PageId> getMetaMaxIds() const
{
return meta_max_ids;
}

private:
// TODO: maybe more frequent gc for GlobalStoragePool?
bool gc(const Settings & settings, const Seconds & try_gc_period = DELTA_MERGE_GC_PERIOD);
Expand All @@ -62,6 +77,10 @@ class GlobalStoragePool : private boost::noncopyable
PageStoragePtr data_storage;
PageStoragePtr meta_storage;

std::map<NamespaceId, PageId> log_max_ids;
std::map<NamespaceId, PageId> data_max_ids;
std::map<NamespaceId, PageId> meta_max_ids;

std::atomic<Timepoint> last_try_gc_time = Clock::now();

std::mutex mutex;
Expand Down Expand Up @@ -143,6 +162,11 @@ class StoragePool : private boost::noncopyable

Context & global_context;

// TBD: Will be replaced GlobalPathPoolPtr after mix mode ptr ready
std::map<NamespaceId, PageId> v3_log_max_ids;
std::map<NamespaceId, PageId> v3_data_max_ids;
std::map<NamespaceId, PageId> v3_meta_max_ids;

std::atomic<PageId> max_log_page_id = 0;
std::atomic<PageId> max_data_page_id = 0;
std::atomic<PageId> max_meta_page_id = 0;
Expand Down
15 changes: 7 additions & 8 deletions dbms/src/Storages/Page/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,16 @@ class PageStorage : private boost::noncopyable

virtual ~PageStorage() = default;

virtual void restore() = 0;
// Return the map[ns_id, max_page_id]
// The caller should ensure that it only allocate new id that is larger than `max_page_id`. Reusing the
// same ID for different kind of write (put/ref/put_external) would make PageStorage run into unexpected error.
//
// Note that for V2, we always return a map with only one element: <ns_id=0, max_id> cause V2 have no
// idea about ns_id.
virtual std::map<NamespaceId, PageId> restore() = 0;

virtual void drop() = 0;

virtual PageId getMaxId(NamespaceId ns_id) = 0;

virtual SnapshotPtr getSnapshot(const String & tracing_id) = 0;

// Get some statistics of all living snapshots and the oldest living snapshot.
Expand Down Expand Up @@ -362,11 +366,6 @@ class PageReader : private boost::noncopyable
return storage->read(ns_id, page_fields, read_limiter, snap);
}

PageId getMaxId() const
{
return storage->getMaxId(ns_id);
}

PageId getNormalPageId(PageId page_id) const
{
return storage->getNormalPageId(ns_id, page_id, snap);
Expand Down
18 changes: 7 additions & 11 deletions dbms/src/Storages/Page/V2/PageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ toConcreteSnapshot(const DB::PageStorage::SnapshotPtr & ptr)
return assert_cast<PageStorage::ConcreteSnapshotRawPtr>(ptr.get());
}

void PageStorage::restore()
std::map<NamespaceId, PageId> PageStorage::restore()
{
LOG_FMT_INFO(log, "{} begin to restore data from disk. [path={}] [num_writers={}]", storage_name, delegator->defaultPath(), write_files.size());

Expand Down Expand Up @@ -349,17 +349,13 @@ void PageStorage::restore()
#endif

statistics = restore_info;
{
auto snapshot = getConcreteSnapshot();
size_t num_pages = snapshot->version()->numPages();
LOG_FMT_INFO(log, "{} restore {} pages, write batch sequence: {}, {}", storage_name, num_pages, write_batch_seq, statistics.toString());
}
}

PageId PageStorage::getMaxId(NamespaceId /*ns_id*/)
{
std::lock_guard write_lock(write_mutex);
return versioned_page_entries.getSnapshot("")->version()->maxId();
auto snapshot = getConcreteSnapshot();
size_t num_pages = snapshot->version()->numPages();
LOG_FMT_INFO(log, "{} restore {} pages, write batch sequence: {}, {}", storage_name, num_pages, write_batch_seq, statistics.toString());

// Fixed namespace id 0
return {{0, snapshot->version()->maxId()}};
}

PageId PageStorage::getNormalPageIdImpl(NamespaceId /*ns_id*/, PageId page_id, SnapshotPtr snapshot, bool throw_on_not_exist)
Expand Down
4 changes: 1 addition & 3 deletions dbms/src/Storages/Page/V2/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,10 @@ class PageStorage : public DB::PageStorage
const FileProviderPtr & file_provider_);
~PageStorage() = default;

void restore() override;
std::map<NamespaceId, PageId> restore() override;

void drop() override;

PageId getMaxId(NamespaceId ns_id) override;

PageId getNormalPageIdImpl(NamespaceId ns_id, PageId page_id, SnapshotPtr snapshot, bool throw_on_not_exist) override;

DB::PageStorage::SnapshotPtr getSnapshot(const String & tracing_id) override;
Expand Down
28 changes: 28 additions & 0 deletions dbms/src/Storages/Page/V2/tests/gtest_page_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ class PageStorage_test : public DB::base::TiFlashStorageTestBasic
return storage;
}

std::pair<std::shared_ptr<PageStorage>, std::map<NamespaceId, PageId>> reopen()
{
auto delegator = path_pool->getPSDiskDelegatorSingle("log");
auto storage = std::make_shared<PageStorage>("test.t", delegator, config, file_provider);
auto max_ids = storage->restore();
return {storage, max_ids};
}


protected:
PageStorage::Config config;
std::shared_ptr<PageStorage> storage;
Expand Down Expand Up @@ -727,6 +736,25 @@ try
}
CATCH

TEST_F(PageStorage_test, getMaxIdsFromRestore)
try
{
{
WriteBatch batch;
batch.putExternal(1, 0);
batch.putExternal(2, 0);
batch.delPage(1);
batch.delPage(2);
storage->write(std::move(batch));
}

storage = nullptr;
auto [page_storage, max_ids] = reopen();
ASSERT_EQ(max_ids.size(), 1);
ASSERT_EQ(max_ids[0], 2);
}
CATCH

TEST_F(PageStorage_test, IgnoreIncompleteWriteBatch1)
try
{
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,15 @@ void PageDirectoryFactory::loadEdit(const PageDirectoryPtr & dir, const PageEntr

for (const auto & r : edit.getRecords())
{
if (auto it = max_apply_page_ids.find(r.page_id.high); it == max_apply_page_ids.end())
{
max_apply_page_ids[r.page_id.high] = r.page_id.low;
}
else
{
it->second = std::max(it->second, r.page_id.low);
}

if (max_applied_ver < r.version)
max_applied_ver = r.version;
max_applied_page_id = std::max(r.page_id, max_applied_page_id);
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/Page/V3/PageDirectoryFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,17 @@ class PageDirectoryFactory
return *this;
}

std::map<NamespaceId, PageId> getMaxApplyPageIds() const
{
return max_apply_page_ids;
}

private:
void loadFromDisk(const PageDirectoryPtr & dir, WALStoreReaderPtr && reader);
void loadEdit(const PageDirectoryPtr & dir, const PageEntriesEdit & edit);

BlobStore::BlobStats * blob_stats = nullptr;
std::map<NamespaceId, PageId> max_apply_page_ids;
};

} // namespace PS::V3
Expand Down
9 changes: 2 additions & 7 deletions dbms/src/Storages/Page/V3/PageStorageImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ PageStorageImpl::PageStorageImpl(

PageStorageImpl::~PageStorageImpl() = default;

void PageStorageImpl::restore()
std::map<NamespaceId, PageId> PageStorageImpl::restore()
{
// TODO: clean up blobstore.
// TODO: Speedup restoring
Expand All @@ -53,19 +53,14 @@ void PageStorageImpl::restore()
page_directory = factory
.setBlobStore(blob_store)
.create(storage_name, file_provider, delegator, parseWALConfig(config));
// factory.max_applied_page_id // TODO: return it to outer function
return factory.getMaxApplyPageIds();
}

void PageStorageImpl::drop()
{
throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED);
}

PageId PageStorageImpl::getMaxId(NamespaceId ns_id)
{
return page_directory->getMaxId(ns_id);
}

PageId PageStorageImpl::getNormalPageIdImpl(NamespaceId ns_id, PageId page_id, SnapshotPtr snapshot, bool throw_on_not_exist)
{
if (!snapshot)
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Storages/Page/V3/PageStorageImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,10 @@ class PageStorageImpl : public DB::PageStorage
return wal_config;
}

void restore() override;
std::map<NamespaceId, PageId> restore() override;

void drop() override;

PageId getMaxId(NamespaceId ns_id) override;

PageId getNormalPageIdImpl(NamespaceId ns_id, PageId page_id, SnapshotPtr snapshot, bool throw_on_not_exist) override;

Expand Down
46 changes: 46 additions & 0 deletions dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ class PageStorageTest : public DB::base::TiFlashStorageTestBasic
return storage;
}


std::pair<std::shared_ptr<PageStorageImpl>, std::map<NamespaceId, PageId>> reopen()
{
auto path = getTemporaryPath();
auto delegator = std::make_shared<DB::tests::MockDiskDelegatorSingle>(path);
auto storage = std::make_shared<PageStorageImpl>("test.t", delegator, config, file_provider);
auto max_ids = storage->restore();
return {storage, max_ids};
}

protected:
FileProviderPtr file_provider;
std::unique_ptr<StoragePathPool> path_pool;
Expand Down Expand Up @@ -1300,5 +1310,41 @@ try
}
CATCH

TEST_F(PageStorageTest, getMaxIdsFromRestore)
try
{
{
WriteBatch batch;
batch.putExternal(1, 0);
batch.putExternal(2, 0);
batch.delPage(1);
batch.delPage(2);
page_storage->write(std::move(batch));

WriteBatch batch2{TEST_NAMESPACE_ID + 1};
batch2.putExternal(1, 0);
batch2.putExternal(2, 0);
batch2.putRefPage(3, 1);
batch2.putRefPage(100, 2);
page_storage->write(std::move(batch2));

WriteBatch batch3{TEST_NAMESPACE_ID + 2};
batch3.putExternal(1, 0);
batch3.putExternal(2, 0);
batch3.putRefPage(3, 1);
batch3.putRefPage(10, 2);
batch3.delPage(10);
page_storage->write(std::move(batch3));
}

page_storage = nullptr;
auto [page_storage, max_ids] = reopen();
ASSERT_EQ(max_ids.size(), 3);
ASSERT_EQ(max_ids[TEST_NAMESPACE_ID], 2); // external page 2 is marked as deleted, but we can still restore it.
ASSERT_EQ(max_ids[TEST_NAMESPACE_ID + 1], 100);
ASSERT_EQ(max_ids[TEST_NAMESPACE_ID + 2], 10); // page 10 is marked as deleted, but we can still restore it.
}
CATCH

} // namespace PS::V3::tests
} // namespace DB
Loading

0 comments on commit ec4d058

Please sign in to comment.