Skip to content

Commit

Permalink
Folder: dwio
Browse files Browse the repository at this point in the history
relative pr:

add support for reading ORC facebookincubator#229
Parquet: Optimize parquet write perf facebookincubator#238
Expand timestamps in page reader facebookincubator#260
Add processedStrides and processedSplits metrics facebookincubator#264
  • Loading branch information
zhejiangxiaomai committed Jul 4, 2023
1 parent 058f29c commit 7aed2b4
Show file tree
Hide file tree
Showing 46 changed files with 2,819 additions and 195 deletions.
52 changes: 42 additions & 10 deletions velox/dwio/common/ColumnSelector.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,21 @@ class ColumnSelector {
*/
explicit ColumnSelector(
const std::shared_ptr<const velox::RowType>& schema,
const MetricsLogPtr& log = nullptr)
: ColumnSelector(schema, schema, log) {}
const MetricsLogPtr& log = nullptr,
const bool caseSensitive = true)
: ColumnSelector(schema, schema, log, caseSensitive) {}

explicit ColumnSelector(
const std::shared_ptr<const velox::RowType>& schema,
const std::shared_ptr<const velox::RowType>& contentSchema,
MetricsLogPtr log = nullptr)
MetricsLogPtr log = nullptr,
const bool caseSensitive = true)
: log_{std::move(log)}, schema_{schema}, state_{ReadState::kAll} {
buildNodes(schema, contentSchema);

// no filter, read everything
setReadAll();
checkSelectColDuplicate(caseSensitive);
}

/**
Expand All @@ -77,18 +80,21 @@ class ColumnSelector {
explicit ColumnSelector(
const std::shared_ptr<const velox::RowType>& schema,
const std::vector<std::string>& names,
const MetricsLogPtr& log = nullptr)
: ColumnSelector(schema, schema, names, log) {}
const MetricsLogPtr& log = nullptr,
const bool caseSensitive = true)
: ColumnSelector(schema, schema, names, log, caseSensitive) {}

explicit ColumnSelector(
const std::shared_ptr<const velox::RowType>& schema,
const std::shared_ptr<const velox::RowType>& contentSchema,
const std::vector<std::string>& names,
MetricsLogPtr log = nullptr)
MetricsLogPtr log = nullptr,
const bool caseSensitive = true)
: log_{std::move(log)},
schema_{schema},
state_{names.empty() ? ReadState::kAll : ReadState::kPartial} {
acceptFilter(schema, contentSchema, names);
acceptFilter(schema, contentSchema, names, false);
checkSelectColDuplicate(caseSensitive);
}

/**
Expand All @@ -98,19 +104,23 @@ class ColumnSelector {
const std::shared_ptr<const velox::RowType>& schema,
const std::vector<uint64_t>& ids,
const bool filterByNodes = false,
const MetricsLogPtr& log = nullptr)
: ColumnSelector(schema, schema, ids, filterByNodes, log) {}
const MetricsLogPtr& log = nullptr,
const bool caseSensitive = true)
: ColumnSelector(schema, schema, ids, filterByNodes, log, caseSensitive) {
}

explicit ColumnSelector(
const std::shared_ptr<const velox::RowType>& schema,
const std::shared_ptr<const velox::RowType>& contentSchema,
const std::vector<uint64_t>& ids,
const bool filterByNodes = false,
MetricsLogPtr log = nullptr)
MetricsLogPtr log = nullptr,
const bool caseSensitive = true)
: log_{std::move(log)},
schema_{schema},
state_{ids.empty() ? ReadState::kAll : ReadState::kPartial} {
acceptFilter(schema, contentSchema, ids, filterByNodes);
checkSelectColDuplicate(caseSensitive);
}

// set a specific node to read state
Expand Down Expand Up @@ -301,6 +311,28 @@ class ColumnSelector {
// get node ID list to be read
std::vector<uint64_t> getNodeFilter() const;

void checkSelectColDuplicate(bool caseSensitive) {
if (caseSensitive) {
return;
}
std::unordered_map<std::string, int> names;
for (auto node : nodes_) {
auto name = node->getNode().name;
if (names.find(name) == names.end()) {
names[name] = 1;
} else {
names[name] = names[name] + 1;
}
for (auto filter : filter_) {
if (names[filter.name] > 1) {
VELOX_USER_FAIL(
"Found duplicate field(s) {} in case-insensitive mode",
filter.name);
}
}
}
}

// accept filter
template <typename T>
void acceptFilter(
Expand Down
29 changes: 27 additions & 2 deletions velox/dwio/common/ColumnVisitors.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,19 @@ class ColumnVisitor {
SelectiveColumnReader* reader,
const RowSet& rows,
ExtractValues values)
: ColumnVisitor(filter, reader, &rows[0], rows.size(), values) {}

ColumnVisitor(
TFilter& filter,
SelectiveColumnReader* reader,
const vector_size_t* rows,
vector_size_t numRows,
ExtractValues values)
: filter_(filter),
reader_(reader),
allowNulls_(!TFilter::deterministic || filter.testNull()),
rows_(&rows[0]),
numRows_(rows.size()),
rows_(rows),
numRows_(numRows),
rowIndex_(0),
values_(values) {}

Expand Down Expand Up @@ -417,6 +425,10 @@ class ColumnVisitor {
return values_.hook();
}

ExtractValues extractValues() const {
return values_;
}

T* rawValues(int32_t size) {
return reader_->mutableValues<T>(size);
}
Expand Down Expand Up @@ -1386,6 +1398,19 @@ class DirectRleColumnVisitor
rows,
values) {}

DirectRleColumnVisitor(
TFilter& filter,
SelectiveColumnReader* reader,
const vector_size_t* rows,
vector_size_t numRows,
ExtractValues values)
: ColumnVisitor<T, TFilter, ExtractValues, isDense>(
filter,
reader,
rows,
numRows,
values) {}

// Use for replacing all rows with non-null rows for fast path with
// processRun and processRle.
void setRows(folly::Range<const int32_t*> newRows) {
Expand Down
14 changes: 9 additions & 5 deletions velox/dwio/common/DataBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class DataBuffer {
return data()[i];
}

void reserve(uint64_t capacity) {
void reserve(uint64_t capacity, uint32_t growRatio = 1) {
if (capacity <= capacity_) {
// After resetting the buffer, capacity always resets to zero.
DWIO_ENSURE_NOT_NULL(buf_);
Expand All @@ -105,15 +105,15 @@ class DataBuffer {
if (veloxRef_ != nullptr) {
DWIO_RAISE("Can't reserve on a referenced buffer");
}
const auto newSize = sizeInBytes(capacity);
const auto newSize = sizeInBytes(capacity) * growRatio;
if (buf_ == nullptr) {
buf_ = reinterpret_cast<T*>(pool_->allocate(newSize));
} else {
buf_ = reinterpret_cast<T*>(
pool_->reallocate(buf_, sizeInBytes(capacity_), newSize));
}
DWIO_ENSURE(buf_ != nullptr || newSize == 0);
capacity_ = capacity;
capacity_ = capacity * growRatio;
}

void extend(uint64_t size) {
Expand Down Expand Up @@ -141,8 +141,12 @@ class DataBuffer {
append(offset, src.data() + srcOffset, items);
}

void append(uint64_t offset, const T* FOLLY_NONNULL src, uint64_t items) {
reserve(offset + items);
void append(
uint64_t offset,
const T* FOLLY_NONNULL src,
uint64_t items,
uint32_t growRatio = 1) {
reserve(offset + items, growRatio);
unsafeAppend(offset, src, items);
}

Expand Down
136 changes: 136 additions & 0 deletions velox/dwio/common/IntDecoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ class IntDecoder {
uint64_t readVuLong();
int64_t readVsLong();
int64_t readLongLE();
uint128_t readVuInt128();
int128_t readVsInt128();
int128_t readInt128();
template <typename cppType>
cppType readLittleEndianFromBigEndian();
Expand Down Expand Up @@ -302,11 +304,138 @@ FOLLY_ALWAYS_INLINE uint64_t IntDecoder<isSigned>::readVuLong() {
}
}

template <bool isSigned>
FOLLY_ALWAYS_INLINE uint128_t IntDecoder<isSigned>::readVuInt128() {
if (LIKELY(bufferEnd - bufferStart >= Varint::kMaxSize128)) {
const char* p = bufferStart;
uint128_t val;
do {
int128_t b;
b = *p++;
val = (b & 0x7f);
if (UNLIKELY(b >= 0)) {
break;
}
b = *p++;
val |= (b & 0x7f) << 7;
if (UNLIKELY(b >= 0)) {
break;
}
b = *p++;
val |= (b & 0x7f) << 14;
if (UNLIKELY(b >= 0)) {
break;
}
b = *p++;
val |= (b & 0x7f) << 21;
if (UNLIKELY(b >= 0)) {
break;
}
b = *p++;
val |= (b & 0x7f) << 28;
if (UNLIKELY(b >= 0)) {
break;
}
b = *p++;
val |= (b & 0x7f) << 35;
if (UNLIKELY(b >= 0)) {
break;
}
b = *p++;
val |= (b & 0x7f) << 42;
if (UNLIKELY(b >= 0)) {
break;
}
b = *p++;
val |= (b & 0x7f) << 49;
if (UNLIKELY(b >= 0)) {
break;
}
b = *p++;
val |= (b & 0x7f) << 56;
if (UNLIKELY(b >= 0)) {
break;
}
b = *p++;
val |= (b & 0x01) << 63;
if (LIKELY(b >= 0)) {
break;
}
b = *p++;
val |= (b & 0x01) << 71;
if (LIKELY(b >= 0)) {
break;
}
b = *p++;
val |= (b & 0x01) << 79;
if (LIKELY(b >= 0)) {
break;
}
b = *p++;
val |= (b & 0x01) << 87;
if (LIKELY(b >= 0)) {
break;
}
b = *p++;
val |= (b & 0x01) << 95;
if (LIKELY(b >= 0)) {
break;
}
b = *p++;
val |= (b & 0x01) << 103;
if (LIKELY(b >= 0)) {
break;
}
b = *p++;
val |= (b & 0x01) << 111;
if (LIKELY(b >= 0)) {
break;
}
b = *p++;
val |= (b & 0x01) << 119;
if (LIKELY(b >= 0)) {
break;
}
b = *p++;
val |= (b & 0x01) << 127;
if (LIKELY(b >= 0)) {
break;
} else {
DWIO_RAISE(fmt::format(
"Invalid encoding: likely corrupt data. bytes remaining: {} , useVInts: {}, numBytes: {}, Input Stream Name: {}, byte: {}, val: {}",
bufferEnd - bufferStart,
useVInts,
numBytes,
inputStream->getName(),
b,
val));
}
} while (false);
bufferStart = p;
return val;
} else {
int128_t result = 0;
int64_t offset = 0;
signed char ch;
do {
ch = readByte();
result |= (ch & BASE_128_MASK) << offset;
offset += 7;
} while (ch < 0);
return result;
}
}

template <bool isSigned>
FOLLY_ALWAYS_INLINE int64_t IntDecoder<isSigned>::readVsLong() {
return ZigZag::decode(readVuLong());
}

template <bool isSigned>
FOLLY_ALWAYS_INLINE int128_t IntDecoder<isSigned>::readVsInt128() {
return ZigZag::decode(readVuInt128());
}

template <bool isSigned>
inline int64_t IntDecoder<isSigned>::readLongLE() {
int64_t result = 0;
Expand Down Expand Up @@ -415,6 +544,13 @@ inline int64_t IntDecoder<isSigned>::readLong() {

template <bool isSigned>
inline int128_t IntDecoder<isSigned>::readInt128() {
if (useVInts) {
if constexpr (isSigned) {
return readVsInt128();
} else {
return static_cast<int128_t>(readVuInt128());
}
}
if (!bigEndian) {
VELOX_NYI();
}
Expand Down
6 changes: 6 additions & 0 deletions velox/dwio/common/MetadataFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ std::unique_ptr<MetadataFilter::Node> MetadataFilter::Node::fromExpression(
if (call->name() == "not") {
return fromExpression(*call->inputs()[0], evaluator, !negated);
}
if (call->name() == "endswith" || call->name() == "contains" ||
call->name() == "like" || call->name() == "startswith" ||
call->name() == "rlike" || call->name() == "isnotnull" ||
call->name() == "coalesce" || call->name() == "might_contain") {
return nullptr;
}
try {
Subfield subfield;
auto filter =
Expand Down
Loading

0 comments on commit 7aed2b4

Please sign in to comment.