Skip to content

Commit

Permalink
Schema evolution support for reader value hooks (facebookincubator#10755
Browse files Browse the repository at this point in the history
)

Summary:
X-link: facebookincubator/nimble#72

Pull Request resolved: facebookincubator#10755

Currently reader value hook is not considering schema evolution at all, this change fix that.

Reviewed By: kevinwilfong

Differential Revision: D61229494

fbshipit-source-id: 729bb90611fb3164282b524376eda20985a30194
  • Loading branch information
Yuhta authored and weiting-chen committed Nov 23, 2024
1 parent e3be552 commit ba46049
Show file tree
Hide file tree
Showing 18 changed files with 412 additions and 245 deletions.
19 changes: 9 additions & 10 deletions velox/dwio/common/ColumnVisitors.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class ExtractToHook {

template <typename V>
void addValue(vector_size_t rowIndex, V value) {
hook_.addValue(rowIndex, &value);
hook_.addValueTyped(rowIndex, value);
}

auto& hook() {
Expand Down Expand Up @@ -126,7 +126,7 @@ class ExtractToGenericHook {

template <typename V>
void addValue(vector_size_t rowIndex, V value) {
hook_->addValue(rowIndex, &value);
hook_->addValueTyped(rowIndex, value);
}

ValueHook& hook() {
Expand Down Expand Up @@ -819,8 +819,7 @@ class DictionaryColumnVisitor
: velox::iota(super::numRows_, super::innerNonNullRows()) +
super::rowIndex_,
values,
numInput,
sizeof(T));
numInput);
super::rowIndex_ += numInput;
return;
}
Expand Down Expand Up @@ -1389,14 +1388,14 @@ class ExtractStringDictionaryToGenericHook {
// according to the index. Stride dictionary indices are offset up
// by the stripe dict size.
if (value < dictionarySize()) {
auto view = folly::StringPiece(
reinterpret_cast<const StringView*>(state_.dictionary.values)[value]);
hook_->addValue(rowIndex, &view);
auto* strings =
reinterpret_cast<const StringView*>(state_.dictionary.values);
hook_->addValue(rowIndex, strings[value]);
} else {
VELOX_DCHECK(state_.inDictionary);
auto view = folly::StringPiece(reinterpret_cast<const StringView*>(
state_.dictionary2.values)[value - dictionarySize()]);
hook_->addValue(rowIndex, &view);
auto* strings =
reinterpret_cast<const StringView*>(state_.dictionary2.values);
hook_->addValue(rowIndex, strings[value - dictionarySize()]);
}
}

Expand Down
15 changes: 9 additions & 6 deletions velox/dwio/common/DecoderUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ void fixedWidthScan(
[&](T value, int32_t rowIndex) {
if (!hasFilter) {
if (hasHook) {
hook.addValue(scatterRows[rowIndex], &value);
hook.addValueTyped(scatterRows[rowIndex], value);
} else {
auto targetRow = scatter ? scatterRows[rowIndex] : rowIndex;
rawValues[targetRow] = value;
Expand Down Expand Up @@ -214,8 +214,7 @@ void fixedWidthScan(
hook.addValues(
scatterRows + rowIndex,
buffer + firstRow - rowOffset,
kStep,
sizeof(T));
kStep);
} else {
if (scatter) {
scatterDense(
Expand Down Expand Up @@ -266,7 +265,9 @@ void fixedWidthScan(
if (!hasFilter) {
if (hasHook) {
hook.addValues(
scatterRows + rowIndex, &values, kWidth, sizeof(T));
scatterRows + rowIndex,
reinterpret_cast<T*>(&values),
kWidth);
} else {
if (scatter) {
scatterDense<T>(
Expand Down Expand Up @@ -322,7 +323,9 @@ void fixedWidthScan(
if (!hasFilter) {
if (hasHook) {
hook.addValues(
scatterRows + rowIndex, &values, width, sizeof(T));
scatterRows + rowIndex,
reinterpret_cast<T*>(&values),
width);
} else {
if (scatter) {
scatterDense<T>(
Expand Down Expand Up @@ -473,7 +476,7 @@ void processFixedWidthRun(
constexpr bool hasHook = !std::is_same_v<THook, NoHook>;
if (!hasFilter) {
if (hasHook) {
hook.addValues(scatterRows + rowIndex, values, rows.size(), sizeof(T));
hook.addValues(scatterRows + rowIndex, values, rows.size());
} else if (scatter) {
scatterNonNulls(rowIndex, numInput, numValues, scatterRows, values);
numValues = scatterRows[rowIndex + numInput - 1] + 1;
Expand Down
10 changes: 3 additions & 7 deletions velox/dwio/common/SelectiveByteRleColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,17 +149,13 @@ void SelectiveByteRleColumnReader::processValueHook(
ValueHook* hook) {
using namespace facebook::velox::aggregate;
switch (hook->kind()) {
case aggregate::AggregationHook::kSumBigintToBigint:
case aggregate::AggregationHook::kBigintSum:
readHelper<Reader, velox::common::AlwaysTrue, isDense>(
&dwio::common::alwaysTrue(),
rows,
dwio::common::ExtractToHook<SumHook<int64_t, int64_t>>(hook));
&alwaysTrue(), rows, ExtractToHook<SumHook<int64_t>>(hook));
break;
default:
readHelper<Reader, velox::common::AlwaysTrue, isDense>(
&dwio::common::alwaysTrue(),
rows,
dwio::common::ExtractToGenericHook(hook));
&alwaysTrue(), rows, ExtractToGenericHook(hook));
}
}

Expand Down
13 changes: 11 additions & 2 deletions velox/dwio/common/SelectiveColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -730,10 +730,19 @@ velox::common::AlwaysTrue& alwaysTrue();
} // namespace facebook::velox::dwio::common

namespace facebook::velox::dwio::common {

// Template parameter to indicate no hook in fast scan path. This is
// referenced in decoders, thus needs to be declared in a header.
struct NoHook : public ValueHook {
void addValue(vector_size_t /*row*/, const void* /*value*/) override {}
struct NoHook final : public ValueHook {
void addValue(vector_size_t /*row*/, int64_t /*value*/) final {}

void addValue(vector_size_t /*row*/, int128_t /*value*/) final {}

void addValue(vector_size_t /*row*/, float /*value*/) final {}

void addValue(vector_size_t /*row*/, double /*value*/) final {}

void addValue(vector_size_t /*row*/, folly::StringPiece /*value*/) final {}
};

} // namespace facebook::velox::dwio::common
24 changes: 7 additions & 17 deletions velox/dwio/common/SelectiveFloatingPointColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class SelectiveFloatingPointColumnReader : public SelectiveColumnReader {
readCommon(vector_size_t offset, RowSet rows, const uint64_t* incomingNulls);

void getValues(RowSet rows, VectorPtr* result) override {
getFlatValues<TRequested, TRequested>(rows, result, requestedType_);
getFlatValues<TData, TRequested>(rows, result, requestedType_);
}

protected:
Expand Down Expand Up @@ -83,7 +83,7 @@ void SelectiveFloatingPointColumnReader<TData, TRequested>::readHelper(
ExtractValues extractValues) {
reinterpret_cast<Reader*>(this)->readWithVisitor(
rows,
ColumnVisitor<TRequested, TFilter, ExtractValues, isDense>(
ColumnVisitor<TData, TFilter, ExtractValues, isDense>(
*reinterpret_cast<TFilter*>(filter), this, rows, extractValues));
}

Expand Down Expand Up @@ -145,27 +145,17 @@ void SelectiveFloatingPointColumnReader<TData, TRequested>::processValueHook(
RowSet rows,
ValueHook* hook) {
switch (hook->kind()) {
case aggregate::AggregationHook::kSumFloatToDouble:
case aggregate::AggregationHook::kDoubleSum:
readHelper<Reader, velox::common::AlwaysTrue, isDense>(
&alwaysTrue(),
rows,
ExtractToHook<aggregate::SumHook<float, double>>(hook));
break;
case aggregate::AggregationHook::kSumDoubleToDouble:
readHelper<Reader, velox::common::AlwaysTrue, isDense>(
&alwaysTrue(),
rows,
ExtractToHook<aggregate::SumHook<double, double>>(hook));
&alwaysTrue(), rows, ExtractToHook<aggregate::SumHook<double>>(hook));
break;
case aggregate::AggregationHook::kFloatMax:
case aggregate::AggregationHook::kDoubleMax:
case aggregate::AggregationHook::kFloatingPointMax:
readHelper<Reader, velox::common::AlwaysTrue, isDense>(
&alwaysTrue(),
rows,
ExtractToHook<aggregate::MinMaxHook<TRequested, false>>(hook));
break;
case aggregate::AggregationHook::kFloatMin:
case aggregate::AggregationHook::kDoubleMin:
case aggregate::AggregationHook::kFloatingPointMin:
readHelper<Reader, velox::common::AlwaysTrue, isDense>(
&alwaysTrue(),
rows,
Expand All @@ -183,7 +173,7 @@ void SelectiveFloatingPointColumnReader<TData, TRequested>::readCommon(
vector_size_t offset,
RowSet rows,
const uint64_t* incomingNulls) {
prepareRead<TRequested>(offset, rows, incomingNulls);
prepareRead<TData>(offset, rows, incomingNulls);
bool isDense = rows.back() == rows.size() - 1;
if (scanSpec_->keepValues()) {
if (scanSpec_->valueHook()) {
Expand Down
10 changes: 5 additions & 5 deletions velox/dwio/common/SelectiveIntegerColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class SelectiveIntegerColumnReader : public SelectiveColumnReader {

// Switches based on the type of ValueHook between different readWithVisitor
// instantiations.
template <typename Reader, bool isDence>
template <typename Reader, bool isDense>
void processValueHook(RowSet rows, ValueHook* hook);

// Instantiates a Visitor based on type, isDense, value processing.
Expand Down Expand Up @@ -196,17 +196,17 @@ void SelectiveIntegerColumnReader::processValueHook(
RowSet rows,
ValueHook* hook) {
switch (hook->kind()) {
case aggregate::AggregationHook::kSumBigintToBigint:
case aggregate::AggregationHook::kBigintSum:
readHelper<Reader, velox::common::AlwaysTrue, isDense>(
&alwaysTrue(),
rows,
ExtractToHook<aggregate::SumHook<int64_t, int64_t, false>>(hook));
ExtractToHook<aggregate::SumHook<int64_t, false>>(hook));
break;
case aggregate::AggregationHook::kSumBigintToBigintOverflow:
case aggregate::AggregationHook::kBigintSumOverflow:
readHelper<Reader, velox::common::AlwaysTrue, isDense>(
&alwaysTrue(),
rows,
ExtractToHook<aggregate::SumHook<int64_t, int64_t, true>>(hook));
ExtractToHook<aggregate::SumHook<int64_t, true>>(hook));
break;
case aggregate::AggregationHook::kBigintMax:
readHelper<Reader, velox::common::AlwaysTrue, isDense>(
Expand Down
5 changes: 2 additions & 3 deletions velox/dwio/common/tests/DecoderUtilTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,8 @@ namespace facebook::velox::dwio::common {
struct NoHook {
void addValues(
const int32_t* /*rows*/,
const void* /*values*/,
int32_t /*size*/,
uint8_t /*valueWidth*/) {}
const int32_t* /*values*/,
int32_t /*size*/) {}
};

} // namespace facebook::velox::dwio::common
Expand Down
40 changes: 30 additions & 10 deletions velox/dwio/common/tests/utils/E2EFilterTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,42 @@ class TestingHook : public ValueHook {
public:
explicit TestingHook(FlatVector<T>* result) : result_(result) {}

void addValue(vector_size_t row, const void* value) override {
result_->set(row, *reinterpret_cast<const T*>(value));
void addValue(vector_size_t row, int64_t value) override {
if constexpr (std::is_integral_v<T>) {
result_->set(row, value);
} else {
VELOX_FAIL();
}
}

void addValue(vector_size_t row, float value) override {
if constexpr (std::is_same_v<T, float>) {
result_->set(row, value);
} else {
VELOX_FAIL();
}
}

void addValue(vector_size_t row, double value) override {
if constexpr (std::is_same_v<T, double>) {
result_->set(row, value);
} else {
VELOX_FAIL();
}
}

void addValue(vector_size_t row, folly::StringPiece value) override {
if constexpr (std::is_same_v<T, StringView>) {
result_->set(row, StringView(value));
} else {
VELOX_FAIL();
}
}

private:
FlatVector<T>* result_;
};

template <>
inline void TestingHook<StringView>::addValue(
vector_size_t row,
const void* value) {
result_->set(
row, StringView(*reinterpret_cast<const folly::StringPiece*>(value)));
}

// Utility for checking that a subsequent batch of output does not
// overwrite internals of a possibly retained previous batch.
class OwnershipChecker {
Expand Down
1 change: 1 addition & 0 deletions velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ void SelectiveDecimalColumnReader<DataT>::read(
RowSet rows,
const uint64_t* incomingNulls) {
VELOX_CHECK(!scanSpec_->filter());
VELOX_CHECK(!scanSpec_->valueHook());
prepareRead<int64_t>(offset, rows, incomingNulls);
bool isDense = rows.back() == rows.size() - 1;
if (isDense) {
Expand Down
Loading

0 comments on commit ba46049

Please sign in to comment.