Skip to content

Commit

Permalink
[WIP] Dynamic filters
Browse files Browse the repository at this point in the history
  • Loading branch information
mbasmanova committed Aug 31, 2021
1 parent 9d97ce1 commit 2c86aa8
Show file tree
Hide file tree
Showing 12 changed files with 503 additions and 8 deletions.
11 changes: 11 additions & 0 deletions velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
#include "velox/core/Context.h"
#include "velox/vector/ComplexVector.h"

namespace facebook::velox::common {
class Filter;
}
namespace facebook::velox::core {
class ITypedExpr;
}
Expand Down Expand Up @@ -85,6 +88,14 @@ class DataSource {
// processed.
virtual RowVectorPtr next(uint64_t size) = 0;

// Add dynamically generated filter.
// @param outputChannel index into outputType specified in
// Connector::createDataSource() that identifies the column this filter
// applies to.
virtual void addDynamicFilter(
ChannelIndex outputChannel,
const std::shared_ptr<common::Filter>& filter) = 0;

// Returns the number of input bytes processed so far.
virtual uint64_t getCompletedBytes() = 0;

Expand Down
22 changes: 22 additions & 0 deletions velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,26 @@ bool testFilters(
}
} // namespace

void HiveDataSource::addDynamicFilter(
ChannelIndex outputChannel,
const std::shared_ptr<common::Filter>& filter) {
pendingDynamicFilters_.emplace(outputChannel, filter);
}

void HiveDataSource::addPendingDynamicFilters() {
for (const auto& entry : pendingDynamicFilters_) {
common::Subfield subfield{outputType_->nameOf(entry.first)};
auto fieldSpec = scanSpec_->getOrCreateChild(subfield);
if (fieldSpec->filter()) {
fieldSpec->filter()->mergeWith(entry.second.get());
} else {
fieldSpec->setFilter(entry.second->clone());
}
}
scanSpec_->resetCachedValues();
pendingDynamicFilters_.clear();
};

void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
VELOX_CHECK(
split_ == nullptr,
Expand All @@ -245,6 +265,8 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {

VLOG(1) << "Adding split " << split_->toString();

addPendingDynamicFilters();

fileHandle_ = fileHandleFactory_->generate(split_->filePath);
if (dataCache_) {
auto dataCacheConfig = std::make_shared<dwio::common::DataCacheConfig>();
Expand Down
12 changes: 11 additions & 1 deletion velox/connectors/hive/HiveConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ class HiveDataSource : public DataSource {

void addSplit(std::shared_ptr<ConnectorSplit> split) override;

void addDynamicFilter(
ChannelIndex outputChannel,
const std::shared_ptr<common::Filter>& filter) override;

RowVectorPtr next(uint64_t size) override;

uint64_t getCompletedRows() override {
Expand Down Expand Up @@ -157,12 +161,14 @@ class HiveDataSource : public DataSource {

void setNullConstantValue(common::ScanSpec* spec, const TypePtr& type) const;

void addPendingDynamicFilters();

const std::shared_ptr<const RowType> outputType_;
FileHandleFactory* fileHandleFactory_;
velox::memory::MemoryPool* pool_;
std::vector<std::string> regularColumns_;
std::unique_ptr<dwrf::ColumnReaderFactory> columnReaderFactory_;
std::unique_ptr<common::ScanSpec> scanSpec_ = nullptr;
std::unique_ptr<common::ScanSpec> scanSpec_;
std::shared_ptr<HiveConnectorSplit> split_;
dwio::common::ReaderOptions readerOpts_;
dwio::common::RowReaderOptions rowReaderOpts_;
Expand All @@ -173,6 +179,10 @@ class HiveDataSource : public DataSource {
std::shared_ptr<const RowType> readerOutputType_;
bool emptySplit_;

// Dynamically pushed down filters to be added to scanSpec_ on next split.
std::unordered_map<ChannelIndex, std::shared_ptr<common::Filter>>
pendingDynamicFilters_;

// Number of splits skipped based on statistics.
int64_t skippedSplits_{0};

Expand Down
9 changes: 9 additions & 0 deletions velox/dwio/dwrf/reader/ScanSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,15 @@ class ScanSpec {
// result of runtime adaptation.
bool hasFilter() const;

// Resets cached values after this or children were updated, e.g. a new filter
// was added or existing filter was modified.
void resetCachedValues() const {
hasFilter_.clear();
for (auto& child : children_) {
child->resetCachedValues();
}
}

void setEnableFilterReorder(bool enableFilterReorder) {
enableFilterReorder_ = enableFilterReorder;
}
Expand Down
126 changes: 121 additions & 5 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,68 @@ Driver::Driver(
ctx_->driver = this;
}

namespace {
/// Checks if output channel is produced using identity projection and returns
/// input channel if so.
std::optional<ChannelIndex> getIdentityProjection(
const std::vector<IdentityProjection>& projections,
ChannelIndex outputChannel) {
for (const auto& projection : projections) {
if (projection.outputChannel == outputChannel) {
return projection.inputChannel;
}
}
return std::nullopt;
}
} // namespace

void Driver::pushdownFilters(int operatorIndex) {
auto op = operators_[operatorIndex].get();
const auto& filters = op->getDynamicFilters();
if (filters.empty()) {
return;
}

op->stats().addRuntimeStat("dynamicFiltersProduced", filters.size());

// Walk operator list upstream and find a place to install the filters.
for (const auto& entry : filters) {
auto channel = entry.first;
for (auto i = operatorIndex - 1; i >= 0; --i) {
auto prevOp = operators_[i].get();

if (i == 0) {
// Source operator.
VELOX_CHECK(
prevOp->canAddDynamicFilter(),
"Cannot push down dynamic filters produced by {}",
op->toString());
prevOp->addDynamicFilter(channel, entry.second);
prevOp->stats().addRuntimeStat("dynamicFiltersAccepted", 1);
break;
}

const auto& identityProjections = prevOp->identityProjections();
auto inputChannel = getIdentityProjection(identityProjections, channel);
if (!inputChannel.has_value()) {
// Filter channel is not an identity projection.
VELOX_CHECK(
prevOp->canAddDynamicFilter(),
"Cannot push down dynamic filters produced by {}",
op->toString());
prevOp->addDynamicFilter(channel, entry.second);
prevOp->stats().addRuntimeStat("dynamicFiltersAccepted", 1);
break;
}

// Continue walking upstream.
channel = inputChannel.value();
}
}

op->clearDynamicFilters();
}

core::StopReason Driver::runInternal(
std::shared_ptr<Driver>& self,
std::shared_ptr<BlockingState>* blockingState) {
Expand Down Expand Up @@ -296,6 +358,7 @@ core::StopReason Driver::runInternal(
op->stats().outputBytes += resultBytes;
}
}
pushdownFilters(i);
if (result) {
OperationTimer timer(nextOp->stats().addInputTiming);
nextOp->stats().inputPositions += result->size();
Expand Down Expand Up @@ -338,8 +401,11 @@ core::StopReason Driver::runInternal(
// control here so it can advance. If it is again blocked,
// this will be detected when trying to add input and we
// will come back here after this is again on thread.
OperationTimer timer(op->stats().getOutputTiming);
op->getOutput();
{
OperationTimer timer(op->stats().getOutputTiming);
op->getOutput();
}
pushdownFilters(i);
continue;
}
if (i == 0) {
Expand Down Expand Up @@ -437,7 +503,7 @@ bool Driver::terminate() {
return false;
}

bool Driver::mayPushdownAggregation(Operator* aggregation) {
bool Driver::mayPushdownAggregation(Operator* aggregation) const {
for (auto i = 1; i < operators_.size(); ++i) {
auto op = operators_[i].get();
if (aggregation == op) {
Expand All @@ -447,8 +513,58 @@ bool Driver::mayPushdownAggregation(Operator* aggregation) {
return false;
}
}
VELOX_CHECK(false, "{} not found in its Driver", aggregation->toString());
return false;
VELOX_FAIL(
"Aggregation operator not found in its Driver: {}",
aggregation->toString());
}

std::unordered_set<ChannelIndex> Driver::canPushdownFilters(
Operator* FOLLY_NONNULL filterSource,
const std::vector<ChannelIndex>& channels) const {
int filterSourceIndex = -1;
for (auto i = 0; i < operators_.size(); ++i) {
auto op = operators_[i].get();
if (filterSource == op) {
filterSourceIndex = i;
break;
}
}
VELOX_CHECK_GE(
filterSourceIndex,
0,
"Operator not found in its Driver: {}",
filterSource->toString());

std::unordered_set<ChannelIndex> supportedChannels;
for (auto i = 0; i < channels.size(); ++i) {
auto channel = channels[i];
for (auto j = filterSourceIndex - 1; j >= 0; --j) {
auto prevOp = operators_[j].get();

if (j == 0) {
// Source operator.
if (prevOp->canAddDynamicFilter()) {
supportedChannels.emplace(channels[i]);
}
break;
}

const auto& identityProjections = prevOp->identityProjections();
auto inputChannel = getIdentityProjection(identityProjections, channel);
if (!inputChannel.has_value()) {
// Filter channel is not an identity projection.
if (prevOp->canAddDynamicFilter()) {
supportedChannels.emplace(channels[i]);
}
break;
}

// Continue walking upstream.
channel = inputChannel.value();
}
}

return supportedChannels;
}

Operator* FOLLY_NULLABLE
Expand Down
12 changes: 11 additions & 1 deletion velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,13 @@ class Driver {

// Returns true if all operators between the source and 'aggregation' are
// order-preserving and do not increase cardinality.
bool mayPushdownAggregation(Operator* FOLLY_NONNULL aggregation);
bool mayPushdownAggregation(Operator* FOLLY_NONNULL aggregation) const;

// Returns a subset of channels for which there are operators upstream from
// filterSource that accept dynamically generated filters.
std::unordered_set<ChannelIndex> canPushdownFilters(
Operator* FOLLY_NONNULL filterSource,
const std::vector<ChannelIndex>& channels) const;

// Returns the Operator with 'planNodeId.' or nullptr if not
// found. For example, hash join probe accesses the corresponding
Expand All @@ -176,6 +182,10 @@ class Driver {

void close();

// Push down dynamic filters produced by the operator at the specified
// position in the pipeline.
void pushdownFilters(int operatorIndex);

std::unique_ptr<DriverCtx> ctx_;
std::shared_ptr<Task> task_;
core::CancelPoolPtr cancelPool_;
Expand Down
33 changes: 33 additions & 0 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,20 @@ BlockingReason HashProbe::isBlocked(ContinueFuture* future) {
joinType_ == core::JoinType::kSemi) {
isFinishing_ = true;
}
} else if (
joinType_ == core::JoinType::kInner &&
table_->hashMode() != BaseHashTable::HashMode::kHash) {
const auto& buildHashers = table_->hashers();
auto channels = operatorCtx_->driverCtx()->driver->canPushdownFilters(
this, keyChannels_);
dynamicFilterBuilders_.resize(keyChannels_.size());
for (auto i = 0; i < keyChannels_.size(); i++) {
auto it = channels.find(keyChannels_[i]);
if (it != channels.end()) {
dynamicFilterBuilders_[i].emplace(DynamicFilterBuilder(
*(buildHashers[i].get()), keyChannels_[i], dynamicFilters_));
}
}
}
}

Expand All @@ -195,16 +209,35 @@ void HashProbe::addInput(RowVectorPtr input) {
nonNullRows_.setAll();
deselectRowsWithNulls(*input_, keyChannels_, nonNullRows_);

auto getDynamicFilterBuilder = [&](auto i) -> DynamicFilterBuilder* {
if (!dynamicFilterBuilders_.empty()) {
auto& builder = dynamicFilterBuilders_[i];
if (builder.has_value() && builder->isActive()) {
return &(builder.value());
}
}
return nullptr;
};

activeRows_ = nonNullRows_;
lookup_->hashes.resize(input_->size());
auto mode = table_->hashMode();
auto& buildHashers = table_->hashers();
for (auto i = 0; i < keyChannels_.size(); ++i) {
auto key = input_->loadedChildAt(keyChannels_[i]);
if (mode != BaseHashTable::HashMode::kHash) {
auto* dynamicFilterBuilder = getDynamicFilterBuilder(i);
if (dynamicFilterBuilder) {
dynamicFilterBuilder->addInput(activeRows_.countSelected());
}

valueIdDecoder_.decode(*key, activeRows_);
buildHashers[i]->lookupValueIds(
valueIdDecoder_, activeRows_, deduppedHashes_, &lookup_->hashes);

if (dynamicFilterBuilder) {
dynamicFilterBuilder->addOutput(activeRows_.countSelected());
}
} else {
hashers_[i]->hash(*key, activeRows_, i > 0, &lookup_->hashes);
}
Expand Down
Loading

0 comments on commit 2c86aa8

Please sign in to comment.