Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parallel page rank #4689

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
322 changes: 243 additions & 79 deletions src/function/gds/page_rank.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#include "binder/binder.h"
#include "binder/expression/expression_util.h"
#include "common/types/internal_id_util.h"
#include "function/gds/gds.h"
#include "function/gds/gds_frontier.h"
#include "function/gds/gds_function_collection.h"
#include "function/gds/gds_utils.h"
#include "function/gds/output_writer.h"
#include "function/gds_function.h"
#include "graph/graph.h"
#include "main/client_context.h"
Expand All @@ -20,8 +22,8 @@ namespace function {

struct PageRankBindData final : public GDSBindData {
double dampingFactor = 0.85;
int64_t maxIteration = 10;
double delta = 0.0001; // detect convergence
int64_t maxIteration = 20;
double delta = 0.0000001; // Convergence delta

explicit PageRankBindData(graph::GraphEntry graphEntry,
std::shared_ptr<binder::Expression> nodeOutput)
Expand All @@ -35,35 +37,209 @@ struct PageRankBindData final : public GDSBindData {
}
};

class PageRankOutputWriter {
static void addCAS(std::atomic<double>& origin, double valToAdd) {
auto expected = origin.load(std::memory_order_relaxed);
auto desired = expected + valToAdd;
while (!origin.compare_exchange_strong(expected, desired))
;
}

// Represents PageRank value
class P {
public:
explicit PageRankOutputWriter(main::ClientContext* context) {
auto mm = context->getMemoryManager();
nodeIDVector = std::make_unique<ValueVector>(LogicalType::INTERNAL_ID(), mm);
rankVector = std::make_unique<ValueVector>(LogicalType::DOUBLE(), mm);
nodeIDVector->state = DataChunkState::getSingleValueDataChunkState();
rankVector->state = DataChunkState::getSingleValueDataChunkState();
vectors.push_back(nodeIDVector.get());
vectors.push_back(rankVector.get());
}

void materialize(main::ClientContext* context, Graph* graph,
const common::node_id_map_t<double>& ranks, FactorizedTable& table) const {
for (auto tableID : graph->getNodeTableIDs()) {
for (auto offset = 0u; offset < graph->getNumNodes(context->getTransaction(), tableID);
++offset) {
auto nodeID = nodeID_t{offset, tableID};
nodeIDVector->setValue<nodeID_t>(0, nodeID);
rankVector->setValue<double>(0, ranks.at(nodeID));
table.append(vectors);
P(table_id_map_t<offset_t> numNodesMap, storage::MemoryManager* mm, double val) {
for (const auto& [tableID, numNodes] : numNodesMap) {
valueMap.allocate(tableID, numNodes, mm);
pin(tableID);
for (auto i = 0u; i < numNodes; ++i) {
values[i].store(val, std::memory_order_relaxed);
}
}
}

void pin(table_id_t tableID) { values = valueMap.getData(tableID); }

double getValue(offset_t offset) { return values[offset].load(std::memory_order_relaxed); }

void addValueCAS(offset_t offset, double val) { addCAS(values[offset], val); }

void setValue(offset_t offset, double val) {
values[offset].store(val, std::memory_order_relaxed);
}

private:
std::atomic<double>* values = nullptr;
ObjectArraysMap<std::atomic<double>> valueMap;
};

// Sum the weight (current rank / degree) for each incoming edge.
class PNextUpdateEdgeCompute : public EdgeCompute {
public:
PNextUpdateEdgeCompute(P* pCurrent, P* pNext) : pCurrent{pCurrent}, pNext{pNext} {}

std::vector<nodeID_t> edgeCompute(nodeID_t boundNodeID, graph::NbrScanState::Chunk& chunk,
bool) override {
if (chunk.size() > 0) {
auto addVal = pCurrent->getValue(boundNodeID.offset) / chunk.size();
chunk.forEach(
[&](auto nbrNodeID, auto) { pNext->addValueCAS(nbrNodeID.offset, addVal); });
}
return {boundNodeID};
}

std::unique_ptr<EdgeCompute> copy() override {
return std::make_unique<PNextUpdateEdgeCompute>(pCurrent, pNext);
}

private:
P* pCurrent;
P* pNext;
};

// Evaluate rank = above result * dampingFactor + {(1 - dampingFactor) / |V|} (constant)
class PNextUpdateVertexCompute : public VertexCompute {
public:
PNextUpdateVertexCompute(double dampingFactor, double constant, P* pNext)
: dampingFactor{dampingFactor}, constant{constant}, pNext{pNext} {}

bool beginOnTable(table_id_t tableID) override {
pNext->pin(tableID);
return true;
}

void vertexCompute(offset_t startOffset, offset_t endOffset, table_id_t) override {
for (auto i = startOffset; i < endOffset; ++i) {
pNext->setValue(i, pNext->getValue(i) * dampingFactor + constant);
}
}

std::unique_ptr<VertexCompute> copy() override {
return std::make_unique<PNextUpdateVertexCompute>(dampingFactor, constant, pNext);
}

private:
double dampingFactor;
double constant;
P* pNext;
};

class PDiffVertexCompute : public VertexCompute {
public:
PDiffVertexCompute(std::atomic<double>& diff, P* pCurrent, P* pNext)
: diff{diff}, pCurrent{pCurrent}, pNext{pNext} {}

bool beginOnTable(table_id_t tableID) override {
pCurrent->pin(tableID);
pNext->pin(tableID);
return true;
}

void vertexCompute(offset_t startOffset, offset_t endOffset, table_id_t) override {
for (auto i = startOffset; i < endOffset; ++i) {
auto next = pNext->getValue(i);
auto current = pCurrent->getValue(i);
if (next > current) {
addCAS(diff, next - current);
} else {
addCAS(diff, current - next);
}
}
}

std::unique_ptr<VertexCompute> copy() override {
return std::make_unique<PDiffVertexCompute>(diff, pCurrent, pNext);
}

private:
std::atomic<double>& diff;
P* pCurrent;
P* pNext;
};

class PResetVertexCompute : public VertexCompute {
public:
explicit PResetVertexCompute(P* pCurrent) : pCurrent{pCurrent} {}

bool beginOnTable(common::table_id_t tableID) override {
pCurrent->pin(tableID);
return true;
}

void vertexCompute(offset_t startOffset, offset_t endOffset, table_id_t) override {
for (auto i = startOffset; i < endOffset; ++i) {
pCurrent->setValue(i, 0);
}
}

std::unique_ptr<VertexCompute> copy() override {
return std::make_unique<PResetVertexCompute>(pCurrent);
}

private:
P* pCurrent;
};

class PageRankOutputWriter : public GDSOutputWriter {
public:
PageRankOutputWriter(main::ClientContext* context, processor::NodeOffsetMaskMap* outputNodeMask,
P* pNext)
: GDSOutputWriter{context, outputNodeMask}, pNext{pNext} {
nodeIDVector = createVector(LogicalType::INTERNAL_ID(), context->getMemoryManager());
rankVector = createVector(LogicalType::DOUBLE(), context->getMemoryManager());
}

void pinTableID(table_id_t tableID) override {
GDSOutputWriter::pinTableID(tableID);
pNext->pin(tableID);
}

void materialize(offset_t startOffset, offset_t endOffset, table_id_t tableID,
FactorizedTable& table) const {
for (auto i = startOffset; i < endOffset; ++i) {
auto nodeID = nodeID_t{i, tableID};
nodeIDVector->setValue<nodeID_t>(0, nodeID);
rankVector->setValue<double>(0, pNext->getValue(i));
table.append(vectors);
}
}

std::unique_ptr<PageRankOutputWriter> copy() const {
return std::make_unique<PageRankOutputWriter>(context, outputNodeMask, pNext);
}

private:
std::unique_ptr<ValueVector> nodeIDVector;
std::unique_ptr<ValueVector> rankVector;
std::vector<ValueVector*> vectors;
P* pNext;
};

class OutputVertexCompute : public VertexCompute {
public:
OutputVertexCompute(storage::MemoryManager* mm, processor::GDSCallSharedState* sharedState,
std::unique_ptr<PageRankOutputWriter> outputWriter)
: mm{mm}, sharedState{sharedState}, outputWriter{std::move(outputWriter)} {
localFT = sharedState->claimLocalTable(mm);
}
~OutputVertexCompute() override { sharedState->returnLocalTable(localFT); }

bool beginOnTable(table_id_t tableID) override {
outputWriter->pinTableID(tableID);
return true;
}

void vertexCompute(offset_t startOffset, offset_t endOffset, table_id_t tableID) override {
outputWriter->materialize(startOffset, endOffset, tableID, *localFT);
}

std::unique_ptr<VertexCompute> copy() override {
return std::make_unique<OutputVertexCompute>(mm, sharedState, outputWriter->copy());
}

private:
storage::MemoryManager* mm;
processor::GDSCallSharedState* sharedState;
std::unique_ptr<PageRankOutputWriter> outputWriter;
processor::FactorizedTable* localFT;
};

class PageRank final : public GDSAlgorithm {
Expand All @@ -78,9 +254,7 @@ class PageRank final : public GDSAlgorithm {
*
* graph::ANY
*/
std::vector<common::LogicalTypeID> getParameterTypeIDs() const override {
return {LogicalTypeID::ANY};
}
std::vector<LogicalTypeID> getParameterTypeIDs() const override { return {LogicalTypeID::ANY}; }

/*
* Outputs are
Expand All @@ -103,63 +277,53 @@ class PageRank final : public GDSAlgorithm {
bindData = std::make_unique<PageRankBindData>(std::move(graphEntry), nodeOutput);
}

void initLocalState(main::ClientContext* context) override {
localState = std::make_unique<PageRankOutputWriter>(context);
}

void exec(processor::ExecutionContext* context) override {
auto extraData = bindData->ptrCast<PageRankBindData>();
localState = std::make_unique<PageRankOutputWriter>(context->clientContext);
auto clientContext = context->clientContext;
auto transaction = clientContext->getTransaction();
auto graph = sharedState->graph.get();
// Initialize state.
common::node_id_map_t<double> ranks;
auto numNodes = graph->getNumNodes(context->clientContext->getTransaction());
for (auto tableID : graph->getNodeTableIDs()) {
for (auto offset = 0u;
offset < graph->getNumNodes(context->clientContext->getTransaction(), tableID);
++offset) {
auto nodeID = nodeID_t{offset, tableID};
ranks.insert({nodeID, 1.0 / numNodes});
}
}
auto dampingValue = (1 - extraData->dampingFactor) / numNodes;
// Compute page rank.
auto nodeTableIDs = graph->getNodeTableIDs();
auto scanState = graph->prepareMultiTableScanFwd(nodeTableIDs);
// We're using multiple overlapping iterators, both of which need access to a scan state, so
// we need multiple scan states
auto innerScanState = graph->prepareMultiTableScanFwd(nodeTableIDs);
auto numNodesInGraph = graph->getNumNodes(context->clientContext->getTransaction());
for (auto i = 0u; i < extraData->maxIteration; ++i) {
auto change = 0.0;
for (auto tableID : nodeTableIDs) {
for (auto offset = 0u;
offset < graph->getNumNodes(context->clientContext->getTransaction(), tableID);
++offset) {
auto nodeID = nodeID_t{offset, tableID};
auto rank = 0.0;
auto iter = graph->scanFwd(nodeID, *scanState);
for (const auto chunk : iter) {
chunk.forEach([&](auto nbr, auto) {
auto numNbrOfNbr = graph->scanFwd(nbr, *innerScanState).count();
if (numNbrOfNbr == 0) {
numNbrOfNbr = numNodesInGraph;
}
rank += extraData->dampingFactor * (ranks[nbr] / numNbrOfNbr);
});
}
rank += dampingValue;
double diff = ranks[nodeID] - rank;
change += diff < 0 ? -diff : diff;
ranks[nodeID] = rank;
}
}
if (change < extraData->delta) {
auto numNodesMap = graph->getNumNodesMap(transaction);
auto numNodes = graph->getNumNodes(transaction);
auto p1 = P(numNodesMap, clientContext->getMemoryManager(), (double)1 / numNodes);
auto p2 = P(numNodesMap, clientContext->getMemoryManager(), 0);
P* pCurrent = &p1;
P* pNext = &p2;
auto pageRankBindData = bindData->ptrCast<PageRankBindData>();
auto currentIter = 1u;
auto currentFrontier = getPathLengthsFrontier(context, PathLengths::UNVISITED);
auto nextFrontier = getPathLengthsFrontier(context, 0);
auto frontierPair =
std::make_unique<DoublePathLengthsFrontierPair>(currentFrontier, nextFrontier);
frontierPair->setActiveNodesForNextIter();
frontierPair->getNextSparseFrontier().disable();
auto pNextUpdateConstant = (1 - pageRankBindData->dampingFactor) * ((double)1 / numNodes);
auto computeState =
GDSComputeState(std::move(frontierPair), nullptr, sharedState->getOutputNodeMaskMap());
while (currentIter < pageRankBindData->maxIteration) {
auto ec = std::make_unique<PNextUpdateEdgeCompute>(pCurrent, pNext);
computeState.edgeCompute = std::move(ec);
GDSUtils::runFrontiersUntilConvergence(context, computeState, graph,
ExtendDirection::FWD, computeState.frontierPair->getCurrentIter() + 1);
auto pNextUpdateVC = PNextUpdateVertexCompute(pageRankBindData->dampingFactor,
pNextUpdateConstant, pNext);
GDSUtils::runVertexCompute(context, graph, pNextUpdateVC);
std::atomic<double> diff;
diff.store(0);
auto pDiffVC = PDiffVertexCompute(diff, pCurrent, pNext);
GDSUtils::runVertexCompute(context, graph, pDiffVC);
std::swap(pCurrent, pNext);
if (diff.load() < pageRankBindData->delta) { // Converged.
break;
}
auto pResetVC = PResetVertexCompute(pNext);
GDSUtils::runVertexCompute(context, graph, pResetVC);
currentIter++;
}
// Materialize result.
localState->materialize(context->clientContext, graph, ranks, *sharedState->fTable);
auto writer = std::make_unique<PageRankOutputWriter>(clientContext,
sharedState->getOutputNodeMaskMap(), pCurrent);
auto outputVC = std::make_unique<OutputVertexCompute>(clientContext->getMemoryManager(),
sharedState.get(), std::move(writer));
GDSUtils::runVertexCompute(context, graph, *outputVC);
sharedState->mergeLocalTables();
}

std::unique_ptr<GDSAlgorithm> copy() const override {
Expand Down
2 changes: 1 addition & 1 deletion src/function/gds/weakly_connected_components.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class WeaklyConnectedComponent final : public GDSAlgorithm {
MAX_ITERATION);
auto vertexCompute = std::make_unique<WCCVertexCompute>(clientContext->getMemoryManager(),
sharedState.get(), std::move(writer));
GDSUtils::runVertexCompute(context, sharedState->graph.get(), *vertexCompute);
GDSUtils::runVertexCompute(context, graph, *vertexCompute);
sharedState->mergeLocalTables();
}

Expand Down
4 changes: 0 additions & 4 deletions src/include/function/gds/gds.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,6 @@ class KUZU_API GDSAlgorithm {
}

protected:
// TODO(Semih/Xiyang): See if this will be still needed after PageRank and other algorithms are
// updated. GDSAlgorithms implementing recursive joins do not use this function.
virtual void initLocalState(main::ClientContext*) {}

graph::GraphEntry bindGraphEntry(main::ClientContext& context, const std::string& name);
std::shared_ptr<binder::Expression> bindNodeOutput(binder::Binder* binder,
const std::vector<catalog::TableCatalogEntry*>& nodeEntries);
Expand Down
1 change: 0 additions & 1 deletion src/include/processor/plan_mapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ class PlanMapper {
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapProjection(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapRecursiveExtend(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapScanSource(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapScanNodeTable(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapSemiMasker(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapSetProperty(planner::LogicalOperator* logicalOperator);
Expand Down
Loading