From c560aaf918c9fbaeda7874d03df4ea28bb753d58 Mon Sep 17 00:00:00 2001 From: Jimmy Lu Date: Tue, 25 Feb 2025 22:19:57 -0800 Subject: [PATCH] feat: Add HashStringAllocator::InputStream (#12364) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/12364 When we get `ByteInputStream` from `HashStringAllocator`, we used to have to materialize all the byte ranges in a vector, which is not efficient. This change improves the efficiency by creating a `ByteInputStream` directly over the linked list of a multi-part allocation. Reviewed By: kevinwilfong Differential Revision: D69750088 fbshipit-source-id: a87609a91544ee34f008d93cde178edb5ce08c36 --- velox/common/file/FileInputStream.cpp | 6 +- velox/common/file/FileInputStream.h | 2 + velox/common/memory/ByteStream.h | 18 +-- velox/common/memory/HashStringAllocator.cpp | 29 +--- velox/common/memory/HashStringAllocator.h | 147 +++++++++++++++++- .../memory/tests/HashStringAllocatorTest.cpp | 68 ++++++-- velox/exec/AddressableNonNullValueList.cpp | 12 +- velox/exec/RowContainer.cpp | 20 +-- velox/exec/RowContainer.h | 4 +- velox/exec/SortedAggregations.cpp | 4 +- velox/exec/prefixsort/PrefixSortEncoder.h | 4 +- velox/exec/tests/ContainerRowSerdeTest.cpp | 41 +++-- .../lib/aggregates/SingleValueAccumulator.cpp | 8 +- velox/functions/lib/aggregates/ValueList.cpp | 8 +- velox/functions/lib/aggregates/ValueList.h | 4 +- velox/functions/lib/aggregates/ValueSet.cpp | 4 +- .../UnsafeRowSerializeBenchmark.cpp | 4 +- 17 files changed, 264 insertions(+), 119 deletions(-) diff --git a/velox/common/file/FileInputStream.cpp b/velox/common/file/FileInputStream.cpp index 56efdc4e1c3a..87e60b8717c1 100644 --- a/velox/common/file/FileInputStream.cpp +++ b/velox/common/file/FileInputStream.cpp @@ -52,7 +52,6 @@ FileInputStream::~FileInputStream() { void FileInputStream::readNextRange() { VELOX_CHECK(current_ == nullptr || current_->availableBytes() == 0); - ranges_.clear(); current_ = nullptr; int32_t readBytes{0}; @@ -77,9 +76,8 @@ void FileInputStream::readNextRange() { } } - ranges_.resize(1); - ranges_[0] = {buffer()->asMutable(), readBytes, 0}; - current_ = ranges_.data(); + range_ = {buffer()->asMutable(), readBytes, 0}; + current_ = &range_; fileOffset_ += readBytes; updateStats(readBytes, readTimeNs); diff --git a/velox/common/file/FileInputStream.h b/velox/common/file/FileInputStream.h index 6daf9f84e109..086eb26b4d41 100644 --- a/velox/common/file/FileInputStream.h +++ b/velox/common/file/FileInputStream.h @@ -123,6 +123,8 @@ class FileInputStream : public ByteInputStream { folly::SemiFuture readAheadWait_{ folly::SemiFuture::makeEmpty()}; + ByteRange range_; + Stats stats_; }; } // namespace facebook::velox::common diff --git a/velox/common/memory/ByteStream.h b/velox/common/memory/ByteStream.h index 370737288366..0c7eef4a967c 100644 --- a/velox/common/memory/ByteStream.h +++ b/velox/common/memory/ByteStream.h @@ -191,18 +191,9 @@ class ByteInputStream { current_->position += sizeof(T); return folly::loadUnaligned(source); } - // The number straddles two buffers. We read byte by byte and make a - // little-endian uint64_t. The bytes can be cast to any integer or floating - // point type since the wire format has the machine byte order. - static_assert(sizeof(T) <= sizeof(uint64_t)); - union { - uint64_t bits; - T typed; - } value{}; - for (int32_t i = 0; i < sizeof(T); ++i) { - value.bits |= static_cast(readByte()) << (i * 8); - } - return value.typed; + T value; + readBytes(&value, sizeof(T)); + return value; } template @@ -222,7 +213,6 @@ class ByteInputStream { protected: // Points to the current buffered byte range. ByteRange* current_{nullptr}; - std::vector ranges_; }; /// Read-only input stream backed by a set of buffers. @@ -268,6 +258,8 @@ class BufferInputStream : public ByteInputStream { const std::vector& ranges() const { return ranges_; } + + std::vector ranges_; }; template <> diff --git a/velox/common/memory/HashStringAllocator.cpp b/velox/common/memory/HashStringAllocator.cpp index 9b24c9f04c9f..2be95f80bbf4 100644 --- a/velox/common/memory/HashStringAllocator.cpp +++ b/velox/common/memory/HashStringAllocator.cpp @@ -165,31 +165,6 @@ void HashStringAllocator::freeToPool(void* ptr, size_t size) { pool()->free(ptr, size); } -// static -std::unique_ptr HashStringAllocator::prepareRead( - const Header* begin, - size_t maxBytes) { - std::vector ranges; - auto* header = const_cast(begin); - - size_t totalBytes{0}; - for (;;) { - ranges.push_back(ByteRange{ - reinterpret_cast(header->begin()), header->usableSize(), 0}); - totalBytes += ranges.back().size; - if (!header->isContinued()) { - break; - } - - if (totalBytes >= maxBytes) { - break; - } - - header = header->nextContinued(); - } - return std::make_unique(std::move(ranges)); -} - HashStringAllocator::Position HashStringAllocator::newWrite( ByteOutputStream& stream, int32_t preferredSize) { @@ -363,9 +338,9 @@ StringView HashStringAllocator::contiguousString( return view; } - auto stream = prepareRead(headerOf(view.data())); + InputStream stream(headerOf(view.data())); storage.resize(view.size()); - stream->readBytes(storage.data(), view.size()); + stream.ByteInputStream::readBytes(storage.data(), view.size()); return StringView(storage); } diff --git a/velox/common/memory/HashStringAllocator.h b/velox/common/memory/HashStringAllocator.h index 485da69966d7..980e6f293b8b 100644 --- a/velox/common/memory/HashStringAllocator.h +++ b/velox/common/memory/HashStringAllocator.h @@ -233,13 +233,7 @@ class HashStringAllocator : public StreamArena { return header->size() + kHeaderSize; } - /// Returns ByteInputStream over the data in the range of 'header' and - /// possible continuation ranges. - /// @param maxBytes If provided, the returned stream will cover at most that - /// many bytes. - static std::unique_ptr prepareRead( - const Header* header, - size_t maxBytes = std::numeric_limits::max()); + class InputStream; /// Returns the number of payload bytes between 'header->begin()' and /// 'position'. @@ -516,6 +510,145 @@ class HashStringAllocator : public StreamArena { State state_; }; +/// A ByteInputStream over the data in the range of begin and possible +/// continuation ranges. +class HashStringAllocator::InputStream : public ByteInputStream { + public: + explicit InputStream(const Header* begin) + : begin_(const_cast(begin)) { + setHeader(begin_); + current_ = &range_; + } + + InputStream(const InputStream& other) { + *this = other; + } + + InputStream& operator=(const InputStream& other) { + begin_ = other.begin_; + header_ = other.header_; + range_ = other.range_; + current_ = &range_; + return *this; + } + + size_t size() const final { + auto* header = begin_; + size_t total = 0; + for (;;) { + total += header->usableSize(); + if (!header->isContinued()) { + break; + } + header = header->nextContinued(); + } + return total; + } + + bool atEnd() const final { + return range_.position == range_.size && !header_->isContinued(); + } + + std::streampos tellp() const final { + auto* header = begin_; + int64_t pos = 0; + while (header != header_) { + pos += header->usableSize(); + header = header->nextContinued(); + } + return pos + range_.position; + } + + void seekp(std::streampos pos) final { + setHeader(begin_); + skipImpl(pos); + } + + void skip(int32_t size) final { + nextHeaderIfNeed(); + skipImpl(size); + } + + size_t remainingSize() const final { + return size() - tellp(); + } + + uint8_t readByte() final { + uint8_t byte; + readBytes(&byte, 1); + return byte; + } + + void readBytes(uint8_t* bytes, int32_t size) final { + nextHeaderIfNeed(); + for (;;) { + auto available = range_.size - range_.position; + if (size <= available) { + std::memcpy(bytes, range_.buffer + range_.position, size); + range_.position += size; + return; + } + std::memcpy(bytes, range_.buffer + range_.position, available); + bytes += available; + size -= available; + VELOX_CHECK(header_->isContinued(), "Reading past end of stream"); + setHeader(header_->nextContinued()); + } + } + + std::string_view nextView(int32_t size) final { + if (atEnd()) { + return {}; + } + nextHeaderIfNeed(); + size = std::min(size, range_.size - range_.position); + std::string_view result( + reinterpret_cast(range_.buffer) + range_.position, size); + range_.position += size; + return result; + } + + std::string toString() const final { + return fmt::format( + "HashStringAllocator::InputStream: begin_={} header_={} range_={}", + begin_->toString(), + header_->toString(), + range_.toString()); + } + + private: + void setHeader(Header* header) { + VELOX_DCHECK_GT(header->usableSize(), 0); + header_ = header; + range_.buffer = reinterpret_cast(header_->begin()); + range_.size = header_->usableSize(); + range_.position = 0; + } + + void nextHeaderIfNeed() { + if (range_.position == range_.size && header_->isContinued()) { + setHeader(header_->nextContinued()); + } + } + + void skipImpl(int64_t size) { + for (;;) { + auto available = range_.size - range_.position; + if (size <= available) { + range_.position += size; + return; + } + size -= available; + VELOX_CHECK(header_->isContinued(), "Seeking past end of stream"); + setHeader(header_->nextContinued()); + } + } + + Header* begin_; + Header* header_; + ByteRange range_; +}; + /// Utility for keeping track of allocation between two points in time. A /// counter on a row supplied at construction is incremented by the change in /// allocation between construction and destruction. This is a scoped guard to diff --git a/velox/common/memory/tests/HashStringAllocatorTest.cpp b/velox/common/memory/tests/HashStringAllocatorTest.cpp index 124220d4d57f..7f79bf4150e6 100644 --- a/velox/common/memory/tests/HashStringAllocatorTest.cpp +++ b/velox/common/memory/tests/HashStringAllocatorTest.cpp @@ -254,15 +254,15 @@ TEST_F(HashStringAllocatorTest, finishWrite) { replaceStart.offset() + 4); // Read back long and short strings. - auto inputStream = HSA::prepareRead(longStart.header); + HSA::InputStream inputStream(longStart.header); std::string copy; copy.resize(longString.size()); - inputStream->readBytes(copy.data(), copy.size()); + inputStream.ByteInputStream::readBytes(copy.data(), copy.size()); ASSERT_EQ(copy, longString); copy.resize(4); - inputStream->readBytes(copy.data(), 4); + inputStream.ByteInputStream::readBytes(copy.data(), 4); ASSERT_EQ(copy, "abcd"); auto allocatedBytes = allocator_->checkConsistency(); @@ -277,10 +277,10 @@ TEST_F(HashStringAllocatorTest, finishWrite) { stream.appendStringView(largeString); allocator_->finishWrite(stream, 0); - auto inStream = HSA::prepareRead(start.header); + HSA::InputStream inStream(start.header); std::string copy; copy.resize(largeString.size()); - inStream->readBytes(copy.data(), copy.size()); + inStream.ByteInputStream::readBytes(copy.data(), copy.size()); ASSERT_EQ(copy, largeString); allocatedBytes = allocator_->checkConsistency(); ASSERT_EQ(allocatedBytes, allocator_->currentBytes()); @@ -396,10 +396,10 @@ TEST_F(HashStringAllocatorTest, rewrite) { stream.appendOne(67890LL); position = allocator_->finishWrite(stream, 0).second; EXPECT_EQ(3 * sizeof(int64_t), HSA::offset(header, position)); - auto inStream = HSA::prepareRead(header); - EXPECT_EQ(123456789012345LL, inStream->read()); - EXPECT_EQ(12345LL, inStream->read()); - EXPECT_EQ(67890LL, inStream->read()); + HSA::InputStream inStream(header); + EXPECT_EQ(123456789012345LL, inStream.read()); + EXPECT_EQ(12345LL, inStream.read()); + EXPECT_EQ(67890LL, inStream.read()); } // The stream contains 3 int64_t's. auto end = HSA::seek(header, 3 * sizeof(int64_t)); @@ -765,5 +765,55 @@ TEST_F(HashStringAllocatorTest, freezeAndExecute) { // mutable. allocator_->freezeAndExecute([&]() { allocator_->currentBytes(); }); } + +TEST_F(HashStringAllocatorTest, inputStream) { + std::string expected; + ByteOutputStream out(allocator_.get()); + auto start = allocator_->newWrite(out, 1); + out.appendStringView(std::string_view("a")); + expected += "a"; + auto last = allocator_->finishWrite(out, 0).second; + for (int i = 1; i < 10; ++i) { + allocator_->extendWrite(last, out); + std::string data(i + 1, 'a' + i); + out.appendStringView(data); + expected += data; + last = allocator_->finishWrite(out, 0).second; + } + ASSERT_TRUE(start.header->isContinued()); + HSA::InputStream in(start.header); + ASSERT_GE(in.size(), out.size()); + ASSERT_EQ(in.tellp(), 0); + ASSERT_FALSE(in.atEnd()); + for (int i = 10, j = 0; i >= 1; --i) { + if (i % 2 == 0) { + char actual[10]; + in.ByteInputStream::readBytes(actual, i); + ASSERT_LE(j + i, expected.size()); + ASSERT_EQ( + std::string_view(actual, i), + std::string_view(expected.data() + j, i)); + } else { + in.skip(i); + } + j += i; + } + ASSERT_EQ(in.tellp(), 55); + ASSERT_EQ(in.size(), 55 + in.remainingSize()); + in.seekp(5); + ASSERT_EQ(in.tellp(), 5); + ASSERT_FALSE(in.atEnd()); + for (int j = 5; j < expected.size();) { + auto actual = in.nextView(5); + auto size = std::min(actual.size(), expected.size() - j); + ASSERT_EQ( + actual.substr(0, size), std::string_view(expected.data() + j, size)); + j += size; + } + in.skip(in.remainingSize()); + ASSERT_TRUE(in.atEnd()); + ASSERT_EQ(in.tellp(), in.size()); +} + } // namespace } // namespace facebook::velox diff --git a/velox/exec/AddressableNonNullValueList.cpp b/velox/exec/AddressableNonNullValueList.cpp index 5e4cc84d3691..3fe010f8ec66 100644 --- a/velox/exec/AddressableNonNullValueList.cpp +++ b/velox/exec/AddressableNonNullValueList.cpp @@ -84,13 +84,13 @@ HashStringAllocator::Position AddressableNonNullValueList::appendSerialized( namespace { -std::unique_ptr prepareRead( +HashStringAllocator::InputStream prepareRead( const AddressableNonNullValueList::Entry& entry) { auto header = entry.offset.header; auto seek = entry.offset.position - header->begin(); - auto stream = HashStringAllocator::prepareRead(header, entry.size + seek); - stream->seekp(seek); + HashStringAllocator::InputStream stream(header); + stream.seekp(seek); return stream; } } // namespace @@ -110,7 +110,7 @@ bool AddressableNonNullValueList::equalTo( CompareFlags compareFlags = CompareFlags::equality(CompareFlags::NullHandlingMode::kNullAsValue); return exec::ContainerRowSerde::compare( - *leftStream, *rightStream, type.get(), compareFlags) == 0; + leftStream, rightStream, type.get(), compareFlags) == 0; } // static @@ -119,7 +119,7 @@ void AddressableNonNullValueList::read( BaseVector& result, vector_size_t index) { auto stream = prepareRead(position); - exec::ContainerRowSerde::deserialize(*stream, index, &result); + exec::ContainerRowSerde::deserialize(stream, index, &result); } // static @@ -127,7 +127,7 @@ void AddressableNonNullValueList::readSerialized( const Entry& position, char* dest) { auto stream = prepareRead(position); - stream->readBytes(dest, position.size); + stream.ByteInputStream::readBytes(dest, position.size); } } // namespace facebook::velox::aggregate::prestosql diff --git a/velox/exec/RowContainer.cpp b/velox/exec/RowContainer.cpp index 0cd6d3d83173..3f9d08f2130a 100644 --- a/velox/exec/RowContainer.cpp +++ b/velox/exec/RowContainer.cpp @@ -629,13 +629,13 @@ void RowContainer::store( } } -std::unique_ptr RowContainer::prepareRead( +HashStringAllocator::InputStream RowContainer::prepareRead( const char* row, int32_t offset) { const auto& view = reinterpret_cast(row + offset); // We set 'stream' to range over the ranges that start at the Header // immediately below the first character in the std::string_view. - return HashStringAllocator::prepareRead( + return HashStringAllocator::InputStream( HashStringAllocator::headerOf(view->data())); } @@ -684,9 +684,9 @@ int32_t RowContainer::extractVariableSizeAt( .size() >= value.size()) { ::memcpy(output + 4, value.data(), size); } else { - auto stream = HashStringAllocator::prepareRead( + HashStringAllocator::InputStream stream( HashStringAllocator::headerOf(value.data())); - stream->readBytes(output + 4, size); + stream.ByteInputStream::readBytes(output + 4, size); } return 4 + size; } @@ -697,7 +697,7 @@ int32_t RowContainer::extractVariableSizeAt( auto stream = prepareRead(row, rowColumn.offset()); ::memcpy(output, &size, 4); - stream->readBytes(output + 4, size); + stream.ByteInputStream::readBytes(output + 4, size); return 4 + size; } @@ -844,9 +844,9 @@ void RowContainer::extractString( return; } auto rawBuffer = values->getRawStringBufferWithSpace(value.size()); - auto stream = HashStringAllocator::prepareRead( + HashStringAllocator::InputStream stream( HashStringAllocator::headerOf(value.data())); - stream->readBytes(rawBuffer, value.size()); + stream.ByteInputStream::readBytes(rawBuffer, value.size()); values->setNoCopy(index, StringView(rawBuffer, value.size())); } @@ -896,7 +896,7 @@ int RowContainer::compareComplexType( VELOX_DCHECK(flags.nullAsValue(), "not supported null handling mode"); auto stream = prepareRead(row, offset); - return ContainerRowSerde::compare(*stream, decoded, index, flags); + return ContainerRowSerde::compare(stream, decoded, index, flags); } int32_t RowContainer::compareStringAsc(StringView left, StringView right) { @@ -917,7 +917,7 @@ int32_t RowContainer::compareComplexType( auto leftStream = prepareRead(left, leftOffset); auto rightStream = prepareRead(right, rightOffset); - return ContainerRowSerde::compare(*leftStream, *rightStream, type, flags); + return ContainerRowSerde::compare(leftStream, rightStream, type, flags); } int32_t RowContainer::compareComplexType( @@ -958,7 +958,7 @@ void RowContainer::hashTyped( Kind == TypeKind::ROW || Kind == TypeKind::ARRAY || Kind == TypeKind::MAP) { auto in = prepareRead(row, offset); - hash = ContainerRowSerde::hash(*in, type); + hash = ContainerRowSerde::hash(in, type); } else if constexpr (typeProvidesCustomComparison) { hash = static_cast*>(type) ->hash(valueAt(row, offset)); diff --git a/velox/exec/RowContainer.h b/velox/exec/RowContainer.h index 8e44bc1e0449..5da2525fb3f1 100644 --- a/velox/exec/RowContainer.h +++ b/velox/exec/RowContainer.h @@ -1168,7 +1168,7 @@ class RowContainer { } } - static std::unique_ptr prepareRead( + static HashStringAllocator::InputStream prepareRead( const char* row, int32_t offset); @@ -1399,7 +1399,7 @@ class RowContainer { result->setNull(resultIndex, true); } else { auto stream = prepareRead(row, offset); - ContainerRowSerde::deserialize(*stream, resultIndex, result.get()); + ContainerRowSerde::deserialize(stream, resultIndex, result.get()); } } } diff --git a/velox/exec/SortedAggregations.cpp b/velox/exec/SortedAggregations.cpp index 560edaf0c93d..175f669e08b4 100644 --- a/velox/exec/SortedAggregations.cpp +++ b/velox/exec/SortedAggregations.cpp @@ -51,10 +51,10 @@ struct RowPointers { } void read(folly::Range rows) { - auto stream = HashStringAllocator::prepareRead(firstBlock); + HashStringAllocator::InputStream stream(firstBlock); for (auto i = 0; i < size; ++i) { - rows[i] = reinterpret_cast(stream->read()); + rows[i] = reinterpret_cast(stream.read()); } } }; diff --git a/velox/exec/prefixsort/PrefixSortEncoder.h b/velox/exec/prefixsort/PrefixSortEncoder.h index a8102ea2c141..945b1de5e160 100644 --- a/velox/exec/prefixsort/PrefixSortEncoder.h +++ b/velox/exec/prefixsort/PrefixSortEncoder.h @@ -294,9 +294,9 @@ FOLLY_ALWAYS_INLINE void PrefixSortEncoder::encodeNoNulls( } else { // 'data' is stored in non-contiguous allocation pieces in the row // container, we only read prefix size data out. - auto stream = HashStringAllocator::prepareRead( + HashStringAllocator::InputStream stream( HashStringAllocator::headerOf(value.data())); - stream->readBytes(dest, copySize); + stream.ByteInputStream::readBytes(dest, copySize); } if (value.size() < encodeSize) { diff --git a/velox/exec/tests/ContainerRowSerdeTest.cpp b/velox/exec/tests/ContainerRowSerdeTest.cpp index 91e82cd790f5..ebf47fc20dec 100644 --- a/velox/exec/tests/ContainerRowSerdeTest.cpp +++ b/velox/exec/tests/ContainerRowSerdeTest.cpp @@ -81,9 +81,9 @@ class ContainerRowSerdeTest : public testing::Test, data->setNull(i, true); } - auto in = HashStringAllocator::prepareRead(position.header); + HashStringAllocator::InputStream in(position.header); for (auto i = 0; i < numRows; ++i) { - ContainerRowSerde::deserialize(*in, i, data.get()); + ContainerRowSerde::deserialize(in, i, data.get()); } return data; } @@ -112,19 +112,19 @@ class ContainerRowSerdeTest : public testing::Test, mode}; for (auto i = 0; i < expected.size(); ++i) { - auto stream = HashStringAllocator::prepareRead(positions.at(i).header); + HashStringAllocator::InputStream stream(positions.at(i).header); if (expected.at(i) == kIndeterminate && mode == CompareFlags::NullHandlingMode::kNullAsIndeterminate && !equalsOnly) { VELOX_ASSERT_THROW( ContainerRowSerde::compareWithNulls( - *stream, decodedVector, i, compareFlags), + stream, decodedVector, i, compareFlags), "Ordering nulls is not supported"); } else { ASSERT_EQ( expected.at(i), ContainerRowSerde::compareWithNulls( - *stream, decodedVector, i, compareFlags)); + stream, decodedVector, i, compareFlags)); } } } @@ -146,22 +146,20 @@ class ContainerRowSerdeTest : public testing::Test, mode}; for (auto i = 0; i < expected.size(); ++i) { - auto leftStream = - HashStringAllocator::prepareRead(leftPositions.at(i).header); - auto rightStream = - HashStringAllocator::prepareRead(rightPositions.at(i).header); + HashStringAllocator::InputStream leftStream(leftPositions.at(i).header); + HashStringAllocator::InputStream rightStream(rightPositions.at(i).header); if (expected.at(i) == kIndeterminate && mode == CompareFlags::NullHandlingMode::kNullAsIndeterminate && !equalsOnly) { VELOX_ASSERT_THROW( ContainerRowSerde::compareWithNulls( - *leftStream, *rightStream, type.get(), compareFlags), + leftStream, rightStream, type.get(), compareFlags), "Ordering nulls is not supported"); } else { ASSERT_EQ( expected.at(i), ContainerRowSerde::compareWithNulls( - *leftStream, *rightStream, type.get(), compareFlags)); + leftStream, rightStream, type.get(), compareFlags)); } } } @@ -181,37 +179,34 @@ class ContainerRowSerdeTest : public testing::Test, for (auto i = 0; i < positionsActual.size(); ++i) { // Test comparing reading from a ByteInputStream and a DecodedVector. - auto actualStream = - HashStringAllocator::prepareRead(positionsActual.at(i).header); + HashStringAllocator::InputStream actualStream( + positionsActual.at(i).header); ASSERT_EQ( 0, ContainerRowSerde::compare( - *actualStream, decodedVector, i, compareFlags)) + actualStream, decodedVector, i, compareFlags)) << "at " << i << ": " << actual->toString(i) << " " << expected->toString(i); // Test comparing reading from two ByteInputStreams. actualStream = - HashStringAllocator::prepareRead(positionsActual.at(i).header); - auto expectedStream = - HashStringAllocator::prepareRead(positionsExpected.at(i).header); + HashStringAllocator::InputStream(positionsActual.at(i).header); + HashStringAllocator::InputStream expectedStream( + positionsExpected.at(i).header); ASSERT_EQ( 0, ContainerRowSerde::compare( - *actualStream, - *expectedStream, - actual->type().get(), - compareFlags)) + actualStream, expectedStream, actual->type().get(), compareFlags)) << "at " << i << ": " << actual->toString(i) << " " << expected->toString(i); // Test comparing hashes. actualStream = - HashStringAllocator::prepareRead(positionsActual.at(i).header); + HashStringAllocator::InputStream(positionsActual.at(i).header); ASSERT_EQ( expected->hashValueAt(i), - ContainerRowSerde::hash(*actualStream, actual->type().get())) + ContainerRowSerde::hash(actualStream, actual->type().get())) << "at " << i << ": " << actual->toString(i) << " " << expected->toString(i); } diff --git a/velox/functions/lib/aggregates/SingleValueAccumulator.cpp b/velox/functions/lib/aggregates/SingleValueAccumulator.cpp index 2e47cbf382b3..f2615af98ef8 100644 --- a/velox/functions/lib/aggregates/SingleValueAccumulator.cpp +++ b/velox/functions/lib/aggregates/SingleValueAccumulator.cpp @@ -41,8 +41,8 @@ void SingleValueAccumulator::read(const VectorPtr& vector, vector_size_t index) const { VELOX_CHECK_NOT_NULL(start_.header); - auto stream = HashStringAllocator::prepareRead(start_.header); - exec::ContainerRowSerde::deserialize(*stream, index, vector.get()); + HashStringAllocator::InputStream stream(start_.header); + exec::ContainerRowSerde::deserialize(stream, index, vector.get()); } bool SingleValueAccumulator::hasValue() const { @@ -55,9 +55,9 @@ std::optional SingleValueAccumulator::compare( CompareFlags compareFlags) const { VELOX_CHECK_NOT_NULL(start_.header); - auto stream = HashStringAllocator::prepareRead(start_.header); + HashStringAllocator::InputStream stream(start_.header); return exec::ContainerRowSerde::compareWithNulls( - *stream, decoded, index, compareFlags); + stream, decoded, index, compareFlags); } void SingleValueAccumulator::destroy(HashStringAllocator* allocator) { diff --git a/velox/functions/lib/aggregates/ValueList.cpp b/velox/functions/lib/aggregates/ValueList.cpp index c3e7e97a421a..2d602db44c74 100644 --- a/velox/functions/lib/aggregates/ValueList.cpp +++ b/velox/functions/lib/aggregates/ValueList.cpp @@ -106,20 +106,20 @@ ValueListReader::ValueListReader(ValueList& values) : size_{values.size()}, lastNullsStart_{size_ % 64 == 0 ? size_ - 64 : size_ - size_ % 64}, lastNulls_{values.lastNulls()}, - dataStream_{HashStringAllocator::prepareRead(values.dataBegin())}, - nullsStream_{HashStringAllocator::prepareRead(values.nullsBegin())} {} + dataStream_{values.dataBegin()}, + nullsStream_{values.nullsBegin()} {} bool ValueListReader::next(BaseVector& output, vector_size_t outputIndex) { if (pos_ == lastNullsStart_) { nulls_ = lastNulls_; } else if (pos_ % 64 == 0) { - nulls_ = nullsStream_->read(); + nulls_ = nullsStream_.read(); } if (nulls_ & (1UL << (pos_ % 64))) { output.setNull(outputIndex, true); } else { - exec::ContainerRowSerde::deserialize(*dataStream_, outputIndex, &output); + exec::ContainerRowSerde::deserialize(dataStream_, outputIndex, &output); } pos_++; diff --git a/velox/functions/lib/aggregates/ValueList.h b/velox/functions/lib/aggregates/ValueList.h index 74aa4731e921..afd9e0218de4 100644 --- a/velox/functions/lib/aggregates/ValueList.h +++ b/velox/functions/lib/aggregates/ValueList.h @@ -127,8 +127,8 @@ class ValueListReader { const vector_size_t size_; const vector_size_t lastNullsStart_; const uint64_t lastNulls_; - std::unique_ptr dataStream_; - std::unique_ptr nullsStream_; + HashStringAllocator::InputStream dataStream_; + HashStringAllocator::InputStream nullsStream_; uint64_t nulls_; vector_size_t pos_{0}; }; diff --git a/velox/functions/lib/aggregates/ValueSet.cpp b/velox/functions/lib/aggregates/ValueSet.cpp index 24450a086866..ff06f0da35dc 100644 --- a/velox/functions/lib/aggregates/ValueSet.cpp +++ b/velox/functions/lib/aggregates/ValueSet.cpp @@ -54,8 +54,8 @@ void ValueSet::read( const HashStringAllocator::Header* header) const { VELOX_CHECK_NOT_NULL(header); - auto stream = HashStringAllocator::prepareRead(header); - exec::ContainerRowSerde::deserialize(*stream, index, vector); + HashStringAllocator::InputStream stream(header); + exec::ContainerRowSerde::deserialize(stream, index, vector); } void ValueSet::free(HashStringAllocator::Header* header) const { diff --git a/velox/row/benchmarks/UnsafeRowSerializeBenchmark.cpp b/velox/row/benchmarks/UnsafeRowSerializeBenchmark.cpp index 4bc4027798c2..508cc466b739 100644 --- a/velox/row/benchmarks/UnsafeRowSerializeBenchmark.cpp +++ b/velox/row/benchmarks/UnsafeRowSerializeBenchmark.cpp @@ -110,9 +110,9 @@ class SerializeBenchmark { auto copy = BaseVector::create(rowType, data->size(), pool()); - auto in = HashStringAllocator::prepareRead(position.header); + HashStringAllocator::InputStream in(position.header); for (auto i = 0; i < data->size(); ++i) { - exec::ContainerRowSerde::deserialize(*in, i, copy.get()); + exec::ContainerRowSerde::deserialize(in, i, copy.get()); } VELOX_CHECK_EQ(copy->size(), data->size());