Skip to content

Commit

Permalink
Support parquet read case sensitive mode (oap-project#126)
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchengchenghh authored and zhejiangxiaomai committed Mar 27, 2023
1 parent ed04007 commit 0956c18
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 24 deletions.
4 changes: 4 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,8 @@ uint32_t HiveConfig::maxPartitionsPerWriters(const Config* config) {
return config->get<uint32_t>(kMaxPartitionsPerWriters, 100);
}

bool HiveConfig::isCaseSensitive(const Config* config) {
return config->get<bool>(kCaseSensitive, true);
}

} // namespace facebook::velox::connector::hive
3 changes: 3 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ class HiveConfig {
const Config* config);

static uint32_t maxPartitionsPerWriters(const Config* config);
static constexpr const char* kCaseSensitive = "case_sensitive";

static bool isCaseSensitive(const Config* config);
};

} // namespace facebook::velox::connector::hive
14 changes: 8 additions & 6 deletions velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ HiveDataSource::HiveDataSource(
ExpressionEvaluator* expressionEvaluator,
memory::MemoryAllocator* allocator,
const std::string& scanId,
bool caseSensitive,
folly::Executor* executor)
: outputType_(outputType),
fileHandleFactory_(fileHandleFactory),
Expand Down Expand Up @@ -340,6 +341,8 @@ HiveDataSource::HiveDataSource(
readerOutputType_ = ROW(std::move(names), std::move(types));
}

readerOpts_.setCaseSensitive(caseSensitive);

rowReaderOpts_.setScanSpec(scanSpec_);
rowReaderOpts_.setMetadataFilter(metadataFilter_);

Expand Down Expand Up @@ -401,7 +404,8 @@ template <TypeKind ToKind>
velox::variant convertFromString(const std::optional<std::string>& value) {
if (value.has_value()) {
// No need for casting if ToKind is VARCHAR or VARBINARY.
if constexpr (ToKind == TypeKind::VARCHAR || ToKind == TypeKind::VARBINARY) {
if constexpr (
ToKind == TypeKind::VARCHAR || ToKind == TypeKind::VARBINARY) {
return velox::variant(value.value());
}
bool nullOutput = false;
Expand Down Expand Up @@ -538,10 +542,7 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
scanSpec_->resetCachedValues();

// Check filters and see if the whole split can be skipped
if (!testFilters(
scanSpec_.get(),
reader_.get(),
split_->filePath)) {
if (!testFilters(scanSpec_.get(), reader_.get(), split_->filePath)) {
emptySplit_ = true;
++runtimeStats_.skippedSplits;
runtimeStats_.skippedSplitBytes += split_->length;
Expand All @@ -560,7 +561,8 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
static const RowTypePtr kEmpty{ROW({}, {})};
cs = std::make_shared<dwio::common::ColumnSelector>(kEmpty);
} else {
cs = std::make_shared<dwio::common::ColumnSelector>(fileType, columnNames);
cs = std::make_shared<dwio::common::ColumnSelector>(
fileType, columnNames, nullptr, readerOpts_.isCaseSensitive());
}

rowReader_ = reader_->createRowReader(
Expand Down
3 changes: 3 additions & 0 deletions velox/connectors/hive/HiveConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "velox/expression/Expr.h"
#include "velox/type/Filter.h"
#include "velox/type/Subfield.h"
#include "velox/connectors/hive/HiveConfig.h"

namespace facebook::velox::connector::hive {

Expand Down Expand Up @@ -135,6 +136,7 @@ class HiveDataSource : public DataSource {
ExpressionEvaluator* FOLLY_NONNULL expressionEvaluator,
memory::MemoryAllocator* FOLLY_NONNULL allocator,
const std::string& scanId,
bool caseSensitive,
folly::Executor* FOLLY_NULLABLE executor);

void addSplit(std::shared_ptr<ConnectorSplit> split) override;
Expand Down Expand Up @@ -259,6 +261,7 @@ class HiveConnector final : public Connector {
connectorQueryCtx->expressionEvaluator(),
connectorQueryCtx->allocator(),
connectorQueryCtx->scanId(),
HiveConfig::isCaseSensitive(connectorQueryCtx->config()),
executor_);
}

Expand Down
52 changes: 42 additions & 10 deletions velox/dwio/common/ColumnSelector.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,21 @@ class ColumnSelector {
*/
explicit ColumnSelector(
const std::shared_ptr<const velox::RowType>& schema,
const MetricsLogPtr& log = nullptr)
: ColumnSelector(schema, schema, log) {}
const MetricsLogPtr& log = nullptr,
const bool caseSensitive = true)
: ColumnSelector(schema, schema, log, caseSensitive) {}

explicit ColumnSelector(
const std::shared_ptr<const velox::RowType>& schema,
const std::shared_ptr<const velox::RowType>& contentSchema,
MetricsLogPtr log = nullptr)
MetricsLogPtr log = nullptr,
const bool caseSensitive = true)
: log_{std::move(log)}, schema_{schema}, state_{ReadState::kAll} {
buildNodes(schema, contentSchema);

// no filter, read everything
setReadAll();
checkSelectColDuplicate(caseSensitive);
}

/**
Expand All @@ -77,18 +80,21 @@ class ColumnSelector {
explicit ColumnSelector(
const std::shared_ptr<const velox::RowType>& schema,
const std::vector<std::string>& names,
const MetricsLogPtr& log = nullptr)
: ColumnSelector(schema, schema, names, log) {}
const MetricsLogPtr& log = nullptr,
const bool caseSensitive = true)
: ColumnSelector(schema, schema, names, log, caseSensitive) {}

explicit ColumnSelector(
const std::shared_ptr<const velox::RowType>& schema,
const std::shared_ptr<const velox::RowType>& contentSchema,
const std::vector<std::string>& names,
MetricsLogPtr log = nullptr)
MetricsLogPtr log = nullptr,
const bool caseSensitive = true)
: log_{std::move(log)},
schema_{schema},
state_{names.empty() ? ReadState::kAll : ReadState::kPartial} {
acceptFilter(schema, contentSchema, names);
acceptFilter(schema, contentSchema, names, false);
checkSelectColDuplicate(caseSensitive);
}

/**
Expand All @@ -98,19 +104,23 @@ class ColumnSelector {
const std::shared_ptr<const velox::RowType>& schema,
const std::vector<uint64_t>& ids,
const bool filterByNodes = false,
const MetricsLogPtr& log = nullptr)
: ColumnSelector(schema, schema, ids, filterByNodes, log) {}
const MetricsLogPtr& log = nullptr,
const bool caseSensitive = true)
: ColumnSelector(schema, schema, ids, filterByNodes, log, caseSensitive) {
}

explicit ColumnSelector(
const std::shared_ptr<const velox::RowType>& schema,
const std::shared_ptr<const velox::RowType>& contentSchema,
const std::vector<uint64_t>& ids,
const bool filterByNodes = false,
MetricsLogPtr log = nullptr)
MetricsLogPtr log = nullptr,
const bool caseSensitive = true)
: log_{std::move(log)},
schema_{schema},
state_{ids.empty() ? ReadState::kAll : ReadState::kPartial} {
acceptFilter(schema, contentSchema, ids, filterByNodes);
checkSelectColDuplicate(caseSensitive);
}

// set a specific node to read state
Expand Down Expand Up @@ -301,6 +311,28 @@ class ColumnSelector {
// get node ID list to be read
std::vector<uint64_t> getNodeFilter() const;

void checkSelectColDuplicate(bool caseSensitive) {
if (caseSensitive) {
return;
}
std::unordered_map<std::string, int> names;
for (auto node : nodes_) {
auto name = node->getNode().name;
if (names.find(name) == names.end()) {
names[name] = 1;
} else {
names[name] = names[name] + 1;
}
for (auto filter : filter_) {
if (names[filter.name] > 1) {
VELOX_USER_FAIL(
"Found duplicate field(s) {} in case-insensitive mode",
filter.name);
}
}
}
}

// accept filter
template <typename T>
void acceptFilter(
Expand Down
15 changes: 14 additions & 1 deletion velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,10 @@ class ReaderOptions {
int32_t maxCoalesceDistance_{kDefaultCoalesceDistance};
SerDeOptions serDeOptions;
std::shared_ptr<encryption::DecrypterFactory> decrypterFactory_;

uint64_t directorySizeGuess{kDefaultDirectorySizeGuess};
uint64_t filePreloadThreshold{kDefaultFilePreloadThreshold};
bool caseSensitive;

public:
static constexpr int32_t kDefaultLoadQuantum = 8 << 20; // 8MB
Expand All @@ -362,7 +364,8 @@ class ReaderOptions {
fileFormat(FileFormat::UNKNOWN),
fileSchema(nullptr),
autoPreloadLength(DEFAULT_AUTO_PRELOAD_SIZE),
prefetchMode(PrefetchMode::PREFETCH) {
prefetchMode(PrefetchMode::PREFETCH),
caseSensitive(true) {
// PASS
}

Expand Down Expand Up @@ -484,6 +487,12 @@ class ReaderOptions {
return *this;
}

ReaderOptions& setCaseSensitive(bool caseSensitiveMode) {
caseSensitive = caseSensitiveMode;

return *this;
}

/**
* Get the desired tail location.
* @return if not set, return the maximum long.
Expand Down Expand Up @@ -549,6 +558,10 @@ class ReaderOptions {
uint64_t getFilePreloadThreshold() const {
return filePreloadThreshold;
}

const bool isCaseSensitive() const {
return caseSensitive;
}
};

} // namespace common
Expand Down
20 changes: 20 additions & 0 deletions velox/dwio/common/tests/TestColumnSelector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "velox/dwio/common/ColumnSelector.h"
#include "velox/dwio/type/fbhive/HiveTypeParser.h"
#include "velox/type/Type.h"
#include "velox/common/base/VeloxException.h"

using namespace facebook::velox::dwio::common;
using facebook::velox::RowType;
Expand Down Expand Up @@ -630,3 +631,22 @@ TEST(TestColumnSelector, testNonexistingColFilters) {
std::vector<std::string>{"id", "values", "notexists#[10,20,30,40]"}),
std::runtime_error);
}

TEST(TestColumnSelector, testCaseInsensitiveDuplicateColFilters) {
const auto schema = std::dynamic_pointer_cast<const RowType>(
HiveTypeParser().parse("struct<"
"id:bigint"
"id:bigint"
"values:array<float>"
"tags:map<int, string>"
"notes:struct<f1:int, f2:double, f3:string>"
"memo:string"
"extra:string>"));

EXPECT_THROW(
ColumnSelector cs(
schema,
std::vector<std::string>{"id"}, nullptr, false),
facebook::velox::VeloxException);
}

18 changes: 12 additions & 6 deletions velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include "velox/dwio/parquet/reader/StructColumnReader.h"
#include "velox/dwio/parquet/thrift/ThriftTransport.h"

#include <folly/String.h>

namespace facebook::velox::parquet {

ReaderBase::ReaderBase(
Expand Down Expand Up @@ -113,7 +115,7 @@ void ReaderBase::initializeSchema() {
uint32_t maxSchemaElementIdx = fileMetaData_->schema.size() - 1;
schemaWithId_ = getParquetColumnInfo(
maxSchemaElementIdx, maxRepeat, maxDefine, schemaIdx, columnIdx);
schema_ = createRowType(schemaWithId_->getChildren());
schema_ = createRowType(schemaWithId_->getChildren(), isCaseSensitive());
}

std::shared_ptr<const ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
Expand Down Expand Up @@ -232,7 +234,7 @@ std::shared_ptr<const ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
// Row type
auto childrenCopy = children;
return std::make_shared<const ParquetTypeWithId>(
createRowType(children),
createRowType(children, isCaseSensitive()),
std::move(childrenCopy),
curSchemaIdx,
maxSchemaElementIdx,
Expand Down Expand Up @@ -430,13 +432,17 @@ TypePtr ReaderBase::convertType(
}

std::shared_ptr<const RowType> ReaderBase::createRowType(
std::vector<std::shared_ptr<const ParquetTypeWithId::TypeWithId>>
children) {
std::vector<std::shared_ptr<const ParquetTypeWithId::TypeWithId>> children,
bool caseSensitive) {
std::vector<std::string> childNames;
std::vector<TypePtr> childTypes;
for (auto& child : children) {
childNames.push_back(
std::static_pointer_cast<const ParquetTypeWithId>(child)->name_);
auto childName =
std::static_pointer_cast<const ParquetTypeWithId>(child)->name_;
if (!caseSensitive) {
folly::toLowerAscii(childName);
}
childNames.push_back(childName);
childTypes.push_back(child->type);
}
return TypeFactory<TypeKind::ROW>::create(
Expand Down
6 changes: 5 additions & 1 deletion velox/dwio/parquet/reader/ParquetReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ class ReaderBase {
return schemaWithId_;
}

const bool isCaseSensitive() const {
return options_.isCaseSensitive();
}

/// Ensures that streams are enqueued and loading for the row group at
/// 'currentGroup'. May start loading one or more subsequent groups.
void scheduleRowGroups(
Expand Down Expand Up @@ -97,7 +101,7 @@ class ReaderBase {

static std::shared_ptr<const RowType> createRowType(
std::vector<std::shared_ptr<const ParquetTypeWithId::TypeWithId>>
children);
children, bool caseSensitive = true);

memory::MemoryPool& pool_;
const uint64_t directorySizeGuess_;
Expand Down
Binary file added velox/dwio/parquet/tests/examples/upper.parquet
Binary file not shown.
20 changes: 20 additions & 0 deletions velox/dwio/parquet/tests/reader/ParquetReaderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,26 @@ TEST_F(ParquetReaderTest, parseSample) {
EXPECT_EQ(type->childByName("b"), col1);
}

TEST_F(ParquetReaderTest, parseInCaseSensitive) {
// sample.parquet holds three columns (A: BIGINT, b: BIGINT) and
// 2 rows
const std::string sample(getExampleFilePath("upper.parquet"));

ReaderOptions readerOptions{defaultPool.get()};
readerOptions.setCaseSensitive(false);
ParquetReader reader = createReader(sample, readerOptions);
EXPECT_EQ(reader.numberOfRows(), 2ULL);

auto type = reader.typeWithId();
EXPECT_EQ(type->size(), 2ULL);
auto col0 = type->childAt(0);
EXPECT_EQ(col0->type->kind(), TypeKind::BIGINT);
auto col1 = type->childAt(1);
EXPECT_EQ(col1->type->kind(), TypeKind::BIGINT);
EXPECT_EQ(type->childByName("a"), col0);
EXPECT_EQ(type->childByName("b"), col1);
}

TEST_F(ParquetReaderTest, parseEmpty) {
// empty.parquet holds two columns (a: BIGINT, b: DOUBLE) and
// 0 rows.
Expand Down

0 comments on commit 0956c18

Please sign in to comment.