Skip to content

Commit

Permalink
Refactor parquet tests (#6921)
Browse files Browse the repository at this point in the history
Summary:
Merged the ParquetWriterTestBase and ParquetReaderTestBase into ParquetTestBase for code re-usability.

Pull Request resolved: #6921

Reviewed By: amitkdutta

Differential Revision: D50720884

Pulled By: pedroerp

fbshipit-source-id: 0ee9d8ccec92b2e221db8f7978697a465c6b57dd
  • Loading branch information
pdabre12 authored and facebook-github-bot committed Oct 28, 2023
1 parent b41f040 commit 40727a1
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 306 deletions.
2 changes: 2 additions & 0 deletions velox/dwio/parquet/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
set(TEST_LINK_LIBS
velox_dwio_common_test_utils
velox_vector_test_lib
velox_exec_test_lib
velox_temp_path
gtest
gtest_main
gmock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,28 @@

#pragma once

#include "velox/dwio/common/tests/utils/DataFiles.h"
#include "velox/vector/tests/utils/VectorMaker.h"

#include <gtest/gtest.h>
#include <string>
#include "velox/common/base/Fs.h"
#include "velox/dwio/common/FileSink.h"
#include "velox/dwio/common/Reader.h"
#include "velox/dwio/common/tests/utils/DataFiles.h"
#include "velox/dwio/parquet/reader/PageReader.h"
#include "velox/dwio/parquet/reader/ParquetReader.h"
#include "velox/dwio/parquet/writer/Writer.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/vector/fuzzer/VectorFuzzer.h"
#include "velox/vector/tests/utils/VectorTestBase.h"

namespace facebook::velox::dwio::parquet {
namespace facebook::velox::parquet {

class ParquetReaderTestBase : public testing::Test {
class ParquetTestBase : public testing::Test, public test::VectorTestBase {
protected:
dwio::common::RowReaderOptions getReaderOpts(
const RowTypePtr& rowType,
bool fileColumnNamesReadAsLowerCase = false) {
dwio::common::RowReaderOptions rowReaderOpts;
rowReaderOpts.select(
std::make_shared<facebook::velox::dwio::common::ColumnSelector>(
rowType,
rowType->names(),
nullptr,
fileColumnNamesReadAsLowerCase));

return rowReaderOpts;
void SetUp() override {
dwio::common::LocalFileSink::registerFactory();
rootPool_ = memory::defaultMemoryManager().addRootPool("ParquetTests");
leafPool_ = rootPool_->addLeafChild("ParquetTests");
tempPath_ = exec::test::TempDirectoryPath::create();
}

static RowTypePtr sampleSchema() {
Expand All @@ -55,17 +56,39 @@ class ParquetReaderTestBase : public testing::Test {
return ROW({"a", "b"}, {BIGINT(), BIGINT()});
}

template <typename T>
VectorPtr rangeVector(size_t size, T start) {
std::vector<T> vals(size);
for (size_t i = 0; i < size; ++i) {
vals[i] = start + static_cast<T>(i);
}
return vectorMaker_->flatVector(vals);
std::unique_ptr<facebook::velox::parquet::ParquetReader> createReader(
const std::string& path,
const dwio::common::ReaderOptions& opts) {
auto input = std::make_unique<dwio::common::BufferedInput>(
std::make_shared<LocalReadFile>(path), opts.getMemoryPool());
return std::make_unique<facebook::velox::parquet::ParquetReader>(
std::move(input), opts);
}

dwio::common::RowReaderOptions getReaderOpts(
const RowTypePtr& rowType,
bool fileColumnNamesReadAsLowerCase = false) {
dwio::common::RowReaderOptions rowReaderOpts;
rowReaderOpts.select(
std::make_shared<facebook::velox::dwio::common::ColumnSelector>(
rowType,
rowType->names(),
nullptr,
fileColumnNamesReadAsLowerCase));

return rowReaderOpts;
}

// Check that actual vector is equal to a part of expected vector
// at a specified offset.
std::shared_ptr<velox::common::ScanSpec> makeScanSpec(
const RowTypePtr& rowType) {
auto scanSpec = std::make_shared<velox::common::ScanSpec>("");
scanSpec->addAllChildFields(*rowType);
return scanSpec;
}

using FilterMap =
std::unordered_map<std::string, std::unique_ptr<velox::common::Filter>>;

void assertEqualVectorPart(
const VectorPtr& expected,
const VectorPtr& actual,
Expand All @@ -87,7 +110,6 @@ class ParquetReaderTestBase : public testing::Test {
memory::MemoryPool& memoryPool) {
uint64_t total = 0;
VectorPtr result = BaseVector::create(outputType, 0, &memoryPool);

while (total < expected->size()) {
auto part = reader.next(1000, result);
if (part > 0) {
Expand All @@ -101,16 +123,6 @@ class ParquetReaderTestBase : public testing::Test {
EXPECT_EQ(reader.next(1000, result), 0);
}

std::shared_ptr<velox::common::ScanSpec> makeScanSpec(
const RowTypePtr& rowType) {
auto scanSpec = std::make_shared<velox::common::ScanSpec>("");
scanSpec->addAllChildFields(*rowType);
return scanSpec;
}

using FilterMap =
std::unordered_map<std::string, std::unique_ptr<velox::common::Filter>>;

void assertReadWithReaderAndFilters(
const std::unique_ptr<dwio::common::Reader> reader,
const std::string& /* fileName */,
Expand All @@ -126,16 +138,56 @@ class ParquetReaderTestBase : public testing::Test {
auto rowReaderOpts = getReaderOpts(fileSchema);
rowReaderOpts.setScanSpec(scanSpec);
auto rowReader = reader->createRowReader(rowReaderOpts);
assertReadExpected(fileSchema, *rowReader, expected, *pool_);
assertReadExpected(fileSchema, *rowReader, expected, *leafPool_);
}

std::unique_ptr<dwio::common::FileSink> createSink(
const std::string& filePath) {
auto sink = dwio::common::FileSink::create(
fmt::format("file:{}", filePath), {.pool = rootPool_.get()});
EXPECT_TRUE(sink->isBuffered());
EXPECT_TRUE(fs::exists(filePath));
EXPECT_FALSE(sink->isClosed());
return sink;
}

std::unique_ptr<facebook::velox::parquet::Writer> createWriter(
std::unique_ptr<dwio::common::FileSink> sink,
std::function<
std::unique_ptr<facebook::velox::parquet::DefaultFlushPolicy>()>
flushPolicy,
facebook::velox::common::CompressionKind compressionKind =
facebook::velox::common::CompressionKind_NONE) {
facebook::velox::parquet::WriterOptions options;
options.memoryPool = rootPool_.get();
options.flushPolicyFactory = flushPolicy;
options.compression = compressionKind;
return std::make_unique<facebook::velox::parquet::Writer>(
std::move(sink), options);
}

std::vector<RowVectorPtr> createBatches(
const RowTypePtr& rowType,
uint64_t numBatches,
uint64_t vectorSize) {
std::vector<RowVectorPtr> batches;
batches.reserve(numBatches);
VectorFuzzer fuzzer({.vectorSize = vectorSize}, leafPool_.get());
for (auto i = 0; i < numBatches; ++i) {
batches.emplace_back(fuzzer.fuzzInputFlatRow(rowType));
}
return batches;
}

std::string getExampleFilePath(const std::string& fileName) {
return test::getDataFilePath(
"velox/dwio/parquet/tests/reader", "../examples/" + fileName);
}

std::shared_ptr<memory::MemoryPool> pool_{memory::addDefaultLeafMemoryPool()};
std::unique_ptr<test::VectorMaker> vectorMaker_{
std::make_unique<test::VectorMaker>(pool_.get())};
static constexpr uint64_t kRowsInRowGroup = 10'000;
static constexpr uint64_t kBytesInRowGroup = 128 * 1'024 * 1'024;
std::shared_ptr<memory::MemoryPool> rootPool_;
std::shared_ptr<memory::MemoryPool> leafPool_;
std::shared_ptr<exec::test::TempDirectoryPath> tempPath_;
};
} // namespace facebook::velox::dwio::parquet
} // namespace facebook::velox::parquet
6 changes: 2 additions & 4 deletions velox/dwio/parquet/tests/reader/ParquetPageReaderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@
*/

#include "velox/dwio/parquet/reader/PageReader.h"
#include "velox/dwio/parquet/reader/ParquetReader.h"
#include "velox/dwio/parquet/tests/ParquetReaderTestBase.h"
#include "velox/dwio/parquet/tests/ParquetTestBase.h"

using namespace facebook::velox;
using namespace facebook::velox::common;
using namespace facebook::velox::dwio::common;
using namespace facebook::velox::dwio::parquet;
using namespace facebook::velox::parquet;

class ParquetPageReaderTest : public ParquetReaderTestBase {};
class ParquetPageReaderTest : public ParquetTestBase {};

namespace {
auto defaultPool = memory::addDefaultLeafMemoryPool();
Expand Down
Loading

0 comments on commit 40727a1

Please sign in to comment.