Skip to content

Commit

Permalink
[WIP] Dynamic filters
Browse files Browse the repository at this point in the history
  • Loading branch information
mbasmanova committed Aug 31, 2021
1 parent 03b68a0 commit 13295e2
Show file tree
Hide file tree
Showing 13 changed files with 518 additions and 23 deletions.
11 changes: 11 additions & 0 deletions velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
#include "velox/core/Context.h"
#include "velox/vector/ComplexVector.h"

namespace facebook::velox::common {
class Filter;
}
namespace facebook::velox::core {
class ITypedExpr;
}
Expand Down Expand Up @@ -85,6 +88,14 @@ class DataSource {
// processed.
virtual RowVectorPtr next(uint64_t size) = 0;

// Add dynamically generated filter.
// @param outputChannel index into outputType specified in
// Connector::createDataSource() that identifies the column this filter
// applies to.
virtual void addDynamicFilter(
ChannelIndex outputChannel,
const std::shared_ptr<common::Filter>& filter) = 0;

// Returns the number of input bytes processed so far.
virtual uint64_t getCompletedBytes() = 0;

Expand Down
22 changes: 22 additions & 0 deletions velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,26 @@ bool testFilters(
}
} // namespace

void HiveDataSource::addDynamicFilter(
ChannelIndex outputChannel,
const std::shared_ptr<common::Filter>& filter) {
pendingDynamicFilters_.emplace(outputChannel, filter);
}

void HiveDataSource::addPendingDynamicFilters() {
for (const auto& entry : pendingDynamicFilters_) {
common::Subfield subfield{outputType_->nameOf(entry.first)};
auto fieldSpec = scanSpec_->getOrCreateChild(subfield);
if (fieldSpec->filter()) {
fieldSpec->filter()->mergeWith(entry.second.get());
} else {
fieldSpec->setFilter(entry.second->clone());
}
}
scanSpec_->resetCachedValues();
pendingDynamicFilters_.clear();
};

void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
VELOX_CHECK(
split_ == nullptr,
Expand All @@ -245,6 +265,8 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {

VLOG(1) << "Adding split " << split_->toString();

addPendingDynamicFilters();

fileHandle_ = fileHandleFactory_->generate(split_->filePath);
if (dataCache_) {
auto dataCacheConfig = std::make_shared<dwio::common::DataCacheConfig>();
Expand Down
12 changes: 11 additions & 1 deletion velox/connectors/hive/HiveConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ class HiveDataSource : public DataSource {

void addSplit(std::shared_ptr<ConnectorSplit> split) override;

void addDynamicFilter(
ChannelIndex outputChannel,
const std::shared_ptr<common::Filter>& filter) override;

RowVectorPtr next(uint64_t size) override;

uint64_t getCompletedRows() override {
Expand Down Expand Up @@ -157,12 +161,14 @@ class HiveDataSource : public DataSource {

void setNullConstantValue(common::ScanSpec* spec, const TypePtr& type) const;

void addPendingDynamicFilters();

const std::shared_ptr<const RowType> outputType_;
FileHandleFactory* fileHandleFactory_;
velox::memory::MemoryPool* pool_;
std::vector<std::string> regularColumns_;
std::unique_ptr<dwrf::ColumnReaderFactory> columnReaderFactory_;
std::unique_ptr<common::ScanSpec> scanSpec_ = nullptr;
std::unique_ptr<common::ScanSpec> scanSpec_;
std::shared_ptr<HiveConnectorSplit> split_;
dwio::common::ReaderOptions readerOpts_;
dwio::common::RowReaderOptions rowReaderOpts_;
Expand All @@ -173,6 +179,10 @@ class HiveDataSource : public DataSource {
std::shared_ptr<const RowType> readerOutputType_;
bool emptySplit_;

// Dynamically pushed down filters to be added to scanSpec_ on next split.
std::unordered_map<ChannelIndex, std::shared_ptr<common::Filter>>
pendingDynamicFilters_;

// Number of splits skipped based on statistics.
int64_t skippedSplits_{0};

Expand Down
9 changes: 9 additions & 0 deletions velox/dwio/dwrf/reader/ScanSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,15 @@ class ScanSpec {
// result of runtime adaptation.
bool hasFilter() const;

// Resets cached values after this or children were updated, e.g. a new filter
// was added or existing filter was modified.
void resetCachedValues() const {
hasFilter_.clear();
for (auto& child : children_) {
child->resetCachedValues();
}
}

void setEnableFilterReorder(bool enableFilterReorder) {
enableFilterReorder_ = enableFilterReorder;
}
Expand Down
30 changes: 15 additions & 15 deletions velox/dwio/dwrf/reader/SelectiveColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ void SelectiveColumnReader::seekTo(vector_size_t offset, bool readsNullsOnly) {
}
readOffset_ = offset;
} else {
VELOX_CHECK(false, "Seeking backward on a ColumnReader");
VELOX_FAIL("Seeking backward on a ColumnReader");
}
}

Expand Down Expand Up @@ -219,7 +219,7 @@ void SelectiveColumnReader::getFlatValues(
VectorPtr* result,
const TypePtr& type,
bool isFinal) {
VELOX_CHECK(valueSize_ != kNoValueSize);
VELOX_CHECK_NE(valueSize_, kNoValueSize);
VELOX_CHECK(mayGetValues_);
if (isFinal) {
mayGetValues_ = false;
Expand Down Expand Up @@ -258,7 +258,7 @@ void SelectiveColumnReader::getFlatValues<int8_t, bool>(
bool isFinal) {
constexpr int32_t kWidth = V8::VSize;
static_assert(kWidth == 32);
VELOX_CHECK(valueSize_ == sizeof(int8_t));
VELOX_CHECK_EQ(valueSize_, sizeof(int8_t));
compactScalarValues<int8_t, int8_t>(rows, isFinal);
auto boolValues =
AlignedBuffer::allocate<bool>(numValues_, &memoryPool, false);
Expand All @@ -283,12 +283,12 @@ void SelectiveColumnReader::getFlatValues<int8_t, bool>(

template <typename T, typename TVector>
void SelectiveColumnReader::upcastScalarValues(RowSet rows) {
VELOX_CHECK(rows.size() <= numValues_);
VELOX_CHECK_LE(rows.size(), numValues_);
VELOX_CHECK(!rows.empty());
if (!values_) {
return;
}
VELOX_CHECK(sizeof(TVector) > sizeof(T));
VELOX_CHECK_GT(sizeof(TVector), sizeof(T));
// Since upcast is not going to be a common path, allocate buffer to copy
// upcasted values to and then copy back to the values buffer.
std::vector<TVector> buf;
Expand Down Expand Up @@ -338,15 +338,15 @@ void SelectiveColumnReader::upcastScalarValues(RowSet rows) {

template <typename T, typename TVector>
void SelectiveColumnReader::compactScalarValues(RowSet rows, bool isFinal) {
VELOX_CHECK(rows.size() <= numValues_);
VELOX_CHECK_LE(rows.size(), numValues_);
VELOX_CHECK(!rows.empty());
if (!values_ || (rows.size() == numValues_ && sizeof(T) == sizeof(TVector))) {
if (values_) {
values_->setSize(numValues_ * sizeof(T));
}
return;
}
VELOX_CHECK(sizeof(TVector) <= sizeof(T));
VELOX_CHECK_LE(sizeof(TVector), sizeof(T));
T* typedSourceValues = reinterpret_cast<T*>(rawValues_);
TVector* typedDestValues = reinterpret_cast<TVector*>(rawValues_);
RowSet sourceRows;
Expand Down Expand Up @@ -784,7 +784,7 @@ class ColumnVisitor {
return 0;
}
if (nextNonNull < 64) {
VELOX_CHECK(rowIndex_ <= rowOfNullWord + nextNonNull);
VELOX_CHECK_LE(rowIndex_, rowOfNullWord + nextNonNull);
rowIndex_ = rowOfNullWord + nextNonNull;
current = currentRow();
return 0;
Expand Down Expand Up @@ -1101,9 +1101,8 @@ class SelectiveByteRleColumnReader : public SelectiveColumnReader {
getFlatValues<int8_t, int64_t>(rows, result);
break;
default:
VELOX_CHECK(
false,
"Result type {} not supported in ByteRLE encoding",
VELOX_FAIL(
"Result type not supported in ByteRLE encoding: {}",
requestedType_->toString());
}
}
Expand Down Expand Up @@ -2081,7 +2080,7 @@ void SelectiveIntegerDictionaryColumnReader::readWithVisitor(
RowSet rows,
ColumnVisitor visitor) {
vector_size_t numRows = rows.back() + 1;
VELOX_CHECK(rleVersion_ == RleVersion_1);
VELOX_CHECK_EQ(rleVersion_, RleVersion_1);
auto reader = reinterpret_cast<RleDecoderV1<false>*>(dataReader_.get());
if (nullsInReadRange_) {
reader->readWithVisitor<true>(nullsInReadRange_->as<uint64_t>(), visitor);
Expand Down Expand Up @@ -3595,8 +3594,9 @@ static void scatter(RowSet rows, VectorPtr* result) {
}

void ColumnLoader::load(RowSet rows, ValueHook* hook, VectorPtr* result) {
VELOX_CHECK(
version_ == structReader_->numReads(),
VELOX_CHECK_EQ(
version_,
structReader_->numReads(),
"Loading LazyVector after the enclosing reader has moved");
auto offset = structReader_->lazyVectorReadOffset();
auto incomingNulls = structReader_->nulls();
Expand Down Expand Up @@ -4204,7 +4204,7 @@ std::unique_ptr<SelectiveColumnReader> SelectiveColumnReader::build(
case TypeKind::MAP:
if (stripe.getEncoding(ek).kind() ==
proto::ColumnEncoding_Kind_MAP_FLAT) {
VELOX_CHECK(false, "SelectiveColumnReader does not support flat maps");
VELOX_UNSUPPORTED("SelectiveColumnReader does not support flat maps");
}
return std::make_unique<SelectiveMapColumnReader>(
ek, requestedType, dataType, stripe, scanSpec);
Expand Down
126 changes: 121 additions & 5 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,68 @@ Driver::Driver(
ctx_->driver = this;
}

namespace {
/// Checks if output channel is produced using identity projection and returns
/// input channel if so.
std::optional<ChannelIndex> getIdentityProjection(
const std::vector<IdentityProjection>& projections,
ChannelIndex outputChannel) {
for (const auto& projection : projections) {
if (projection.outputChannel == outputChannel) {
return projection.inputChannel;
}
}
return std::nullopt;
}
} // namespace

void Driver::pushdownFilters(int operatorIndex) {
auto op = operators_[operatorIndex].get();
const auto& filters = op->getDynamicFilters();
if (filters.empty()) {
return;
}

op->stats().addRuntimeStat("dynamicFiltersProduced", filters.size());

// Walk operator list upstream and find a place to install the filters.
for (const auto& entry : filters) {
auto channel = entry.first;
for (auto i = operatorIndex - 1; i >= 0; --i) {
auto prevOp = operators_[i].get();

if (i == 0) {
// Source operator.
VELOX_CHECK(
prevOp->canAddDynamicFilter(),
"Cannot push down dynamic filters produced by {}",
op->toString());
prevOp->addDynamicFilter(channel, entry.second);
prevOp->stats().addRuntimeStat("dynamicFiltersAccepted", 1);
break;
}

const auto& identityProjections = prevOp->identityProjections();
auto inputChannel = getIdentityProjection(identityProjections, channel);
if (!inputChannel.has_value()) {
// Filter channel is not an identity projection.
VELOX_CHECK(
prevOp->canAddDynamicFilter(),
"Cannot push down dynamic filters produced by {}",
op->toString());
prevOp->addDynamicFilter(channel, entry.second);
prevOp->stats().addRuntimeStat("dynamicFiltersAccepted", 1);
break;
}

// Continue walking upstream.
channel = inputChannel.value();
}
}

op->clearDynamicFilters();
}

core::StopReason Driver::runInternal(
std::shared_ptr<Driver>& self,
std::shared_ptr<BlockingState>* blockingState) {
Expand Down Expand Up @@ -296,6 +358,7 @@ core::StopReason Driver::runInternal(
op->stats().outputBytes += resultBytes;
}
}
pushdownFilters(i);
if (result) {
OperationTimer timer(nextOp->stats().addInputTiming);
nextOp->stats().inputPositions += result->size();
Expand Down Expand Up @@ -338,8 +401,11 @@ core::StopReason Driver::runInternal(
// control here so it can advance. If it is again blocked,
// this will be detected when trying to add input and we
// will come back here after this is again on thread.
OperationTimer timer(op->stats().getOutputTiming);
op->getOutput();
{
OperationTimer timer(op->stats().getOutputTiming);
op->getOutput();
}
pushdownFilters(i);
continue;
}
if (i == 0) {
Expand Down Expand Up @@ -437,7 +503,7 @@ bool Driver::terminate() {
return false;
}

bool Driver::mayPushdownAggregation(Operator* aggregation) {
bool Driver::mayPushdownAggregation(Operator* aggregation) const {
for (auto i = 1; i < operators_.size(); ++i) {
auto op = operators_[i].get();
if (aggregation == op) {
Expand All @@ -447,8 +513,58 @@ bool Driver::mayPushdownAggregation(Operator* aggregation) {
return false;
}
}
VELOX_CHECK(false, "{} not found in its Driver", aggregation->toString());
return false;
VELOX_FAIL(
"Aggregation operator not found in its Driver: {}",
aggregation->toString());
}

std::unordered_set<ChannelIndex> Driver::canPushdownFilters(
Operator* FOLLY_NONNULL filterSource,
const std::vector<ChannelIndex>& channels) const {
int filterSourceIndex = -1;
for (auto i = 0; i < operators_.size(); ++i) {
auto op = operators_[i].get();
if (filterSource == op) {
filterSourceIndex = i;
break;
}
}
VELOX_CHECK_GE(
filterSourceIndex,
0,
"Operator not found in its Driver: {}",
filterSource->toString());

std::unordered_set<ChannelIndex> supportedChannels;
for (auto i = 0; i < channels.size(); ++i) {
auto channel = channels[i];
for (auto j = filterSourceIndex - 1; j >= 0; --j) {
auto prevOp = operators_[j].get();

if (j == 0) {
// Source operator.
if (prevOp->canAddDynamicFilter()) {
supportedChannels.emplace(channels[i]);
}
break;
}

const auto& identityProjections = prevOp->identityProjections();
auto inputChannel = getIdentityProjection(identityProjections, channel);
if (!inputChannel.has_value()) {
// Filter channel is not an identity projection.
if (prevOp->canAddDynamicFilter()) {
supportedChannels.emplace(channels[i]);
}
break;
}

// Continue walking upstream.
channel = inputChannel.value();
}
}

return supportedChannels;
}

Operator* FOLLY_NULLABLE
Expand Down
Loading

0 comments on commit 13295e2

Please sign in to comment.