Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

remove redundant project&dedup in findpath & fix path error #912

Merged
merged 6 commits into from
Apr 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,6 @@ workspace.*
*.py[co]
__pycache__
venv/

#lock
*.lock
33 changes: 19 additions & 14 deletions src/executor/algo/ConjunctPathExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ folly::Future<Status> ConjunctPathExecutor::bfsShortestPath() {
<< " right input: " << conjunct->rightInputVar();
DCHECK(!!lIter);

auto steps = conjunct->steps();
count_++;

DataSet ds;
ds.colNames = conjunct->colNames();

Expand Down Expand Up @@ -62,22 +65,24 @@ folly::Future<Status> ConjunctPathExecutor::bfsShortestPath() {
}
}

auto latest = rHist.back().iter();
isLatest = true;
backward_.emplace_back();
VLOG(1) << "Find even length path.";
auto rows = findBfsShortestPath(latest.get(), isLatest, forward_.back());
if (!rows.empty()) {
VLOG(1) << "Meet even length path.";
ds.rows = std::move(rows);
if (count_ * 2 <= steps) {
auto latest = rHist.back().iter();
isLatest = true;
backward_.emplace_back();
VLOG(1) << "Find even length path.";
auto rows = findBfsShortestPath(latest.get(), isLatest, forward_.back());
if (!rows.empty()) {
VLOG(1) << "Meet even length path.";
ds.rows = std::move(rows);
}
}
return finish(ResultBuilder().value(Value(std::move(ds))).finish());
}

std::vector<Row> ConjunctPathExecutor::findBfsShortestPath(
Iterator* iter,
bool isLatest,
std::multimap<Value, const Edge*>& table) {
std::unordered_multimap<Value, const Edge*>& table) {
std::unordered_set<Value> meets;
for (; iter->valid(); iter->next()) {
auto& dst = iter->getColumn(kVid);
Expand Down Expand Up @@ -118,10 +123,10 @@ std::vector<Row> ConjunctPathExecutor::findBfsShortestPath(
return rows;
}

std::multimap<Value, Path> ConjunctPathExecutor::buildBfsInterimPath(
std::unordered_multimap<Value, Path> ConjunctPathExecutor::buildBfsInterimPath(
std::unordered_set<Value>& meets,
std::vector<std::multimap<Value, const Edge*>>& hists) {
std::multimap<Value, Path> results;
std::vector<std::unordered_multimap<Value, const Edge*>>& hists) {
std::unordered_multimap<Value, Path> results;
for (auto& v : meets) {
VLOG(1) << "Meet at: " << v;
Path start;
Expand Down Expand Up @@ -205,7 +210,7 @@ folly::Future<Status> ConjunctPathExecutor::floydShortestPath() {
findPath(previous.get(), forwardCostPathMap, ds);
}

if (count_ * 2 < steps) {
if (count_ * 2 <= steps) {
VLOG(1) << "Find even length path.";
auto latest = rHist.back().iter();
findPath(latest.get(), forwardCostPathMap, ds);
Expand All @@ -224,7 +229,7 @@ Status ConjunctPathExecutor::conjunctPath(const List& forwardPaths,
}
for (auto& j : backwardPaths.values) {
if (!j.isPath()) {
return Status::Error("Forward Path Type Error");
return Status::Error("Backward Path Type Error");
}
Row row;
auto forward = i.getPath();
Expand Down
10 changes: 5 additions & 5 deletions src/executor/algo/ConjunctPathExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ class ConjunctPathExecutor final : public Executor {

std::vector<Row> findBfsShortestPath(Iterator* iter,
bool isLatest,
std::multimap<Value, const Edge*>& table);
std::unordered_multimap<Value, const Edge*>& table);

std::multimap<Value, Path> buildBfsInterimPath(
std::unordered_multimap<Value, Path> buildBfsInterimPath(
std::unordered_set<Value>& meets,
std::vector<std::multimap<Value, const Edge*>>& hist);
std::vector<std::unordered_multimap<Value, const Edge*>>& hist);

folly::Future<Status> floydShortestPath();

Expand All @@ -54,8 +54,8 @@ class ConjunctPathExecutor final : public Executor {
void delPathFromConditionalVar(const Value& start, const Value& end);

private:
std::vector<std::multimap<Value, const Edge*>> forward_;
std::vector<std::multimap<Value, const Edge*>> backward_;
std::vector<std::unordered_multimap<Value, const Edge*>> forward_;
std::vector<std::unordered_multimap<Value, const Edge*>> backward_;
size_t count_{0};
// startVid : {endVid, cost}
std::unordered_map<Value, std::unordered_map<Value, Value>> historyCostMap_;
Expand Down
103 changes: 30 additions & 73 deletions src/validator/FindPathValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,14 @@ Status FindPathValidator::singlePairPlan() {
auto* bodyStart = StartNode::make(qctx_);
auto* passThrough = PassThroughNode::make(qctx_, bodyStart);

std::string fromPathVar;
auto* forward = bfs(passThrough, from_, fromPathVar, false);
VLOG(1) << "forward: " << fromPathVar;
auto* forward = bfs(passThrough, from_, false);
VLOG(1) << "forward: " << forward->outputVar();

std::string toPathVar;
auto* backward = bfs(passThrough, to_, toPathVar, true);
VLOG(1) << "backward: " << toPathVar;
auto* backward = bfs(passThrough, to_, true);
VLOG(1) << "backward: " << backward->outputVar();

auto* conjunct =
ConjunctPath::make(qctx_, forward, backward, ConjunctPath::PathKind::kBiBFS, steps_.steps);
conjunct->setLeftVar(fromPathVar);
conjunct->setRightVar(toPathVar);
conjunct->setColNames({"_path"});

auto* loop = Loop::make(
Expand All @@ -82,45 +78,28 @@ Status FindPathValidator::singlePairPlan() {
return Status::OK();
}

PlanNode* FindPathValidator::bfs(PlanNode* dep,
Starts& starts,
std::string& pathVar,
bool reverse) {
PlanNode* FindPathValidator::bfs(PlanNode* dep, Starts& starts, bool reverse) {
std::string startVidsVar;
buildConstantInput(starts, startVidsVar);

auto* gn = GetNeighbors::make(qctx_, dep, space_.id);
gn->setSrc(starts.src);
gn->setEdgeProps(buildEdgeKey(reverse));
gn->setDedup();
gn->setInputVar(startVidsVar);

auto* bfs = BFSShortestPath::make(qctx_, gn);
bfs->setColNames({"_vid", "edge"});
pathVar = bfs->outputVar();

auto* columns = qctx_->objPool()->add(new YieldColumns());
auto* column =
new YieldColumn(new VariablePropertyExpression(new std::string("*"), new std::string(kVid)),
new std::string(kVid));
columns->addColumn(column);
auto* project = Project::make(qctx_, bfs, columns);
project->setColNames(deduceColNames(columns));

auto* dedup = Dedup::make(qctx_, project);
auto* startVidsVarPtr = qctx_->symTable()->getVar(startVidsVar);
startVidsVarPtr->colNames = project->colNames();
dedup->setOutputVar(startVidsVar);

bfs->setOutputVar(startVidsVar);
bfs->setColNames({nebula::kVid, "edge"});

DataSet ds;
ds.colNames = {"_vid", "edge"};
ds.colNames = {nebula::kVid, "edge"};
Row row;
row.values.emplace_back(starts.vids.front());
row.values.emplace_back(Value::kEmpty);
ds.rows.emplace_back(std::move(row));
qctx_->ectx()->setResult(pathVar, ResultBuilder().value(Value(std::move(ds))).finish());
qctx_->ectx()->setResult(startVidsVar, ResultBuilder().value(Value(std::move(ds))).finish());

return dedup;
return bfs;
}

Expression* FindPathValidator::buildBfsLoopCondition(uint32_t steps, const std::string& pathVar) {
Expand Down Expand Up @@ -186,9 +165,7 @@ GetNeighbors::EdgeProps FindPathValidator::buildEdgeKey(bool reverse) {
return edgeProps;
}

PlanNode* FindPathValidator::buildAllPairFirstDataSet(PlanNode* dep,
const std::string& inputVar,
const std::string& outputVar) {
PlanNode* FindPathValidator::buildAllPairFirstDataSet(PlanNode* dep, const std::string& inputVar) {
auto* vid =
new YieldColumn(new VariablePropertyExpression(new std::string("*"), new std::string(kVid)),
new std::string(kVid));
Expand All @@ -206,9 +183,9 @@ PlanNode* FindPathValidator::buildAllPairFirstDataSet(PlanNode* dep,

auto* project = Project::make(qctx_, dep, columns);
project->setInputVar(inputVar);
auto* outputVarPtr = qctx_->symTable()->getVar(outputVar);
outputVarPtr->colNames = {kVid, "path"};
project->setOutputVar(outputVar);
auto* outputVarPtr = qctx_->symTable()->getVar(inputVar);
outputVarPtr->colNames = {nebula::kVid, "path"};
project->setOutputVar(inputVar);
return project;
}

Expand All @@ -218,28 +195,25 @@ Status FindPathValidator::allPairPaths() {

std::string fromStartVidsVar;
buildStart(from_, fromStartVidsVar, false);
std::string fromPathVar;
auto* forward = allPaths(passThrough, from_, fromStartVidsVar, fromPathVar, false);
VLOG(1) << "forward: " << fromPathVar;

auto* forward = allPaths(passThrough, from_, fromStartVidsVar, false);
VLOG(1) << "forward: " << forward->outputVar();

std::string toStartVidsVar;
buildStart(to_, toStartVidsVar, true);
std::string toPathVar;
auto* backward = allPaths(passThrough, to_, toStartVidsVar, toPathVar, true);
VLOG(1) << "backward: " << toPathVar;
auto* backward = allPaths(passThrough, to_, toStartVidsVar, true);
VLOG(1) << "backward: " << backward->outputVar();

auto* conjunct = ConjunctPath::make(
qctx_, forward, backward, ConjunctPath::PathKind::kAllPaths, steps_.steps);
conjunct->setLeftVar(fromPathVar);
conjunct->setRightVar(toPathVar);
conjunct->setColNames({"_path"});
conjunct->setNoLoop(noLoop_);

PlanNode* projectFromDep = nullptr;
linkLoopDepFromTo(projectFromDep);

auto* projectFrom = buildAllPairFirstDataSet(projectFromDep, fromStartVidsVar, fromPathVar);
auto* projectTo = buildAllPairFirstDataSet(projectFrom, toStartVidsVar, toPathVar);
auto* projectFrom = buildAllPairFirstDataSet(projectFromDep, fromStartVidsVar);
auto* projectTo = buildAllPairFirstDataSet(projectFrom, toStartVidsVar);

auto* loop =
Loop::make(qctx_, projectTo, conjunct, buildAllPathsLoopCondition(steps_.steps));
Expand All @@ -256,31 +230,17 @@ Status FindPathValidator::allPairPaths() {
PlanNode* FindPathValidator::allPaths(PlanNode* dep,
Starts& starts,
std::string& startVidsVar,
std::string& pathVar,
bool reverse) {
auto* gn = GetNeighbors::make(qctx_, dep, space_.id);
gn->setSrc(starts.src);
gn->setEdgeProps(buildEdgeKey(reverse));
gn->setInputVar(startVidsVar);
gn->setDedup();

auto* allPaths = ProduceAllPaths::make(qctx_, gn);
allPaths->setColNames({kVid, "path"});
pathVar = allPaths->outputVar();

auto* columns = qctx_->objPool()->add(new YieldColumns());
auto* column =
new YieldColumn(new VariablePropertyExpression(new std::string("*"), new std::string(kVid)),
new std::string(kVid));
columns->addColumn(column);
auto* project = Project::make(qctx_, allPaths, columns);
project->setColNames(deduceColNames(columns));

auto* dedup = Dedup::make(qctx_, project);
auto* startVidsVarPtr = qctx_->symTable()->getVar(startVidsVar);
startVidsVarPtr->colNames = project->colNames();
dedup->setOutputVar(startVidsVar);

return dedup;
allPaths->setOutputVar(startVidsVar);
allPaths->setColNames({nebula::kVid, "path"});
return allPaths;
}

Expression* FindPathValidator::buildAllPathsLoopCondition(uint32_t steps) {
Expand Down Expand Up @@ -419,9 +379,10 @@ PlanNode* FindPathValidator::multiPairShortestPath(PlanNode* dep,
gn->setSrc(starts.src);
gn->setEdgeProps(buildEdgeKey(reverse));
gn->setInputVar(startVidsVar);
gn->setDedup();

auto* pssp = ProduceSemiShortestPath::make(qctx_, gn);
pssp->setColNames({kDst, kSrc, "cost", "paths"});
pssp->setColNames({nebula::kDst, nebula::kSrc, "cost", "paths"});
pathVar = pssp->outputVar();

auto* columns = qctx_->objPool()->add(new YieldColumns());
Expand All @@ -430,14 +391,10 @@ PlanNode* FindPathValidator::multiPairShortestPath(PlanNode* dep,
new std::string(kVid));
columns->addColumn(column);
auto* project = Project::make(qctx_, pssp, columns);
project->setOutputVar(startVidsVar);
project->setColNames(deduceColNames(columns));

auto* dedup = Dedup::make(qctx_, project);
auto* startVidsVarPtr = qctx_->symTable()->getVar(startVidsVar);
startVidsVarPtr->colNames = project->colNames();
dedup->setOutputVar(startVidsVar);

return dedup;
return project;
}

Expression* FindPathValidator::buildMultiPairLoopCondition(uint32_t steps,
Expand Down
12 changes: 3 additions & 9 deletions src/validator/FindPathValidator.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,14 @@ class FindPathValidator final : public TraversalValidator {
void linkLoopDepFromTo(PlanNode*& projectDep);
// bfs
Status singlePairPlan();
PlanNode* bfs(PlanNode* dep, Starts& starts, std::string& pathVar, bool reverse);
PlanNode* bfs(PlanNode* dep, Starts& starts, bool reverse);
Expression* buildBfsLoopCondition(uint32_t steps, const std::string& pathVar);

// allPath
Status allPairPaths();
PlanNode* allPaths(PlanNode* dep,
Starts& starts,
std::string& startVidsVar,
std::string& pathVar,
bool reverse);
PlanNode* allPaths(PlanNode* dep, Starts& starts, std::string& startVidsVar, bool reverse);
Expression* buildAllPathsLoopCondition(uint32_t steps);
PlanNode* buildAllPairFirstDataSet(PlanNode* dep,
const std::string& inputVar,
const std::string& outputVar);
PlanNode* buildAllPairFirstDataSet(PlanNode* dep, const std::string& inputVar);

// multi-pair
Status multiPairPlan();
Expand Down
Loading