Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic filter pushdown for selective broadcast joins #103

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
#include "velox/core/Context.h"
#include "velox/vector/ComplexVector.h"

namespace facebook::velox::common {
class Filter;
}
namespace facebook::velox::core {
class ITypedExpr;
}
Expand Down Expand Up @@ -85,6 +88,14 @@ class DataSource {
// processed.
virtual RowVectorPtr next(uint64_t size) = 0;

// Add dynamically generated filter.
// @param outputChannel index into outputType specified in
// Connector::createDataSource() that identifies the column this filter
// applies to.
virtual void addDynamicFilter(
ChannelIndex outputChannel,
const std::shared_ptr<common::Filter>& filter) = 0;

// Returns the number of input bytes processed so far.
virtual uint64_t getCompletedBytes() = 0;

Expand Down
17 changes: 17 additions & 0 deletions velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,23 @@ bool testFilters(
}
} // namespace

void HiveDataSource::addDynamicFilter(
ChannelIndex outputChannel,
const std::shared_ptr<common::Filter>& filter) {
common::Subfield subfield{outputType_->nameOf(outputChannel)};
auto fieldSpec = scanSpec_->getOrCreateChild(subfield);
if (fieldSpec->filter()) {
fieldSpec->filter()->mergeWith(filter.get());
} else {
fieldSpec->setFilter(filter->clone());
}
scanSpec_->resetCachedValues();

auto columnReader =
dynamic_cast<SelectiveColumnReader*>(rowReader_->columnReader());
columnReader->resetFilterCaches();
}

void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
VELOX_CHECK(
split_ == nullptr,
Expand Down
6 changes: 5 additions & 1 deletion velox/connectors/hive/HiveConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ class HiveDataSource : public DataSource {

void addSplit(std::shared_ptr<ConnectorSplit> split) override;

void addDynamicFilter(
ChannelIndex outputChannel,
const std::shared_ptr<common::Filter>& filter) override;

RowVectorPtr next(uint64_t size) override;

uint64_t getCompletedRows() override {
Expand Down Expand Up @@ -162,7 +166,7 @@ class HiveDataSource : public DataSource {
velox::memory::MemoryPool* pool_;
std::vector<std::string> regularColumns_;
std::unique_ptr<dwrf::ColumnReaderFactory> columnReaderFactory_;
std::unique_ptr<common::ScanSpec> scanSpec_ = nullptr;
std::unique_ptr<common::ScanSpec> scanSpec_;
std::shared_ptr<HiveConnectorSplit> split_;
dwio::common::ReaderOptions readerOpts_;
dwio::common::RowReaderOptions rowReaderOpts_;
Expand Down
4 changes: 4 additions & 0 deletions velox/dwio/dwrf/reader/DwrfReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ class DwrfRowReader : public DwrfRowReaderShared {
return skippedStrides_;
}

ColumnReader* columnReader() {
return columnReader_.get();
}

private:
void checkSkipStrides(const StatsContext& context, uint64_t strideSize);

Expand Down
11 changes: 11 additions & 0 deletions velox/dwio/dwrf/reader/ScanSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,17 @@ class ScanSpec {
// result of runtime adaptation.
bool hasFilter() const;

// Resets cached values after this or children were updated, e.g. a new filter
// was added or existing filter was modified.
void resetCachedValues() {
hasFilter_.clear();
for (auto& child : children_) {
child->resetCachedValues();
}

reorder();
}

void setEnableFilterReorder(bool enableFilterReorder) {
enableFilterReorder_ = enableFilterReorder;
}
Expand Down
29 changes: 29 additions & 0 deletions velox/dwio/dwrf/reader/SelectiveColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1983,6 +1983,13 @@ class SelectiveIntegerDictionaryColumnReader : public SelectiveColumnReader {
common::ScanSpec* scanSpec,
uint32_t numBytes);

void resetFilterCaches() override {
if (!filterCache_.empty()) {
simd::memset(
filterCache_.data(), FilterResult::kUnknown, dictionarySize_);
}
}

bool hasBulkPath() const override {
return true;
}
Expand Down Expand Up @@ -3003,6 +3010,13 @@ class SelectiveStringDictionaryColumnReader : public SelectiveColumnReader {
StripeStreams& stripe,
common::ScanSpec* scanSpec);

void resetFilterCaches() override {
if (!filterCache_.empty()) {
simd::memset(
filterCache_.data(), FilterResult::kUnknown, dictionaryCount_);
}
}

void seekToRowGroup(uint32_t index) override {
ensureRowGroupIndex();

Expand Down Expand Up @@ -3785,6 +3799,12 @@ class SelectiveStructColumnReader : public SelectiveColumnReader {
StripeStreams& stripe,
common::ScanSpec* scanSpec);

void resetFilterCaches() override {
for (auto& child : children_) {
child->resetFilterCaches();
}
}

void seekToRowGroup(uint32_t index) override {
for (auto& child : children_) {
child->seekToRowGroup(index);
Expand Down Expand Up @@ -4278,6 +4298,10 @@ class SelectiveListColumnReader : public SelectiveRepeatedColumnReader {
StripeStreams& stripe,
common::ScanSpec* scanSpec);

void resetFilterCaches() override {
child_->resetFilterCaches();
}

void seekToRowGroup(uint32_t index) override {
ensureRowGroupIndex();

Expand Down Expand Up @@ -4402,6 +4426,11 @@ class SelectiveMapColumnReader : public SelectiveRepeatedColumnReader {
StripeStreams& stripe,
common::ScanSpec* scanSpec);

void resetFilterCaches() override {
keyReader_->resetFilterCaches();
elementReader_->resetFilterCaches();
}

void seekToRowGroup(uint32_t index) override {
ensureRowGroupIndex();

Expand Down
8 changes: 7 additions & 1 deletion velox/dwio/dwrf/reader/SelectiveColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class SelectiveColumnReader : public ColumnReader {
uint64_t /*numValues*/,
VectorPtr& /*result*/,
const uint64_t* /*incomingNulls*/) override {
VELOX_CHECK(false, "next() is only defined in SelectiveStructColumnReader");
VELOX_UNSUPPORTED("next() is only defined in SelectiveStructColumnReader");
}

// Creates a reader for the given stripe.
Expand All @@ -61,6 +61,12 @@ class SelectiveColumnReader : public ColumnReader {
common::ScanSpec* scanSpec,
uint32_t sequence = 0);

// Called when filters in ScanSpec change, e.g. a new filter is pushed down
// from a downstream operator.
virtual void resetFilterCaches() {
// Most readers don't have filter caches.
}

// Seeks to offset and reads the rows in 'rows' and applies
// filters and value processing as given by 'scanSpec supplied at
// construction. 'offset' is relative to start of stripe. 'rows' are
Expand Down
126 changes: 121 additions & 5 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,68 @@ Driver::Driver(
ctx_->driver = this;
}

namespace {
/// Checks if output channel is produced using identity projection and returns
/// input channel if so.
std::optional<ChannelIndex> getIdentityProjection(
const std::vector<IdentityProjection>& projections,
ChannelIndex outputChannel) {
for (const auto& projection : projections) {
if (projection.outputChannel == outputChannel) {
return projection.inputChannel;
}
}
return std::nullopt;
}
} // namespace

void Driver::pushdownFilters(int operatorIndex) {
auto op = operators_[operatorIndex].get();
const auto& filters = op->getDynamicFilters();
if (filters.empty()) {
return;
}

op->stats().addRuntimeStat("dynamicFiltersProduced", filters.size());

// Walk operator list upstream and find a place to install the filters.
for (const auto& entry : filters) {
auto channel = entry.first;
for (auto i = operatorIndex - 1; i >= 0; --i) {
auto prevOp = operators_[i].get();

if (i == 0) {
// Source operator.
VELOX_CHECK(
prevOp->canAddDynamicFilter(),
"Cannot push down dynamic filters produced by {}",
op->toString());
prevOp->addDynamicFilter(channel, entry.second);
prevOp->stats().addRuntimeStat("dynamicFiltersAccepted", 1);
break;
}

const auto& identityProjections = prevOp->identityProjections();
auto inputChannel = getIdentityProjection(identityProjections, channel);
if (!inputChannel.has_value()) {
// Filter channel is not an identity projection.
VELOX_CHECK(
prevOp->canAddDynamicFilter(),
"Cannot push down dynamic filters produced by {}",
op->toString());
prevOp->addDynamicFilter(channel, entry.second);
prevOp->stats().addRuntimeStat("dynamicFiltersAccepted", 1);
break;
}

// Continue walking upstream.
channel = inputChannel.value();
}
}

op->clearDynamicFilters();
}

core::StopReason Driver::runInternal(
std::shared_ptr<Driver>& self,
std::shared_ptr<BlockingState>* blockingState) {
Expand Down Expand Up @@ -312,6 +374,7 @@ core::StopReason Driver::runInternal(
op->stats().outputBytes += resultBytes;
}
}
pushdownFilters(i);
if (result) {
OperationTimer timer(nextOp->stats().addInputTiming);
nextOp->stats().inputPositions += result->size();
Expand Down Expand Up @@ -354,8 +417,11 @@ core::StopReason Driver::runInternal(
// control here so it can advance. If it is again blocked,
// this will be detected when trying to add input and we
// will come back here after this is again on thread.
OperationTimer timer(op->stats().getOutputTiming);
op->getOutput();
{
OperationTimer timer(op->stats().getOutputTiming);
op->getOutput();
}
pushdownFilters(i);
continue;
}
if (i == 0) {
Expand Down Expand Up @@ -453,7 +519,7 @@ bool Driver::terminate() {
return false;
}

bool Driver::mayPushdownAggregation(Operator* aggregation) {
bool Driver::mayPushdownAggregation(Operator* aggregation) const {
for (auto i = 1; i < operators_.size(); ++i) {
auto op = operators_[i].get();
if (aggregation == op) {
Expand All @@ -463,8 +529,58 @@ bool Driver::mayPushdownAggregation(Operator* aggregation) {
return false;
}
}
VELOX_CHECK(false, "{} not found in its Driver", aggregation->toString());
return false;
VELOX_FAIL(
"Aggregation operator not found in its Driver: {}",
aggregation->toString());
}

std::unordered_set<ChannelIndex> Driver::canPushdownFilters(
Operator* FOLLY_NONNULL filterSource,
const std::vector<ChannelIndex>& channels) const {
int filterSourceIndex = -1;
for (auto i = 0; i < operators_.size(); ++i) {
auto op = operators_[i].get();
if (filterSource == op) {
filterSourceIndex = i;
break;
}
}
VELOX_CHECK_GE(
filterSourceIndex,
0,
"Operator not found in its Driver: {}",
filterSource->toString());

std::unordered_set<ChannelIndex> supportedChannels;
for (auto i = 0; i < channels.size(); ++i) {
auto channel = channels[i];
for (auto j = filterSourceIndex - 1; j >= 0; --j) {
auto prevOp = operators_[j].get();

if (j == 0) {
// Source operator.
if (prevOp->canAddDynamicFilter()) {
supportedChannels.emplace(channels[i]);
}
break;
}

const auto& identityProjections = prevOp->identityProjections();
auto inputChannel = getIdentityProjection(identityProjections, channel);
if (!inputChannel.has_value()) {
// Filter channel is not an identity projection.
if (prevOp->canAddDynamicFilter()) {
supportedChannels.emplace(channels[i]);
}
break;
}

// Continue walking upstream.
channel = inputChannel.value();
}
}

return supportedChannels;
}

Operator* FOLLY_NULLABLE
Expand Down
12 changes: 11 additions & 1 deletion velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,13 @@ class Driver {

// Returns true if all operators between the source and 'aggregation' are
// order-preserving and do not increase cardinality.
bool mayPushdownAggregation(Operator* FOLLY_NONNULL aggregation);
bool mayPushdownAggregation(Operator* FOLLY_NONNULL aggregation) const;

// Returns a subset of channels for which there are operators upstream from
// filterSource that accept dynamically generated filters.
std::unordered_set<ChannelIndex> canPushdownFilters(
Operator* FOLLY_NONNULL filterSource,
const std::vector<ChannelIndex>& channels) const;

// Returns the Operator with 'planNodeId.' or nullptr if not
// found. For example, hash join probe accesses the corresponding
Expand All @@ -187,6 +193,10 @@ class Driver {

void close();

// Push down dynamic filters produced by the operator at the specified
// position in the pipeline.
void pushdownFilters(int operatorIndex);

std::unique_ptr<DriverCtx> ctx_;
std::shared_ptr<Task> task_;
core::CancelPoolPtr cancelPool_;
Expand Down
Loading