diff --git a/src/function/gds/page_rank.cpp b/src/function/gds/page_rank.cpp index 21c9ee0211b..9b9c14a84fe 100644 --- a/src/function/gds/page_rank.cpp +++ b/src/function/gds/page_rank.cpp @@ -1,8 +1,10 @@ #include "binder/binder.h" #include "binder/expression/expression_util.h" -#include "common/types/internal_id_util.h" #include "function/gds/gds.h" +#include "function/gds/gds_frontier.h" #include "function/gds/gds_function_collection.h" +#include "function/gds/gds_utils.h" +#include "function/gds/output_writer.h" #include "function/gds_function.h" #include "graph/graph.h" #include "main/client_context.h" @@ -20,8 +22,8 @@ namespace function { struct PageRankBindData final : public GDSBindData { double dampingFactor = 0.85; - int64_t maxIteration = 10; - double delta = 0.0001; // detect convergence + int64_t maxIteration = 20; + double delta = 0.0000001; // Convergence delta explicit PageRankBindData(graph::GraphEntry graphEntry, std::shared_ptr nodeOutput) @@ -35,35 +37,211 @@ struct PageRankBindData final : public GDSBindData { } }; -class PageRankOutputWriter { +static void addCAS(std::atomic& origin, double valToAdd) { + auto expected = origin.load(std::memory_order_relaxed); + auto desired = expected + valToAdd; + while (!origin.compare_exchange_strong(expected, desired)) { + desired = expected + valToAdd; + } +} + +// Represents PageRank value for all nodes +class PValues { +public: + PValues(table_id_map_t numNodesMap, storage::MemoryManager* mm, double val) { + for (const auto& [tableID, numNodes] : numNodesMap) { + valueMap.allocate(tableID, numNodes, mm); + pin(tableID); + for (auto i = 0u; i < numNodes; ++i) { + values[i].store(val, std::memory_order_relaxed); + } + } + } + + void pin(table_id_t tableID) { values = valueMap.getData(tableID); } + + double getValue(offset_t offset) { return values[offset].load(std::memory_order_relaxed); } + + void addValueCAS(offset_t offset, double val) { addCAS(values[offset], val); } + + void setValue(offset_t offset, double val) { + values[offset].store(val, std::memory_order_relaxed); + } + +private: + std::atomic* values = nullptr; + ObjectArraysMap> valueMap; +}; + +// Sum the weight (current rank / degree) for each incoming edge. +class PNextUpdateEdgeCompute : public EdgeCompute { +public: + PNextUpdateEdgeCompute(PValues* pCurrent, PValues* pNext) : pCurrent{pCurrent}, pNext{pNext} {} + + std::vector edgeCompute(nodeID_t boundNodeID, graph::NbrScanState::Chunk& chunk, + bool) override { + if (chunk.size() > 0) { + auto valToAdd = pCurrent->getValue(boundNodeID.offset) / chunk.size(); + chunk.forEach( + [&](auto nbrNodeID, auto) { pNext->addValueCAS(nbrNodeID.offset, valToAdd); }); + } + return {boundNodeID}; + } + + std::unique_ptr copy() override { + return std::make_unique(pCurrent, pNext); + } + +private: + PValues* pCurrent; + PValues* pNext; +}; + +// Evaluate rank = above result * dampingFactor + {(1 - dampingFactor) / |V|} (constant) +class PNextUpdateVertexCompute : public VertexCompute { +public: + PNextUpdateVertexCompute(double dampingFactor, double constant, PValues* pNext) + : dampingFactor{dampingFactor}, constant{constant}, pNext{pNext} {} + + bool beginOnTable(table_id_t tableID) override { + pNext->pin(tableID); + return true; + } + + void vertexCompute(offset_t startOffset, offset_t endOffset, table_id_t) override { + for (auto i = startOffset; i < endOffset; ++i) { + pNext->setValue(i, pNext->getValue(i) * dampingFactor + constant); + } + } + + std::unique_ptr copy() override { + return std::make_unique(dampingFactor, constant, pNext); + } + +private: + double dampingFactor; + double constant; + PValues* pNext; +}; + +class PDiffVertexCompute : public VertexCompute { public: - explicit PageRankOutputWriter(main::ClientContext* context) { - auto mm = context->getMemoryManager(); - nodeIDVector = std::make_unique(LogicalType::INTERNAL_ID(), mm); - rankVector = std::make_unique(LogicalType::DOUBLE(), mm); - nodeIDVector->state = DataChunkState::getSingleValueDataChunkState(); - rankVector->state = DataChunkState::getSingleValueDataChunkState(); - vectors.push_back(nodeIDVector.get()); - vectors.push_back(rankVector.get()); - } - - void materialize(main::ClientContext* context, Graph* graph, - const common::node_id_map_t& ranks, FactorizedTable& table) const { - for (auto tableID : graph->getNodeTableIDs()) { - for (auto offset = 0u; offset < graph->getNumNodes(context->getTransaction(), tableID); - ++offset) { - auto nodeID = nodeID_t{offset, tableID}; - nodeIDVector->setValue(0, nodeID); - rankVector->setValue(0, ranks.at(nodeID)); - table.append(vectors); + PDiffVertexCompute(std::atomic& diff, PValues* pCurrent, PValues* pNext) + : diff{diff}, pCurrent{pCurrent}, pNext{pNext} {} + + bool beginOnTable(table_id_t tableID) override { + pCurrent->pin(tableID); + pNext->pin(tableID); + return true; + } + + void vertexCompute(offset_t startOffset, offset_t endOffset, table_id_t) override { + for (auto i = startOffset; i < endOffset; ++i) { + auto next = pNext->getValue(i); + auto current = pCurrent->getValue(i); + if (next > current) { + addCAS(diff, next - current); + } else { + addCAS(diff, current - next); } + pCurrent->setValue(i, 0); } } + std::unique_ptr copy() override { + return std::make_unique(diff, pCurrent, pNext); + } + +private: + std::atomic& diff; + PValues* pCurrent; + PValues* pNext; +}; + +// class PResetVertexCompute : public VertexCompute { +// public: +// explicit PResetVertexCompute(P* pCurrent) : pCurrent{pCurrent} {} +// +// bool beginOnTable(common::table_id_t tableID) override { +// pCurrent->pin(tableID); +// return true; +// } +// +// void vertexCompute(offset_t startOffset, offset_t endOffset, table_id_t) override { +// for (auto i = startOffset; i < endOffset; ++i) { +// pCurrent->setValue(i, 0); +// } +// } +// +// std::unique_ptr copy() override { +// return std::make_unique(pCurrent); +// } +// +// private: +// P* pCurrent; +// }; + +class PageRankOutputWriter : public GDSOutputWriter { +public: + PageRankOutputWriter(main::ClientContext* context, processor::NodeOffsetMaskMap* outputNodeMask, + PValues* pNext) + : GDSOutputWriter{context, outputNodeMask}, pNext{pNext} { + nodeIDVector = createVector(LogicalType::INTERNAL_ID(), context->getMemoryManager()); + rankVector = createVector(LogicalType::DOUBLE(), context->getMemoryManager()); + } + + void pinTableID(table_id_t tableID) override { + GDSOutputWriter::pinTableID(tableID); + pNext->pin(tableID); + } + + void materialize(offset_t startOffset, offset_t endOffset, table_id_t tableID, + FactorizedTable& table) const { + for (auto i = startOffset; i < endOffset; ++i) { + auto nodeID = nodeID_t{i, tableID}; + nodeIDVector->setValue(0, nodeID); + rankVector->setValue(0, pNext->getValue(i)); + table.append(vectors); + } + } + + std::unique_ptr copy() const { + return std::make_unique(context, outputNodeMask, pNext); + } + private: std::unique_ptr nodeIDVector; std::unique_ptr rankVector; - std::vector vectors; + PValues* pNext; +}; + +class OutputVertexCompute : public VertexCompute { +public: + OutputVertexCompute(storage::MemoryManager* mm, processor::GDSCallSharedState* sharedState, + std::unique_ptr outputWriter) + : mm{mm}, sharedState{sharedState}, outputWriter{std::move(outputWriter)} { + localFT = sharedState->claimLocalTable(mm); + } + ~OutputVertexCompute() override { sharedState->returnLocalTable(localFT); } + + bool beginOnTable(table_id_t tableID) override { + outputWriter->pinTableID(tableID); + return true; + } + + void vertexCompute(offset_t startOffset, offset_t endOffset, table_id_t tableID) override { + outputWriter->materialize(startOffset, endOffset, tableID, *localFT); + } + + std::unique_ptr copy() override { + return std::make_unique(mm, sharedState, outputWriter->copy()); + } + +private: + storage::MemoryManager* mm; + processor::GDSCallSharedState* sharedState; + std::unique_ptr outputWriter; + processor::FactorizedTable* localFT; }; class PageRank final : public GDSAlgorithm { @@ -92,63 +270,51 @@ class PageRank final : public GDSAlgorithm { bindData = std::make_unique(std::move(graphEntry), nodeOutput); } - void initLocalState(main::ClientContext* context) override { - localState = std::make_unique(context); - } - void exec(processor::ExecutionContext* context) override { - auto extraData = bindData->ptrCast(); - localState = std::make_unique(context->clientContext); + auto clientContext = context->clientContext; + auto transaction = clientContext->getTransaction(); auto graph = sharedState->graph.get(); - // Initialize state. - common::node_id_map_t ranks; - auto numNodes = graph->getNumNodes(context->clientContext->getTransaction()); - for (auto tableID : graph->getNodeTableIDs()) { - for (auto offset = 0u; - offset < graph->getNumNodes(context->clientContext->getTransaction(), tableID); - ++offset) { - auto nodeID = nodeID_t{offset, tableID}; - ranks.insert({nodeID, 1.0 / numNodes}); - } - } - auto dampingValue = (1 - extraData->dampingFactor) / numNodes; - // Compute page rank. - auto nodeTableIDs = graph->getNodeTableIDs(); - auto scanState = graph->prepareMultiTableScanFwd(nodeTableIDs); - // We're using multiple overlapping iterators, both of which need access to a scan state, so - // we need multiple scan states - auto innerScanState = graph->prepareMultiTableScanFwd(nodeTableIDs); - auto numNodesInGraph = graph->getNumNodes(context->clientContext->getTransaction()); - for (auto i = 0u; i < extraData->maxIteration; ++i) { - auto change = 0.0; - for (auto tableID : nodeTableIDs) { - for (auto offset = 0u; - offset < graph->getNumNodes(context->clientContext->getTransaction(), tableID); - ++offset) { - auto nodeID = nodeID_t{offset, tableID}; - auto rank = 0.0; - auto iter = graph->scanFwd(nodeID, *scanState); - for (const auto chunk : iter) { - chunk.forEach([&](auto nbr, auto) { - auto numNbrOfNbr = graph->scanFwd(nbr, *innerScanState).count(); - if (numNbrOfNbr == 0) { - numNbrOfNbr = numNodesInGraph; - } - rank += extraData->dampingFactor * (ranks[nbr] / numNbrOfNbr); - }); - } - rank += dampingValue; - double diff = ranks[nodeID] - rank; - change += diff < 0 ? -diff : diff; - ranks[nodeID] = rank; - } - } - if (change < extraData->delta) { + auto numNodesMap = graph->getNumNodesMap(transaction); + auto numNodes = graph->getNumNodes(transaction); + auto p1 = PValues(numNodesMap, clientContext->getMemoryManager(), (double)1 / numNodes); + auto p2 = PValues(numNodesMap, clientContext->getMemoryManager(), 0); + PValues* pCurrent = &p1; + PValues* pNext = &p2; + auto pageRankBindData = bindData->ptrCast(); + auto currentIter = 1u; + auto currentFrontier = getPathLengthsFrontier(context, PathLengths::UNVISITED); + auto nextFrontier = getPathLengthsFrontier(context, 0); + auto frontierPair = + std::make_unique(currentFrontier, nextFrontier); + frontierPair->setActiveNodesForNextIter(); + frontierPair->getNextSparseFrontier().disable(); + auto pNextUpdateConstant = (1 - pageRankBindData->dampingFactor) * ((double)1 / numNodes); + auto computeState = + GDSComputeState(std::move(frontierPair), nullptr, sharedState->getOutputNodeMaskMap()); + while (currentIter < pageRankBindData->maxIteration) { + auto ec = std::make_unique(pCurrent, pNext); + computeState.edgeCompute = std::move(ec); + GDSUtils::runFrontiersUntilConvergence(context, computeState, graph, + ExtendDirection::FWD, computeState.frontierPair->getCurrentIter() + 1); + auto pNextUpdateVC = PNextUpdateVertexCompute(pageRankBindData->dampingFactor, + pNextUpdateConstant, pNext); + GDSUtils::runVertexCompute(context, graph, pNextUpdateVC); + std::atomic diff; + diff.store(0); + auto pDiffVC = PDiffVertexCompute(diff, pCurrent, pNext); + GDSUtils::runVertexCompute(context, graph, pDiffVC); + std::swap(pCurrent, pNext); + if (diff.load() < pageRankBindData->delta) { // Converged. break; } + currentIter++; } - // Materialize result. - localState->materialize(context->clientContext, graph, ranks, *sharedState->fTable); + auto writer = std::make_unique(clientContext, + sharedState->getOutputNodeMaskMap(), pCurrent); + auto outputVC = std::make_unique(clientContext->getMemoryManager(), + sharedState.get(), std::move(writer)); + GDSUtils::runVertexCompute(context, graph, *outputVC); + sharedState->mergeLocalTables(); } std::unique_ptr copy() const override { diff --git a/src/function/gds/weakly_connected_components.cpp b/src/function/gds/weakly_connected_components.cpp index 41771b53551..9e38c8b9720 100644 --- a/src/function/gds/weakly_connected_components.cpp +++ b/src/function/gds/weakly_connected_components.cpp @@ -219,7 +219,7 @@ class WeaklyConnectedComponent final : public GDSAlgorithm { MAX_ITERATION); auto vertexCompute = std::make_unique(clientContext->getMemoryManager(), sharedState.get(), std::move(writer)); - GDSUtils::runVertexCompute(context, sharedState->graph.get(), *vertexCompute); + GDSUtils::runVertexCompute(context, graph, *vertexCompute); sharedState->mergeLocalTables(); } diff --git a/src/graph/on_disk_graph.cpp b/src/graph/on_disk_graph.cpp index cc961093990..e97143079cc 100644 --- a/src/graph/on_disk_graph.cpp +++ b/src/graph/on_disk_graph.cpp @@ -215,38 +215,6 @@ std::unique_ptr OnDiskGraph::prepareScan(table_id_t relTableID, std::span(&relTable, 1), graphEntry, edgePropertyIndex)); } -std::unique_ptr OnDiskGraph::prepareMultiTableScanFwd( - std::span nodeTableIDs) { - std::unordered_set relTableIDSet; - std::vector tables; - for (auto nodeTableID : nodeTableIDs) { - for (auto& [tableID, table] : nodeTableIDToFwdRelTables.at(nodeTableID)) { - if (!relTableIDSet.contains(tableID)) { - relTableIDSet.insert(tableID); - tables.push_back(table); - } - } - } - return std::unique_ptr( - new OnDiskGraphNbrScanStates(context, std::span(tables), graphEntry)); -} - -std::unique_ptr OnDiskGraph::prepareMultiTableScanBwd( - std::span nodeTableIDs) { - std::unordered_set relTableIDSet; - std::vector tables; - for (auto nodeTableID : nodeTableIDs) { - for (auto& [tableID, table] : nodeTableIDToBwdRelTables.at(nodeTableID)) { - if (!relTableIDSet.contains(tableID)) { - relTableIDSet.insert(tableID); - tables.push_back(table); - } - } - } - return std::unique_ptr( - new OnDiskGraphNbrScanStates(context, std::span(tables), graphEntry)); -} - Graph::EdgeIterator OnDiskGraph::scanFwd(nodeID_t nodeID, NbrScanState& state) { auto& onDiskScanState = ku_dynamic_cast(state); onDiskScanState.srcNodeIDVector->setValue(0, nodeID); diff --git a/src/include/function/gds/gds.h b/src/include/function/gds/gds.h index ed361cec020..6fbcd001967 100644 --- a/src/include/function/gds/gds.h +++ b/src/include/function/gds/gds.h @@ -113,10 +113,6 @@ class KUZU_API GDSAlgorithm { } protected: - // TODO(Semih/Xiyang): See if this will be still needed after PageRank and other algorithms are - // updated. GDSAlgorithms implementing recursive joins do not use this function. - virtual void initLocalState(main::ClientContext*) {} - graph::GraphEntry bindGraphEntry(main::ClientContext& context, const std::string& name); std::shared_ptr bindNodeOutput(binder::Binder* binder, const std::vector& nodeEntries); diff --git a/src/include/graph/graph.h b/src/include/graph/graph.h index fd71253f819..c844e854bf2 100644 --- a/src/include/graph/graph.h +++ b/src/include/graph/graph.h @@ -203,24 +203,10 @@ class Graph { // Prepares scan on the specified relationship table (works for backwards and forwards scans) virtual std::unique_ptr prepareScan(common::table_id_t relTableID, std::optional edgePropertyIndex = std::nullopt) = 0; - // Prepares scan on all connected relationship tables using forward adjList. - virtual std::unique_ptr prepareMultiTableScanFwd( - std::span nodeTableIDs) = 0; - - // scanFwd an scanBwd scan a single source node under the assumption that many nodes in the same - // group will be scanned at once. // Get dst nodeIDs for given src nodeID using forward adjList. virtual EdgeIterator scanFwd(common::nodeID_t nodeID, NbrScanState& state) = 0; - // We don't use scanBwd currently. I'm adding them because they are the mirroring to scanFwd. - // Also, algorithm may only need adjList index in single direction so we should make double - // indexing optional. - - // Prepares scan on all connected relationship tables using backward adjList. - virtual std::unique_ptr prepareMultiTableScanBwd( - std::span nodeTableIDs) = 0; - // Get dst nodeIDs for given src nodeID tables using backward adjList. virtual EdgeIterator scanBwd(common::nodeID_t nodeID, NbrScanState& state) = 0; diff --git a/src/include/graph/on_disk_graph.h b/src/include/graph/on_disk_graph.h index d8250cc90fd..af83d6b3778 100644 --- a/src/include/graph/on_disk_graph.h +++ b/src/include/graph/on_disk_graph.h @@ -166,10 +166,6 @@ class KUZU_API OnDiskGraph final : public Graph { std::unique_ptr prepareScan(common::table_id_t relTableID, std::optional edgePropertyIndex = std::nullopt) override; - std::unique_ptr prepareMultiTableScanFwd( - std::span nodeTableIDs) override; - std::unique_ptr prepareMultiTableScanBwd( - std::span nodeTableIDs) override; std::unique_ptr prepareVertexScan(common::table_id_t tableID, const std::vector& propertiesToScan) override; diff --git a/src/include/processor/plan_mapper.h b/src/include/processor/plan_mapper.h index 43e286fa695..1c48e33c7c1 100644 --- a/src/include/processor/plan_mapper.h +++ b/src/include/processor/plan_mapper.h @@ -98,7 +98,6 @@ class PlanMapper { planner::LogicalOperator* logicalOperator); std::unique_ptr mapProjection(planner::LogicalOperator* logicalOperator); std::unique_ptr mapRecursiveExtend(planner::LogicalOperator* logicalOperator); - std::unique_ptr mapScanSource(planner::LogicalOperator* logicalOperator); std::unique_ptr mapScanNodeTable(planner::LogicalOperator* logicalOperator); std::unique_ptr mapSemiMasker(planner::LogicalOperator* logicalOperator); std::unique_ptr mapSetProperty(planner::LogicalOperator* logicalOperator); diff --git a/test/test_files/function/gds/basic.test b/test/test_files/function/gds/basic.test index 2bccfb558bb..89438ff9d66 100644 --- a/test/test_files/function/gds/basic.test +++ b/test/test_files/function/gds/basic.test @@ -103,16 +103,6 @@ Elizabeth|1 Farooq|1 Greg|1 Hubert Blaine Wolfeschlegelsteinhausenbergerdorff|0 --STATEMENT CALL page_rank('PK') RETURN _node.fName, rank; ----- 8 -Alice|0.125000 -Bob|0.125000 -Carol|0.125000 -Dan|0.125000 -Elizabeth|0.022734 -Farooq|0.018750 -Greg|0.018750 -Hubert Blaine Wolfeschlegelsteinhausenbergerdorff|0.018750 -STATEMENT CALL enable_gds = true; ---- ok diff --git a/test/test_files/function/gds/page_rank.test b/test/test_files/function/gds/page_rank.test new file mode 100644 index 00000000000..93911ab7c56 --- /dev/null +++ b/test/test_files/function/gds/page_rank.test @@ -0,0 +1,17 @@ +-DATASET CSV tinysnb + +-- + +-CASE PageRank1 +-STATEMENT CALL create_project_graph('PK', ['person'], ['knows']) +---- ok +-STATEMENT CALL page_rank('PK') RETURN _node.fName, rank; +---- 8 +Alice|0.125000 +Bob|0.125000 +Carol|0.125000 +Dan|0.125000 +Elizabeth|0.018750 +Farooq|0.026719 +Greg|0.026719 +Hubert Blaine Wolfeschlegelsteinhausenbergerdorff|0.018750