Skip to content

Commit

Permalink
Folder: exec
Browse files Browse the repository at this point in the history
relative pr:

Fix hashjoin runtime issue oap-project#106
INVALID_STATE on HashJoin when spill is turned on oap-project#154
SIGABRT on DecimalAvgAggregate<UnscaleLongDecimal, UnscaleShortDecimal> when spilling is engaged oap-project#236
Support kPreceeding & kFollowing for window range frame type oap-project#287
  • Loading branch information
zhejiangxiaomai committed May 31, 2023
1 parent 681235c commit 65aee45
Show file tree
Hide file tree
Showing 25 changed files with 914 additions and 56 deletions.
1 change: 1 addition & 0 deletions velox/exec/ArrowStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/
#include "velox/exec/ArrowStream.h"
#include "velox/vector/arrow/Abi.h"

namespace facebook::velox::exec {

Expand Down
3 changes: 1 addition & 2 deletions velox/exec/ArrowStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
#include "velox/core/PlanNode.h"
#include "velox/exec/Operator.h"

#include "velox/vector/arrow/Abi.h"

struct ArrowArrayStream;
namespace facebook::velox::exec {

class ArrowStream : public SourceOperator {
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ add_library(
Driver.cpp
EnforceSingleRow.cpp
Exchange.cpp
Expand.cpp
FilterProject.cpp
GroupId.cpp
GroupingSet.cpp
Expand Down Expand Up @@ -60,6 +61,7 @@ add_library(
TopN.cpp
Unnest.cpp
Values.cpp
ValueStream.cpp
VectorHasher.cpp
Window.cpp
WindowFunction.cpp
Expand Down
116 changes: 116 additions & 0 deletions velox/exec/Expand.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "velox/exec/Expand.h"

namespace facebook::velox::exec {

Expand::Expand(
int32_t operatorId,
DriverCtx* driverCtx,
const std::shared_ptr<const core::ExpandNode>& expandNode)
: Operator(
driverCtx,
expandNode->outputType(),
operatorId,
expandNode->id(),
"Expand") {
const auto& inputType = expandNode->sources()[0]->outputType();
auto numProjectSets = expandNode->projectSets().size();
projectMappings_.reserve(numProjectSets);
constantMappings_.reserve(numProjectSets);
auto numProjects = expandNode->names().size();
for (const auto& projectSet : expandNode->projectSets()) {
std::vector<column_index_t> projectMapping;
projectMapping.reserve(numProjects);
std::vector<ConstantTypedExprPtr> constantMapping;
constantMapping.reserve(numProjects);
for (const auto& project : projectSet) {
if (auto field =
std::dynamic_pointer_cast<const core::FieldAccessTypedExpr>(
project)) {
projectMapping.push_back(inputType->getChildIdx(field->name()));
constantMapping.push_back(nullptr);
} else if (
auto constant =
std::dynamic_pointer_cast<const core::ConstantTypedExpr>(
project)) {
projectMapping.push_back(kUnMapedProject);
constantMapping.push_back(constant);
} else {
VELOX_FAIL("Unexpted expression for Expand");
}
}

projectMappings_.emplace_back(std::move(projectMapping));
constantMappings_.emplace_back(std::move(constantMapping));
}
}

bool Expand::needsInput() const {
return !noMoreInput_ && input_ == nullptr;
}

void Expand::addInput(RowVectorPtr input) {
// Load Lazy vectors.
for (auto& child : input->children()) {
child->loadedVector();
}

input_ = std::move(input);
}

RowVectorPtr Expand::getOutput() {
if (!input_) {
return nullptr;
}

// Make a copy of input for the grouping set at 'projectSetIndex_'.
auto numInput = input_->size();

std::vector<VectorPtr> outputColumns(outputType_->size());

const auto& projectMapping = projectMappings_[projectSetIndex_];
const auto& constantMapping = constantMappings_[projectSetIndex_];
auto numGroupingKeys = projectMapping.size();

for (auto i = 0; i < numGroupingKeys; ++i) {
if (projectMapping[i] == kUnMapedProject) {
auto constantExpr = constantMapping[i];
if (constantExpr->value().isNull()) {
// Add null column.
outputColumns[i] = BaseVector::createNullConstant(
outputType_->childAt(i), numInput, pool());
} else {
// Add constant column: gid, gpos, etc.
outputColumns[i] = BaseVector::createConstant(
constantExpr->type(), constantExpr->value(), numInput, pool());
}
} else {
outputColumns[i] = input_->childAt(projectMapping[i]);
}
}

++projectSetIndex_;
if (projectSetIndex_ == projectMappings_.size()) {
projectSetIndex_ = 0;
input_ = nullptr;
}

return std::make_shared<RowVector>(
pool(), outputType_, nullptr, numInput, std::move(outputColumns));
}

} // namespace facebook::velox::exec
62 changes: 62 additions & 0 deletions velox/exec/Expand.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "velox/core/Expressions.h"
#include "velox/exec/Operator.h"

namespace facebook::velox::exec {

using ConstantTypedExprPtr = std::shared_ptr<const core::ConstantTypedExpr>;

class Expand : public Operator {
public:
Expand(
int32_t operatorId,
DriverCtx* driverCtx,
const std::shared_ptr<const core::ExpandNode>& expandNode);

bool needsInput() const override;

void addInput(RowVectorPtr input) override;

RowVectorPtr getOutput() override;

BlockingReason isBlocked(ContinueFuture* /*future*/) override {
return BlockingReason::kNotBlocked;
}

bool isFinished() override {
return finished_ || (noMoreInput_ && input_ == nullptr);
}

private:
static constexpr column_index_t kUnMapedProject =
std::numeric_limits<column_index_t>::max();

bool finished_{false};

std::vector<std::vector<column_index_t>> projectMappings_;

std::vector<std::vector<ConstantTypedExprPtr>> constantMappings_;

/// 'getOutput()' returns 'input_' for one grouping set at a time.
/// 'groupingSetIndex_' contains the index of the grouping set to output in
/// the next 'getOutput' call. This index is used to generate groupId column
/// and lookup the input-to-output column mappings in the
/// projectMappings_.
int32_t projectSetIndex_{0};
};
} // namespace facebook::velox::exec
3 changes: 2 additions & 1 deletion velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,8 @@ void GroupingSet::ensureInputFits(const RowVectorPtr& input) {
}

const auto currentUsage = pool_.currentBytes();
if (spillMemoryThreshold_ != 0 && currentUsage > spillMemoryThreshold_) {
if ((spillMemoryThreshold_ != 0 && currentUsage > spillMemoryThreshold_) ||
pool_.highUsage()) {
const int64_t bytesToSpill =
currentUsage * spillConfig_->spillableReservationGrowthPct / 100;
auto rowsToSpill = std::max<int64_t>(
Expand Down
22 changes: 18 additions & 4 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,13 @@ HashBuild::HashBuild(
nullAware_{joinNode_->isNullAware()},
joinBridge_(operatorCtx_->task()->getHashJoinBridgeLocked(
operatorCtx_->driverCtx()->splitGroupId,
planNodeId())) {
planNodeId())),
spillMemoryThreshold_(
operatorCtx_->driverCtx()
->queryConfig()
.joinSpillMemoryThreshold()) // fixme should we use
// "hashBuildSpillMemoryThreshold"
{
VELOX_CHECK(pool()->trackUsage());
VELOX_CHECK_NOT_NULL(joinBridge_);

Expand Down Expand Up @@ -92,9 +98,6 @@ HashBuild::HashBuild(
}

// Identify the non-key build side columns and make a decoder for each.
const auto numDependents = outputType->size() - numKeys;
dependentChannels_.reserve(numDependents);
decoders_.reserve(numDependents);
for (auto i = 0; i < outputType->size(); ++i) {
if (keyChannelMap.find(i) == keyChannelMap.end()) {
dependentChannels_.emplace_back(i);
Expand Down Expand Up @@ -434,6 +437,17 @@ bool HashBuild::reserveMemory(const RowVectorPtr& input) {
return false;
}

const auto currentUsage = pool()->currentBytes();
if ((spillMemoryThreshold_ != 0 && currentUsage > spillMemoryThreshold_) ||
pool()->highUsage()) {
const int64_t bytesToSpill =
currentUsage * spillConfig()->spillableReservationGrowthPct / 100;
numSpillRows_ = std::max<int64_t>(
1, bytesToSpill / (rows->fixedRowSize() + outOfLineBytesPerRow));
numSpillBytes_ = numSpillRows_ * outOfLineBytesPerRow;
return false;
}

if (freeRows > input->size() &&
(outOfLineBytes == 0 || outOfLineFreeBytes >= flatBytes)) {
// Enough free rows for input rows and enough variable length free
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ class HashBuild final : public Operator {

std::shared_ptr<SpillOperatorGroup> spillGroup_;

// The maximum memory usage that a hash build can hold before spilling.
// If it is zero, then there is no such limit.
const uint64_t spillMemoryThreshold_;

State state_{State::kRunning};

// The row type used for hash table build and disk spilling.
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,10 @@ void HashProbe::checkStateTransition(ProbeOperatorState state) {
}

RowVectorPtr HashProbe::getOutput() {
if (isFinished()) {
return nullptr;
}

checkRunning();

clearIdentityProjectedOutput();
Expand Down
14 changes: 14 additions & 0 deletions velox/exec/LocalPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "velox/exec/CallbackSink.h"
#include "velox/exec/EnforceSingleRow.h"
#include "velox/exec/Exchange.h"
#include "velox/exec/Expand.h"
#include "velox/exec/FilterProject.h"
#include "velox/exec/GroupId.h"
#include "velox/exec/HashAggregation.h"
Expand All @@ -37,6 +38,7 @@
#include "velox/exec/TableWriter.h"
#include "velox/exec/TopN.h"
#include "velox/exec/Unnest.h"
#include "velox/exec/ValueStream.h"
#include "velox/exec/Values.h"
#include "velox/exec/Window.h"

Expand Down Expand Up @@ -183,6 +185,9 @@ uint32_t maxDrivers(const DriverFactory& driverFactory) {
if (!values->isParallelizable()) {
return 1;
}
} else if (std::dynamic_pointer_cast<const core::ValueStreamNode>(node)) {
// ValueStream node must run single-threaded.
return 1;
} else if (std::dynamic_pointer_cast<const core::ArrowStreamNode>(node)) {
// ArrowStream node must run single-threaded.
return 1;
Expand Down Expand Up @@ -399,6 +404,11 @@ std::shared_ptr<Driver> DriverFactory::createDriver(
auto valuesNode =
std::dynamic_pointer_cast<const core::ValuesNode>(planNode)) {
operators.push_back(std::make_unique<Values>(id, ctx.get(), valuesNode));
} else if (
auto valueStreamNode =
std::dynamic_pointer_cast<const core::ValueStreamNode>(planNode)) {
operators.push_back(
std::make_unique<ValueStream>(id, ctx.get(), valueStreamNode));
} else if (
auto arrowStreamNode =
std::dynamic_pointer_cast<const core::ArrowStreamNode>(planNode)) {
Expand Down Expand Up @@ -455,6 +465,10 @@ std::shared_ptr<Driver> DriverFactory::createDriver(
operators.push_back(
std::make_unique<HashAggregation>(id, ctx.get(), aggregationNode));
}
} else if (
auto expandNode =
std::dynamic_pointer_cast<const core::ExpandNode>(planNode)) {
operators.push_back(std::make_unique<Expand>(id, ctx.get(), expandNode));
} else if (
auto groupIdNode =
std::dynamic_pointer_cast<const core::GroupIdNode>(planNode)) {
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/OperatorUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ void gatherCopy(
const std::vector<vector_size_t>& sourceIndices,
column_index_t sourceChannel) {
if (target->isScalar()) {
VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH(
VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
scalarGatherCopy,
target->type()->kind(),
target,
Expand Down
5 changes: 3 additions & 2 deletions velox/exec/OrderBy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ void OrderBy::ensureInputFits(const RowVectorPtr& input) {
}

const auto currentUsage = pool()->currentBytes();
if (spillMemoryThreshold_ != 0 && currentUsage > spillMemoryThreshold_) {
if ((spillMemoryThreshold_ != 0 && currentUsage > spillMemoryThreshold_) ||
pool()->highUsage()) {
const int64_t bytesToSpill =
currentUsage * spillConfig.spillableReservationGrowthPct / 100;
auto rowsToSpill = std::max<int64_t>(
Expand Down Expand Up @@ -266,7 +267,7 @@ void OrderBy::noMoreInput() {
returningRows_.resize(numRows_);
RowContainerIterator iter;
data_->listRows(&iter, numRows_, returningRows_.data());
std::sort(
std::stable_sort(
returningRows_.begin(),
returningRows_.end(),
[this](const char* leftRow, const char* rightRow) {
Expand Down
8 changes: 4 additions & 4 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -869,10 +869,10 @@ void Task::removeDriver(std::shared_ptr<Task> self, Driver* driver) {
}

if (self->numFinishedDrivers_ == self->numTotalDrivers_) {
LOG(INFO) << "All drivers (" << self->numFinishedDrivers_
/*LOG(INFO) << "All drivers (" << self->numFinishedDrivers_
<< ") finished for task " << self->taskId()
<< " after running for " << self->timeSinceStartMsLocked()
<< " ms.";
<< " ms.";*/
}
}

Expand Down Expand Up @@ -1547,9 +1547,9 @@ ContinueFuture Task::terminate(TaskState terminalState) {
}
}

LOG(INFO) << "Terminating task " << taskId() << " with state "
/*LOG(INFO) << "Terminating task " << taskId() << " with state "
<< taskStateString(state_) << " after running for "
<< timeSinceStartMsLocked() << " ms.";
<< timeSinceStartMsLocked() << " ms.";*/

activateTaskCompletionNotifier(completionNotifier);

Expand Down
Loading

0 comments on commit 65aee45

Please sign in to comment.