Skip to content

Commit

Permalink
[WIP] Dynamic filters
Browse files Browse the repository at this point in the history
  • Loading branch information
mbasmanova committed Aug 25, 2021
1 parent 059525a commit e3ef9a1
Show file tree
Hide file tree
Showing 11 changed files with 187 additions and 2 deletions.
11 changes: 11 additions & 0 deletions velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
#include "velox/core/Context.h"
#include "velox/vector/ComplexVector.h"

namespace facebook::velox::common {
class Filter;
}
namespace facebook::velox::core {
class ITypedExpr;
}
Expand Down Expand Up @@ -85,6 +88,14 @@ class DataSource {
// processed.
virtual RowVectorPtr next(uint64_t size) = 0;

// Add a dynamically generated filter.
// @param outputChannel index into outputType specified in
// Connector::createDataSource() that identifies the column this filter
// applies to.
virtual void addFilter(
ChannelIndex outputChannel,
const std::shared_ptr<common::Filter>& filter) = 0;

// Returns the number of input bytes processed so far.
virtual uint64_t getCompletedBytes() = 0;

Expand Down
16 changes: 16 additions & 0 deletions velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,22 @@ bool testFilters(
}
} // namespace

void HiveDataSource::addFilter(
ChannelIndex outputChannel,
const std::shared_ptr<common::Filter>& filter) {
common::Subfield subfield{outputType_->nameOf(outputChannel)};
auto fieldSpec = scanSpec_->getOrCreateChild(subfield);
if (fieldSpec->filter()) {
// TODO Merge two filters
LOG(INFO) << "Skipping dynamic filter on " << subfield.toString() << ": "
<< filter->toString();
} else {
LOG(INFO) << "Adding dynamic filter on " << subfield.toString() << ": "
<< filter->toString();
fieldSpec->setFilter(filter->clone());
}
}

void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
VELOX_CHECK(
split_ == nullptr,
Expand Down
6 changes: 5 additions & 1 deletion velox/connectors/hive/HiveConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ class HiveDataSource : public DataSource {

void addSplit(std::shared_ptr<ConnectorSplit> split) override;

void addFilter(
ChannelIndex outputChannel,
const std::shared_ptr<common::Filter>& filter) override;

RowVectorPtr next(uint64_t size) override;

uint64_t getCompletedRows() override {
Expand Down Expand Up @@ -162,7 +166,7 @@ class HiveDataSource : public DataSource {
velox::memory::MemoryPool* pool_;
std::vector<std::string> regularColumns_;
std::unique_ptr<dwrf::ColumnReaderFactory> columnReaderFactory_;
std::unique_ptr<common::ScanSpec> scanSpec_ = nullptr;
std::unique_ptr<common::ScanSpec> scanSpec_;
std::shared_ptr<HiveConnectorSplit> split_;
dwio::common::ReaderOptions readerOpts_;
dwio::common::RowReaderOptions rowReaderOpts_;
Expand Down
62 changes: 62 additions & 0 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,63 @@ Driver::Driver(
ctx_->driver = this;
}

namespace {
/// Checks if output channel is produced using identity projection and returns
/// input channel if so.
std::optional<ChannelIndex> getIdentityProjection(
const std::vector<IdentityProjection>& projections,
ChannelIndex outputChannel) {
for (const auto& projection : projections) {
if (projection.outputChannel == outputChannel) {
return projection.inputChannel;
}
}
return std::nullopt;
}
} // namespace

void Driver::pushdownFilters() {
const int32_t numOperators = operators_.size();
for (int32_t i = numOperators - 1; i >= 0; --i) {
auto op = operators_[i].get();
const auto& filters = op->getDynamicFilters();
if (filters.empty()) {
continue;
}

// Walk operator list upstream and find a place to install the filters.
for (const auto& entry : filters) {
auto channel = entry.first;
for (int32_t j = i - 1; j >= 0; --j) {
auto prevOp = operators_[j].get();

if (j == 0) {
// Source operator.
if (!prevOp->addFilter(channel, entry.second)) {
// TODO Install FilterProject operator right after prevOp.
continue;
}
}

const auto& identityProjections = prevOp->identityProjections();
auto inputChannel = getIdentityProjection(identityProjections, channel);
if (!inputChannel.has_value()) {
// Filter channel is not an identity projection.
if (!prevOp->addFilter(channel, entry.second)) {
// TODO Install FilterProject operator right after prevOp.
continue;
}
} else {
// Continue walking upstream.
channel = inputChannel.value();
}
}
}

op->clearDynamicFilters();
}
}

core::StopReason Driver::runInternal(
std::shared_ptr<Driver>& self,
std::shared_ptr<BlockingState>* blockingState) {
Expand Down Expand Up @@ -274,6 +331,11 @@ core::StopReason Driver::runInternal(
guard.notThrown();
return core::StopReason::kBlock;
}

if (!op->getDynamicFilters().empty()) {
pushdownFilters();
}

Operator* nextOp = nullptr;
if (i < operators_.size() - 1) {
nextOp = operators_[i + 1].get();
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ class Driver {

void close();

void pushdownFilters();

std::unique_ptr<DriverCtx> ctx_;
std::shared_ptr<Task> task_;
core::CancelPoolPtr cancelPool_;
Expand Down
13 changes: 13 additions & 0 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,19 @@ BlockingReason HashProbe::isBlocked(ContinueFuture* future) {
joinType_ == core::JoinType::kSemi) {
isFinishing_ = true;
}
} else if (table_->hashMode() != BaseHashTable::HashMode::kHash) {
uint64_t asRange;
uint64_t asDistinct;
const auto& hashers = table_->hashers();
for (auto i = 0; i < hashers.size(); i++) {
hashers[i]->cardinality(asRange, asDistinct);

if (asDistinct < 2'048) {
auto filter = hashers[i]->getFilter();
VELOX_CHECK(filter);
dynamicFilters_.emplace(keyChannels_[i], std::move(filter));
}
}
}
}

Expand Down
26 changes: 25 additions & 1 deletion velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "velox/common/time/CpuWallTimer.h"
#include "velox/core/PlanNode.h"
#include "velox/exec/Driver.h"
#include "velox/type/Filter.h"

namespace facebook::velox::exec {

Expand Down Expand Up @@ -246,6 +247,26 @@ class Operator {
return isFinishing_;
}

virtual const std::
unordered_map<ChannelIndex, std::shared_ptr<common::Filter>>&
getDynamicFilters() const {
return dynamicFilters_;
}

virtual void clearDynamicFilters() {
dynamicFilters_.clear();
}

virtual bool addFilter(
ChannelIndex outputChannel,
const std::shared_ptr<common::Filter>& filter) {
return false;
}

const std::vector<IdentityProjection>& identityProjections() const {
return identityProjections_;
}

// Frees all resources associated with 'this'. No other methods
// should be called after this.
virtual void close() {
Expand Down Expand Up @@ -329,7 +350,10 @@ class Operator {
// i.e. one could copy directly from input to output if no
// cardinality change.
bool isIdentityProjection_ = false;
};

std::unordered_map<ChannelIndex, std::shared_ptr<common::Filter>>
dynamicFilters_;
}; // namespace facebook::velox::exec

constexpr ChannelIndex kConstantChannel =
std::numeric_limits<ChannelIndex>::max();
Expand Down
18 changes: 18 additions & 0 deletions velox/exec/TableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ RowVectorPtr TableScan::getOutput() {
tableHandle_,
columnHandles_,
connectorQueryCtx_.get());
for (const auto& entry : pendingDynamicFilters_) {
dataSource_->addFilter(entry.first, entry.second);
}
pendingDynamicFilters_.clear();
} else {
VELOX_CHECK(
connector_->connectorId() == connectorSplit->connectorId,
Expand Down Expand Up @@ -106,6 +110,20 @@ RowVectorPtr TableScan::getOutput() {
}
}

bool TableScan::addFilter(
ChannelIndex outputChannel,
const std::shared_ptr<common::Filter>& filter) {
if (dataSource_) {
dataSource_->addFilter(outputChannel, filter);
} else {
pendingDynamicFilters_.emplace(outputChannel, filter);
}

// TODO Add proper handling for connectors that do not support dynamic
// filters.
return true;
}

void TableScan::close() {
// TODO Implement
}
Expand Down
7 changes: 7 additions & 0 deletions velox/exec/TableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ class TableScan : public SourceOperator {
close();
}

bool addFilter(
ChannelIndex outputChannel,
const std::shared_ptr<common::Filter>& filter) override;

void close() override;

private:
Expand All @@ -63,5 +67,8 @@ class TableScan : public SourceOperator {
bool noMoreSplits_ = false;
// The bucketed group id we are in the middle of processing.
int32_t currentSplitGroupId_{-1};
// Dynamic filters to add to the data source when it gets created.
std::unordered_map<ChannelIndex, std::shared_ptr<common::Filter>>
pendingDynamicFilters_;
};
} // namespace facebook::velox::exec
24 changes: 24 additions & 0 deletions velox/exec/VectorHasher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,30 @@ void VectorHasher::copyStringToLocal(const UniqueValue* unique) {
reinterpret_cast<int64_t>(str->data() + start));
}

std::shared_ptr<common::Filter> VectorHasher::getFilter() const {
if (distinctOverflow_) {
return nullptr;
}

switch (typeKind_) {
case TypeKind::TINYINT:
case TypeKind::SMALLINT:
case TypeKind::INTEGER:
case TypeKind::BIGINT: {
std::vector<int64_t> values;
values.reserve(uniqueValues_.size());
for (const auto& value : uniqueValues_) {
values.emplace_back(value.data());
}

// TODO Handle nulls properly, e.g. do not assume there are no nulls.
return common::createBigintValues(values, false);
}
default:
return nullptr;
}
}

void VectorHasher::cardinality(uint64_t& asRange, uint64_t& asDistincts) {
if (typeKind_ == TypeKind::BOOLEAN) {
hasRange_ = true;
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/VectorHasher.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ class VectorHasher {
return hasRange_ || !distinctOverflow_;
}

// Returns an instance of the filter corresponding to a range of values or a
// set of unique values. mayUseValueIds() must be true.
std::shared_ptr<common::Filter> getFilter() const;

template <typename T>
bool computeValueIdForRows(
char** groups,
Expand Down

0 comments on commit e3ef9a1

Please sign in to comment.