Skip to content

Commit

Permalink
add UniversalPageStorage (#6723)
Browse files Browse the repository at this point in the history
ref #6728
  • Loading branch information
lidezhu authored Feb 7, 2023
1 parent 2830317 commit 584c992
Show file tree
Hide file tree
Showing 114 changed files with 2,794 additions and 1,186 deletions.
1 change: 1 addition & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ add_headers_and_sources(dbms src/Storages/Page/V3/WAL)
add_headers_and_sources(dbms src/Storages/Page/V3/spacemap)
add_headers_and_sources(dbms src/Storages/Page/V3/PageDirectory)
add_headers_and_sources(dbms src/Storages/Page/V3/Blob)
add_headers_and_sources(dbms src/Storages/Page/V3/Universal)
add_headers_and_sources(dbms src/Storages/Page/)
add_headers_and_sources(dbms src/TiDB)
add_headers_and_sources(dbms src/Client)
Expand Down
11 changes: 11 additions & 0 deletions dbms/src/IO/ReadBufferFromString.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <IO/ReadBufferFromMemory.h>
#include <common/types.h>


namespace DB
Expand All @@ -31,4 +32,14 @@ class ReadBufferFromString : public ReadBufferFromMemory
{}
};

class ReadBufferFromOwnString : public String
, public ReadBufferFromString
{
public:
explicit ReadBufferFromOwnString(std::string_view s_)
: String(s_)
, ReadBufferFromString(*this)
{}
};

} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/WriteBatches.h>
#include <Storages/Page/PageDefines.h>
#include <Storages/Page/PageDefinesBase.h>

namespace DB
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class ColumnFileBig : public ColumnFilePersisted

auto getFile() const { return file; }

PageId getDataPageId() { return file->pageId(); }
PageIdU64 getDataPageId() { return file->pageId(); }

size_t getRows() const override { return valid_rows; }
size_t getBytes() const override { return valid_bytes; };
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ Columns ColumnFileTiny::readFromDisk(const PageReader & page_reader, //

// Read the columns from disk and apply DDL cast if need
auto page_map = page_reader.read({fields});
Page page = page_map[data_page_id];
Page page = page_map.at(data_page_id);
for (size_t index = col_start; index < col_end; ++index)
{
const size_t index_in_read_columns = index - col_start;
Expand Down Expand Up @@ -160,7 +160,7 @@ ColumnFilePersistedPtr ColumnFileTiny::deserializeMetadata(const DMContext & con
if (unlikely(!schema))
throw Exception("Cannot deserialize DeltaPackBlock's schema", ErrorCodes::LOGICAL_ERROR);

PageId data_page_id;
PageIdU64 data_page_id;
size_t rows, bytes;

readIntBinary(data_page_id, buf);
Expand Down Expand Up @@ -213,7 +213,7 @@ ColumnFileTinyPtr ColumnFileTiny::writeColumnFile(const DMContext & context, con
return std::make_shared<ColumnFileTiny>(schema, limit, bytes, page_id, cache);
}

PageId ColumnFileTiny::writeColumnFileData(const DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs)
PageIdU64 ColumnFileTiny::writeColumnFileData(const DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs)
{
auto page_id = context.storage_pool.newLogPageId();

Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ColumnFileTiny : public ColumnFilePersisted
UInt64 bytes = 0;

/// The id of data page which stores the data of this pack.
PageId data_page_id;
PageIdU64 data_page_id;

/// The members below are not serialized.

Expand All @@ -62,7 +62,7 @@ class ColumnFileTiny : public ColumnFilePersisted
}

public:
ColumnFileTiny(const ColumnFileSchemaPtr & schema_, UInt64 rows_, UInt64 bytes_, PageId data_page_id_, const CachePtr & cache_ = nullptr)
ColumnFileTiny(const ColumnFileSchemaPtr & schema_, UInt64 rows_, UInt64 bytes_, PageIdU64 data_page_id_, const CachePtr & cache_ = nullptr)
: schema(schema_)
, rows(rows_)
, bytes(bytes_)
Expand All @@ -81,7 +81,7 @@ class ColumnFileTiny : public ColumnFilePersisted
/// The schema of this pack. Could be empty, i.e. a DeleteRange does not have a schema.
ColumnFileSchemaPtr getSchema() const { return schema; }

ColumnFileTinyPtr cloneWith(PageId new_data_page_id)
ColumnFileTinyPtr cloneWith(PageIdU64 new_data_page_id)
{
auto new_tiny_file = std::make_shared<ColumnFileTiny>(*this);
new_tiny_file->data_page_id = new_data_page_id;
Expand All @@ -98,13 +98,13 @@ class ColumnFileTiny : public ColumnFilePersisted

void serializeMetadata(WriteBuffer & buf, bool save_schema) const override;

PageId getDataPageId() const { return data_page_id; }
PageIdU64 getDataPageId() const { return data_page_id; }

Block readBlockForMinorCompaction(const PageReader & page_reader) const;

static ColumnFileTinyPtr writeColumnFile(const DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs, const CachePtr & cache = nullptr);

static PageId writeColumnFileData(const DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs);
static PageIdU64 writeColumnFileData(const DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs);

static ColumnFilePersistedPtr deserializeMetadata(const DMContext & context, ReadBuffer & buf, ColumnFileSchemaPtr & last_schema);

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile_V2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct ColumnFileV2
UInt64 bytes = 0;
BlockPtr schema;
RowKeyRange delete_range;
PageId data_page_id = 0;
PageIdU64 data_page_id = 0;

bool isDeleteRange() const { return !delete_range.none(); }
};
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/Delta/ColumnFileFlushTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/WriteBatches.h>
#include <Storages/Page/PageDefines.h>
#include <Storages/Page/PageDefinesBase.h>
#include <common/logger_useful.h>

namespace DB
Expand All @@ -47,7 +47,7 @@ class ColumnFileFlushTask
ColumnFilePtr column_file;

Block block_data;
PageId data_page = 0;
PageIdU64 data_page = 0;

bool sorted = false;
size_t rows_offset = 0;
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace DB
{
namespace DM
{
inline void serializeColumnFilePersisteds(WriteBatches & wbs, PageId id, const ColumnFilePersisteds & persisted_files)
inline void serializeColumnFilePersisteds(WriteBatches & wbs, PageIdU64 id, const ColumnFilePersisteds & persisted_files)
{
MemoryWriteBuffer buf(0, COLUMN_FILE_SERIALIZE_BUFFER_SIZE);
serializeSavedColumnFiles(buf, persisted_files);
Expand Down Expand Up @@ -76,7 +76,7 @@ void ColumnFilePersistedSet::checkColumnFiles(const ColumnFilePersisteds & new_c
}

ColumnFilePersistedSet::ColumnFilePersistedSet( //
PageId metadata_id_,
PageIdU64 metadata_id_,
const ColumnFilePersisteds & persisted_column_files)
: metadata_id(metadata_id_)
, persisted_files(persisted_column_files)
Expand All @@ -88,7 +88,7 @@ ColumnFilePersistedSet::ColumnFilePersistedSet( //
ColumnFilePersistedSetPtr ColumnFilePersistedSet::restore( //
DMContext & context,
const RowKeyRange & segment_range,
PageId id)
PageIdU64 id)
{
Page page = context.storage_pool.metaReader()->read(id);
ReadBufferFromMemory buf(page.data.begin(), page.data.size());
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
#include <Storages/DeltaMerge/DeltaTree.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/Page/PageDefines.h>
#include <Storages/Page/PageDefinesBase.h>
#include <fmt/format.h>

namespace DB
Expand All @@ -49,7 +49,7 @@ class ColumnFilePersistedSet : public std::enable_shared_from_this<ColumnFilePer
, private boost::noncopyable
{
private:
PageId metadata_id;
PageIdU64 metadata_id;
ColumnFilePersisteds persisted_files;
// TODO: check the proper memory_order when use this atomic variable
std::atomic<size_t> persisted_files_count = 0;
Expand All @@ -70,11 +70,11 @@ class ColumnFilePersistedSet : public std::enable_shared_from_this<ColumnFilePer
void checkColumnFiles(const ColumnFilePersisteds & new_column_files);

public:
explicit ColumnFilePersistedSet(PageId metadata_id_, const ColumnFilePersisteds & persisted_column_files = {});
explicit ColumnFilePersistedSet(PageIdU64 metadata_id_, const ColumnFilePersisteds & persisted_column_files = {});

/// Restore the metadata of this instance.
/// Only called after reboot.
static ColumnFilePersistedSetPtr restore(DMContext & context, const RowKeyRange & segment_range, PageId id);
static ColumnFilePersistedSetPtr restore(DMContext & context, const RowKeyRange & segment_range, PageIdU64 id);

/**
* Resets the logger by using the one from the segment.
Expand Down Expand Up @@ -123,7 +123,7 @@ class ColumnFilePersistedSet : public std::enable_shared_from_this<ColumnFilePer
ColumnFilePersisteds diffColumnFiles(const ColumnFiles & previous_column_files) const;

/// Thread safe part start
PageId getId() const { return metadata_id; }
PageIdU64 getId() const { return metadata_id; }

size_t getColumnFileCount() const { return persisted_files_count.load(); }
size_t getRows() const { return rows.load(); }
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace DM
// ================================================
// Public methods
// ================================================
DeltaValueSpace::DeltaValueSpace(PageId id_, const ColumnFilePersisteds & persisted_files, const ColumnFiles & in_memory_files)
DeltaValueSpace::DeltaValueSpace(PageIdU64 id_, const ColumnFilePersisteds & persisted_files, const ColumnFiles & in_memory_files)
: persisted_file_set(std::make_shared<ColumnFilePersistedSet>(id_, persisted_files))
, mem_table_set(std::make_shared<MemTableSet>(in_memory_files))
, delta_index(std::make_shared<DeltaIndex>())
Expand All @@ -55,7 +55,7 @@ void DeltaValueSpace::abandon(DMContext & context)
manager->deleteRef(delta_index);
}

DeltaValueSpacePtr DeltaValueSpace::restore(DMContext & context, const RowKeyRange & segment_range, PageId id)
DeltaValueSpacePtr DeltaValueSpace::restore(DMContext & context, const RowKeyRange & segment_range, PageIdU64 id)
{
auto persisted_file_set = ColumnFilePersistedSet::restore(context, segment_range, id);
return std::make_shared<DeltaValueSpace>(std::move(persisted_file_set));
Expand Down Expand Up @@ -114,7 +114,7 @@ std::vector<ColumnFilePtrT> CloneColumnFilesHelper<ColumnFilePtrT>::clone(
else if (auto * t = column_file->tryToTinyFile(); t)
{
// Use a newly created page_id to reference the data page_id of current column file.
PageId new_data_page_id = context.storage_pool.newLogPageId();
PageIdU64 new_data_page_id = context.storage_pool.newLogPageId();
wbs.log.putRefPage(new_data_page_id, t->getDataPageId());
auto new_column_file = t->cloneWith(new_data_page_id);
cloned.push_back(new_column_file);
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
#include <Storages/DeltaMerge/DeltaTree.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/Page/PageDefines.h>
#include <Storages/Page/PageDefinesBase.h>

namespace DB
{
namespace DM
{
using GenPageId = std::function<PageId()>;
using GenPageId = std::function<PageIdU64()>;
class DeltaValueSpace;
class DeltaValueSnapshot;

Expand Down Expand Up @@ -100,13 +100,13 @@ class DeltaValueSpace
LoggerPtr log;

public:
explicit DeltaValueSpace(PageId id_, const ColumnFilePersisteds & persisted_files = {}, const ColumnFiles & in_memory_files = {});
explicit DeltaValueSpace(PageIdU64 id_, const ColumnFilePersisteds & persisted_files = {}, const ColumnFiles & in_memory_files = {});

explicit DeltaValueSpace(ColumnFilePersistedSetPtr && persisted_file_set_);

/// Restore the metadata of this instance.
/// Only called after reboot.
static DeltaValueSpacePtr restore(DMContext & context, const RowKeyRange & segment_range, PageId id);
static DeltaValueSpacePtr restore(DMContext & context, const RowKeyRange & segment_range, PageIdU64 id);

/**
* Resets the logger by using the one from the segment.
Expand Down Expand Up @@ -167,7 +167,7 @@ class DeltaValueSpace
const RowKeyRange & target_range,
WriteBatches & wbs) const;

PageId getId() const { return persisted_file_set->getId(); }
PageIdU64 getId() const { return persisted_file_set->getId(); }

size_t getColumnFileCount() const { return persisted_file_set->getColumnFileCount() + mem_table_set->getColumnFileCount(); }
size_t getRows(bool use_unsaved = true) const
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DeltaIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/DeltaTree.h>
#include <Storages/Page/PageDefines.h>
#include <Storages/Page/PageDefinesBase.h>

namespace DB
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ void DeltaMergeStore::dropAllSegments(bool keep_first_segment)
{
std::unique_lock lock(read_write_mutex);
auto segment_id = DELTA_MERGE_FIRST_SEGMENT_ID;
std::stack<PageId> segment_ids;
std::stack<PageIdU64> segment_ids;
while (segment_id != 0)
{
segment_ids.push(segment_id);
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ namespace tests
class DeltaMergeStoreTest;
}

inline static const PageId DELTA_MERGE_FIRST_SEGMENT_ID = 1;
inline static const PageIdU64 DELTA_MERGE_FIRST_SEGMENT_ID = 1;

struct SegmentStats
{
Expand Down Expand Up @@ -168,7 +168,7 @@ class DeltaMergeStore : private boost::noncopyable
static Settings EMPTY_SETTINGS;

using SegmentSortedMap = std::map<RowKeyValueRef, SegmentPtr, std::less<>>;
using SegmentMap = std::unordered_map<PageId, SegmentPtr>;
using SegmentMap = std::unordered_map<PageIdU64, SegmentPtr>;

enum ThreadType
{
Expand Down Expand Up @@ -269,9 +269,9 @@ class DeltaMergeStore : private boost::noncopyable

void deleteRange(const Context & db_context, const DB::Settings & db_settings, const RowKeyRange & delete_range);

std::tuple<String, PageId> preAllocateIngestFile();
std::tuple<String, PageIdU64> preAllocateIngestFile();

void preIngestFile(const String & parent_path, PageId file_id, size_t file_size);
void preIngestFile(const String & parent_path, PageIdU64 file_id, size_t file_size);

/// You must ensure external files are ordered and do not overlap. Otherwise exceptions will be thrown.
/// You must ensure all of the external files are contained by the range. Otherwise exceptions will be thrown.
Expand Down
5 changes: 2 additions & 3 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ extern const char force_ingest_via_replace[];

namespace DM
{

std::tuple<String, PageId> DeltaMergeStore::preAllocateIngestFile()
std::tuple<String, PageIdU64> DeltaMergeStore::preAllocateIngestFile()
{
if (shutdown_called.load(std::memory_order_relaxed))
return {};
Expand All @@ -59,7 +58,7 @@ std::tuple<String, PageId> DeltaMergeStore::preAllocateIngestFile()
return {parent_path, new_id};
}

void DeltaMergeStore::preIngestFile(const String & parent_path, const PageId file_id, size_t file_size)
void DeltaMergeStore::preIngestFile(const String & parent_path, const PageIdU64 file_id, size_t file_size)
{
if (shutdown_called.load(std::memory_order_relaxed))
return;
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class LocalDMFileGcScanner final
options.only_list_can_gc = true;
for (auto & root_path : delegate.listPaths())
{
std::set<PageId> ids_under_path;
std::set<PageIdU64> ids_under_path;
auto file_ids_in_current_path = DMFile::listAllInPath(file_provider, root_path, options);
path_and_ids_vec.emplace_back(root_path, std::move(file_ids_in_current_path));
}
Expand All @@ -104,7 +104,7 @@ class LocalDMFileGcRemover final
, logger(std::move(log))
{}

void operator()(const ExternalPageCallbacks::PathAndIdsVec & path_and_ids_vec, const std::set<PageId> & valid_ids)
void operator()(const ExternalPageCallbacks::PathAndIdsVec & path_and_ids_vec, const std::set<PageIdU64> & valid_ids)
{
// If the StoragePathPool is invalid or shutdown flag is set,
// meaning we call `remover` after shutdowning or dropping the table,
Expand Down Expand Up @@ -177,7 +177,7 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context)
// that callbacks is called after the `DeltaMergeStore` shutdown or dropped,
// we must make the callbacks safe.
ExternalPageCallbacks callbacks;
callbacks.ns_id = storage_pool->getNamespaceId();
callbacks.prefix = storage_pool->getNamespaceId();
callbacks.scanner = LocalDMFileGcScanner(std::weak_ptr<StoragePathPool>(path_pool), global_context.getFileProvider());
callbacks.remover = LocalDMFileGcRemover(std::weak_ptr<StoragePathPool>(path_pool), global_context.getFileProvider(), log);
// remember to unregister it when shutdown
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/ExternalDTFileInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#pragma once

#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/Page/PageDefines.h>
#include <Storages/Page/PageDefinesBase.h>
#include <fmt/core.h>

namespace DB::DM
Expand All @@ -26,7 +26,7 @@ struct ExternalDTFileInfo
/**
* The allocated PageId of the file.
*/
PageId id;
PageIdU64 id;

/**
* The handle range of contained data.
Expand Down
Loading

0 comments on commit 584c992

Please sign in to comment.