Skip to content

Commit

Permalink
type
Browse files Browse the repository at this point in the history
  • Loading branch information
hx235 committed Dec 19, 2024
1 parent 1acd315 commit bc3ae2c
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 34 deletions.
8 changes: 6 additions & 2 deletions db/log_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@

#pragma once

#include <cstdint>

#include "rocksdb/rocksdb_namespace.h"

namespace ROCKSDB_NAMESPACE {
namespace log {

enum RecordType {
enum RecordType : uint8_t {
// Zero is reserved for preallocated files
kZeroType = 0,
kFullType = 1,
Expand All @@ -40,7 +42,9 @@ enum RecordType {
kUserDefinedTimestampSizeType = 10,
kRecyclableUserDefinedTimestampSizeType = 11,
};
constexpr int kMaxRecordType = kRecyclableUserDefinedTimestampSizeType;
// Unknown type of value with the 8-th bit set will be ignored
constexpr uint8_t kRecordTypeSafeIgnoreMask = 1 << 7;
constexpr uint8_t kMaxRecordType = kRecyclableUserDefinedTimestampSizeType;

constexpr unsigned int kBlockSize = 32768;

Expand Down
54 changes: 29 additions & 25 deletions db/log_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
while (true) {
uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
size_t drop_size = 0;
const unsigned int record_type =
const uint8_t record_type =
ReadPhysicalRecord(&fragment, &drop_size, record_checksum);
switch (record_type) {
case kFullType:
Expand Down Expand Up @@ -313,11 +313,13 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
break;

default: {
std::string reason =
"unknown record type " + std::to_string(record_type);
ReportCorruption(
(fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
reason.c_str());
if ((record_type & kRecordTypeSafeIgnoreMask) == 0) {
std::string reason =
"unknown record type " + std::to_string(record_type);
ReportCorruption(
(fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
reason.c_str());
}
in_fragmented_record = false;
scratch->clear();
break;
Expand Down Expand Up @@ -419,7 +421,7 @@ void Reader::ReportOldLogRecord(size_t bytes) {
}
}

bool Reader::ReadMore(size_t* drop_size, int* error) {
bool Reader::ReadMore(size_t* drop_size, uint8_t* error) {
if (!eof_ && !read_error_) {
// Last read was a full read, so this is a trailer to skip
buffer_.clear();
Expand Down Expand Up @@ -460,15 +462,15 @@ bool Reader::ReadMore(size_t* drop_size, int* error) {
}
}

unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size,
uint64_t* fragment_checksum) {
uint8_t Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size,
uint64_t* fragment_checksum) {
while (true) {
// We need at least the minimum header size
if (buffer_.size() < static_cast<size_t>(kHeaderSize)) {
// the default value of r is meaningless because ReadMore will overwrite
// it if it returns false; in case it returns true, the return value will
// not be used anyway
int r = kEof;
uint8_t r = kEof;
if (!ReadMore(drop_size, &r)) {
return r;
}
Expand All @@ -479,7 +481,7 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size,
const char* header = buffer_.data();
const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
const unsigned int type = header[6];
const uint8_t type = static_cast<uint8_t>(header[6]);
const uint32_t length = a | (b << 8);
int header_size = kHeaderSize;
const bool is_recyclable_type =
Expand All @@ -494,7 +496,7 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size,
recycled_ = true;
// We need enough for the larger header
if (buffer_.size() < static_cast<size_t>(kRecyclableHeaderSize)) {
int r = kEof;
uint8_t r = kEof;
if (!ReadMore(drop_size, &r)) {
return r;
}
Expand Down Expand Up @@ -651,7 +653,7 @@ bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
uint64_t prospective_record_offset = 0;
uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
size_t drop_size = 0;
unsigned int fragment_type_or_err = 0; // Initialize to make compiler happy
uint8_t fragment_type_or_err = 0; // Initialize to make compiler happy
Slice fragment;
while (TryReadFragment(&fragment, &drop_size, &fragment_type_or_err)) {
switch (fragment_type_or_err) {
Expand Down Expand Up @@ -781,11 +783,13 @@ bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
break;

default: {
std::string reason =
"unknown record type " + std::to_string(fragment_type_or_err);
ReportCorruption(
fragment.size() + (in_fragmented_record_ ? fragments_.size() : 0),
reason.c_str());
if ((fragment_type_or_err & kRecordTypeSafeIgnoreMask) == 0) {
std::string reason =
"unknown record type " + std::to_string(fragment_type_or_err);
ReportCorruption(
fragment.size() + (in_fragmented_record_ ? fragments_.size() : 0),
reason.c_str());
}
in_fragmented_record_ = false;
fragments_.clear();
break;
Expand All @@ -803,7 +807,7 @@ void FragmentBufferedReader::UnmarkEOF() {
UnmarkEOFInternal();
}

bool FragmentBufferedReader::TryReadMore(size_t* drop_size, int* error) {
bool FragmentBufferedReader::TryReadMore(size_t* drop_size, uint8_t* error) {
if (!eof_ && !read_error_) {
// Last read was a full read, so this is a trailer to skip
buffer_.clear();
Expand Down Expand Up @@ -844,15 +848,15 @@ bool FragmentBufferedReader::TryReadMore(size_t* drop_size, int* error) {
}

// return true if the caller should process the fragment_type_or_err.
bool FragmentBufferedReader::TryReadFragment(
Slice* fragment, size_t* drop_size, unsigned int* fragment_type_or_err) {
bool FragmentBufferedReader::TryReadFragment(Slice* fragment, size_t* drop_size,
uint8_t* fragment_type_or_err) {
assert(fragment != nullptr);
assert(drop_size != nullptr);
assert(fragment_type_or_err != nullptr);

while (buffer_.size() < static_cast<size_t>(kHeaderSize)) {
size_t old_size = buffer_.size();
int error = kEof;
uint8_t error = kEof;
if (!TryReadMore(drop_size, &error)) {
*fragment_type_or_err = error;
return false;
Expand All @@ -863,7 +867,7 @@ bool FragmentBufferedReader::TryReadFragment(
const char* header = buffer_.data();
const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
const unsigned int type = header[6];
const uint8_t type = static_cast<uint8_t>(header[6]);
const uint32_t length = a | (b << 8);
int header_size = kHeaderSize;
if ((type >= kRecyclableFullType && type <= kRecyclableLastType) ||
Expand All @@ -877,7 +881,7 @@ bool FragmentBufferedReader::TryReadFragment(
header_size = kRecyclableHeaderSize;
while (buffer_.size() < static_cast<size_t>(kRecyclableHeaderSize)) {
size_t old_size = buffer_.size();
int error = kEof;
uint8_t error = kEof;
if (!TryReadMore(drop_size, &error)) {
*fragment_type_or_err = error;
return false;
Expand All @@ -894,7 +898,7 @@ bool FragmentBufferedReader::TryReadFragment(

while (header_size + length > buffer_.size()) {
size_t old_size = buffer_.size();
int error = kEof;
uint8_t error = kEof;
if (!TryReadMore(drop_size, &error)) {
*fragment_type_or_err = error;
return false;
Expand Down
13 changes: 7 additions & 6 deletions db/log_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#pragma once
#include <stdint.h>

#include <cstdint>
#include <memory>
#include <unordered_map>
#include <vector>
Expand Down Expand Up @@ -171,7 +172,7 @@ class Reader {
UnorderedMap<uint32_t, size_t> recorded_cf_to_ts_sz_;

// Extend record types with the following special values
enum {
enum : uint8_t {
kEof = kMaxRecordType + 1,
// Returned whenever we find an invalid physical record.
// Currently there are three situations in which this happens:
Expand All @@ -192,11 +193,11 @@ class Reader {
// If WAL compressioned is enabled, fragment_checksum is the checksum of the
// fragment computed from the orginal buffer containinng uncompressed
// fragment.
unsigned int ReadPhysicalRecord(Slice* result, size_t* drop_size,
uint64_t* fragment_checksum = nullptr);
uint8_t ReadPhysicalRecord(Slice* result, size_t* drop_size,
uint64_t* fragment_checksum = nullptr);

// Read some more
bool ReadMore(size_t* drop_size, int* error);
bool ReadMore(size_t* drop_size, uint8_t* error);

void UnmarkEOFInternal();

Expand Down Expand Up @@ -232,9 +233,9 @@ class FragmentBufferedReader : public Reader {
bool in_fragmented_record_;

bool TryReadFragment(Slice* result, size_t* drop_size,
unsigned int* fragment_type_or_err);
uint8_t* fragment_type_or_err);

bool TryReadMore(size_t* drop_size, int* error);
bool TryReadMore(size_t* drop_size, uint8_t* error);

// No copy allowed
FragmentBufferedReader(const FragmentBufferedReader&);
Expand Down
11 changes: 11 additions & 0 deletions db/log_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,17 @@ TEST_P(LogTest, BadRecordType) {
ASSERT_EQ("OK", MatchError("unknown record type"));
}

TEST_P(LogTest, IgnorableRecordType) {
Write("foo");
// Type is stored in header[6]
SetByte(6, static_cast<char>(kRecordTypeSafeIgnoreMask + 1));
FixChecksum(0, 3, false);
ASSERT_EQ("EOF", Read());
// The new type has value 129 and masked to be ignorable if unknown
ASSERT_EQ(0U, DroppedBytes());
ASSERT_EQ("", ReportMessage());
}

TEST_P(LogTest, TruncatedTrailingRecordIsIgnored) {
Write("foo");
ShrinkSize(4); // Drop all payload as well as a header byte
Expand Down
2 changes: 1 addition & 1 deletion db/log_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Writer::Writer(std::unique_ptr<WritableFileWriter>&& dest, uint64_t log_number,
manual_flush_(manual_flush),
compression_type_(compression_type),
compress_(nullptr) {
for (int i = 0; i <= kMaxRecordType; i++) {
for (uint8_t i = 0; i <= kMaxRecordType; i++) {
char t = static_cast<char>(i);
type_crc_[i] = crc32c::Value(&t, 1);
}
Expand Down

0 comments on commit bc3ae2c

Please sign in to comment.