From 11356d38692505a1dc32de8952c5c04a75339bd1 Mon Sep 17 00:00:00 2001 From: xiyang Date: Tue, 7 Jan 2025 16:21:48 +0800 Subject: [PATCH 1/3] Add parallel page rank --- src/function/gds/page_rank.cpp | 315 +++++++++++++----- .../gds/weakly_connected_components.cpp | 2 +- src/include/function/gds/gds.h | 4 - 3 files changed, 241 insertions(+), 80 deletions(-) diff --git a/src/function/gds/page_rank.cpp b/src/function/gds/page_rank.cpp index dd115c63a85..048d1b4c732 100644 --- a/src/function/gds/page_rank.cpp +++ b/src/function/gds/page_rank.cpp @@ -1,13 +1,15 @@ #include "binder/binder.h" #include "binder/expression/expression_util.h" -#include "common/types/internal_id_util.h" #include "function/gds/gds.h" #include "function/gds/gds_function_collection.h" +#include "function/gds/gds_frontier.h" #include "function/gds_function.h" #include "graph/graph.h" #include "main/client_context.h" #include "processor/execution_context.h" #include "processor/result/factorized_table.h" +#include "function/gds/output_writer.h" +#include "function/gds/gds_utils.h" using namespace kuzu::processor; using namespace kuzu::common; @@ -35,35 +37,210 @@ struct PageRankBindData final : public GDSBindData { } }; -class PageRankOutputWriter { +static void addCAS(std::atomic& origin, double valToAdd) { + auto expected = origin.load(std::memory_order_relaxed); + auto desired = expected + valToAdd; + while (!origin.compare_exchange_strong(expected, desired)); +} + +class P { public: - explicit PageRankOutputWriter(main::ClientContext* context) { - auto mm = context->getMemoryManager(); - nodeIDVector = std::make_unique(LogicalType::INTERNAL_ID(), mm); - rankVector = std::make_unique(LogicalType::DOUBLE(), mm); - nodeIDVector->state = DataChunkState::getSingleValueDataChunkState(); - rankVector->state = DataChunkState::getSingleValueDataChunkState(); - vectors.push_back(nodeIDVector.get()); - vectors.push_back(rankVector.get()); - } - - void materialize(main::ClientContext* context, Graph* graph, - const common::node_id_map_t& ranks, FactorizedTable& table) const { - for (auto tableID : graph->getNodeTableIDs()) { - for (auto offset = 0u; offset < graph->getNumNodes(context->getTransaction(), tableID); - ++offset) { - auto nodeID = nodeID_t{offset, tableID}; - nodeIDVector->setValue(0, nodeID); - rankVector->setValue(0, ranks.at(nodeID)); - table.append(vectors); + P(table_id_map_t numNodesMap, storage::MemoryManager* mm, double val) { + for (const auto& [tableID, numNodes] : numNodesMap) { + valueMap.allocate(tableID, numNodes, mm); + pin(tableID); + for (auto i = 0u; i < numNodes; ++i) { + values[i].store(val, std::memory_order_relaxed); } } } + void pin(table_id_t tableID) { + values = valueMap.getData(tableID); + } + + double getValue(offset_t offset) { + return values[offset].load(std::memory_order_relaxed); + } + + void addValueCAS(offset_t offset, double val) { + addCAS(values[offset], val); + } + + void setValue(offset_t offset, double val) { + values[offset].store(val, std::memory_order_relaxed); + } + +private: + std::atomic* values = nullptr; + ObjectArraysMap> valueMap; +}; + +class PNextUpdateEdgeCompute : public EdgeCompute { +public: + PNextUpdateEdgeCompute(P* pCurrent, P* pNext) : pCurrent{pCurrent}, pNext{pNext} {} + + std::vector edgeCompute(nodeID_t boundNodeID, + graph::NbrScanState::Chunk& chunk, bool) override { + auto addVal = pCurrent->getValue(boundNodeID.offset) / chunk.size(); + chunk.forEach([&](auto nbrNodeID, auto) { + pNext->addValueCAS(nbrNodeID.offset, addVal); + }); + return {boundNodeID}; + } + + std::unique_ptr copy() override { + return std::make_unique(pCurrent, pNext); + } + +private: + P* pCurrent; + P* pNext; +}; + +class PNextUpdateVertexCompute : public VertexCompute { +public: + PNextUpdateVertexCompute(double dampingFactor, double constant, P* pNext) + : dampingFactor{dampingFactor}, constant{constant}, pNext{pNext} {} + + bool beginOnTable(table_id_t tableID) override { + pNext->pin(tableID); + return true; + } + + void vertexCompute(offset_t startOffset, offset_t endOffset, table_id_t) override { + for (auto i = startOffset; i < endOffset; ++i) { + pNext->setValue(i, pNext->getValue(i) * dampingFactor + constant); + } + } + + std::unique_ptr copy() override { + return std::make_unique(dampingFactor, constant, pNext); + } + +private: + double dampingFactor; + double constant; + P* pNext; +}; + +class PDiffVertexCompute : public VertexCompute { +public: + PDiffVertexCompute(std::atomic& diff, P* pCurrent, P* pNext) + : diff{diff}, pCurrent{pCurrent}, pNext{pNext} {} + + bool beginOnTable(table_id_t tableID) override { + pCurrent->pin(tableID); + pNext->pin(tableID); + return true; + } + + void vertexCompute(offset_t startOffset, offset_t endOffset, table_id_t) override { + for (auto i = startOffset; i < endOffset; ++i) { + auto next = pNext->getValue(i); + auto current = pCurrent->getValue(i); + if (next > current) { + addCAS(diff, next - current); + } else { + addCAS(diff, current - next); + } + } + } + + std::unique_ptr copy() override { + return std::make_unique(diff, pCurrent, pNext); + } + +private: + std::atomic& diff; + P* pCurrent; + P* pNext; +}; + +class PResetVertexCompute : public VertexCompute { +public: + explicit PResetVertexCompute(P* pCurrent) : pCurrent{pCurrent} {} + + bool beginOnTable(common::table_id_t tableID) override { + pCurrent->pin(tableID); + return true; + } + + void vertexCompute(offset_t startOffset, offset_t endOffset, table_id_t) override { + for (auto i = startOffset; i < endOffset; ++i) { + pCurrent->setValue(i, 0); + } + } + + std::unique_ptr copy() override { + return std::make_unique(pCurrent); + } + +private: + P* pCurrent; +}; + +class PageRankOutputWriter : public GDSOutputWriter { +public: + PageRankOutputWriter(main::ClientContext* context, processor::NodeOffsetMaskMap* outputNodeMask, P* pNext) + : GDSOutputWriter{context, outputNodeMask}, pNext{pNext} { + nodeIDVector = createVector(LogicalType::INTERNAL_ID(), context->getMemoryManager()); + rankVector = createVector(LogicalType::DOUBLE(), context->getMemoryManager()); + } + + void pinTableID(table_id_t tableID) override { + GDSOutputWriter::pinTableID(tableID); + pNext->pin(tableID); + } + + void materialize(offset_t startOffset, offset_t endOffset, table_id_t tableID, + FactorizedTable& table) const { + for (auto i = startOffset; i < endOffset; ++i) { + auto nodeID = nodeID_t{i, tableID}; + nodeIDVector->setValue(0, nodeID); + rankVector->setValue(0, pNext->getValue(i)); + table.append(vectors); + } + } + + std::unique_ptr copy() const { + return std::make_unique(context, outputNodeMask, pNext); + } + private: std::unique_ptr nodeIDVector; std::unique_ptr rankVector; - std::vector vectors; + P* pNext; +}; + +class OutputVertexCompute : public VertexCompute { +public: + OutputVertexCompute(storage::MemoryManager* mm, processor::GDSCallSharedState* sharedState, + std::unique_ptr outputWriter) + : mm{mm}, sharedState{sharedState}, outputWriter{std::move(outputWriter)} { + localFT = sharedState->claimLocalTable(mm); + } + ~OutputVertexCompute() override { sharedState->returnLocalTable(localFT); } + + bool beginOnTable(table_id_t tableID) override { + outputWriter->pinTableID(tableID); + return true; + } + + void vertexCompute(offset_t startOffset, offset_t endOffset, + table_id_t tableID) override { + outputWriter->materialize(startOffset, endOffset, tableID, *localFT); + } + + std::unique_ptr copy() override { + return std::make_unique(mm, sharedState, outputWriter->copy()); + } + +private: + storage::MemoryManager* mm; + processor::GDSCallSharedState* sharedState; + std::unique_ptr outputWriter; + processor::FactorizedTable* localFT; }; class PageRank final : public GDSAlgorithm { @@ -78,7 +255,7 @@ class PageRank final : public GDSAlgorithm { * * graph::ANY */ - std::vector getParameterTypeIDs() const override { + std::vector getParameterTypeIDs() const override { return {LogicalTypeID::ANY}; } @@ -103,63 +280,51 @@ class PageRank final : public GDSAlgorithm { bindData = std::make_unique(std::move(graphEntry), nodeOutput); } - void initLocalState(main::ClientContext* context) override { - localState = std::make_unique(context); - } - void exec(processor::ExecutionContext* context) override { - auto extraData = bindData->ptrCast(); - localState = std::make_unique(context->clientContext); + auto clientContext = context->clientContext; + auto transaction = clientContext->getTransaction(); auto graph = sharedState->graph.get(); - // Initialize state. - common::node_id_map_t ranks; - auto numNodes = graph->getNumNodes(context->clientContext->getTransaction()); - for (auto tableID : graph->getNodeTableIDs()) { - for (auto offset = 0u; - offset < graph->getNumNodes(context->clientContext->getTransaction(), tableID); - ++offset) { - auto nodeID = nodeID_t{offset, tableID}; - ranks.insert({nodeID, 1.0 / numNodes}); - } - } - auto dampingValue = (1 - extraData->dampingFactor) / numNodes; - // Compute page rank. - auto nodeTableIDs = graph->getNodeTableIDs(); - auto scanState = graph->prepareMultiTableScanFwd(nodeTableIDs); - // We're using multiple overlapping iterators, both of which need access to a scan state, so - // we need multiple scan states - auto innerScanState = graph->prepareMultiTableScanFwd(nodeTableIDs); - auto numNodesInGraph = graph->getNumNodes(context->clientContext->getTransaction()); - for (auto i = 0u; i < extraData->maxIteration; ++i) { - auto change = 0.0; - for (auto tableID : nodeTableIDs) { - for (auto offset = 0u; - offset < graph->getNumNodes(context->clientContext->getTransaction(), tableID); - ++offset) { - auto nodeID = nodeID_t{offset, tableID}; - auto rank = 0.0; - auto iter = graph->scanFwd(nodeID, *scanState); - for (const auto chunk : iter) { - chunk.forEach([&](auto nbr, auto) { - auto numNbrOfNbr = graph->scanFwd(nbr, *innerScanState).count(); - if (numNbrOfNbr == 0) { - numNbrOfNbr = numNodesInGraph; - } - rank += extraData->dampingFactor * (ranks[nbr] / numNbrOfNbr); - }); - } - rank += dampingValue; - double diff = ranks[nodeID] - rank; - change += diff < 0 ? -diff : diff; - ranks[nodeID] = rank; - } - } - if (change < extraData->delta) { + auto numNodesMap = graph->getNumNodesMap(transaction); + auto numNodes = graph->getNumNodes(transaction); + auto p1 = P(numNodesMap, clientContext->getMemoryManager(), (double)1 / numNodes); + auto p2 = P(numNodesMap, clientContext->getMemoryManager(), 0); + P* pCurrent = &p1; + P* pNext = &p2; + auto pageRankBindData = bindData->ptrCast(); + auto currentIter = 1u; + auto currentFrontier = getPathLengthsFrontier(context, PathLengths::UNVISITED); + auto nextFrontier = getPathLengthsFrontier(context, 0); + auto frontierPair = std::make_unique(currentFrontier, nextFrontier); + frontierPair->setActiveNodesForNextIter(); + frontierPair->getNextSparseFrontier().disable(); + auto ec = std::make_unique(pCurrent, pNext); + auto pNextUpdateConstant = (1 - pageRankBindData->dampingFactor) * ( (double )1 / numNodes ); + auto pNextUpdateVC = PNextUpdateVertexCompute(pageRankBindData->dampingFactor, pNextUpdateConstant, pNext); + std::atomic diff; + diff.store(0); + auto pDiffVC = PDiffVertexCompute(diff, pCurrent, pNext); + auto pResetVC = PResetVertexCompute(pNext); + auto computeState = GDSComputeState(std::move(frontierPair), std::move(ec), + sharedState->getOutputNodeMaskMap()); + while (currentIter < pageRankBindData->maxIteration) { + GDSUtils::runFrontiersUntilConvergence(context, computeState, graph, ExtendDirection::FWD, + computeState.frontierPair->getCurrentIter() + 1); + GDSUtils::runVertexCompute(context, graph, pNextUpdateVC); + diff.store(0); + GDSUtils::runVertexCompute(context, graph, pDiffVC); + if (diff.load() < pageRankBindData->delta) { break; } + pCurrent = pNext; + GDSUtils::runVertexCompute(context, graph, pResetVC); + currentIter++; } - // Materialize result. - localState->materialize(context->clientContext, graph, ranks, *sharedState->fTable); + auto writer = std::make_unique(clientContext, + sharedState->getOutputNodeMaskMap(), pNext); + auto outputVC = std::make_unique(clientContext->getMemoryManager(), + sharedState.get(), std::move(writer)); + GDSUtils::runVertexCompute(context, graph, *outputVC); + sharedState->mergeLocalTables(); } std::unique_ptr copy() const override { diff --git a/src/function/gds/weakly_connected_components.cpp b/src/function/gds/weakly_connected_components.cpp index 3c8a9225703..094c629b18d 100644 --- a/src/function/gds/weakly_connected_components.cpp +++ b/src/function/gds/weakly_connected_components.cpp @@ -226,7 +226,7 @@ class WeaklyConnectedComponent final : public GDSAlgorithm { MAX_ITERATION); auto vertexCompute = std::make_unique(clientContext->getMemoryManager(), sharedState.get(), std::move(writer)); - GDSUtils::runVertexCompute(context, sharedState->graph.get(), *vertexCompute); + GDSUtils::runVertexCompute(context, graph, *vertexCompute); sharedState->mergeLocalTables(); } diff --git a/src/include/function/gds/gds.h b/src/include/function/gds/gds.h index ed361cec020..6fbcd001967 100644 --- a/src/include/function/gds/gds.h +++ b/src/include/function/gds/gds.h @@ -113,10 +113,6 @@ class KUZU_API GDSAlgorithm { } protected: - // TODO(Semih/Xiyang): See if this will be still needed after PageRank and other algorithms are - // updated. GDSAlgorithms implementing recursive joins do not use this function. - virtual void initLocalState(main::ClientContext*) {} - graph::GraphEntry bindGraphEntry(main::ClientContext& context, const std::string& name); std::shared_ptr bindNodeOutput(binder::Binder* binder, const std::vector& nodeEntries); From 2f4cee75534ac4f2b2aa4bbb405cd824aa3c660a Mon Sep 17 00:00:00 2001 From: xiyang Date: Wed, 8 Jan 2025 15:27:43 +0800 Subject: [PATCH 2/3] draft --- src/function/gds/page_rank.cpp | 37 ++++++++++++--------- src/include/processor/plan_mapper.h | 1 - test/test_files/function/gds/basic.test | 13 ++------ test/test_files/function/gds/page_rank.test | 17 ++++++++++ 4 files changed, 40 insertions(+), 28 deletions(-) create mode 100644 test/test_files/function/gds/page_rank.test diff --git a/src/function/gds/page_rank.cpp b/src/function/gds/page_rank.cpp index 048d1b4c732..ffc81a9ee74 100644 --- a/src/function/gds/page_rank.cpp +++ b/src/function/gds/page_rank.cpp @@ -22,8 +22,8 @@ namespace function { struct PageRankBindData final : public GDSBindData { double dampingFactor = 0.85; - int64_t maxIteration = 10; - double delta = 0.0001; // detect convergence + int64_t maxIteration = 20; + double delta = 0.0000001; // Convergence delta explicit PageRankBindData(graph::GraphEntry graphEntry, std::shared_ptr nodeOutput) @@ -43,6 +43,7 @@ static void addCAS(std::atomic& origin, double valToAdd) { while (!origin.compare_exchange_strong(expected, desired)); } +// Represents PageRank value class P { public: P(table_id_map_t numNodesMap, storage::MemoryManager* mm, double val) { @@ -76,16 +77,19 @@ class P { ObjectArraysMap> valueMap; }; +// Sum the weight (current rank / degree) for each incoming edge. class PNextUpdateEdgeCompute : public EdgeCompute { public: PNextUpdateEdgeCompute(P* pCurrent, P* pNext) : pCurrent{pCurrent}, pNext{pNext} {} std::vector edgeCompute(nodeID_t boundNodeID, graph::NbrScanState::Chunk& chunk, bool) override { - auto addVal = pCurrent->getValue(boundNodeID.offset) / chunk.size(); - chunk.forEach([&](auto nbrNodeID, auto) { - pNext->addValueCAS(nbrNodeID.offset, addVal); - }); + if (chunk.size() > 0) { + auto addVal = pCurrent->getValue(boundNodeID.offset) / chunk.size(); + chunk.forEach([&](auto nbrNodeID, auto) { + pNext->addValueCAS(nbrNodeID.offset, addVal); + }); + } return {boundNodeID}; } @@ -98,6 +102,7 @@ class PNextUpdateEdgeCompute : public EdgeCompute { P* pNext; }; +// Evaluate rank = above result * dampingFactor + {(1 - dampingFactor) / |V|} (constant) class PNextUpdateVertexCompute : public VertexCompute { public: PNextUpdateVertexCompute(double dampingFactor, double constant, P* pNext) @@ -297,30 +302,30 @@ class PageRank final : public GDSAlgorithm { auto frontierPair = std::make_unique(currentFrontier, nextFrontier); frontierPair->setActiveNodesForNextIter(); frontierPair->getNextSparseFrontier().disable(); - auto ec = std::make_unique(pCurrent, pNext); auto pNextUpdateConstant = (1 - pageRankBindData->dampingFactor) * ( (double )1 / numNodes ); - auto pNextUpdateVC = PNextUpdateVertexCompute(pageRankBindData->dampingFactor, pNextUpdateConstant, pNext); - std::atomic diff; - diff.store(0); - auto pDiffVC = PDiffVertexCompute(diff, pCurrent, pNext); - auto pResetVC = PResetVertexCompute(pNext); - auto computeState = GDSComputeState(std::move(frontierPair), std::move(ec), + auto computeState = GDSComputeState(std::move(frontierPair), nullptr, sharedState->getOutputNodeMaskMap()); while (currentIter < pageRankBindData->maxIteration) { + auto ec = std::make_unique(pCurrent, pNext); + computeState.edgeCompute = std::move(ec); GDSUtils::runFrontiersUntilConvergence(context, computeState, graph, ExtendDirection::FWD, computeState.frontierPair->getCurrentIter() + 1); + auto pNextUpdateVC = PNextUpdateVertexCompute(pageRankBindData->dampingFactor, pNextUpdateConstant, pNext); GDSUtils::runVertexCompute(context, graph, pNextUpdateVC); + std::atomic diff; diff.store(0); + auto pDiffVC = PDiffVertexCompute(diff, pCurrent, pNext); GDSUtils::runVertexCompute(context, graph, pDiffVC); - if (diff.load() < pageRankBindData->delta) { + std::swap(pCurrent, pNext); + if (diff.load() < pageRankBindData->delta) { // Converged. break; } - pCurrent = pNext; + auto pResetVC = PResetVertexCompute(pNext); GDSUtils::runVertexCompute(context, graph, pResetVC); currentIter++; } auto writer = std::make_unique(clientContext, - sharedState->getOutputNodeMaskMap(), pNext); + sharedState->getOutputNodeMaskMap(), pCurrent); auto outputVC = std::make_unique(clientContext->getMemoryManager(), sharedState.get(), std::move(writer)); GDSUtils::runVertexCompute(context, graph, *outputVC); diff --git a/src/include/processor/plan_mapper.h b/src/include/processor/plan_mapper.h index 43e286fa695..1c48e33c7c1 100644 --- a/src/include/processor/plan_mapper.h +++ b/src/include/processor/plan_mapper.h @@ -98,7 +98,6 @@ class PlanMapper { planner::LogicalOperator* logicalOperator); std::unique_ptr mapProjection(planner::LogicalOperator* logicalOperator); std::unique_ptr mapRecursiveExtend(planner::LogicalOperator* logicalOperator); - std::unique_ptr mapScanSource(planner::LogicalOperator* logicalOperator); std::unique_ptr mapScanNodeTable(planner::LogicalOperator* logicalOperator); std::unique_ptr mapSemiMasker(planner::LogicalOperator* logicalOperator); std::unique_ptr mapSetProperty(planner::LogicalOperator* logicalOperator); diff --git a/test/test_files/function/gds/basic.test b/test/test_files/function/gds/basic.test index bcc09a02c12..eb70fe0b2e1 100644 --- a/test/test_files/function/gds/basic.test +++ b/test/test_files/function/gds/basic.test @@ -2,8 +2,8 @@ -- --CASE BasicAlgorithm +-CASE BasicAlgorithm -STATEMENT CALL create_project_graph('PK', ['person'], ['knows']) ---- ok -STATEMENT CALL create_project_graph('PK', [], []) @@ -94,16 +94,7 @@ Hubert Blaine Wolfeschlegelsteinhausenbergerdorff|7 #|ABFsUni|0 #|CsWork|1 #|DEsWork|1 --STATEMENT CALL page_rank('PK') RETURN _node.fName, rank; ----- 8 -Alice|0.125000 -Bob|0.125000 -Carol|0.125000 -Dan|0.125000 -Elizabeth|0.022734 -Farooq|0.018750 -Greg|0.018750 -Hubert Blaine Wolfeschlegelsteinhausenbergerdorff|0.018750 + -STATEMENT CALL enable_gds = true; ---- ok diff --git a/test/test_files/function/gds/page_rank.test b/test/test_files/function/gds/page_rank.test new file mode 100644 index 00000000000..93911ab7c56 --- /dev/null +++ b/test/test_files/function/gds/page_rank.test @@ -0,0 +1,17 @@ +-DATASET CSV tinysnb + +-- + +-CASE PageRank1 +-STATEMENT CALL create_project_graph('PK', ['person'], ['knows']) +---- ok +-STATEMENT CALL page_rank('PK') RETURN _node.fName, rank; +---- 8 +Alice|0.125000 +Bob|0.125000 +Carol|0.125000 +Dan|0.125000 +Elizabeth|0.018750 +Farooq|0.026719 +Greg|0.026719 +Hubert Blaine Wolfeschlegelsteinhausenbergerdorff|0.018750 From e3040b266b3f34ebc374cda4055eaea5512a05cd Mon Sep 17 00:00:00 2001 From: CI Bot Date: Wed, 8 Jan 2025 07:29:46 +0000 Subject: [PATCH 3/3] Run clang-format --- src/function/gds/page_rank.cpp | 58 +++++++++++++++------------------- 1 file changed, 26 insertions(+), 32 deletions(-) diff --git a/src/function/gds/page_rank.cpp b/src/function/gds/page_rank.cpp index ffc81a9ee74..e6bccf0d473 100644 --- a/src/function/gds/page_rank.cpp +++ b/src/function/gds/page_rank.cpp @@ -1,15 +1,15 @@ #include "binder/binder.h" #include "binder/expression/expression_util.h" #include "function/gds/gds.h" -#include "function/gds/gds_function_collection.h" #include "function/gds/gds_frontier.h" +#include "function/gds/gds_function_collection.h" +#include "function/gds/gds_utils.h" +#include "function/gds/output_writer.h" #include "function/gds_function.h" #include "graph/graph.h" #include "main/client_context.h" #include "processor/execution_context.h" #include "processor/result/factorized_table.h" -#include "function/gds/output_writer.h" -#include "function/gds/gds_utils.h" using namespace kuzu::processor; using namespace kuzu::common; @@ -40,7 +40,8 @@ struct PageRankBindData final : public GDSBindData { static void addCAS(std::atomic& origin, double valToAdd) { auto expected = origin.load(std::memory_order_relaxed); auto desired = expected + valToAdd; - while (!origin.compare_exchange_strong(expected, desired)); + while (!origin.compare_exchange_strong(expected, desired)) + ; } // Represents PageRank value @@ -56,17 +57,11 @@ class P { } } - void pin(table_id_t tableID) { - values = valueMap.getData(tableID); - } + void pin(table_id_t tableID) { values = valueMap.getData(tableID); } - double getValue(offset_t offset) { - return values[offset].load(std::memory_order_relaxed); - } + double getValue(offset_t offset) { return values[offset].load(std::memory_order_relaxed); } - void addValueCAS(offset_t offset, double val) { - addCAS(values[offset], val); - } + void addValueCAS(offset_t offset, double val) { addCAS(values[offset], val); } void setValue(offset_t offset, double val) { values[offset].store(val, std::memory_order_relaxed); @@ -82,13 +77,12 @@ class PNextUpdateEdgeCompute : public EdgeCompute { public: PNextUpdateEdgeCompute(P* pCurrent, P* pNext) : pCurrent{pCurrent}, pNext{pNext} {} - std::vector edgeCompute(nodeID_t boundNodeID, - graph::NbrScanState::Chunk& chunk, bool) override { + std::vector edgeCompute(nodeID_t boundNodeID, graph::NbrScanState::Chunk& chunk, + bool) override { if (chunk.size() > 0) { auto addVal = pCurrent->getValue(boundNodeID.offset) / chunk.size(); - chunk.forEach([&](auto nbrNodeID, auto) { - pNext->addValueCAS(nbrNodeID.offset, addVal); - }); + chunk.forEach( + [&](auto nbrNodeID, auto) { pNext->addValueCAS(nbrNodeID.offset, addVal); }); } return {boundNodeID}; } @@ -173,7 +167,7 @@ class PResetVertexCompute : public VertexCompute { void vertexCompute(offset_t startOffset, offset_t endOffset, table_id_t) override { for (auto i = startOffset; i < endOffset; ++i) { - pCurrent->setValue(i, 0); + pCurrent->setValue(i, 0); } } @@ -187,7 +181,8 @@ class PResetVertexCompute : public VertexCompute { class PageRankOutputWriter : public GDSOutputWriter { public: - PageRankOutputWriter(main::ClientContext* context, processor::NodeOffsetMaskMap* outputNodeMask, P* pNext) + PageRankOutputWriter(main::ClientContext* context, processor::NodeOffsetMaskMap* outputNodeMask, + P* pNext) : GDSOutputWriter{context, outputNodeMask}, pNext{pNext} { nodeIDVector = createVector(LogicalType::INTERNAL_ID(), context->getMemoryManager()); rankVector = createVector(LogicalType::DOUBLE(), context->getMemoryManager()); @@ -232,8 +227,7 @@ class OutputVertexCompute : public VertexCompute { return true; } - void vertexCompute(offset_t startOffset, offset_t endOffset, - table_id_t tableID) override { + void vertexCompute(offset_t startOffset, offset_t endOffset, table_id_t tableID) override { outputWriter->materialize(startOffset, endOffset, tableID, *localFT); } @@ -260,9 +254,7 @@ class PageRank final : public GDSAlgorithm { * * graph::ANY */ - std::vector getParameterTypeIDs() const override { - return {LogicalTypeID::ANY}; - } + std::vector getParameterTypeIDs() const override { return {LogicalTypeID::ANY}; } /* * Outputs are @@ -299,18 +291,20 @@ class PageRank final : public GDSAlgorithm { auto currentIter = 1u; auto currentFrontier = getPathLengthsFrontier(context, PathLengths::UNVISITED); auto nextFrontier = getPathLengthsFrontier(context, 0); - auto frontierPair = std::make_unique(currentFrontier, nextFrontier); + auto frontierPair = + std::make_unique(currentFrontier, nextFrontier); frontierPair->setActiveNodesForNextIter(); frontierPair->getNextSparseFrontier().disable(); - auto pNextUpdateConstant = (1 - pageRankBindData->dampingFactor) * ( (double )1 / numNodes ); - auto computeState = GDSComputeState(std::move(frontierPair), nullptr, - sharedState->getOutputNodeMaskMap()); + auto pNextUpdateConstant = (1 - pageRankBindData->dampingFactor) * ((double)1 / numNodes); + auto computeState = + GDSComputeState(std::move(frontierPair), nullptr, sharedState->getOutputNodeMaskMap()); while (currentIter < pageRankBindData->maxIteration) { auto ec = std::make_unique(pCurrent, pNext); computeState.edgeCompute = std::move(ec); - GDSUtils::runFrontiersUntilConvergence(context, computeState, graph, ExtendDirection::FWD, - computeState.frontierPair->getCurrentIter() + 1); - auto pNextUpdateVC = PNextUpdateVertexCompute(pageRankBindData->dampingFactor, pNextUpdateConstant, pNext); + GDSUtils::runFrontiersUntilConvergence(context, computeState, graph, + ExtendDirection::FWD, computeState.frontierPair->getCurrentIter() + 1); + auto pNextUpdateVC = PNextUpdateVertexCompute(pageRankBindData->dampingFactor, + pNextUpdateConstant, pNext); GDSUtils::runVertexCompute(context, graph, pNextUpdateVC); std::atomic diff; diff.store(0);