Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix pagestorage v3 ddl problem. #4691

Merged
merged 12 commits into from
Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion dbms/src/Storages/Page/PageDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ struct ByteBuffer
{
using Pos = char *;

ByteBuffer() = default;
ByteBuffer()
: begin_pos(nullptr)
, end_pos(nullptr)
{}

ByteBuffer(Pos begin_pos_, Pos end_pos_)
: begin_pos(begin_pos_)
, end_pos(end_pos_)
Expand Down
59 changes: 56 additions & 3 deletions dbms/src/Storages/Page/V3/BlobStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,21 @@ void BlobStore::read(PageIDAndEntriesV3 & entries, const PageHandler & handler,
for (const auto & p : entries)
buf_size = std::max(buf_size, p.second.size);

// When we read `WriteBatch` which is `WriteType::PUT_EXTERNAL`.
// The `buf_size` will be 0, we need avoid calling malloc/free with size 0.
if (buf_size == 0)
{
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved
for (const auto & [page_id_v3, entry] : entries)
{
(void)entry;
LOG_FMT_DEBUG(log, "Read entry [page_id={}] without entry size.", page_id_v3);
Page page;
page.page_id = page_id_v3.low;
handler(page_id_v3.low, page);
}
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved
return;
}

char * data_buf = static_cast<char *>(alloc(buf_size));
MemHolder mem_holder = createMemHolder(data_buf, [&, buf_size](char * p) {
free(p, buf_size);
Expand Down Expand Up @@ -418,11 +433,23 @@ PageMap BlobStore::read(FieldReadInfos & to_read, const ReadLimiterPtr & read_li
[](const FieldReadInfo & a, const FieldReadInfo & b) { return a.entry.offset < b.entry.offset; });

// allocate data_buf that can hold all pages with specify fields

size_t buf_size = 0;
for (auto & [page_id, entry, fields] : to_read)
{
(void)page_id;
buf_size += entry.size;
// Sort fields to get better read on disk
std::sort(fields.begin(), fields.end());
for (const auto field_index : fields)
{
buf_size += entry.getFieldSize(field_index);
}
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
}

// Read with `FieldReadInfos`, buf_size must not be 0.
if (buf_size == 0)
{
throw Exception("Reading with fields but entry size is 0.", ErrorCodes::LOGICAL_ERROR);
}

char * data_buf = static_cast<char *>(alloc(buf_size));
Expand Down Expand Up @@ -476,13 +503,13 @@ PageMap BlobStore::read(FieldReadInfos & to_read, const ReadLimiterPtr & read_li

Page page;
page.page_id = page_id_v3.low;
page.data = ByteBuffer(pos, pos + entry.size);
page.data = ByteBuffer(pos, write_offset);
page.mem_holder = mem_holder;
page.field_offsets.swap(fields_offset_in_page);
fields_offset_in_page.clear();
page_map.emplace(page_id_v3.low, std::move(page));

pos += entry.size;
pos = write_offset;
}

if (unlikely(pos != data_buf + buf_size))
Expand All @@ -509,6 +536,22 @@ PageMap BlobStore::read(PageIDAndEntriesV3 & entries, const ReadLimiterPtr & rea
buf_size += p.second.size;
}

// When we read `WriteBatch` which is `WriteType::PUT_EXTERNAL`.
// The `buf_size` will be 0, we need avoid calling malloc/free with size 0.
if (buf_size == 0)
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved
{
PageMap page_map;
for (const auto & [page_id_v3, entry] : entries)
{
(void)entry;
LOG_FMT_DEBUG(log, "Read entry [page_id={}] without entry size.", page_id_v3);
Page page;
page.page_id = page_id_v3.low;
page_map.emplace(page_id_v3.low, page);
}
return page_map;
}

char * data_buf = static_cast<char *>(alloc(buf_size));
MemHolder mem_holder = createMemHolder(data_buf, [&, buf_size](char * p) {
free(p, buf_size);
Expand Down Expand Up @@ -561,6 +604,16 @@ Page BlobStore::read(const PageIDAndEntryV3 & id_entry, const ReadLimiterPtr & r
const auto & [page_id_v3, entry] = id_entry;
const size_t buf_size = entry.size;

// When we read `WriteBatch` which is `WriteType::PUT_EXTERNAL`.
// The `buf_size` will be 0, we need avoid calling malloc/free with size 0.
if (buf_size == 0)
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved
{
LOG_FMT_DEBUG(log, "Read entry [page_id={}] without entry size.", page_id_v3);
Page page;
page.page_id = page_id_v3.low;
return page;
}
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved

char * data_buf = static_cast<char *>(alloc(buf_size));
MemHolder mem_holder = createMemHolder(data_buf, [&, buf_size](char * p) {
free(p, buf_size);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/LogFile/LogWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ void LogWriter::emitPhysicalRecord(Format::RecordType type, ReadBuffer & payload
static_assert(Format::RECYCLABLE_HEADER_SIZE > Format::CHECKSUM_FIELD_SIZE, "Header size must be greater than the checksum size");
static_assert(Format::RECYCLABLE_HEADER_SIZE > Format::HEADER_SIZE, "Ensure the min buffer size for physical record");
constexpr static size_t HEADER_BUFF_SIZE = Format::RECYCLABLE_HEADER_SIZE - Format::CHECKSUM_FIELD_SIZE;
char buf[HEADER_BUFF_SIZE];
char buf[HEADER_BUFF_SIZE] = {0};
WriteBuffer header_buff(buf, HEADER_BUFF_SIZE);

// Format the header
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,15 +322,15 @@ VersionedPageEntries::resolveToPageId(UInt64 seq, bool check_prev, PageEntryV3 *
else if (type == EditRecordType::VAR_EXTERNAL)
{
// We may add reference to an external id even if it is logically deleted.
bool ok = check_prev ? true : (!is_deleted || (is_deleted && seq < delete_ver.sequence));
bool ok = check_prev ? true : (!is_deleted || seq < delete_ver.sequence);
if (create_ver.sequence <= seq && ok)
{
return {RESOLVE_TO_NORMAL, buildV3Id(0, 0), PageVersionType(0)};
}
}
else if (type == EditRecordType::VAR_REF)
{
if (create_ver.sequence <= seq && (!is_deleted || (is_deleted && seq < delete_ver.sequence)))
if (create_ver.sequence <= seq && (!is_deleted || seq < delete_ver.sequence))
{
return {RESOLVE_TO_REF, ori_page_id, create_ver};
}
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/Page/V3/PageEntriesEdit.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ inline const char * typeToString(EditRecordType t)
return "VAR_EXT";
case EditRecordType::VAR_DELETE:
return "VAR_DEL";
default:
return "INVALID";
}
}

Expand Down Expand Up @@ -220,6 +222,7 @@ class PageEntriesEdit
EditRecord()
: page_id(0)
, ori_page_id(0)
, version(0, 0)
, being_ref_count(1)
{}
};
Expand Down
14 changes: 14 additions & 0 deletions dbms/src/Storages/Page/V3/PageEntry.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@ struct PageEntryV3
PageFieldOffsetChecksums field_offsets{};

public:
size_t getFieldSize(size_t index) const
{
if (unlikely(index >= field_offsets.size()))
throw Exception(fmt::format("Try to getFieldData of PageEntry [blob_id={}] with invalid [index={}] [fields size={}]",
file_id,
index,
field_offsets.size()),
ErrorCodes::LOGICAL_ERROR);
else if (index == field_offsets.size() - 1)
return size - field_offsets.back().first;
else
return field_offsets[index + 1].first - field_offsets[index].first;
}

// Return field{index} offsets: [begin, end) of page data.
std::pair<size_t, size_t> getFieldOffsets(size_t index) const
{
Expand Down
74 changes: 71 additions & 3 deletions dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,74 @@ TEST_F(BlobStoreTest, testWriteRead)
ASSERT_EQ(index, buff_nums);
}

TEST_F(BlobStoreTest, testWriteReadWithFiled)
try
{
const auto file_provider = DB::tests::TiFlashTestEnv::getContext().getFileProvider();

PageId page_id1 = 50;
PageId page_id2 = 51;
PageId page_id3 = 53;

size_t buff_size = 120;
WriteBatch wb;

auto blob_store = BlobStore(getCurrentTestName(), file_provider, delegator, config);
char c_buff[buff_size];

for (size_t j = 0; j < buff_size; ++j)
{
c_buff[j] = static_cast<char>(j & 0xff);
}

ReadBufferPtr buff1 = std::make_shared<ReadBufferFromMemory>(const_cast<char *>(c_buff), buff_size);
ReadBufferPtr buff2 = std::make_shared<ReadBufferFromMemory>(const_cast<char *>(c_buff), buff_size);
ReadBufferPtr buff3 = std::make_shared<ReadBufferFromMemory>(const_cast<char *>(c_buff), buff_size);
wb.putPage(page_id1, /* tag */ 0, buff1, buff_size, {20, 40, 40, 20});
wb.putPage(page_id2, /* tag */ 0, buff2, buff_size, {10, 50, 20, 20, 20});
wb.putPage(page_id3, /* tag */ 0, buff3, buff_size, {10, 5, 20, 20, 15, 5, 15, 30});
PageEntriesEdit edit = blob_store.write(wb, nullptr);
ASSERT_EQ(edit.size(), 3);

BlobStore::FieldReadInfo read_info1(buildV3Id(TEST_NAMESPACE_ID, page_id1), edit.getRecords()[0].entry, {0, 1, 2, 3});
BlobStore::FieldReadInfo read_info2(buildV3Id(TEST_NAMESPACE_ID, page_id2), edit.getRecords()[1].entry, {2, 4});
BlobStore::FieldReadInfo read_info3(buildV3Id(TEST_NAMESPACE_ID, page_id3), edit.getRecords()[2].entry, {1, 3});

BlobStore::FieldReadInfos read_infos = {read_info1, read_info2, read_info3};

const auto & page_map = blob_store.read(read_infos, nullptr);
ASSERT_EQ(page_map.size(), 3);

for (const auto & [pageid, page] : page_map)
{
if (pageid == page_id1)
{
ASSERT_EQ(page.page_id, page_id1);
ASSERT_EQ(page.data.size(), buff_size);
ASSERT_EQ(strncmp(page.data.begin(), c_buff, buff_size), 0);
}
else if (pageid == page_id2)
{
ASSERT_EQ(page.page_id, page_id2);
// the buffer size read is equal to the fields size we read
// field {2, 4}
ASSERT_EQ(page.data.size(), 40);
ASSERT_EQ(strncmp(page.data.begin(), &c_buff[60], 20), 0);
ASSERT_EQ(strncmp(&page.data.begin()[20], &c_buff[100], 20), 0);
}
else if (pageid == page_id3)
{
ASSERT_EQ(page.page_id, page_id3);
// the buffer size read is equal to the fields size we read
// field {1, 3}
ASSERT_EQ(page.data.size(), 25);
ASSERT_EQ(strncmp(page.data.begin(), &c_buff[10], 5), 0);
ASSERT_EQ(strncmp(&page.data.begin()[5], &c_buff[35], 20), 0);
}
}
}
CATCH

TEST_F(BlobStoreTest, testFeildOffsetWriteRead)
{
const auto file_provider = DB::tests::TiFlashTestEnv::getContext().getFileProvider();
Expand Down Expand Up @@ -651,8 +719,8 @@ try
const size_t buff_size = 1024;
WriteBatch wb;
{
char c_buff1[buff_size];
char c_buff2[buff_size];
char c_buff1[buff_size] = {0};
char c_buff2[buff_size] = {0};

for (size_t i = 0; i < buff_size; ++i)
{
Expand Down Expand Up @@ -777,7 +845,7 @@ TEST_F(BlobStoreTest, testWriteOutOfLimitSize)
config.file_limit_size = buff_size;

size_t buffer_sizes[] = {buff_size, buff_size - 1, buff_size / 2 + 1};
for (auto & buf_size : buffer_sizes)
for (const auto & buf_size : buffer_sizes)
{
auto blob_store = BlobStore(getCurrentTestName(), file_provider, delegator, config);

Expand Down
13 changes: 13 additions & 0 deletions dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,19 @@ try
}
CATCH

TEST_F(PageStorageTest, ReadNULL)
try
{
{
WriteBatch batch;
batch.putExternal(0, 0);
page_storage->write(std::move(batch));
}
const auto & page = page_storage->read(0);
ASSERT_EQ(page.data.begin(), nullptr);
}
CATCH

TEST_F(PageStorageTest, WriteMultipleBatchRead1)
try
{
Expand Down
44 changes: 29 additions & 15 deletions dbms/src/Storages/Page/WriteBatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,21 +203,35 @@ class WriteBatch : private boost::noncopyable

String toString() const
{
String str;
for (const auto & w : writes)
{
if (w.type == WriteType::PUT)
str += DB::toString(w.page_id) + ",";
else if (w.type == WriteType::REF)
str += DB::toString(w.page_id) + ">" + DB::toString(w.ori_page_id) + ",";
else if (w.type == WriteType::DEL)
str += "X" + DB::toString(w.page_id) + ",";
else if (w.type == WriteType::UPSERT)
str += "U" + DB::toString(w.page_id) + ",";
}
if (!str.empty())
str.erase(str.size() - 1);
return str;
FmtBuffer fmt_buffer;
fmt_buffer.joinStr(
writes.begin(),
writes.end(),
[this](const auto w, FmtBuffer & fb) {
switch (w.type)
{
case WriteType::PUT:
fb.fmtAppend("{}.{}", namespace_id, w.page_id);
break;
case WriteType::REF:
fb.fmtAppend("{}.{} > {}.{}", namespace_id, w.page_id, namespace_id, w.ori_page_id);
break;
case WriteType::DEL:
fb.fmtAppend("X{}.{}", namespace_id, w.page_id);
break;
case WriteType::UPSERT:
fb.fmtAppend("U{}.{}", namespace_id, w.page_id);
break;
case WriteType::PUT_EXTERNAL:
fb.fmtAppend("E{}.{}", namespace_id, w.page_id);
break;
default:
fb.fmtAppend("Unknow {}.{}", namespace_id, w.page_id);
break;
};
},
",");
return fmt_buffer.toString();
}

private:
Expand Down