diff --git a/velox/dwio/dwrf/common/Common.cpp b/velox/dwio/dwrf/common/Common.cpp index 38142546bc95..0137e0ccaa57 100644 --- a/velox/dwio/dwrf/common/Common.cpp +++ b/velox/dwio/dwrf/common/Common.cpp @@ -36,6 +36,7 @@ std::string writerVersionToString(WriterVersion version) { return folly::to("future - ", version); } +/* unused std::string streamKindToString(StreamKind kind) { switch (static_cast(kind)) { case StreamKind_PRESENT: @@ -63,6 +64,7 @@ std::string streamKindToString(StreamKind kind) { } return folly::to("unknown - ", kind); } +*/ std::string columnEncodingKindToString(ColumnEncodingKind kind) { switch (static_cast(kind)) { @@ -82,6 +84,11 @@ DwrfStreamIdentifier EncodingKey::forKind(const proto::Stream_Kind kind) const { return DwrfStreamIdentifier(node, sequence, 0, kind); } +DwrfStreamIdentifier EncodingKey::forKind( + const proto::orc::Stream_Kind kind) const { + return DwrfStreamIdentifier(node, sequence, 0, kind); +} + namespace { using dwio::common::CompressionKind; diff --git a/velox/dwio/dwrf/common/Common.h b/velox/dwio/dwrf/common/Common.h index 0efa71ff39a0..2fcb0ec30394 100644 --- a/velox/dwio/dwrf/common/Common.h +++ b/velox/dwio/dwrf/common/Common.h @@ -29,6 +29,11 @@ namespace facebook::velox::dwrf { +enum class DwrfFormat : uint8_t { + kDwrf = 0, + kOrc = 1, +}; + // Writer version constexpr folly::StringPiece WRITER_NAME_KEY{"orc.writer.name"}; constexpr folly::StringPiece WRITER_VERSION_KEY{"orc.writer.version"}; @@ -54,6 +59,7 @@ constexpr WriterVersion WriterVersion_CURRENT = WriterVersion::DWRF_7_0; */ std::string writerVersionToString(WriterVersion kind); +// Stream kind of dwrf. enum StreamKind { StreamKind_PRESENT = 0, StreamKind_DATA = 1, @@ -69,15 +75,40 @@ enum StreamKind { StreamKind_IN_MAP = 11 }; +// Stream kind of orc. +enum StreamKindOrc { + StreamKindOrc_PRESENT = 0, + StreamKindOrc_DATA = 1, + StreamKindOrc_LENGTH = 2, + StreamKindOrc_DICTIONARY_DATA = 3, + StreamKindOrc_DICTIONARY_COUNT = 4, + StreamKindOrc_SECONDARY = 5, + StreamKindOrc_ROW_INDEX = 6, + StreamKindOrc_BLOOM_FILTER = 7, + StreamKindOrc_BLOOM_FILTER_UTF8 = 8, + StreamKindOrc_ENCRYPTED_INDEX = 9, + StreamKindOrc_ENCRYPTED_DATA = 10, + StreamKindOrc_STRIPE_STATISTICS = 100, + StreamKindOrc_FILE_STATISTICS = 101, + + StreamKindOrc_INVALID = -1 +}; + inline bool isIndexStream(StreamKind kind) { return kind == StreamKind::StreamKind_ROW_INDEX || kind == StreamKind::StreamKind_BLOOM_FILTER_UTF8; } +inline bool isIndexStream(StreamKindOrc kind) { + return kind == StreamKindOrc::StreamKindOrc_ROW_INDEX || + kind == StreamKindOrc::StreamKindOrc_BLOOM_FILTER || + kind == StreamKindOrc::StreamKindOrc_BLOOM_FILTER_UTF8; +} + /** * Get the string representation of the StreamKind. */ -std::string streamKindToString(StreamKind kind); +// std::string streamKindToString(StreamKind kind); class StreamInformation { public: @@ -90,6 +121,12 @@ class StreamInformation { virtual uint64_t getLength() const = 0; virtual bool getUseVInts() const = 0; virtual bool valid() const = 0; + + // providing a default implementation otherwise leading to too much compiling + // errors + virtual StreamKindOrc getKindOrc() const { + return StreamKindOrc_INVALID; + } }; enum ColumnEncodingKind { @@ -100,6 +137,7 @@ enum ColumnEncodingKind { }; class DwrfStreamIdentifier; + class EncodingKey { public: static const EncodingKey& getInvalid() { @@ -107,14 +145,13 @@ class EncodingKey { return INVALID; } - public: + uint32_t node; + uint32_t sequence; + EncodingKey() : EncodingKey(dwio::common::MAX_UINT32, dwio::common::MAX_UINT32) {} - /* implicit */ EncodingKey(uint32_t n, uint32_t s = 0) - : node{n}, sequence{s} {} - uint32_t node; - uint32_t sequence; + EncodingKey(uint32_t n, uint32_t s = 0) : node{n}, sequence{s} {} bool operator==(const EncodingKey& other) const { return node == other.node && sequence == other.sequence; @@ -133,6 +170,8 @@ class EncodingKey { } DwrfStreamIdentifier forKind(const proto::Stream_Kind kind) const; + + DwrfStreamIdentifier forKind(const proto::orc::Stream_Kind kind) const; }; struct EncodingKeyHash { @@ -150,15 +189,24 @@ class DwrfStreamIdentifier : public dwio::common::StreamIdentifier { public: DwrfStreamIdentifier() - : column_(dwio::common::MAX_UINT32), kind_(StreamKind_DATA) {} + : column_(dwio::common::MAX_UINT32), + format_(DwrfFormat::kDwrf), + kind_(StreamKind_DATA) {} - /* implicit */ DwrfStreamIdentifier(const proto::Stream& stream) + DwrfStreamIdentifier(const proto::Stream& stream) : DwrfStreamIdentifier( stream.node(), stream.has_sequence() ? stream.sequence() : 0, stream.has_column() ? stream.column() : dwio::common::MAX_UINT32, stream.kind()) {} + DwrfStreamIdentifier(const proto::orc::Stream& stream) + : DwrfStreamIdentifier( + stream.column(), + 0, + dwio::common::MAX_UINT32, + stream.kind()) {} + DwrfStreamIdentifier( uint32_t node, uint32_t sequence, @@ -167,9 +215,22 @@ class DwrfStreamIdentifier : public dwio::common::StreamIdentifier { : StreamIdentifier( velox::cache::TrackingId((node << kNodeShift) | kind).id()), column_{column}, + format_(DwrfFormat::kDwrf), kind_(kind), encodingKey_{node, sequence} {} + DwrfStreamIdentifier( + uint32_t node, + uint32_t sequence, + uint32_t column, + StreamKindOrc kind) + : StreamIdentifier( + velox::cache::TrackingId((node << kNodeShift) | kind).id()), + column_{column}, + format_(DwrfFormat::kOrc), + kindOrc_(kind), + encodingKey_{node, sequence} {} + DwrfStreamIdentifier( uint32_t node, uint32_t sequence, @@ -181,6 +242,17 @@ class DwrfStreamIdentifier : public dwio::common::StreamIdentifier { column, static_cast(pkind)) {} + DwrfStreamIdentifier( + uint32_t node, + uint32_t sequence, + uint32_t column, + proto::orc::Stream_Kind pkind) + : DwrfStreamIdentifier( + node, + sequence, + column, + static_cast(pkind)) {} + ~DwrfStreamIdentifier() = default; bool operator==(const DwrfStreamIdentifier& other) const { @@ -189,7 +261,7 @@ class DwrfStreamIdentifier : public dwio::common::StreamIdentifier { return encodingKey_ == other.encodingKey_ && kind_ == other.kind_; } - std::size_t hash() const { + std::size_t hash() const override { return encodingKey_.hash() ^ std::hash()(kind_); } @@ -197,21 +269,30 @@ class DwrfStreamIdentifier : public dwio::common::StreamIdentifier { return column_; } + DwrfFormat format() const { + return format_; + } + const StreamKind& kind() const { return kind_; } + const StreamKindOrc& kindOrc() const { + return kindOrc_; + } + const EncodingKey& encodingKey() const { return encodingKey_; } - std::string toString() const { + std::string toString() const override { return fmt::format( - "[id={}, node={}, sequence={}, column={}, kind={}]", + "[id={}, node={}, sequence={}, column={}, format={}, kind={}]", id_, encodingKey_.node, encodingKey_.sequence, column_, + (uint32_t)format_, static_cast(kind_)); } @@ -219,7 +300,13 @@ class DwrfStreamIdentifier : public dwio::common::StreamIdentifier { static constexpr int32_t kNodeShift = 5; uint32_t column_; - StreamKind kind_; + + DwrfFormat format_; + union { + StreamKind kind_; // format_ == kDwrf + StreamKindOrc kindOrc_; // format_ == kOrc + }; + EncodingKey encodingKey_; }; diff --git a/velox/dwio/dwrf/common/FileMetadata.h b/velox/dwio/dwrf/common/FileMetadata.h index 983ddecfe9e5..d5b6546091f0 100644 --- a/velox/dwio/dwrf/common/FileMetadata.h +++ b/velox/dwio/dwrf/common/FileMetadata.h @@ -25,11 +25,6 @@ namespace facebook::velox::dwrf { -enum class DwrfFormat : uint8_t { - kDwrf = 0, - kOrc = 1, -}; - class ProtoWrapperBase { protected: ProtoWrapperBase(DwrfFormat format, const void* impl) diff --git a/velox/dwio/dwrf/reader/ColumnReader.cpp b/velox/dwio/dwrf/reader/ColumnReader.cpp index a4d15bc65224..f98196d53fc0 100644 --- a/velox/dwio/dwrf/reader/ColumnReader.cpp +++ b/velox/dwio/dwrf/reader/ColumnReader.cpp @@ -85,6 +85,19 @@ inline RleVersion convertRleVersion(proto::ColumnEncoding_Kind kind) { } } +inline RleVersion convertRleVersion(proto::orc::ColumnEncoding_Kind kind) { + switch (static_cast(kind)) { + case proto::orc::ColumnEncoding_Kind_DIRECT: + case proto::orc::ColumnEncoding_Kind_DICTIONARY: + return RleVersion_1; + case proto::orc::ColumnEncoding_Kind_DIRECT_V2: + case proto::orc::ColumnEncoding_Kind_DICTIONARY_V2: + return RleVersion_2; + default: + DWIO_RAISE("Unknown encoding in convertRleVersion"); + } +} + template FlatVector* resetIfWrongFlatVectorType(VectorPtr& result) { return detail::resetIfWrongVectorType>(result); @@ -139,8 +152,16 @@ ColumnReader::ColumnReader( memoryPool_(stripe.getMemoryPool()), flatMapContext_(std::move(flatMapContext)) { EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence}; - std::unique_ptr stream = - stripe.getStream(encodingKey.forKind(proto::Stream_Kind_PRESENT), false); + + DwrfStreamIdentifier id; + if (stripe.format() == DwrfFormat::kDwrf) { + id = encodingKey.forKind(proto::Stream_Kind_PRESENT); + } else { + VELOX_CHECK(stripe.format() == DwrfFormat::kOrc); + id = encodingKey.forKind(proto::orc::Stream_Kind_PRESENT); + } + + auto stream = stripe.getStream(id, false); if (stream) { notNullDecoder_ = createBooleanRleDecoder(std::move(stream), encodingKey); } @@ -208,10 +229,18 @@ class ByteRleColumnReader : public ColumnReader { : ColumnReader(std::move(nodeType), stripe, std::move(flatMapContext)), requestedType_{std::move(requestedType)} { EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence}; - rle = creator( - stripe.getStream(encodingKey.forKind(proto::Stream_Kind_DATA), true), - encodingKey); + DwrfStreamIdentifier id; + + if (stripe.format() == DwrfFormat::kDwrf) { + id = encodingKey.forKind(proto::Stream_Kind_DATA); + } else { + VELOX_CHECK(stripe.format() == DwrfFormat::kOrc); + id = encodingKey.forKind(proto::orc::Stream_Kind_DATA); + } + + rle = creator(stripe.getStream(id, true), encodingKey); } + ~ByteRleColumnReader() override = default; uint64_t skip(uint64_t numValues) override; @@ -382,16 +411,21 @@ IntegerDirectColumnReader::IntegerDirectColumnReader( : ColumnReader(std::move(nodeType), stripe, std::move(flatMapContext)), requestedType_{std::move(requestedType)} { EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence}; - auto data = encodingKey.forKind(proto::Stream_Kind_DATA); - bool dataVInts = stripe.getUseVInts(data); + if (stripe.format() == DwrfFormat::kDwrf) { + auto data = encodingKey.forKind(proto::Stream_Kind_DATA); ints = createDirectDecoder( - stripe.getStream(data, true), dataVInts, numBytes); + stripe.getStream(data, true), stripe.getUseVInts(data), numBytes); } else { - auto encoding = stripe.getEncoding(encodingKey); - RleVersion vers = convertRleVersion(encoding.kind()); + auto data = encodingKey.forKind(proto::orc::Stream_Kind_DATA); + auto encoding = stripe.getEncodingOrc(encodingKey); + auto vers = convertRleVersion(encoding.kind()); ints = createRleDecoder( - stripe.getStream(data, true), vers, memoryPool_, dataVInts, numBytes); + stripe.getStream(data, true), + vers, + memoryPool_, + stripe.getUseVInts(data), + numBytes); } } @@ -513,6 +547,7 @@ IntegerDictionaryColumnReader::IntegerDictionaryColumnReader( FlatMapContext flatMapContext) : ColumnReader(std::move(nodeType), stripe, std::move(flatMapContext)), requestedType_{std::move(requestedType)} { + VELOX_CHECK(stripe.format() == DwrfFormat::kDwrf); EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence}; auto encoding = stripe.getEncoding(encodingKey); dictionarySize = encoding.dictionarysize(); @@ -630,22 +665,33 @@ TimestampColumnReader::TimestampColumnReader( FlatMapContext flatMapContext) : ColumnReader(std::move(nodeType), stripe, std::move(flatMapContext)) { EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence}; - RleVersion vers = convertRleVersion(stripe.getEncoding(encodingKey).kind()); - auto data = encodingKey.forKind(proto::Stream_Kind_DATA); - bool vints = stripe.getUseVInts(data); + + RleVersion vers; + DwrfStreamIdentifier data, nanoData; + + if (stripe.format() == DwrfFormat::kDwrf) { + vers = convertRleVersion(stripe.getEncoding(encodingKey).kind()); + data = encodingKey.forKind(proto::Stream_Kind_DATA); + nanoData = encodingKey.forKind(proto::Stream_Kind_NANO_DATA); + } else { + VELOX_CHECK(stripe.format() == DwrfFormat::kOrc); + vers = convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); + data = encodingKey.forKind(proto::orc::Stream_Kind_DATA); + nanoData = encodingKey.forKind(proto::orc::Stream_Kind_SECONDARY); + } + seconds = createRleDecoder( stripe.getStream(data, true), vers, memoryPool_, - vints, + stripe.getUseVInts(data), dwio::common::LONG_BYTE_SIZE); - auto nanoData = encodingKey.forKind(proto::Stream_Kind_NANO_DATA); - bool nanoVInts = stripe.getUseVInts(nanoData); + nano = createRleDecoder( stripe.getStream(nanoData, true), vers, memoryPool_, - nanoVInts, + stripe.getUseVInts(nanoData), dwio::common::LONG_BYTE_SIZE); } @@ -772,10 +818,16 @@ FloatingPointColumnReader::FloatingPointColumnReader( FlatMapContext flatMapContext) : ColumnReader(std::move(nodeType), stripe, std::move(flatMapContext)), requestedType_{std::move(requestedType)}, - inputStream(stripe.getStream( - EncodingKey{nodeType_->id, flatMapContext_.sequence}.forKind( - proto::Stream_Kind_DATA), - true)), + inputStream( + stripe.format() == DwrfFormat::kDwrf + ? stripe.getStream( + EncodingKey{nodeType_->id, flatMapContext_.sequence} + .forKind(proto::Stream_Kind_DATA), + true) + : stripe.getStream( + EncodingKey{nodeType_->id, flatMapContext_.sequence} + .forKind(proto::orc::Stream_Kind_DATA), + true)), bufferPointer(nullptr), bufferEnd(nullptr) { // PASS @@ -929,6 +981,100 @@ class StringDictionaryColumnReader : public ColumnReader { void ensureInitialized(); + void initOrc(StripeStreams& stripe) { + EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence}; + RleVersion rleVersion = + convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); + dictionaryCount = stripe.getEncodingOrc(encodingKey).dictionarysize(); + + const auto dataId = encodingKey.forKind(proto::orc::Stream_Kind_DATA); + bool dictVInts = stripe.getUseVInts(dataId); + dictIndex = createRleDecoder( + stripe.getStream(dataId, true), + rleVersion, + memoryPool_, + dictVInts, + dwio::common::INT_BYTE_SIZE); + + const auto lenId = encodingKey.forKind(proto::orc::Stream_Kind_LENGTH); + bool lenVInts = stripe.getUseVInts(lenId); + lengthDecoder = createRleDecoder( + stripe.getStream(lenId, false), + rleVersion, + memoryPool_, + lenVInts, + dwio::common::INT_BYTE_SIZE); + + blobStream = stripe.getStream( + encodingKey.forKind(proto::orc::Stream_Kind_DICTIONARY_DATA), false); + } + + void initDwrf(StripeStreams& stripe) { + EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence}; + RleVersion rleVersion = + convertRleVersion(stripe.getEncoding(encodingKey).kind()); + dictionaryCount = stripe.getEncoding(encodingKey).dictionarysize(); + + const auto dataId = encodingKey.forKind(proto::Stream_Kind_DATA); + bool dictVInts = stripe.getUseVInts(dataId); + dictIndex = createRleDecoder( + stripe.getStream(dataId, true), + rleVersion, + memoryPool_, + dictVInts, + dwio::common::INT_BYTE_SIZE); + + const auto lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); + bool lenVInts = stripe.getUseVInts(lenId); + lengthDecoder = createRleDecoder( + stripe.getStream(lenId, false), + rleVersion, + memoryPool_, + lenVInts, + dwio::common::INT_BYTE_SIZE); + + blobStream = stripe.getStream( + encodingKey.forKind(proto::Stream_Kind_DICTIONARY_DATA), false); + + // handle in dictionary stream + std::unique_ptr inDictStream = + stripe.getStream( + encodingKey.forKind(proto::Stream_Kind_IN_DICTIONARY), false); + if (inDictStream) { + inDictionaryReader = + createBooleanRleDecoder(std::move(inDictStream), encodingKey); + + // stride dictionary only exists if in dictionary exists + strideDictStream = stripe.getStream( + encodingKey.forKind(proto::Stream_Kind_STRIDE_DICTIONARY), true); + DWIO_ENSURE_NOT_NULL(strideDictStream, "Stride dictionary is missing"); + + indexStream_ = stripe.getStream( + encodingKey.forKind(proto::Stream_Kind_ROW_INDEX), true); + DWIO_ENSURE_NOT_NULL(indexStream_, "String index is missing"); + + const auto strideDictLenId = + encodingKey.forKind(proto::Stream_Kind_STRIDE_DICTIONARY_LENGTH); + bool strideLenVInt = stripe.getUseVInts(strideDictLenId); + strideDictLengthDecoder = createRleDecoder( + stripe.getStream(strideDictLenId, true), + rleVersion, + memoryPool_, + strideLenVInt, + dwio::common::INT_BYTE_SIZE); + } + } + + void init(StripeStreams& stripe) { + auto format = stripe.format(); + if (format == DwrfFormat::kDwrf) { + initDwrf(stripe); + } else { + VELOX_CHECK(format == DwrfFormat::kOrc); + initOrc(stripe); + } + } + public: StringDictionaryColumnReader( std::shared_ptr nodeType, @@ -950,59 +1096,7 @@ StringDictionaryColumnReader::StringDictionaryColumnReader( lastStrideIndex(-1), provider(stripe.getStrideIndexProvider()), returnFlatVector_(stripe.getRowReaderOptions().getReturnFlatVector()) { - EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence}; - RleVersion rleVersion = - convertRleVersion(stripe.getEncoding(encodingKey).kind()); - dictionaryCount = stripe.getEncoding(encodingKey).dictionarysize(); - - const auto dataId = encodingKey.forKind(proto::Stream_Kind_DATA); - bool dictVInts = stripe.getUseVInts(dataId); - dictIndex = createRleDecoder( - stripe.getStream(dataId, true), - rleVersion, - memoryPool_, - dictVInts, - dwio::common::INT_BYTE_SIZE); - - const auto lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); - bool lenVInts = stripe.getUseVInts(lenId); - lengthDecoder = createRleDecoder( - stripe.getStream(lenId, false), - rleVersion, - memoryPool_, - lenVInts, - dwio::common::INT_BYTE_SIZE); - - blobStream = stripe.getStream( - encodingKey.forKind(proto::Stream_Kind_DICTIONARY_DATA), false); - - // handle in dictionary stream - std::unique_ptr inDictStream = - stripe.getStream( - encodingKey.forKind(proto::Stream_Kind_IN_DICTIONARY), false); - if (inDictStream) { - inDictionaryReader = - createBooleanRleDecoder(std::move(inDictStream), encodingKey); - - // stride dictionary only exists if in dictionary exists - strideDictStream = stripe.getStream( - encodingKey.forKind(proto::Stream_Kind_STRIDE_DICTIONARY), true); - DWIO_ENSURE_NOT_NULL(strideDictStream, "Stride dictionary is missing"); - - indexStream_ = stripe.getStream( - encodingKey.forKind(proto::Stream_Kind_ROW_INDEX), true); - DWIO_ENSURE_NOT_NULL(indexStream_, "String index is missing"); - - const auto strideDictLenId = - encodingKey.forKind(proto::Stream_Kind_STRIDE_DICTIONARY_LENGTH); - bool strideLenVInt = stripe.getUseVInts(strideDictLenId); - strideDictLengthDecoder = createRleDecoder( - stripe.getStream(strideDictLenId, true), - rleVersion, - memoryPool_, - strideLenVInt, - dwio::common::INT_BYTE_SIZE); - } + init(stripe); } uint64_t StringDictionaryColumnReader::skip(uint64_t numValues) { @@ -1435,18 +1529,31 @@ StringDirectColumnReader::StringDirectColumnReader( FlatMapContext flatMapContext) : ColumnReader(std::move(nodeType), stripe, std::move(flatMapContext)) { EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence}; - RleVersion rleVersion = - convertRleVersion(stripe.getEncoding(encodingKey).kind()); - auto lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); - bool lenVInts = stripe.getUseVInts(lenId); + + RleVersion rleVersion; + DwrfStreamIdentifier lenId; + + if (stripe.format() == DwrfFormat::kDwrf) { + rleVersion = convertRleVersion(stripe.getEncoding(encodingKey).kind()); + lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); + + blobStream = + stripe.getStream(encodingKey.forKind(proto::Stream_Kind_DATA), true); + } else { + VELOX_CHECK(stripe.format() == DwrfFormat::kOrc); + rleVersion = convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); + lenId = encodingKey.forKind(proto::orc::Stream_Kind_LENGTH); + + blobStream = stripe.getStream( + encodingKey.forKind(proto::orc::Stream_Kind_DATA), true); + } + length = createRleDecoder( stripe.getStream(lenId, true), rleVersion, memoryPool_, - lenVInts, + stripe.getUseVInts(lenId), dwio::common::INT_BYTE_SIZE); - blobStream = - stripe.getStream(encodingKey.forKind(proto::Stream_Kind_DATA), true); } uint64_t StringDirectColumnReader::skip(uint64_t numValues) { @@ -1594,11 +1701,23 @@ StructColumnReader::StructColumnReader( requestedType_{requestedType} { DWIO_ENSURE_EQ(nodeType_->id, dataType->id, "working on the same node"); EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence}; - auto encoding = static_cast(stripe.getEncoding(encodingKey).kind()); - DWIO_ENSURE_EQ( - encoding, - proto::ColumnEncoding_Kind_DIRECT, - "Unknown encoding for StructColumnReader"); + + if (stripe.format() == DwrfFormat::kDwrf) { + auto encoding = + static_cast(stripe.getEncoding(encodingKey).kind()); + DWIO_ENSURE_EQ( + encoding, + proto::ColumnEncoding_Kind_DIRECT, + "Unknown dwrf encoding for StructColumnReader"); + } else { + VELOX_CHECK(stripe.format() == DwrfFormat::kOrc); + auto encoding = + static_cast(stripe.getEncodingOrc(encodingKey).kind()); + DWIO_ENSURE_EQ( + encoding, + proto::orc::ColumnEncoding_Kind_DIRECT, + "Unknown orc encoding for StructColumnReader"); + } // count the number of selected sub-columns const auto& cs = stripe.getColumnSelector(); @@ -1727,16 +1846,26 @@ ListColumnReader::ListColumnReader( requestedType_{requestedType} { DWIO_ENSURE_EQ(nodeType_->id, dataType->id, "working on the same node"); EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence}; - // count the number of selected sub-columns - RleVersion vers = convertRleVersion(stripe.getEncoding(encodingKey).kind()); - auto lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); - bool vints = stripe.getUseVInts(lenId); + RleVersion vers; + DwrfStreamIdentifier lenId; + + if (stripe.format() == DwrfFormat::kDwrf) { + // Count the number of selected sub-columns. + vers = convertRleVersion(stripe.getEncoding(encodingKey).kind()); + lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); + } else { + VELOX_CHECK(stripe.format() == DwrfFormat::kOrc); + // Count the number of selected sub-columns. + vers = convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); + lenId = encodingKey.forKind(proto::orc::Stream_Kind_LENGTH); + } + length = createRleDecoder( stripe.getStream(lenId, true), vers, memoryPool_, - vints, + stripe.getUseVInts(lenId), dwio::common::INT_BYTE_SIZE); const auto& cs = stripe.getColumnSelector(); @@ -1889,16 +2018,26 @@ MapColumnReader::MapColumnReader( requestedType_{requestedType} { DWIO_ENSURE_EQ(nodeType_->id, dataType->id, "working on the same node"); EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence}; - // Determine if the key and/or value columns are selected - RleVersion vers = convertRleVersion(stripe.getEncoding(encodingKey).kind()); - auto lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); - bool vints = stripe.getUseVInts(lenId); + RleVersion vers; + DwrfStreamIdentifier lenId; + + if (stripe.format() == DwrfFormat::kDwrf) { + // Determine if the key and/or value columns are selected. + vers = convertRleVersion(stripe.getEncoding(encodingKey).kind()); + lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); + } else { + VELOX_CHECK(stripe.format() == DwrfFormat::kOrc); + // Determine if the key and/or value columns are selected. + vers = convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); + lenId = encodingKey.forKind(proto::orc::Stream_Kind_LENGTH); + } + length = createRleDecoder( stripe.getStream(lenId, true), vers, memoryPool_, - vints, + stripe.getUseVInts(lenId), dwio::common::INT_BYTE_SIZE); const auto& cs = stripe.getColumnSelector(); @@ -2135,17 +2274,13 @@ std::unique_ptr buildIntegerReader( FlatMapContext flatMapContext, StripeStreams& stripe) { EncodingKey ek{nodeType->id, flatMapContext.sequence}; - switch (static_cast(stripe.getEncoding(ek).kind())) { - case proto::ColumnEncoding_Kind_DICTIONARY: - case proto::ColumnEncoding_Kind_DICTIONARY_V2: - return buildTypedIntegerColumnReader( - nodeType, requestedType, std::move(flatMapContext), stripe, numBytes); - case proto::ColumnEncoding_Kind_DIRECT: - case proto::ColumnEncoding_Kind_DIRECT_V2: - return buildTypedIntegerColumnReader( - nodeType, requestedType, std::move(flatMapContext), stripe, numBytes); - default: - DWIO_RAISE("buildReader unhandled string encoding"); + + if (stripe.isColumnEncodingKindDirect(ek)) { + return buildTypedIntegerColumnReader( + nodeType, requestedType, std::move(flatMapContext), stripe, numBytes); + } else { + return buildTypedIntegerColumnReader( + nodeType, requestedType, std::move(flatMapContext), stripe, numBytes); } } @@ -2180,19 +2315,15 @@ std::unique_ptr ColumnReader::build( std::move(flatMapContext), stripe); case TypeKind::VARBINARY: - case TypeKind::VARCHAR: - switch (static_cast(stripe.getEncoding(ek).kind())) { - case proto::ColumnEncoding_Kind_DICTIONARY: - case proto::ColumnEncoding_Kind_DICTIONARY_V2: - return std::make_unique( - dataType, stripe, std::move(flatMapContext)); - case proto::ColumnEncoding_Kind_DIRECT: - case proto::ColumnEncoding_Kind_DIRECT_V2: - return std::make_unique( - dataType, stripe, std::move(flatMapContext)); - default: - DWIO_RAISE("buildReader unhandled string encoding"); + case TypeKind::VARCHAR: { + if (stripe.isColumnEncodingKindDirect(ek)) { + return std::make_unique( + dataType, stripe, std::move(flatMapContext)); + } else { + return std::make_unique( + dataType, stripe, std::move(flatMapContext)); } + } case TypeKind::BOOLEAN: return buildByteRleColumnReader( dataType, requestedType->type, stripe, std::move(flatMapContext)); @@ -2202,14 +2333,18 @@ std::unique_ptr ColumnReader::build( case TypeKind::ARRAY: return std::make_unique( requestedType, dataType, stripe, std::move(flatMapContext)); - case TypeKind::MAP: - if (stripe.getEncoding(ek).kind() == - proto::ColumnEncoding_Kind_MAP_FLAT) { - return FlatMapColumnReaderFactory::create( - requestedType, dataType, stripe, std::move(flatMapContext)); + case TypeKind::MAP: { + if (stripe.format() == DwrfFormat::kDwrf) { + if (stripe.getEncoding(ek).kind() == + proto::ColumnEncoding_Kind_MAP_FLAT) { + return FlatMapColumnReaderFactory::create( + requestedType, dataType, stripe, std::move(flatMapContext)); + } } + return std::make_unique( requestedType, dataType, stripe, std::move(flatMapContext)); + } case TypeKind::ROW: return std::make_unique( requestedType, dataType, stripe, std::move(flatMapContext)); diff --git a/velox/dwio/dwrf/reader/DwrfData.cpp b/velox/dwio/dwrf/reader/DwrfData.cpp index ca431dc474ae..4d991cf602a9 100644 --- a/velox/dwio/dwrf/reader/DwrfData.cpp +++ b/velox/dwio/dwrf/reader/DwrfData.cpp @@ -20,15 +20,19 @@ namespace facebook::velox::dwrf { -DwrfData::DwrfData( - std::shared_ptr nodeType, - StripeStreams& stripe, - FlatMapContext flatMapContext) - : memoryPool_(stripe.getMemoryPool()), - nodeType_(std::move(nodeType)), - flatMapContext_(std::move(flatMapContext)), - rowsPerRowGroup_{stripe.rowsPerRowGroup()} { +void DwrfData::init(StripeStreams& stripe) { + auto format = stripe.format(); + if (format == DwrfFormat::kDwrf) { + initDwrf(stripe); + } else { + VELOX_CHECK(format == DwrfFormat::kOrc); + initOrc(stripe); + } +} + +void DwrfData::initDwrf(StripeStreams& stripe) { EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence}; + std::unique_ptr stream = stripe.getStream(encodingKey.forKind(proto::Stream_Kind_PRESENT), false); if (stream) { @@ -44,6 +48,35 @@ DwrfData::DwrfData( encodingKey.forKind(proto::Stream_Kind_ROW_INDEX), false); } +void DwrfData::initOrc(StripeStreams& stripe) { + EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence}; + + std::unique_ptr stream = stripe.getStream( + encodingKey.forKind(proto::orc::Stream_Kind_PRESENT), false); + if (stream) { + notNullDecoder_ = createBooleanRleDecoder(std::move(stream), encodingKey); + } + + // We always initialize indexStream_ because indices are needed as + // soon as there is a single filter that can trigger row group skips + // anywhere in the reader tree. This is not known at construct time + // because the first filter can come from a hash join or other run + // time pushdown. + indexStream_ = stripe.getStream( + encodingKey.forKind(proto::orc::Stream_Kind_ROW_INDEX), false); +} + +DwrfData::DwrfData( + std::shared_ptr nodeType, + StripeStreams& stripe, + FlatMapContext flatMapContext) + : memoryPool_(stripe.getMemoryPool()), + nodeType_(std::move(nodeType)), + flatMapContext_(std::move(flatMapContext)), + rowsPerRowGroup_{stripe.rowsPerRowGroup()} { + init(stripe); +} + uint64_t DwrfData::skipNulls(uint64_t numValues, bool /*nullsOnly*/) { if (!notNullDecoder_ && !flatMapContext_.inMapDecoder) { return numValues; diff --git a/velox/dwio/dwrf/reader/DwrfData.h b/velox/dwio/dwrf/reader/DwrfData.h index 463a4db1bcbc..9e62f1a6923b 100644 --- a/velox/dwio/dwrf/reader/DwrfData.h +++ b/velox/dwio/dwrf/reader/DwrfData.h @@ -95,6 +95,10 @@ class DwrfData : public dwio::common::FormatData { entry.positions().begin(), entry.positions().end()); } + void init(StripeStreams& stripe); + void initDwrf(StripeStreams& stripe); + void initOrc(StripeStreams& stripe); + memory::MemoryPool& memoryPool_; const std::shared_ptr nodeType_; FlatMapContext flatMapContext_; @@ -152,4 +156,17 @@ inline RleVersion convertRleVersion(proto::ColumnEncoding_Kind kind) { } } +inline RleVersion convertRleVersion(proto::orc::ColumnEncoding_Kind kind) { + switch (static_cast(kind)) { + case proto::orc::ColumnEncoding_Kind_DIRECT: + case proto::orc::ColumnEncoding_Kind_DICTIONARY: + return RleVersion_1; + case proto::orc::ColumnEncoding_Kind_DIRECT_V2: + case proto::orc::ColumnEncoding_Kind_DICTIONARY_V2: + return RleVersion_2; + default: + DWIO_RAISE("Unknown encoding in convertRleVersion"); + } +} + } // namespace facebook::velox::dwrf diff --git a/velox/dwio/dwrf/reader/ReaderBase.cpp b/velox/dwio/dwrf/reader/ReaderBase.cpp index 7ccd475c51e2..99c96a4b6114 100644 --- a/velox/dwio/dwrf/reader/ReaderBase.cpp +++ b/velox/dwio/dwrf/reader/ReaderBase.cpp @@ -202,6 +202,7 @@ ReaderBase::ReaderBase( postScript_->cacheMode(), *footer_, std::move(cacheBuffer)); } } + if (!cache_ && input_->shouldPrefetchStripes()) { auto numStripes = getFooter().stripesSize(); for (auto i = 0; i < numStripes; i++) { @@ -214,6 +215,7 @@ ReaderBase::ReaderBase( input_->load(LogType::FOOTER); } } + // initialize file decrypter handler_ = DecryptionHandler::create(*footer_, decryptorFactory_.get()); } diff --git a/velox/dwio/dwrf/reader/ReaderBase.h b/velox/dwio/dwrf/reader/ReaderBase.h index b089eddc1fab..49f780820537 100644 --- a/velox/dwio/dwrf/reader/ReaderBase.h +++ b/velox/dwio/dwrf/reader/ReaderBase.h @@ -100,6 +100,30 @@ class ReaderBase { } } + ReaderBase( + memory::MemoryPool& pool, + std::unique_ptr input, + std::unique_ptr ps, + const proto::orc::Footer* footer, + std::unique_ptr cache, + std::unique_ptr handler = nullptr) + : pool_{pool}, + postScript_{std::move(ps)}, + footer_{std::make_unique(footer)}, + cache_{std::move(cache)}, + handler_{std::move(handler)}, + input_{std::move(input)}, + schema_{ + std::dynamic_pointer_cast(convertType(*footer_))}, + fileLength_{0}, + psLength_{0} { + DWIO_ENSURE(footer_->getOrcPtr()->GetArena()); + DWIO_ENSURE_NOT_NULL(schema_, "invalid schema"); + if (!handler_) { + handler_ = encryption::DecryptionHandler::create(*footer_); + } + } + // for testing explicit ReaderBase(memory::MemoryPool& pool) : pool_{pool} {} diff --git a/velox/dwio/dwrf/reader/SelectiveByteRleColumnReader.h b/velox/dwio/dwrf/reader/SelectiveByteRleColumnReader.h index 1030f65c9f6b..1bae1555cb95 100644 --- a/velox/dwio/dwrf/reader/SelectiveByteRleColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveByteRleColumnReader.h @@ -22,6 +22,46 @@ namespace facebook::velox::dwrf { class SelectiveByteRleColumnReader : public dwio::common::SelectiveByteRleColumnReader { + void init(DwrfParams& params, bool isBool) { + auto format = params.stripeStreams().format(); + if (format == DwrfFormat::kDwrf) { + initDwrf(params, isBool); + } else { + VELOX_CHECK(format == DwrfFormat::kOrc); + initOrc(params, isBool); + } + } + + void initDwrf(DwrfParams& params, bool isBool) { + EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; + auto& stripe = params.stripeStreams(); + if (isBool) { + boolRle_ = createBooleanRleDecoder( + stripe.getStream(encodingKey.forKind(proto::Stream_Kind_DATA), true), + encodingKey); + } else { + byteRle_ = createByteRleDecoder( + stripe.getStream(encodingKey.forKind(proto::Stream_Kind_DATA), true), + encodingKey); + } + } + + void initOrc(DwrfParams& params, bool isBool) { + EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; + auto& stripe = params.stripeStreams(); + if (isBool) { + boolRle_ = createBooleanRleDecoder( + stripe.getStream( + encodingKey.forKind(proto::orc::Stream_Kind_DATA), true), + encodingKey); + } else { + byteRle_ = createByteRleDecoder( + stripe.getStream( + encodingKey.forKind(proto::orc::Stream_Kind_DATA), true), + encodingKey); + } + } + public: using ValueType = int8_t; @@ -36,17 +76,7 @@ class SelectiveByteRleColumnReader params, scanSpec, dataType->type) { - EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; - auto& stripe = params.stripeStreams(); - if (isBool) { - boolRle_ = createBooleanRleDecoder( - stripe.getStream(encodingKey.forKind(proto::Stream_Kind_DATA), true), - encodingKey); - } else { - byteRle_ = createByteRleDecoder( - stripe.getStream(encodingKey.forKind(proto::Stream_Kind_DATA), true), - encodingKey); - } + init(params, isBool); } void seekToRowGroup(uint32_t index) override { diff --git a/velox/dwio/dwrf/reader/SelectiveDwrfReader.cpp b/velox/dwio/dwrf/reader/SelectiveDwrfReader.cpp index a7d6197f4916..331d93779d81 100644 --- a/velox/dwio/dwrf/reader/SelectiveDwrfReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveDwrfReader.cpp @@ -42,17 +42,13 @@ std::unique_ptr buildIntegerReader( common::ScanSpec& scanSpec) { EncodingKey ek{requestedType->id, params.flatMapContext().sequence}; auto& stripe = params.stripeStreams(); - switch (static_cast(stripe.getEncoding(ek).kind())) { - case proto::ColumnEncoding_Kind_DICTIONARY: - case proto::ColumnEncoding_Kind_DICTIONARY_V2: - return std::make_unique( - requestedType, dataType, params, scanSpec, numBytes); - case proto::ColumnEncoding_Kind_DIRECT: - case proto::ColumnEncoding_Kind_DIRECT_V2: - return std::make_unique( - requestedType, dataType, params, numBytes, scanSpec); - default: - DWIO_RAISE("buildReader unhandled integer encoding"); + if (stripe.isColumnEncodingKindDictionary(ek)) { + return std::make_unique( + requestedType, dataType, params, scanSpec, numBytes); + } else { + VELOX_CHECK(stripe.isColumnEncodingKindDirect(ek)); + return std::make_unique( + requestedType, dataType, params, numBytes, scanSpec); } } @@ -88,14 +84,18 @@ std::unique_ptr SelectiveDwrfReader::build( case TypeKind::ARRAY: return std::make_unique( requestedType, dataType, params, scanSpec); - case TypeKind::MAP: - if (stripe.getEncoding(ek).kind() == - proto::ColumnEncoding_Kind_MAP_FLAT) { - return createSelectiveFlatMapColumnReader( - requestedType, dataType, params, scanSpec); + case TypeKind::MAP: { + if (stripe.format() == DwrfFormat::kDwrf) { + if (stripe.getEncoding(ek).kind() == + proto::ColumnEncoding_Kind_MAP_FLAT) { + return createSelectiveFlatMapColumnReader( + requestedType, dataType, params, scanSpec); + } } + return std::make_unique( requestedType, dataType, params, scanSpec); + } case TypeKind::REAL: if (requestedType->type->kind() == TypeKind::REAL) { return std::make_unique< @@ -120,19 +120,16 @@ std::unique_ptr SelectiveDwrfReader::build( return std::make_unique( requestedType, dataType, params, scanSpec, false); case TypeKind::VARBINARY: - case TypeKind::VARCHAR: - switch (static_cast(stripe.getEncoding(ek).kind())) { - case proto::ColumnEncoding_Kind_DIRECT: - case proto::ColumnEncoding_Kind_DIRECT_V2: - return std::make_unique( - requestedType, params, scanSpec); - case proto::ColumnEncoding_Kind_DICTIONARY: - case proto::ColumnEncoding_Kind_DICTIONARY_V2: - return std::make_unique( - requestedType, params, scanSpec); - default: - DWIO_RAISE("buildReader string unknown encoding"); + case TypeKind::VARCHAR: { + if (stripe.isColumnEncodingKindDirect(ek)) { + return std::make_unique( + requestedType, params, scanSpec); + } else { + VELOX_CHECK(stripe.isColumnEncodingKindDictionary(ek)); + return std::make_unique( + requestedType, params, scanSpec); } + } case TypeKind::TIMESTAMP: return std::make_unique( requestedType, params, scanSpec); diff --git a/velox/dwio/dwrf/reader/SelectiveFloatingPointColumnReader.h b/velox/dwio/dwrf/reader/SelectiveFloatingPointColumnReader.h index 63216b52ceee..e6029b8a12d3 100644 --- a/velox/dwio/dwrf/reader/SelectiveFloatingPointColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveFloatingPointColumnReader.h @@ -73,7 +73,10 @@ SelectiveFloatingPointColumnReader:: decoder_(params.stripeStreams().getStream( EncodingKey{root::nodeType_->id, params.flatMapContext().sequence} .forKind(proto::Stream_Kind_DATA), - true)) {} + true)) { + VELOX_CHECK( + (int)proto::Stream_Kind_DATA == (int)proto::orc::Stream_Kind_DATA); +} template uint64_t SelectiveFloatingPointColumnReader::skip( diff --git a/velox/dwio/dwrf/reader/SelectiveIntegerDictionaryColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveIntegerDictionaryColumnReader.cpp index 2f2f6cafefeb..a41efe10aafa 100644 --- a/velox/dwio/dwrf/reader/SelectiveIntegerDictionaryColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveIntegerDictionaryColumnReader.cpp @@ -34,6 +34,7 @@ SelectiveIntegerDictionaryColumnReader::SelectiveIntegerDictionaryColumnReader( dataType->type) { EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; auto& stripe = params.stripeStreams(); + VELOX_CHECK(stripe.format() == DwrfFormat::kDwrf); auto encoding = stripe.getEncoding(encodingKey); scanState_.dictionary.numValues = encoding.dictionarysize(); rleVersion_ = convertRleVersion(encoding.kind()); diff --git a/velox/dwio/dwrf/reader/SelectiveIntegerDirectColumnReader.h b/velox/dwio/dwrf/reader/SelectiveIntegerDirectColumnReader.h index 0911c281ed61..b1a6ad007fb1 100644 --- a/velox/dwio/dwrf/reader/SelectiveIntegerDirectColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveIntegerDirectColumnReader.h @@ -24,6 +24,58 @@ namespace facebook::velox::dwrf { class SelectiveIntegerDirectColumnReader : public dwio::common::SelectiveIntegerColumnReader { + void init(DwrfParams& params, uint32_t numBytes) { + format_ = params.stripeStreams().format(); + if (format_ == DwrfFormat::kDwrf) { + initDwrf(params, numBytes); + } else { + VELOX_CHECK(format_ == DwrfFormat::kOrc); + initOrc(params, numBytes); + } + } + + void initDwrf(DwrfParams& params, uint32_t numBytes) { + auto& stripe = params.stripeStreams(); + EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; + auto data = encodingKey.forKind(proto::Stream_Kind_DATA); + bool dataVInts = stripe.getUseVInts(data); + + auto decoder = createDirectDecoder( + stripe.getStream(data, true), dataVInts, numBytes); + directDecoder = + dynamic_cast*>(decoder.release()); + VELOX_CHECK(directDecoder); + ints.reset(directDecoder); + } + + void initOrc(DwrfParams& params, uint32_t numBytes) { + auto& stripe = params.stripeStreams(); + EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; + auto data = encodingKey.forKind(proto::orc::Stream_Kind_DATA); + bool dataVInts = stripe.getUseVInts(data); + + auto encoding = stripe.getEncodingOrc(encodingKey); + rleVersion_ = convertRleVersion(encoding.kind()); + auto decoder = createRleDecoder( + stripe.getStream(data, true), + rleVersion_, + params.pool(), + dataVInts, + numBytes); + if (rleVersion_ == velox::dwrf::RleVersion_1) { + rleDecoderV1 = + dynamic_cast*>(decoder.release()); + VELOX_CHECK(rleDecoderV1); + ints.reset(rleDecoderV1); + } else { + VELOX_CHECK(rleVersion_ == velox::dwrf::RleVersion_2); + rleDecoderV2 = + dynamic_cast*>(decoder.release()); + VELOX_CHECK(rleDecoderV2); + ints.reset(rleDecoderV2); + } + } + public: using ValueType = int64_t; @@ -38,43 +90,7 @@ class SelectiveIntegerDirectColumnReader params, scanSpec, dataType->type) { - EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; - auto data = encodingKey.forKind(proto::Stream_Kind_DATA); - auto& stripe = params.stripeStreams(); - bool dataVInts = stripe.getUseVInts(data); - - format_ = stripe.format(); - if (format_ == velox::dwrf::DwrfFormat::kDwrf) { - auto decoder = createDirectDecoder( - stripe.getStream(data, true), dataVInts, numBytes); - directDecoder = - dynamic_cast*>(decoder.release()); - VELOX_CHECK(directDecoder); - ints.reset(directDecoder); - } else if (format_ == velox::dwrf::DwrfFormat::kOrc) { - auto encoding = stripe.getEncoding(encodingKey); - rleVersion_ = convertRleVersion(encoding.kind()); - auto decoder = createRleDecoder( - stripe.getStream(data, true), - rleVersion_, - params.pool(), - dataVInts, - numBytes); - if (rleVersion_ == velox::dwrf::RleVersion_1) { - rleDecoderV1 = - dynamic_cast*>(decoder.release()); - VELOX_CHECK(rleDecoderV1); - ints.reset(rleDecoderV1); - } else { - VELOX_CHECK(rleVersion_ == velox::dwrf::RleVersion_2); - rleDecoderV2 = - dynamic_cast*>(decoder.release()); - VELOX_CHECK(rleDecoderV2); - ints.reset(rleDecoderV2); - } - } else { - VELOX_FAIL("invalid stripe format"); - } + init(params, numBytes); } bool hasBulkPath() const override { diff --git a/velox/dwio/dwrf/reader/SelectiveLongDecimalColumnReader.h b/velox/dwio/dwrf/reader/SelectiveLongDecimalColumnReader.h index 1549409173ca..202235c108af 100644 --- a/velox/dwio/dwrf/reader/SelectiveLongDecimalColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveLongDecimalColumnReader.h @@ -26,6 +26,49 @@ namespace facebook::velox::dwrf { class SelectiveLongDecimalColumnReader : public dwio::common::SelectiveColumnReader { + void init(DwrfParams& params) { + format_ = params.stripeStreams().format(); + if (format_ == DwrfFormat::kDwrf) { + initDwrf(params); + } else { + VELOX_CHECK(format_ == DwrfFormat::kOrc); + initOrc(params); + } + } + + void initDwrf(DwrfParams& params) { + VELOX_FAIL("dwrf unsupport decimal"); + } + + void initOrc(DwrfParams& params) { + auto& stripe = params.stripeStreams(); + + EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; + auto values = encodingKey.forKind(proto::orc::Stream_Kind_DATA); + auto scales = encodingKey.forKind(proto::orc::Stream_Kind_SECONDARY); + + bool valuesVInts = stripe.getUseVInts(values); + bool scalesVInts = stripe.getUseVInts(scales); + + auto encoding = stripe.getEncodingOrc(encodingKey); + auto encodingKind = encoding.kind(); + VELOX_CHECK( + encodingKind == proto::orc::ColumnEncoding_Kind_DIRECT || + encodingKind == proto::orc::ColumnEncoding_Kind_DIRECT_V2); + + version_ = convertRleVersion(encodingKind); + + valueDecoder_ = createDirectDecoder( + stripe.getStream(values, true), valuesVInts, sizeof(int128_t)); + + scaleDecoder_ = createRleDecoder( + stripe.getStream(scales, true), + version_, + params.pool(), + scalesVInts, + facebook::velox::dwio::common::LONG_BYTE_SIZE); + } + public: using ValueType = int128_t; @@ -37,42 +80,7 @@ class SelectiveLongDecimalColumnReader : SelectiveColumnReader(nodeType, params, scanSpec, nodeType->type) { precision_ = dataType->asLongDecimal().precision(); scale_ = dataType->asLongDecimal().scale(); - - auto& stripe = params.stripeStreams(); - - EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; - auto values = encodingKey.forKind(proto::Stream_Kind_DATA); - auto scales = encodingKey.forKind( - proto::Stream_Kind_NANO_DATA); // equal to - // orc::proto::Stream_Kind_SECONDARY - - bool valuesVInts = stripe.getUseVInts(values); - bool scalesVInts = stripe.getUseVInts(scales); - - format_ = stripe.format(); - if (format_ == velox::dwrf::DwrfFormat::kDwrf) { - VELOX_FAIL("dwrf unsupport decimal"); - } else if (format_ == velox::dwrf::DwrfFormat::kOrc) { - auto encoding = stripe.getEncoding(encodingKey); - auto encodingKind = encoding.kind(); - VELOX_CHECK( - encodingKind == proto::ColumnEncoding_Kind_DIRECT || - encodingKind == proto::ColumnEncoding_Kind_DIRECT_V2); - - version_ = convertRleVersion(encodingKind); - - valueDecoder_ = createDirectDecoder( - stripe.getStream(values, true), valuesVInts, sizeof(int128_t)); - - scaleDecoder_ = createRleDecoder( - stripe.getStream(scales, true), - version_, - params.pool(), - scalesVInts, - facebook::velox::dwio::common::LONG_BYTE_SIZE); - } else { - VELOX_FAIL("invalid stripe format"); - } + init(params); } bool hasBulkPath() const override { diff --git a/velox/dwio/dwrf/reader/SelectiveRepeatedColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveRepeatedColumnReader.cpp index cbb5f07c545d..ea0d70b4cede 100644 --- a/velox/dwio/dwrf/reader/SelectiveRepeatedColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveRepeatedColumnReader.cpp @@ -25,15 +25,30 @@ std::unique_ptr> makeLengthDecoder( memory::MemoryPool& pool) { EncodingKey encodingKey{nodeType.id, params.flatMapContext().sequence}; auto& stripe = params.stripeStreams(); - auto rleVersion = convertRleVersion(stripe.getEncoding(encodingKey).kind()); - auto lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); - bool lenVints = stripe.getUseVInts(lenId); - return createRleDecoder( - stripe.getStream(lenId, true), - rleVersion, - pool, - lenVints, - dwio::common::INT_BYTE_SIZE); + auto format = stripe.format(); + if (format == DwrfFormat::kDwrf) { + auto rleVersion = convertRleVersion(stripe.getEncoding(encodingKey).kind()); + auto lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); + bool lenVints = stripe.getUseVInts(lenId); + return createRleDecoder( + stripe.getStream(lenId, true), + rleVersion, + pool, + lenVints, + dwio::common::INT_BYTE_SIZE); + } else { + VELOX_CHECK(format == DwrfFormat::kOrc); + auto rleVersion = + convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); + auto lenId = encodingKey.forKind(proto::orc::Stream_Kind_LENGTH); + bool lenVints = stripe.getUseVInts(lenId); + return createRleDecoder( + stripe.getStream(lenId, true), + rleVersion, + pool, + lenVints, + dwio::common::INT_BYTE_SIZE); + } } } // namespace diff --git a/velox/dwio/dwrf/reader/SelectiveShortDecimalColumnReader.h b/velox/dwio/dwrf/reader/SelectiveShortDecimalColumnReader.h index 3cd41e2332b0..4ecabbb29cd2 100644 --- a/velox/dwio/dwrf/reader/SelectiveShortDecimalColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveShortDecimalColumnReader.h @@ -26,6 +26,51 @@ namespace facebook::velox::dwrf { class SelectiveShortDecimalColumnReader : public dwio::common::SelectiveColumnReader { + void init(DwrfParams& params) { + format_ = params.stripeStreams().format(); + if (format_ == DwrfFormat::kDwrf) { + initDwrf(params); + } else { + VELOX_CHECK(format_ == DwrfFormat::kOrc); + initOrc(params); + } + } + + void initDwrf(DwrfParams& params) { + VELOX_FAIL("dwrf unsupport decimal"); + } + + void initOrc(DwrfParams& params) { + const auto& stripe = params.stripeStreams(); + EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; + + auto values = encodingKey.forKind(proto::orc::Stream_Kind_DATA); + auto scales = encodingKey.forKind(proto::orc::Stream_Kind_SECONDARY); + + bool valuesVInts = stripe.getUseVInts(values); + bool scalesVInts = stripe.getUseVInts(scales); + + auto encoding = stripe.getEncodingOrc(encodingKey); + auto encodingKind = encoding.kind(); + VELOX_CHECK( + encodingKind == proto::orc::ColumnEncoding_Kind_DIRECT || + encodingKind == proto::orc::ColumnEncoding_Kind_DIRECT_V2); + + version_ = convertRleVersion(encodingKind); + + valueDecoder_ = createDirectDecoder( + stripe.getStream(values, true), + valuesVInts, + facebook::velox::dwio::common::LONG_BYTE_SIZE); + + scaleDecoder_ = createRleDecoder( + stripe.getStream(scales, true), + version_, + params.pool(), + scalesVInts, + facebook::velox::dwio::common::LONG_BYTE_SIZE); + } + public: using ValueType = int64_t; @@ -41,45 +86,7 @@ class SelectiveShortDecimalColumnReader : SelectiveColumnReader(nodeType, params, scanSpec, nodeType->type) { precision_ = dataType->asShortDecimal().precision(); scale_ = dataType->asShortDecimal().scale(); - - const auto& stripe = params.stripeStreams(); - - EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; - - auto values = encodingKey.forKind(proto::Stream_Kind_DATA); - auto scales = encodingKey.forKind( - proto::Stream_Kind_NANO_DATA); // equal to - // proto::orc::Stream_Kind_SECONDARY - - bool valuesVInts = stripe.getUseVInts(values); - bool scalesVInts = stripe.getUseVInts(scales); - - format_ = stripe.format(); - if (format_ == velox::dwrf::DwrfFormat::kDwrf) { - VELOX_FAIL("dwrf unsupport decimal"); - } else if (format_ == velox::dwrf::DwrfFormat::kOrc) { - auto encoding = stripe.getEncoding(encodingKey); - auto encodingKind = encoding.kind(); - VELOX_CHECK( - encodingKind == proto::ColumnEncoding_Kind_DIRECT || - encodingKind == proto::ColumnEncoding_Kind_DIRECT_V2); - - version_ = convertRleVersion(encodingKind); - - valueDecoder_ = createDirectDecoder( - stripe.getStream(values, true), - valuesVInts, - facebook::velox::dwio::common::LONG_BYTE_SIZE); - - scaleDecoder_ = createRleDecoder( - stripe.getStream(scales, true), - version_, - params.pool(), - scalesVInts, - facebook::velox::dwio::common::LONG_BYTE_SIZE); - } else { - VELOX_FAIL("invalid stripe format"); - } + init(params); } bool hasBulkPath() const override { diff --git a/velox/dwio/dwrf/reader/SelectiveStringDictionaryColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveStringDictionaryColumnReader.cpp index a4e5f023bf87..3d4ff7cffc63 100644 --- a/velox/dwio/dwrf/reader/SelectiveStringDictionaryColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveStringDictionaryColumnReader.cpp @@ -22,15 +22,18 @@ namespace facebook::velox::dwrf { using namespace dwio::common; -SelectiveStringDictionaryColumnReader::SelectiveStringDictionaryColumnReader( - const std::shared_ptr& nodeType, - DwrfParams& params, - common::ScanSpec& scanSpec) - : SelectiveColumnReader(nodeType, params, scanSpec, nodeType->type), - lastStrideIndex_(-1), - provider_(params.stripeStreams().getStrideIndexProvider()) { +void SelectiveStringDictionaryColumnReader::init(DwrfParams& params) { + format_ = params.stripeStreams().format(); + if (format_ == DwrfFormat::kDwrf) { + initDwrf(params); + } else { + VELOX_CHECK(format_ == DwrfFormat::kOrc); + initOrc(params); + } +} + +void SelectiveStringDictionaryColumnReader::initDwrf(DwrfParams& params) { auto& stripe = params.stripeStreams(); - format_ = stripe.format(); EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; rleVersion_ = convertRleVersion(stripe.getEncoding(encodingKey).kind()); @@ -85,6 +88,48 @@ SelectiveStringDictionaryColumnReader::SelectiveStringDictionaryColumnReader( scanState_.updateRawState(); } +void SelectiveStringDictionaryColumnReader::initOrc(DwrfParams& params) { + auto& stripe = params.stripeStreams(); + + EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; + rleVersion_ = convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); + scanState_.dictionary.numValues = + stripe.getEncodingOrc(encodingKey).dictionarysize(); + + const auto dataId = encodingKey.forKind(proto::orc::Stream_Kind_DATA); + bool dictVInts = stripe.getUseVInts(dataId); + dictIndex_ = createRleDecoder( + stripe.getStream(dataId, true), + rleVersion_, + memoryPool_, + dictVInts, + dwio::common::INT_BYTE_SIZE); + + const auto lenId = encodingKey.forKind(proto::orc::Stream_Kind_LENGTH); + bool lenVInts = stripe.getUseVInts(lenId); + lengthDecoder_ = createRleDecoder( + stripe.getStream(lenId, false), + rleVersion_, + memoryPool_, + lenVInts, + dwio::common::INT_BYTE_SIZE); + + blobStream_ = stripe.getStream( + encodingKey.forKind(proto::orc::Stream_Kind_DICTIONARY_DATA), false); + + scanState_.updateRawState(); +} + +SelectiveStringDictionaryColumnReader::SelectiveStringDictionaryColumnReader( + const std::shared_ptr& nodeType, + DwrfParams& params, + common::ScanSpec& scanSpec) + : SelectiveColumnReader(nodeType, params, scanSpec, nodeType->type), + lastStrideIndex_(-1), + provider_(params.stripeStreams().getStrideIndexProvider()) { + init(params); +} + uint64_t SelectiveStringDictionaryColumnReader::skip(uint64_t numValues) { numValues = SelectiveColumnReader::skip(numValues); dictIndex_->skip(numValues); diff --git a/velox/dwio/dwrf/reader/SelectiveStringDictionaryColumnReader.h b/velox/dwio/dwrf/reader/SelectiveStringDictionaryColumnReader.h index f79b0dd36a9e..b8426e32df90 100644 --- a/velox/dwio/dwrf/reader/SelectiveStringDictionaryColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveStringDictionaryColumnReader.h @@ -71,6 +71,10 @@ class SelectiveStringDictionaryColumnReader void loadStrideDictionary(); void makeDictionaryBaseVector(); + void init(DwrfParams& params); + void initDwrf(DwrfParams& params); + void initOrc(DwrfParams& params); + template void readWithVisitor(RowSet rows, TVisitor visitor); diff --git a/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp index a32baa539f9c..e4be7081c208 100644 --- a/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp @@ -20,13 +20,10 @@ namespace facebook::velox::dwrf { -SelectiveStringDirectColumnReader::SelectiveStringDirectColumnReader( - const std::shared_ptr& nodeType, - DwrfParams& params, - common::ScanSpec& scanSpec) - : SelectiveColumnReader(nodeType, params, scanSpec, nodeType->type) { - EncodingKey encodingKey{nodeType->id, params.flatMapContext().sequence}; +void SelectiveStringDirectColumnReader::initDwrf(DwrfParams& params) { + EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; auto& stripe = params.stripeStreams(); + RleVersion rleVersion = convertRleVersion(stripe.getEncoding(encodingKey).kind()); auto lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); @@ -38,7 +35,33 @@ SelectiveStringDirectColumnReader::SelectiveStringDirectColumnReader( lenVInts, dwio::common::INT_BYTE_SIZE); blobStream_ = - stripe.getStream(encodingKey.forKind(proto::Stream_Kind_DATA), true); + stripe.getStream(encodingKey.forKind(proto::orc::Stream_Kind_DATA), true); +} + +void SelectiveStringDirectColumnReader::initOrc(DwrfParams& params) { + EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; + auto& stripe = params.stripeStreams(); + + RleVersion rleVersion = + convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); + auto lenId = encodingKey.forKind(proto::orc::Stream_Kind_LENGTH); + bool lenVInts = stripe.getUseVInts(lenId); + lengthDecoder_ = createRleDecoder( + stripe.getStream(lenId, true), + rleVersion, + memoryPool_, + lenVInts, + dwio::common::INT_BYTE_SIZE); + blobStream_ = + stripe.getStream(encodingKey.forKind(proto::orc::Stream_Kind_DATA), true); +} + +SelectiveStringDirectColumnReader::SelectiveStringDirectColumnReader( + const std::shared_ptr& nodeType, + DwrfParams& params, + common::ScanSpec& scanSpec) + : SelectiveColumnReader(nodeType, params, scanSpec, nodeType->type) { + init(params); } uint64_t SelectiveStringDirectColumnReader::skip(uint64_t numValues) { diff --git a/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.h b/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.h index d6c2ccba885b..89eb1c671d90 100644 --- a/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.h @@ -23,6 +23,20 @@ namespace facebook::velox::dwrf { class SelectiveStringDirectColumnReader : public dwio::common::SelectiveColumnReader { + void init(DwrfParams& params) { + auto format = params.stripeStreams().format(); + if (format == DwrfFormat::kDwrf) { + initDwrf(params); + } else { + VELOX_CHECK(format == DwrfFormat::kOrc); + initOrc(params); + } + } + + void initDwrf(DwrfParams& params); + + void initOrc(DwrfParams& params); + public: using ValueType = StringView; SelectiveStringDirectColumnReader( diff --git a/velox/dwio/dwrf/reader/SelectiveStructColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveStructColumnReader.cpp index 8cd0bd2ddc59..a0efbc371ea0 100644 --- a/velox/dwio/dwrf/reader/SelectiveStructColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveStructColumnReader.cpp @@ -32,13 +32,10 @@ SelectiveStructColumnReader::SelectiveStructColumnReader( dataType, params, scanSpec) { + init(params); + EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; auto& stripe = params.stripeStreams(); - auto encoding = static_cast(stripe.getEncoding(encodingKey).kind()); - DWIO_ENSURE_EQ( - encoding, - proto::ColumnEncoding_Kind_DIRECT, - "Unknown encoding for StructColumnReader"); const auto& cs = stripe.getColumnSelector(); // A reader tree may be constructed while the ScanSpec is being used diff --git a/velox/dwio/dwrf/reader/SelectiveStructColumnReader.h b/velox/dwio/dwrf/reader/SelectiveStructColumnReader.h index de43a1acea36..a08306267bc6 100644 --- a/velox/dwio/dwrf/reader/SelectiveStructColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveStructColumnReader.h @@ -84,6 +84,38 @@ struct SelectiveStructColumnReader : SelectiveStructColumnReaderBase { common::ScanSpec& scanSpec); private: + void init(DwrfParams& params) { + auto format = params.stripeStreams().format(); + if (format == DwrfFormat::kDwrf) { + initDwrf(params); + } else { + VELOX_CHECK(format == DwrfFormat::kOrc); + initOrc(params); + } + } + + void initDwrf(DwrfParams& params) { + EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; + auto& stripe = params.stripeStreams(); + auto encoding = + static_cast(stripe.getEncoding(encodingKey).kind()); + DWIO_ENSURE_EQ( + encoding, + proto::ColumnEncoding_Kind_DIRECT, + "Unknown dwrf encoding for StructColumnReader"); + } + + void initOrc(DwrfParams& params) { + EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; + auto& stripe = params.stripeStreams(); + auto encoding = + static_cast(stripe.getEncodingOrc(encodingKey).kind()); + DWIO_ENSURE_EQ( + encoding, + proto::orc::ColumnEncoding_Kind_DIRECT, + "Unknown orc encoding for StructColumnReader"); + } + void addChild(std::unique_ptr child) { children_.push_back(child.get()); childrenOwned_.push_back(std::move(child)); diff --git a/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.cpp index 3e8953ae97f8..277a831ce68f 100644 --- a/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.cpp @@ -22,13 +22,10 @@ namespace facebook::velox::dwrf { using namespace dwio::common; -SelectiveTimestampColumnReader::SelectiveTimestampColumnReader( - const std::shared_ptr& nodeType, - DwrfParams& params, - common::ScanSpec& scanSpec) - : SelectiveColumnReader(nodeType, params, scanSpec, nodeType->type) { +void SelectiveTimestampColumnReader::initDwrf(DwrfParams& params) { EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; auto& stripe = params.stripeStreams(); + version = convertRleVersion(stripe.getEncoding(encodingKey).kind()); auto data = encodingKey.forKind(proto::Stream_Kind_DATA); @@ -50,6 +47,39 @@ SelectiveTimestampColumnReader::SelectiveTimestampColumnReader( LONG_BYTE_SIZE); } +void SelectiveTimestampColumnReader::initOrc(DwrfParams& params) { + EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; + auto& stripe = params.stripeStreams(); + + version = convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); + + auto data = encodingKey.forKind(proto::orc::Stream_Kind_DATA); + bool vints = stripe.getUseVInts(data); + seconds_ = createRleDecoder( + stripe.getStream(data, true), + version, + memoryPool_, + vints, + LONG_BYTE_SIZE); + + auto nanoData = encodingKey.forKind(proto::orc::Stream_Kind_SECONDARY); + bool nanoVInts = stripe.getUseVInts(nanoData); + nano_ = createRleDecoder( + stripe.getStream(nanoData, true), + version, + memoryPool_, + nanoVInts, + LONG_BYTE_SIZE); +} + +SelectiveTimestampColumnReader::SelectiveTimestampColumnReader( + const std::shared_ptr& nodeType, + DwrfParams& params, + common::ScanSpec& scanSpec) + : SelectiveColumnReader(nodeType, params, scanSpec, nodeType->type) { + init(params); +} + uint64_t SelectiveTimestampColumnReader::skip(uint64_t numValues) { numValues = SelectiveColumnReader::skip(numValues); seconds_->skip(numValues); diff --git a/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.h b/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.h index 20b4ead7d8bd..cdce468de4bd 100644 --- a/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.h @@ -23,6 +23,20 @@ namespace facebook::velox::dwrf { class SelectiveTimestampColumnReader : public dwio::common::SelectiveColumnReader { + void init(DwrfParams& params) { + auto format = params.stripeStreams().format(); + if (format == DwrfFormat::kDwrf) { + initDwrf(params); + } else { + VELOX_CHECK(format == DwrfFormat::kOrc); + initOrc(params); + } + } + + void initDwrf(DwrfParams& params); + + void initOrc(DwrfParams& params); + public: // The readers produce int64_t, the vector is Timestamps. using ValueType = int64_t; diff --git a/velox/dwio/dwrf/reader/StripeReaderBase.cpp b/velox/dwio/dwrf/reader/StripeReaderBase.cpp index 2e9aef87398a..c758c5576722 100644 --- a/velox/dwio/dwrf/reader/StripeReaderBase.cpp +++ b/velox/dwio/dwrf/reader/StripeReaderBase.cpp @@ -70,16 +70,29 @@ StripeInformationWrapper StripeReaderBase::loadStripe( LogType::STRIPE_FOOTER); } + auto streamDebugInfo = fmt::format("Stripe {} Footer ", index); + // Reuse footer_'s memory to avoid expensive destruction - if (!footer_) { - footer_ = google::protobuf::Arena::CreateMessage( - reader_->arena()); - } + if (format() == DwrfFormat::kDwrf) { + if (!footer_) { + footer_ = google::protobuf::Arena::CreateMessage( + reader_->arena()); + } - auto streamDebugInfo = fmt::format("Stripe {} Footer ", index); - ProtoUtils::readProtoInto( - reader_->createDecompressedStream(std::move(stream), streamDebugInfo), - footer_); + ProtoUtils::readProtoInto( + reader_->createDecompressedStream(std::move(stream), streamDebugInfo), + footer_); + } else { // DwrfFormat::kOrc + if (!footerOrc_) { + footerOrc_ = + google::protobuf::Arena::CreateMessage( + reader_->arena()); + } + + ProtoUtils::readProtoInto( + reader_->createDecompressedStream(std::move(stream), streamDebugInfo), + footerOrc_); + } // refresh stripe encryption key if necessary loadEncryptionKeys(index); diff --git a/velox/dwio/dwrf/reader/StripeReaderBase.h b/velox/dwio/dwrf/reader/StripeReaderBase.h index b5346a81a835..c44dafd9e205 100644 --- a/velox/dwio/dwrf/reader/StripeReaderBase.h +++ b/velox/dwio/dwrf/reader/StripeReaderBase.h @@ -26,6 +26,7 @@ class StripeReaderBase { public: explicit StripeReaderBase(const std::shared_ptr& reader) : reader_{reader}, + footer_(nullptr), handler_{std::make_unique( reader_->getDecryptionHandler())} {} @@ -43,6 +44,19 @@ class StripeReaderBase { DWIO_ENSURE(footer->GetArena()); } + StripeReaderBase( + const std::shared_ptr& reader, + const proto::orc::StripeFooter* footer) + : reader_{reader}, + footerOrc_{const_cast(footer)}, + handler_{std::make_unique( + reader_->getDecryptionHandler())}, + canLoad_{false} { + // The footer is expected to be arena allocated and to stay + // live for the lifetime of 'this'. + DWIO_ENSURE(footer->GetArena()); + } + virtual ~StripeReaderBase() = default; StripeInformationWrapper loadStripe(uint32_t index, bool& preload); @@ -52,10 +66,19 @@ class StripeReaderBase { return *footer_; } + const proto::orc::StripeFooter& getStripeFooterOrc() const { + DWIO_ENSURE_NOT_NULL(footerOrc_, "stripe not loaded"); + return *footerOrc_; + } + dwio::common::BufferedInput& getStripeInput() const { return stripeInput_ ? *stripeInput_ : reader_->getBufferedInput(); } + DwrfFormat format() const { + return reader_->format(); + } + ReaderBase& getReader() const { return *reader_; } @@ -71,7 +94,12 @@ class StripeReaderBase { private: std::shared_ptr reader_; std::unique_ptr stripeInput_; - proto::StripeFooter* footer_ = nullptr; + + union { + proto::StripeFooter* footer_ = nullptr; // format() == Dwrf + proto::orc::StripeFooter* footerOrc_; // format() == Orc + }; + std::unique_ptr handler_; std::optional lastStripeIndex_; bool canLoad_{true}; diff --git a/velox/dwio/dwrf/reader/StripeStream.cpp b/velox/dwio/dwrf/reader/StripeStream.cpp index 1b6ceb64a5c0..3ac9221beb26 100644 --- a/velox/dwio/dwrf/reader/StripeStream.cpp +++ b/velox/dwio/dwrf/reader/StripeStream.cpp @@ -17,7 +17,6 @@ #include #include -#include "velox/common/base/BitSet.h" #include "velox/dwio/common/exception/Exception.h" #include "velox/dwio/dwrf/common/DecoderUtil.h" #include "velox/dwio/dwrf/common/wrap/coded-stream-wrapper.h" @@ -136,45 +135,84 @@ StripeStreamsBase::getIntDictionaryInitializerForNode( }; } -void StripeStreamsImpl::loadStreams() { - auto& footer = reader_.getStripeFooter(); +auto addStreamDwrf = [](StripeStreamsImpl* ssi, + BitSet& projectedNodes, + auto& stream, + auto& offset) { + if (stream.has_offset()) { + offset = stream.offset(); + } + if (projectedNodes.contains(stream.node())) { + ssi->getStreams()[stream] = {offset, stream}; + } + offset += stream.length(); +}; + +auto addStreamOrc = [](StripeStreamsImpl* ssi, + BitSet& projectedNodes, + auto& stream, + auto& offset) { + if (projectedNodes.contains(stream.column())) { + ssi->getStreams()[stream] = {offset, stream}; + } + offset += stream.length(); +}; +void StripeStreamsImpl::processStreams(BitSet& projectedNodes) { // HACK!!! // Column selector filters based on requested schema (ie, table schema), while // we need filter based on file schema. As a result we cannot call // shouldReadNode directly. Instead, build projected nodes set based on node // id from file schema. Column selector should really be fixed to handle file // schema properly - BitSet projectedNodes(0); auto expected = selector_.getSchemaWithId(); auto actual = reader_.getReader().getSchemaWithId(); findProjectedNodes(projectedNodes, *expected, *actual, [&](uint32_t node) { return selector_.shouldReadNode(node); }); - auto addStream = [&](auto& stream, auto& offset) { - if (stream.has_offset()) { - offset = stream.offset(); + uint64_t streamOffset = 0; + if (format() == DwrfFormat::kDwrf) { + for (auto& stream : reader_.getStripeFooter().streams()) { + addStreamDwrf(this, projectedNodes, stream, streamOffset); } - if (projectedNodes.contains(stream.node())) { - streams_[stream] = {offset, stream}; + } else { // kOrc + for (auto& stream : reader_.getStripeFooterOrc().streams()) { + addStreamOrc(this, projectedNodes, stream, streamOffset); } - offset += stream.length(); - }; - - uint64_t streamOffset = 0; - for (auto& stream : footer.streams()) { - addStream(stream, streamOffset); } +} - // update column encoding for each stream - for (uint32_t i = 0; i < footer.encoding_size(); ++i) { - auto& e = footer.encoding(i); - auto node = e.has_node() ? e.node() : i; - if (projectedNodes.contains(node)) { - encodings_[{node, e.has_sequence() ? e.sequence() : 0}] = i; +void StripeStreamsImpl::processEncodings(BitSet& projectedNodes) { + if (format() == DwrfFormat::kDwrf) { + auto& footer = reader_.getStripeFooter(); + // update column encoding for each stream + for (uint32_t i = 0; i < footer.encoding_size(); ++i) { + auto& e = footer.encoding(i); + auto node = e.has_node() ? e.node() : i; + if (projectedNodes.contains(node)) { + encodings_[{node, e.has_sequence() ? e.sequence() : 0}] = i; + } + } + } else { // kOrc + auto& footer = reader_.getStripeFooterOrc(); + // update column encoding for each stream + for (uint32_t i = 0; i < footer.columns_size(); ++i) { + if (projectedNodes.contains(i)) { + encodings_[{i, 0}] = i; + } } } +} + +void StripeStreamsImpl::processEncryptions(BitSet& projectedNodes) { + if (format() == DwrfFormat::kOrc) { + // orc doesn't contain encryption field + VELOX_CHECK(reader_.getStripeFooterOrc().encryption_size() == 0); + return; + } + + auto& footer = reader_.getStripeFooter(); // handle encrypted columns auto& handler = reader_.getDecryptionHandler(); @@ -196,10 +234,12 @@ void StripeStreamsImpl::loadStreams() { reader_.getReader().readProtoFromString( group, std::addressof(handler.getEncryptionProviderByIndex(index))); - streamOffset = 0; + + uint64_t streamOffset = 0; for (auto& stream : groupProto->streams()) { - addStream(stream, streamOffset); + addStreamDwrf(this, projectedNodes, stream, streamOffset); } + for (auto& encoding : groupProto->encoding()) { DWIO_ENSURE(encoding.has_node(), "node is required"); auto node = encoding.node(); @@ -213,6 +253,13 @@ void StripeStreamsImpl::loadStreams() { } } +void StripeStreamsImpl::loadStreams() { + BitSet projectedNodes(0); + processStreams(projectedNodes); + processEncodings(projectedNodes); + processEncryptions(projectedNodes); +} + std::unique_ptr StripeStreamsImpl::getCompressedStream(const DwrfStreamIdentifier& si) const { const auto& info = getStreamInfo(si); diff --git a/velox/dwio/dwrf/reader/StripeStream.h b/velox/dwio/dwrf/reader/StripeStream.h index b5ec8609c679..4acab535b0b0 100644 --- a/velox/dwio/dwrf/reader/StripeStream.h +++ b/velox/dwio/dwrf/reader/StripeStream.h @@ -16,6 +16,7 @@ #pragma once +#include "velox/common/base/BitSet.h" #include "velox/dwio/common/ColumnSelector.h" #include "velox/dwio/common/Options.h" #include "velox/dwio/common/SeekableInputStream.h" @@ -48,6 +49,7 @@ class StreamInformationImpl : public StreamInformation { } StreamInformationImpl() : streamId_{DwrfStreamIdentifier::getInvalid()} {} + StreamInformationImpl(uint64_t offset, const proto::Stream& stream) : streamId_(stream), offset_(offset), @@ -56,12 +58,22 @@ class StreamInformationImpl : public StreamInformation { // PASS } - ~StreamInformationImpl() override = default; + StreamInformationImpl(uint64_t offset, const proto::orc::Stream& stream) + : streamId_(stream), + offset_(offset), + length_(stream.length()), + useVInts_(true) { + // PASS + } StreamKind getKind() const override { return streamId_.kind(); } + StreamKindOrc getKindOrc() const override { + return streamId_.kindOrc(); + } + uint32_t getNode() const override { return streamId_.encodingKey().node; } @@ -112,6 +124,16 @@ class StripeStreams { virtual const proto::ColumnEncoding& getEncoding( const EncodingKey&) const = 0; + /** + * Get the encoding for the given column for this stripe. + * this interface is used for format Orc + */ + virtual const proto::orc::ColumnEncoding& getEncodingOrc( + const EncodingKey&) const { + static proto::orc::ColumnEncoding columnEncoding; + return columnEncoding; + } + /** * Get the stream for the given column/kind in this stripe. * @param streamId stream identifier object @@ -163,6 +185,41 @@ class StripeStreams { // Number of rows per row group. Last row group may have fewer rows. virtual uint32_t rowsPerRowGroup() const = 0; + + bool isColumnEncodingKindDirect(const EncodingKey& ek) const { + auto dwrfFormat = format(); + if (dwrfFormat == DwrfFormat::kDwrf) { + auto kind = getEncoding(ek).kind(); + if (kind == proto::ColumnEncoding_Kind_DIRECT || + kind == proto::ColumnEncoding_Kind_DIRECT_V2) { + return true; + } else if ( + kind == proto::ColumnEncoding_Kind_DICTIONARY || + kind == proto::ColumnEncoding_Kind_DICTIONARY_V2) { + return false; + } else { + DWIO_RAISE("isColumnEncodingKindDirect dwrf kind error"); + } + } else if (dwrfFormat == DwrfFormat::kOrc) { + auto kind = getEncodingOrc(ek).kind(); + if (kind == proto::orc::ColumnEncoding_Kind_DIRECT || + kind == proto::orc::ColumnEncoding_Kind_DIRECT_V2) { + return true; + } else if ( + kind == proto::orc::ColumnEncoding_Kind_DICTIONARY || + kind == proto::orc::ColumnEncoding_Kind_DICTIONARY_V2) { + return false; + } else { + DWIO_RAISE("isColumnEncodingKindDirect orc kind error"); + } + } else { + DWIO_RAISE("isColumnEncodingKindDirect dwrfFormat error"); + } + } + + bool isColumnEncodingKindDictionary(const EncodingKey& ek) const { + return !isColumnEncodingKindDirect(ek); + } }; class StripeStreamsBase : public StripeStreams { @@ -209,6 +266,10 @@ class StripeStreamsImpl : public StripeStreamsBase { const uint32_t stripeIndex_; bool readPlanLoaded_; + void processStreams(BitSet& projectedNodes); + void processEncodings(BitSet& projectedNodes); + void processEncryptions(BitSet& projectedNodes); + void loadStreams(); // map of stream id -> stream information @@ -217,7 +278,9 @@ class StripeStreamsImpl : public StripeStreamsBase { StreamInformationImpl, dwio::common::StreamIdentifierHash> streams_; + folly::F14FastMap encodings_; + folly::F14FastMap decryptedEncodings_; @@ -268,6 +331,23 @@ class StripeStreamsImpl : public StripeStreamsBase { return enc->second; } + const proto::orc::ColumnEncoding& getEncodingOrc( + const EncodingKey& ek) const override { + VELOX_CHECK(format() == DwrfFormat::kOrc); + auto index = encodings_.find(ek); + if (index != encodings_.end()) { + return reader_.getStripeFooterOrc().columns(index->second); + } + // TODO: zuochunwei + // need find from decryptedEncodings_ for Orc? + static proto::orc::ColumnEncoding columnEncoding; + return columnEncoding; + } + + auto& getStreams() { + return streams_; + } + // load data into buffer according to read plan void loadReadPlan();