Skip to content

Commit

Permalink
[DWIO] refactor the reader of dwrf/orc (facebookincubator#261)
Browse files Browse the repository at this point in the history
Co-authored-by: zuochunwei <[email protected]>
  • Loading branch information
2 people authored and zhejiangxiaomai committed Jul 3, 2023
1 parent 8ae1d76 commit 9c331a0
Show file tree
Hide file tree
Showing 28 changed files with 1,073 additions and 369 deletions.
7 changes: 7 additions & 0 deletions velox/dwio/dwrf/common/Common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ std::string writerVersionToString(WriterVersion version) {
return folly::to<std::string>("future - ", version);
}

/* unused
std::string streamKindToString(StreamKind kind) {
switch (static_cast<int32_t>(kind)) {
case StreamKind_PRESENT:
Expand Down Expand Up @@ -63,6 +64,7 @@ std::string streamKindToString(StreamKind kind) {
}
return folly::to<std::string>("unknown - ", kind);
}
*/

std::string columnEncodingKindToString(ColumnEncodingKind kind) {
switch (static_cast<int32_t>(kind)) {
Expand All @@ -82,6 +84,11 @@ DwrfStreamIdentifier EncodingKey::forKind(const proto::Stream_Kind kind) const {
return DwrfStreamIdentifier(node, sequence, 0, kind);
}

DwrfStreamIdentifier EncodingKey::forKind(
const proto::orc::Stream_Kind kind) const {
return DwrfStreamIdentifier(node, sequence, 0, kind);
}

namespace {
using dwio::common::CompressionKind;

Expand Down
111 changes: 99 additions & 12 deletions velox/dwio/dwrf/common/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@

namespace facebook::velox::dwrf {

enum class DwrfFormat : uint8_t {
kDwrf = 0,
kOrc = 1,
};

// Writer version
constexpr folly::StringPiece WRITER_NAME_KEY{"orc.writer.name"};
constexpr folly::StringPiece WRITER_VERSION_KEY{"orc.writer.version"};
Expand All @@ -54,6 +59,7 @@ constexpr WriterVersion WriterVersion_CURRENT = WriterVersion::DWRF_7_0;
*/
std::string writerVersionToString(WriterVersion kind);

// Stream kind of dwrf.
enum StreamKind {
StreamKind_PRESENT = 0,
StreamKind_DATA = 1,
Expand All @@ -69,15 +75,40 @@ enum StreamKind {
StreamKind_IN_MAP = 11
};

// Stream kind of orc.
enum StreamKindOrc {
StreamKindOrc_PRESENT = 0,
StreamKindOrc_DATA = 1,
StreamKindOrc_LENGTH = 2,
StreamKindOrc_DICTIONARY_DATA = 3,
StreamKindOrc_DICTIONARY_COUNT = 4,
StreamKindOrc_SECONDARY = 5,
StreamKindOrc_ROW_INDEX = 6,
StreamKindOrc_BLOOM_FILTER = 7,
StreamKindOrc_BLOOM_FILTER_UTF8 = 8,
StreamKindOrc_ENCRYPTED_INDEX = 9,
StreamKindOrc_ENCRYPTED_DATA = 10,
StreamKindOrc_STRIPE_STATISTICS = 100,
StreamKindOrc_FILE_STATISTICS = 101,

StreamKindOrc_INVALID = -1
};

inline bool isIndexStream(StreamKind kind) {
return kind == StreamKind::StreamKind_ROW_INDEX ||
kind == StreamKind::StreamKind_BLOOM_FILTER_UTF8;
}

inline bool isIndexStream(StreamKindOrc kind) {
return kind == StreamKindOrc::StreamKindOrc_ROW_INDEX ||
kind == StreamKindOrc::StreamKindOrc_BLOOM_FILTER ||
kind == StreamKindOrc::StreamKindOrc_BLOOM_FILTER_UTF8;
}

/**
* Get the string representation of the StreamKind.
*/
std::string streamKindToString(StreamKind kind);
// std::string streamKindToString(StreamKind kind);

class StreamInformation {
public:
Expand All @@ -90,6 +121,12 @@ class StreamInformation {
virtual uint64_t getLength() const = 0;
virtual bool getUseVInts() const = 0;
virtual bool valid() const = 0;

// providing a default implementation otherwise leading to too much compiling
// errors
virtual StreamKindOrc getKindOrc() const {
return StreamKindOrc_INVALID;
}
};

enum ColumnEncodingKind {
Expand All @@ -100,21 +137,21 @@ enum ColumnEncodingKind {
};

class DwrfStreamIdentifier;

class EncodingKey {
public:
static const EncodingKey& getInvalid() {
static const EncodingKey INVALID;
return INVALID;
}

public:
uint32_t node;
uint32_t sequence;

EncodingKey()
: EncodingKey(dwio::common::MAX_UINT32, dwio::common::MAX_UINT32) {}

/* implicit */ EncodingKey(uint32_t n, uint32_t s = 0)
: node{n}, sequence{s} {}
uint32_t node;
uint32_t sequence;
EncodingKey(uint32_t n, uint32_t s = 0) : node{n}, sequence{s} {}

bool operator==(const EncodingKey& other) const {
return node == other.node && sequence == other.sequence;
Expand All @@ -133,6 +170,8 @@ class EncodingKey {
}

DwrfStreamIdentifier forKind(const proto::Stream_Kind kind) const;

DwrfStreamIdentifier forKind(const proto::orc::Stream_Kind kind) const;
};

struct EncodingKeyHash {
Expand All @@ -150,15 +189,24 @@ class DwrfStreamIdentifier : public dwio::common::StreamIdentifier {

public:
DwrfStreamIdentifier()
: column_(dwio::common::MAX_UINT32), kind_(StreamKind_DATA) {}
: column_(dwio::common::MAX_UINT32),
format_(DwrfFormat::kDwrf),
kind_(StreamKind_DATA) {}

/* implicit */ DwrfStreamIdentifier(const proto::Stream& stream)
DwrfStreamIdentifier(const proto::Stream& stream)
: DwrfStreamIdentifier(
stream.node(),
stream.has_sequence() ? stream.sequence() : 0,
stream.has_column() ? stream.column() : dwio::common::MAX_UINT32,
stream.kind()) {}

DwrfStreamIdentifier(const proto::orc::Stream& stream)
: DwrfStreamIdentifier(
stream.column(),
0,
dwio::common::MAX_UINT32,
stream.kind()) {}

DwrfStreamIdentifier(
uint32_t node,
uint32_t sequence,
Expand All @@ -167,9 +215,22 @@ class DwrfStreamIdentifier : public dwio::common::StreamIdentifier {
: StreamIdentifier(
velox::cache::TrackingId((node << kNodeShift) | kind).id()),
column_{column},
format_(DwrfFormat::kDwrf),
kind_(kind),
encodingKey_{node, sequence} {}

DwrfStreamIdentifier(
uint32_t node,
uint32_t sequence,
uint32_t column,
StreamKindOrc kind)
: StreamIdentifier(
velox::cache::TrackingId((node << kNodeShift) | kind).id()),
column_{column},
format_(DwrfFormat::kOrc),
kindOrc_(kind),
encodingKey_{node, sequence} {}

DwrfStreamIdentifier(
uint32_t node,
uint32_t sequence,
Expand All @@ -181,6 +242,17 @@ class DwrfStreamIdentifier : public dwio::common::StreamIdentifier {
column,
static_cast<StreamKind>(pkind)) {}

DwrfStreamIdentifier(
uint32_t node,
uint32_t sequence,
uint32_t column,
proto::orc::Stream_Kind pkind)
: DwrfStreamIdentifier(
node,
sequence,
column,
static_cast<StreamKindOrc>(pkind)) {}

~DwrfStreamIdentifier() = default;

bool operator==(const DwrfStreamIdentifier& other) const {
Expand All @@ -189,37 +261,52 @@ class DwrfStreamIdentifier : public dwio::common::StreamIdentifier {
return encodingKey_ == other.encodingKey_ && kind_ == other.kind_;
}

std::size_t hash() const {
std::size_t hash() const override {
return encodingKey_.hash() ^ std::hash<uint32_t>()(kind_);
}

uint32_t column() const {
return column_;
}

DwrfFormat format() const {
return format_;
}

const StreamKind& kind() const {
return kind_;
}

const StreamKindOrc& kindOrc() const {
return kindOrc_;
}

const EncodingKey& encodingKey() const {
return encodingKey_;
}

std::string toString() const {
std::string toString() const override {
return fmt::format(
"[id={}, node={}, sequence={}, column={}, kind={}]",
"[id={}, node={}, sequence={}, column={}, format={}, kind={}]",
id_,
encodingKey_.node,
encodingKey_.sequence,
column_,
(uint32_t)format_,
static_cast<uint32_t>(kind_));
}

private:
static constexpr int32_t kNodeShift = 5;

uint32_t column_;
StreamKind kind_;

DwrfFormat format_;
union {
StreamKind kind_; // format_ == kDwrf
StreamKindOrc kindOrc_; // format_ == kOrc
};

EncodingKey encodingKey_;
};

Expand Down
5 changes: 0 additions & 5 deletions velox/dwio/dwrf/common/FileMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@

namespace facebook::velox::dwrf {

enum class DwrfFormat : uint8_t {
kDwrf = 0,
kOrc = 1,
};

class ProtoWrapperBase {
protected:
ProtoWrapperBase(DwrfFormat format, const void* impl)
Expand Down
Loading

0 comments on commit 9c331a0

Please sign in to comment.