diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 6298e32ec..f00b31446 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -115,6 +115,7 @@ add_subdirectory(community_detection_module) add_subdirectory(pagerank_module) add_subdirectory(uuid_module) add_subdirectory(katz_centrality_module) +add_subdirectory(hits_module) add_subdirectory(degree_centrality_module) add_subdirectory(graph_util_module) add_subdirectory(node_similarity_module) diff --git a/cpp/hits_module/CMakeLists.txt b/cpp/hits_module/CMakeLists.txt new file mode 100644 index 000000000..9c307fb14 --- /dev/null +++ b/cpp/hits_module/CMakeLists.txt @@ -0,0 +1,25 @@ +set(hits_module_src + hits_module.cpp + algorithm/hits.cpp) + +add_query_module(hits 1 "${hits_module_src}") + +# Link external libraries +target_link_libraries(hits PRIVATE mg_utility) +target_include_directories(hits PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}) + +################################################################ + +# Module tests +if (NOT MAGE_CUGRAPH_ENABLE) + include(GoogleTest) + set(hits_test_src + hits_test.cpp + algorithm/hits.cpp) + + add_executable(hits_test "${hits_test_src}") + target_link_libraries(hits_test PRIVATE mg_utility mage_gtest) + gtest_add_tests(TARGET hits_test) +endif() + +################################################################ diff --git a/cpp/hits_module/algorithm/hits.cpp b/cpp/hits_module/algorithm/hits.cpp new file mode 100644 index 000000000..483ad1d1f --- /dev/null +++ b/cpp/hits_module/algorithm/hits.cpp @@ -0,0 +1,231 @@ +#include +#include +#include +#include + +#include "hits.hpp" + + +namespace hits_alg { + +/// @tparam T Node ID data type. +namespace { + template + class AdjacencyList { + public: + AdjacencyList() = default; + + explicit AdjacencyList(std::uint64_t node_count) : list_(node_count) {} + + auto GetNodeCount() const { return list_.size(); } + /// AdjacentPair is a pair of T. Values of T have to be >= 0 and < node_count + /// because they represent position in the underlying std::vector. + void AddAdjacentPair(T left_node_id, T right_node_id, bool undirected = false) { + list_[left_node_id].push_back(right_node_id); + if (undirected) { + list_[right_node_id].push_back(left_node_id); + } + } + /// Be careful and don't call AddAdjacentPair while you have reference to this + /// vector because the referenced vector could be resized (moved) which means + /// that the reference is going to become invalid. + /// + /// @return A reference to std::vector of adjecent node ids. + const auto &GetAdjacentNodes(T node_id) const { return list_[node_id]; } + + private: + std::vector> list_; + }; + +/// Calculates optimal borders for dividing edges in number_of_threads +/// consecutive partitions (blocks) such that the maximal block size is minimal +/// For example: if number_of_edges = 10 and number_of_threads = 3: +/// optimal borders = {0, 3, 6, 10} so obtained blocks of edges +/// are [0, 3>, [3, 6> and [6, 10>. +/// @param graph -- graph +/// @param number_of_threads -- number of threads + std::vector CalculateOptimalBorders(const HitsGraph &graph, const std::uint64_t number_of_threads) { + std::vector borders; + + if (number_of_threads == 0) { + throw std::runtime_error("Number of threads can't be zero (0)!"); + } + + for (std::uint64_t border_index = 0; border_index <= number_of_threads; border_index++) { + borders.push_back(border_index * graph.GetEdgeCount() / number_of_threads); + } + return borders; + } + +/// Calculating hub and auth scores block related to [lo, hi> interval of edges +/// Graph edges are ordered by target node ids so target nodes of an interval +/// of edges also form an interval of nodes. +/// Required hub and auth values of nodes from this interval will be sent +/// as vector of values using new_rank_promise. +/// +/// @param graph -- graph +/// @param old_hub -- hub scores of nodes after previous iterations +/// @param old_auth -- auth scores of nodes after previous iterations +/// @param lo -- left bound of interval of edges +/// @param hi -- right bound of interval of edges +/// @param new_hits_promise -- used for sending information about calculated new hub and auth block block + void ThreadHitsIteration(const HitsGraph &graph, const std::vector old_hub, const std::vector old_auth, + const std::uint64_t lo, + const std::uint64_t hi, + std::promise, std::vector>> new_hits_promise) { + + std::vector new_hub(graph.GetNodeCount(), 0); + std::vector new_auth(graph.GetNodeCount(), 0); + // Calculate hub and auth scores for the entire block + for (std::size_t edge_id = lo; edge_id < hi; edge_id++) { + const auto [source, target] = graph.GetOrderedEdges()[edge_id]; + // Add the score of target node to the sum. + new_hub[target] += old_auth[source]; + new_auth[source] += old_hub[target]; + } + std::pair, std::vector> hub_auth = {new_hub, new_auth}; + new_hits_promise.set_value(hub_auth); + }; + +/// Merging hub and auth scores blocks from ThreadPageRankIteration. +/// +/// @param graph -- graph +/// @param block -- hub and auth block calculated in ThreadPageRankIteration +/// @param hub_next -- hub scores which will be updated +/// @param hub_next -- auth scores which will be updated + void AddCurrentBlockToNext(const HitsGraph &graph, const std::pair, std::vector> block, + std::vector &hub_next, std::vector &auth_next) { + for (std::size_t node_index = 0; node_index < block.first.size(); node_index++) { + hub_next[node_index] += block.first[node_index]; + auth_next[node_index] += block.second[node_index]; + } + + }; + +/// Adds remaining hub and auth values +/// Adds hub and auth values of nodes that haven't been added by +/// AddCurrentBlockToRankNext. That are nodes whose id is greater +/// than id of target node of the last edge in ordered edge list. +/// +/// @param graph -- graph +/// @param hub_next -- hub scores which will be updated +/// @param hub_next -- auth scores which will be updated + void CompleteRankNext(const HitsGraph &graph, std::vector &hub_next, std::vector &auth_next) { + while (hub_next.size() < graph.GetNodeCount() && auth_next.size() < graph.GetNodeCount()) { + hub_next.push_back(0); + auth_next.push_back(0); + } + } +/// Checks whether PageRank algorithm should continue iterating +/// Checks if maximal number of iterations was reached or +/// if difference between every component of previous and current PageRank +/// was less than stop_epsilon. +/// +/// @param rank -- PageRank before the last iteration +/// @param rank_next -- PageRank after the last iteration +/// @param max_iterations -- maximal number of iterations +/// @param stop_epsilon -- stop epsilon +/// @param number_of_iterations -- current number of operations + bool CheckContinueIterate(const std::vector &hub, const std::vector &hub_next, + const std::vector &auth, const std::vector &auth_next, + const std::size_t max_iterations, const double stop_epsilon, + const std::size_t number_of_iterations) { + if (number_of_iterations == max_iterations) { + return false; + } + for (std::size_t node_id = 0; node_id < hub.size(); node_id++) { + if (std::abs(hub[node_id] - hub_next[node_id]) > stop_epsilon && + std::abs(auth[node_id] - auth_next[node_id]) > stop_epsilon) { + return true; + } + } + return false; + } +/// Normalizing PageRank +/// Divides all values with sum of the values to get their sum equal 1 +/// +/// @param score -- hub scores or auth scores + void Normalise(std::vector &scores) { + const double sum = std::accumulate(scores.begin(), scores.end(), 0.0); + if (sum != 0) { + for (double &value: scores) { + value /= sum; + } + } + } +} // namespace + + +HitsGraph::HitsGraph(std::uint64_t number_of_nodes, std::uint64_t number_of_edges, + const std::vector &edges) : + node_count_(number_of_nodes), edge_count_(number_of_edges), out_degree_(number_of_nodes){ + AdjacencyList in_neighbours(number_of_nodes); + for (const auto [from, to]: edges) { + // Because PageRank needs a set of nodes that point to a given node. + in_neighbours.AddAdjacentPair(from, to); + out_degree_[from] += 1; + } + + for (std::size_t node_id = 0; node_id < number_of_nodes; node_id++) { + const auto &adjacent_nodes = in_neighbours.GetAdjacentNodes(node_id); + for (const auto adjacent_node: adjacent_nodes) { + ordered_edges_.emplace_back(adjacent_node, node_id); + }; + } + }; + +std::uint64_t HitsGraph::GetNodeCount() const { return node_count_; }; + +std::uint64_t HitsGraph::GetEdgeCount() const { return edge_count_; }; + +const std::vector &HitsGraph::GetOrderedEdges() const { return ordered_edges_; }; + + +std::tuple, std::vector>ParallelIterativeHits(const HitsGraph &graph, std::size_t max_iterations, + double stop_epsilon){ + const std::uint64_t number_of_threads = std::thread::hardware_concurrency()/2; + auto borders = CalculateOptimalBorders(graph, number_of_threads); + std::vector hub(graph.GetNodeCount(), 1); + std::vector auth(graph.GetNodeCount(), 1); + bool continue_iterate = max_iterations != 0; + std::size_t number_of_iterations = 0; + while (continue_iterate) { + std::vector, std::vector>>> hits_promise(number_of_threads); + std::vector, std::vector>>> hits_future; + hits_future.reserve(number_of_threads); + + std::transform(hits_promise.begin(), hits_promise.end(), std::back_inserter(hits_future), + [](auto &pr_promise) { return pr_promise.get_future(); }); + std::vector my_thread; + my_thread.reserve(number_of_threads); + for (std::size_t cluster_id = 0; cluster_id < number_of_threads; cluster_id++) { + my_thread.emplace_back( + [&, lo = borders[cluster_id], hi = borders[cluster_id + 1]]( + auto promise) { ThreadHitsIteration(graph, hub, auth, lo, hi, std::move(promise)); }, + std::move(hits_promise[cluster_id]) + ); + } + std::vector hub_next(graph.GetNodeCount(), 0); + std::vector auth_next(graph.GetNodeCount(), 0); + for (std::size_t cluster_id = 0; cluster_id < number_of_threads; cluster_id++) { + std::pair, std::vector> block = hits_future[cluster_id].get(); + AddCurrentBlockToNext(graph, block, hub_next, auth_next); + + } + CompleteRankNext(graph, hub_next, auth_next); + for (std::uint64_t i = 0; i < number_of_threads; i++) { + if (my_thread[i].joinable()) { + my_thread[i].join(); + } + } + hub.swap(hub_next); + auth.swap(auth_next); + number_of_iterations++; + continue_iterate = CheckContinueIterate(hub, hub_next, auth, auth_next, max_iterations, stop_epsilon, + number_of_iterations); + Normalise(hub); + Normalise(auth); + } + return {hub, auth}; + } +}; diff --git a/cpp/hits_module/algorithm/hits.hpp b/cpp/hits_module/algorithm/hits.hpp new file mode 100644 index 000000000..fb3017429 --- /dev/null +++ b/cpp/hits_module/algorithm/hits.hpp @@ -0,0 +1,77 @@ +#pragma once + +#include + +namespace hits_alg { + +// Defines edge's from and to node index. Just an alias for user convenience. + using EdgePair = std::pair; + +/// A directed, unweighted graph. +/// Self loops and multiple edges are allowed and they will affect the result. +/// Node ids are integers from interval [0, number of nodes in graph - 1]. +/// Graph is allowed to be disconnected. + class HitsGraph { + public: + /// Creates graph with given number of nodes with node ids from interval + /// [0, number_of_nodes - 1] and with given edges between them. + /// Node ids describing edges have to be integers from + /// interval [0, number of nodes in graph - 1]. + /// @param number_of_nodes -- number of nodes in graph + /// @param number_of_edges -- number of edges in graph + /// @param edges -- pairs (source, target) representing directed edges + HitsGraph(std::uint64_t number_of_nodes, std::uint64_t number_of_edges, const std::vector &edges); + + /// @return -- number of nodes in graph + std::uint64_t GetNodeCount() const; + + /// @return -- number of edges in graph + std::uint64_t GetEdgeCount() const; + + /// @return -- a reference to ordered vector of edges + const std::vector &GetOrderedEdges() const; + + private: + /// node_count equals number of nodes in graph + std::uint64_t node_count_; + /// edge_count equals number of edges in graph + std::uint64_t edge_count_; + /// directed edges (source, target) (source -> target) ordered by target + std::vector ordered_edges_; + /// out degree for each node in graph because it is required in calculating + /// PageRank + std::vector out_degree_; + }; + +/// If we present nodes as pages and directed edges between them as links the +/// Hits algorithm outputs is two numbers for a node. Authorities estimates the node +/// value based on the incoming links. Hubs estimates the node value based +/// on outgoing links + +/// The set of highly relevant nodes are called Roots. They are potential Authorities. +/// Nodes that are not very relevant but point to pages in the Root are called Hubs. +/// So, an Authority is a page that many hubs link to whereas a Hub is a page that links to many authorities. + +///HITS Algorithm is a Link Analysis Algorithm that rates webpages. +/// This algorithm is used to the web link-structures to discover and rank the webpages relevant +/// for a particular search. Its uses hubs and authorities ranks to define a recursive relationship +/// between webpages. + +// Let number of iterations be k. +// Each node is assigned a Hub score = 1 and an Authority score = 1. +// Hits is computed iteratively using following formula: + +/// Hub update : Each node’s Hub score = \Sigma (Authority score of each node it points to). +/// Authority update : Each node’s Authority score = \Sigma (Hub score of each node pointing to it). +/// At the end hub and authority Ranks values are normalized to sum 1 to form probability distribution. + + +/// @param graph -- a directed, unweighted, not necessarily connected graph +/// which can contain multiple edges and self-loops. +/// @param max_iterations -- maximum number of iterations performed by Hits. +/// @param stop_epsilon -- stop epsilon +/// @return -- two numbers for a node hub an auth + std::tuple, std::vector> ParallelIterativeHits(const HitsGraph &graph, size_t max_iterations = 100, + double stop_epsilon = 10e-6); + +} // namespace hits_alg diff --git a/cpp/hits_module/hits_module.cpp b/cpp/hits_module/hits_module.cpp new file mode 100644 index 000000000..9d20be9ad --- /dev/null +++ b/cpp/hits_module/hits_module.cpp @@ -0,0 +1,86 @@ +#include +#include "algorithm/hits.hpp" + +namespace { + constexpr char const *kProcedureGet = "get"; + + constexpr char const *kFieldNode = "node"; + constexpr char const *kFieldHub = "hub"; + constexpr char const *kFieldAuth = "auth"; + + constexpr char const *kArgumentMaxIterations = "max_iterations"; + constexpr char const *kArgumentStopEpsilon = "stop_epsilon"; + + + void InsertHitsRecord(mgp_graph *graph, mgp_result *result, mgp_memory *memory, const std::uint64_t node_id, + double hub, double auth) { + auto *record = mgp::result_new_record(result); + + mg_utility::InsertNodeValueResult(graph, record, kFieldNode, node_id, memory); + mg_utility::InsertDoubleValueResult(record, kFieldHub, hub, memory); + mg_utility::InsertDoubleValueResult(record, kFieldAuth, auth, memory); + } + + void HitsWrapper(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) { + try { + auto max_iterations = mgp::value_get_int(mgp::list_at(args, 0)); + auto stop_epsilon = mgp::value_get_double(mgp::list_at(args, 1)); + + auto graph = mg_utility::GetGraphView(memgraph_graph, result, memory, mg_graph::GraphType::kDirectedGraph); + + const auto &graph_edges = graph->Edges(); + std::vector hits_edges; + std::transform(graph_edges.begin(), graph_edges.end(), std::back_inserter(hits_edges), + [](const mg_graph::Edge &edge) -> hits_alg::EdgePair { + return {edge.from, edge.to}; + }); + + auto number_of_nodes = graph->Nodes().size(); + + auto hits_graph = hits_alg::HitsGraph(number_of_nodes, hits_edges.size(), hits_edges); + auto hits = + hits_alg::ParallelIterativeHits(hits_graph, max_iterations, stop_epsilon); + auto hubs = std::get<0>(hits); + auto auth =std::get<1>(hits); + for (std::uint64_t node_id = 0; node_id < number_of_nodes; ++node_id) { + InsertHitsRecord(memgraph_graph, result, memory, graph->GetMemgraphNodeId(node_id), hubs[node_id], + auth[node_id]); + } + } catch (const std::exception &e) { + // We must not let any exceptions out of our module. + mgp::result_set_error_msg(result, e.what()); + return; + } + } +} + +extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *memory) { + mgp_value *default_max_iterations; + mgp_value *default_stop_epsilon; + try { + auto *hits_proc = mgp::module_add_read_procedure(module, kProcedureGet, HitsWrapper); + default_max_iterations = mgp::value_make_int(100, memory); + default_stop_epsilon = mgp::value_make_double(1e-5, memory); + + mgp::proc_add_opt_arg(hits_proc, kArgumentMaxIterations, mgp::type_int(), default_max_iterations); + mgp::proc_add_opt_arg(hits_proc, kArgumentStopEpsilon, mgp::type_float(), default_stop_epsilon); + + // Query module output record + mgp::proc_add_result(hits_proc, kFieldNode, mgp::type_node()); + mgp::proc_add_result(hits_proc, kFieldHub, mgp::type_float()); + mgp::proc_add_result(hits_proc, kFieldAuth, mgp::type_float()); + + } catch (const std::exception &e) { + // Destroy values if exception occurs earlier + mgp_value_destroy(default_max_iterations); + mgp_value_destroy(default_stop_epsilon); + return 1; + } + + mgp_value_destroy(default_max_iterations); + mgp_value_destroy(default_stop_epsilon); + + return 0; +} + +extern "C" int mgp_shutdown_module() { return 0; } diff --git a/cpp/hits_module/hits_test.cpp b/cpp/hits_module/hits_test.cpp new file mode 100644 index 000000000..1d27496cc --- /dev/null +++ b/cpp/hits_module/hits_test.cpp @@ -0,0 +1,53 @@ +#include +#include "gtest/gtest.h" +#include "mg_test_utils.hpp" + +#include "algorithm/hits.hpp" + +class HitsTests : public testing::TestWithParam, std::vector > >>{}; + +TEST_P(HitsTests, ParametrizedTest){ + auto graph =std::get<0>(GetParam()); + auto [hub, auth] = std::get<1>(GetParam()); + auto [hub_result, auth_result] = hits_alg::ParallelIterativeHits(graph); + ASSERT_TRUE(mg_test_utility::TestEqualVectors(hub_result, hub)); + ASSERT_TRUE(mg_test_utility::TestEqualVectors(auth_result, auth)); + +} + +INSTANTIATE_TEST_SUITE_P( Hits, HitsTests, + + testing::Values( + std::make_tuple(hits_alg::HitsGraph(1, 0, {}),std::make_tuple(std::vector{0}, std::vector{0})), + std::make_tuple(hits_alg::HitsGraph(2, 1, {{0, 1}}),std::make_tuple(std::vector{1, 0}, std::vector{0, 1})), + std::make_tuple(hits_alg::HitsGraph(0, 0, {}),std::make_tuple(std::vector{}, std::vector{})), + std::make_tuple(hits_alg::HitsGraph(1, 1, {{0, 0}}),std::make_tuple(std::vector{1}, std::vector{1})), + std::make_tuple(hits_alg::HitsGraph(2, 2, {{0, 1},{0, 1}}),std::make_tuple(std::vector{1, 0}, std::vector{0, 1})), + std::make_tuple(hits_alg::HitsGraph(2, 1, {{1, 1}}),std::make_tuple(std::vector{0, 1}, std::vector{0, 1})), + std::make_tuple(hits_alg::HitsGraph(5, 11, {{0, 2},{0, 0},{2, 3},{3, 1},{1, 3},{1, 0},{1, 2},{3, 0},{0, 1},{3, 2},{4, 0}}), + std::make_tuple(std::vector{0.293869, 0.257972, 0.0361229, 0.293869, 0.118168}, + std::vector{0.358123, 0.218354, 0.314219, 0.109304, 0.0000})), + std::make_tuple(hits_alg::HitsGraph(4, 4, {{1, 0},{3, 0},{2, 0},{3, 0}}), + std::make_tuple(std::vector{0, 0.25, 0.25, 0.5}, + std::vector{1, 0, 0, 0})), + + std::make_tuple(hits_alg::HitsGraph(7, 30, {{0, 6},{3, 0},{6, 2},{0, 3},{2, 3}, + {6, 4},{1, 1},{2, 0},{0, 3},{5, 0},{0, 4},{5, 2},{1, 5},{5, 3},{2, 3},{6, 1},{2, 0}, + {6, 1},{2, 6},{2, 2},{0, 0},{6, 0},{6, 0},{0, 6},{3, 3},{6, 3},{1, 3},{4, 0},{1, 2}, + {2, 1}}), std::make_tuple( + std::vector{0.189179, 0.0958985, 0.247642, 0.0898859, 0.0446254, 0.111874, 0.220895}, + std::vector{0.256163, 0.146562, 0.126215, 0.259812, 0.076528, 0.0178975, 0.116822})), + + std::make_tuple(hits_alg::HitsGraph(10, 9, {{8, 8}, {2, 2}, {0, 8}, {7, 8}, {1, 6}, {0, 0}, {1, 1}, {6, 3}, {9, 5}}), + std::make_tuple(std::vector{0.414214 ,0 ,0 ,0 ,0 ,0 ,0 ,0.292893 ,0.292893 ,0 }, + std::vector{0.292893 ,0 ,0 ,0 ,0 ,0 ,0 ,0 ,0.707107 ,0})) + +)); + + + + +int main(int argc, char **argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/cpp/personalised_pagerank_module/algorithm/personalised_pagerank.cpp b/cpp/personalised_pagerank_module/algorithm/personalised_pagerank.cpp new file mode 100644 index 000000000..36d347c77 --- /dev/null +++ b/cpp/personalised_pagerank_module/algorithm/personalised_pagerank.cpp @@ -0,0 +1,268 @@ +#include +#include +#include +#include + +#include "personalised_pagerank.hpp" + + +namespace personalised_pagerank_alg { + + namespace { + + template + class AdjacencyList { + public: + AdjacencyList() = default; + explicit AdjacencyList(std::uint64_t node_count) : list_(node_count) {} + + auto GetNodeCount() const { return list_.size(); } + + /// AdjacentPair is a pair of T. Values of T have to be >= 0 and < node_count + /// because they represent position in the underlying std::vector. + void AddAdjacentPair(T left_node_id, T right_node_id, bool undirected = false) { + list_[left_node_id].push_back(right_node_id); + if (undirected) { + list_[right_node_id].push_back(left_node_id); + } + } + + /// Be careful and don't call AddAdjacentPair while you have reference to this + /// vector because the referenced vector could be resized (moved) which means + /// that the reference is going to become invalid. + /// + /// @return A reference to std::vector of adjecent node ids. + const auto &GetAdjacentNodes(T node_id) const { return list_[node_id]; } + + private: + std::vector> list_; + }; + + using EdgePair = std::pair; + +/// Calculates optimal borders for dividing edges in number_of_threads +/// consecutive partitions (blocks) such that the maximal block size is minimal +/// For example: if number_of_edges = 10 and number_of_threads = 3: +/// optimal borders = {0, 3, 6, 10} so obtained blocks of edges +/// are [0, 3>, [3, 6> and [6, 10>. +/// +/// @param graph -- graph +/// @param number_of_threads -- number of threads + std::vector CalculateOptimalBorders(const PageRankGraph &graph, const std::uint64_t number_of_threads) { + std::vector borders; + + if (number_of_threads == 0) { + throw std::runtime_error("Number of threads can't be zero (0)!"); + } + + for (std::uint64_t border_index = 0; border_index <= number_of_threads; border_index++) { + borders.push_back(border_index * graph.GetEdgeCount() / number_of_threads); + } + return borders; + } + +/// Calculating summation of dangle nodes ranks to use to in personalise page rank +/// @param graph -- graph +/// @param start_rank -- rank of initial nodes + double DangleNodeSum(const PageRankGraph &graph, std::vector start_rank) { + std::vector is_dangling_nodes = graph.GetDangleNodes(); + double sum_dangle_node_rank = 0; + for (std::uint64_t dangle_node_id = 0; dangle_node_id < is_dangling_nodes.size(); dangle_node_id++) { + sum_dangle_node_rank += start_rank[is_dangling_nodes[dangle_node_id]]; + } + return sum_dangle_node_rank; + } + +/// Calculating personalised pagerank block related to [lo, hi> interval of edges +/// Graph edges are ordered by target node ids so target nodes of an interval +/// of edges also form an interval of nodes. +/// Required PageRank values of nodes from this interval will be sent +/// as vector of values using new_rank_promise. +/// +/// @param graph -- graph +/// @param old_rank -- rank of nodes after previous iterations +/// @param damping_factor -- damping factor +/// @param lo -- left bound of interval of edges +/// @param hi -- right bound of interval of edges +/// @param new_rank_promise -- used for sending information about calculated new block + void ThreadPageRankIteration(const PageRankGraph &graph, const std::vector &old_rank, + const std::uint64_t lo, const std::uint64_t hi, + std::promise> new_rank_promise) { + + std::vector new_rank(graph.GetNodeCount(), 0); + for (std::size_t edge_id = lo; edge_id < hi; edge_id++) { + const auto [source, target] = graph.GetOrderedEdges()[edge_id]; + new_rank[source] += old_rank[target] / graph.GetOutDegree(target); + } + new_rank_promise.set_value(std::move(new_rank)); + } + +/// Merging PageRank blocks from ThreadPageRankIteration. +/// +/// @param graph -- graph +/// @param damping_factor -- damping factor +/// @param block -- PageRank block calculated in ThreadPageRankIteration +/// @param rank_next -- personalised pagerank which will be updated + void AddCurrentBlockToRankNext(const PageRankGraph &graph, const double damping_factor, + const std::vector &block, std::vector &rank_next, + std::vector &dangle_p_rank) { + for (std::size_t node_index = 0; node_index < block.size(); node_index++) { + rank_next[node_index] += damping_factor * (block[node_index] + dangle_p_rank[node_index]); + } + } + +/// Adds remaining PageRank values +/// Adds PageRank values of nodes that haven't been added by +/// AddCurrentBlockToRankNext. That are nodes whose id is greater +/// than id of target node of the last edge in ordered edge list. +/// +/// @param graph -- graph +/// @param damping_factor -- damping factor +/// @param rank_next -- PageRank which will be updated + void CompleteRankNext(const PageRankGraph &graph, const double damping_factor, std::vector &rank_next) { + while (rank_next.size() < graph.GetNodeCount()) { + rank_next.push_back((1.0 - damping_factor) / graph.GetNodeCount()); + } + } + +/// Checks whether PageRank algorithm should continue iterating +/// Checks if maximal number of iterations was reached or +/// if difference between every component of previous and current PageRank +/// was less than stop_epsilon. +/// +/// @param rank -- PageRank before the last iteration +/// @param rank_next -- PageRank after the last iteration +/// @param max_iterations -- maximal number of iterations +/// @param stop_epsilon -- stop epsilon +/// @param number_of_iterations -- current number of operations + bool CheckContinueIterate(const std::vector &rank, const std::vector &rank_next, + const std::size_t max_iterations, const double stop_epsilon, + const std::size_t number_of_iterations) { + if (number_of_iterations == max_iterations) { + return false; + } + for (std::size_t node_id = 0; node_id < rank.size(); node_id++) { + if (std::abs(rank[node_id] - rank_next[node_id]) > stop_epsilon) { + return true; + } + } + return false; + } + +/// Normalizing PageRank +/// Divides all values with sum of the values to get their sum equal 1 +/// +/// @param rank -- PageRank + void NormalizeRank(std::vector &rank) { + const double sum = std::accumulate(rank.begin(), rank.end(), 0.0); + for (double &value: rank) { + value /= sum; + } + } + }// namespace + + + + PageRankGraph::PageRankGraph(const std::uint64_t number_of_nodes, const std::uint64_t number_of_edges, + const std::vector &edges) + : node_count_(number_of_nodes), edge_count_(number_of_edges), out_degree_(number_of_nodes) { + AdjacencyList in_neighbours(number_of_nodes); + + for (const auto [from, to] : edges) { + // Because PageRank needs a set of nodes that point to a given node. + in_neighbours.AddAdjacentPair(from, to); + out_degree_[from] += 1; + } + for (std::size_t node_id = 0; node_id < number_of_nodes; node_id++) { + const auto &adjacent_nodes = in_neighbours.GetAdjacentNodes(node_id); + for (const auto adjacent_node : adjacent_nodes) { + ordered_edges_.emplace_back(adjacent_node, node_id); + } + } + }- + + std::uint64_t PageRankGraph::GetNodeCount() const { return node_count_; } + + std::uint64_t PageRankGraph::GetEdgeCount() const { return edge_count_; } + + const std::vector &PageRankGraph::GetOrderedEdges() const { return ordered_edges_; } + + std::uint64_t PageRankGraph::GetOutDegree(const std::uint64_t node_id) const { return out_degree_[node_id]; } + + std::vector PageRankGraph::GetDangleNodes() const { + std::vector DangleNodes; + if (!out_degree_.empty()) { + for (std::size_t node_id = 0; node_id < node_count_; node_id++) { + if (out_degree_[node_id] == 0) { DangleNodes.emplace_back(node_id); } + }} + return DangleNodes; + } + + + + +std::vector ParallelIterativePPageRank(const PageRankGraph &graph, const std::vector>& personalisation, + std::size_t max_iterations, + double damping_factor, double stop_epsilon) { + const std::uint64_t number_of_threads = std::thread::hardware_concurrency()/2; + auto borders = CalculateOptimalBorders(graph, number_of_threads); + std::vector start_rank(graph.GetNodeCount(), 1.0 / graph.GetNodeCount()); + std::vector prank(graph.GetNodeCount(), 1.0 / graph.GetNodeCount()); + if (!personalisation.empty()) { + std::vector personalised_rank(graph.GetNodeCount(), 0); + for (const auto [node, p_value]: personalisation) { + personalised_rank[node] = p_value; + } + const double sum = std::accumulate(start_rank.begin(), start_rank.end(), 0.0); + for (double &value: personalised_rank) { value /= sum; } + prank.swap(personalised_rank); + } + bool continue_iterate = max_iterations != 0; + std::size_t number_of_iterations = 0; + while (continue_iterate) { + double sum_dangle_node_rank = DangleNodeSum(graph, start_rank); + std::vector dangle_p_rank = prank; + std::transform(dangle_p_rank.begin(), dangle_p_rank.end(), dangle_p_rank.begin(), + [sum_dangle_node_rank](double &c) { return c * sum_dangle_node_rank; }); + std::vector>> page_rank_promise(number_of_threads); + std::vector>> page_rank_future; + page_rank_future.reserve(number_of_threads); + std::transform(page_rank_promise.begin(), page_rank_promise.end(), std::back_inserter(page_rank_future), + [](auto &pr_promise) { return pr_promise.get_future(); }); + std::vector my_threads; + my_threads.reserve(number_of_threads); + for (std::size_t cluster_id = 0; cluster_id < number_of_threads; cluster_id++) { + my_threads.emplace_back( + [&, lo = borders[cluster_id], hi = borders[cluster_id + 1]]( + auto promise) { ThreadPageRankIteration(graph, start_rank, lo, hi, std::move(promise)); }, + std::move(page_rank_promise[cluster_id]) + ); + } + std::vector rank_next = prank; + double damp = 1 - damping_factor; + std::transform(dangle_p_rank.begin(), dangle_p_rank.end(), dangle_p_rank.begin(), + [damp](double &c) { return c * damp; }); + for (std::size_t cluster_id = 0; cluster_id < number_of_threads; cluster_id++) { + std::vector block = page_rank_future[cluster_id].get(); + AddCurrentBlockToRankNext(graph, damping_factor, block, rank_next, dangle_p_rank); + } + CompleteRankNext(graph, damping_factor, rank_next); + + for (std::uint64_t i = 0; i < number_of_threads; i++) { + if (my_threads[i].joinable()) { + my_threads[i].join(); + } + } + start_rank.swap(rank_next); + number_of_iterations++; + continue_iterate = CheckContinueIterate(start_rank, rank_next, max_iterations, stop_epsilon, + number_of_iterations); + + + } + NormalizeRank(start_rank); + return start_rank; + +}; + +} //namespace personalised_pagerank_alg diff --git a/cpp/personalised_pagerank_module/algorithm/personalised_pagerank.hpp b/cpp/personalised_pagerank_module/algorithm/personalised_pagerank.hpp new file mode 100644 index 000000000..e0fd4c525 --- /dev/null +++ b/cpp/personalised_pagerank_module/algorithm/personalised_pagerank.hpp @@ -0,0 +1,89 @@ +#pragma once + +#include + +namespace personalised_pagerank_alg { + +// Defines edge's from and to node index. Just an alias for user convenience. + using EdgePair = std::pair; + +/// A directed, unweighted graph. +/// Self loops and multiple edges are allowed and they will affect the result. +/// Node ids are integers from interval [0, number of nodes in graph - 1]. +/// Graph is allowed to be disconnected. + class PageRankGraph { + public: + /// Creates graph with given number of nodes with node ids from interval + /// [0, number_of_nodes - 1] and with given edges between them. + /// Node ids describing edges have to be integers from + /// interval [0, number of nodes in graph - 1]. + /// @param number_of_nodes -- number of nodes in graph + /// @param number_of_edges -- number of edges in graph + /// @param edges -- pairs (source, target) representing directed edges + PageRankGraph(std::uint64_t number_of_nodes, std::uint64_t number_of_edges, const std::vector &edges); + + /// @return -- number of nodes in graph + std::uint64_t GetNodeCount() const; + + /// @return -- number of edges in graph + std::uint64_t GetEdgeCount() const; + + /// @return -- a reference to ordered vector of edges + const std::vector &GetOrderedEdges() const; + + /// Returns out degree of node node_id + /// @param node_id -- node name + /// @return -- out degree of node node_id + std::uint64_t GetOutDegree(std::uint64_t node_id) const; + + /// @return -- vector of dangle node_id + std::vector GetDangleNodes() const; + + + private: + /// node_count equals number of nodes in graph + std::uint64_t node_count_; + /// edge_count equals number of edges in graph + std::uint64_t edge_count_; + /// directed edges (source, target) (source -> target) ordered by target + std::vector ordered_edges_; + /// out degree for each node in graph because it is required in calculating + /// PageRank + std::vector out_degree_; + }; + +/// If we present nodes as pages and directed edges between them as links the +/// PageRank algorithm outputs a probability distribution used to represent the +/// likelihood that a person randomly clicking on links will arrive at any +/// particular page. +/// +/// PageRank theory holds that an imaginary surfer who is randomly clicking on +/// links will eventually stop clicking. The probability, at any step, that the +/// person will continue randomly clicking on links is called a damping factor, +/// otherwise next page is chosen randomly among all pages. + +////The calculation start with initial rank of 1/N for each node, +/// However, if a personalization vector exists, a weight is assigned to each node that +/// influences the random walk restart.It biases the walk towards specific nodes. + +/// PageRank is computed iteratively using following formula: +/// Rank(n, t + 1) = (1 - d) / number_of_nodes +/// + d * sum { Rank(in_neighbour_of_n, t) / +/// out_degree(in_neighbour_of_n)} +/// Where Rank(n, t) is PageRank of node n at iteration t +/// At the end Rank values are normalized to sum 1 to form probability +/// distribution. +/// + +/// @param graph -- a directed, unweighted, not necessarily connected graph +/// which can contain multiple edges and self-loops. +/// @param personalisation -- The "personalization vector" consisting of a vector with a +// subset of pairs of graph nodes and personalization value each of those for ex: {{3,1},{29,1}}. +/// @param max_iterations -- maximum number of iterations performed by PageRank. +/// @param damping_factor -- a real number from interval [0, 1], as described above +/// @param stop_epsilon -- stop epsilon +/// @return -- probability distribution, as described above + std::vector ParallelIterativePPageRank(const PageRankGraph &graph, const std::vector>& personalisation = {}, size_t max_iterations = 100, + double damping_factor = 0.85, double stop_epsilon = 10e-6); + +} // namespace personalised_pagerank_alg diff --git a/cpp/personalised_pagerank_module/personalised_pagerank_test.cpp b/cpp/personalised_pagerank_module/personalised_pagerank_test.cpp new file mode 100644 index 000000000..c94e85365 --- /dev/null +++ b/cpp/personalised_pagerank_module/personalised_pagerank_test.cpp @@ -0,0 +1,64 @@ +#include + +#include "gtest/gtest.h" +#include + +#include "algorithm/personalised_pagerank.hpp" + +class PPagerankTests : public testing::TestWithParam>,std::vector>> {}; + +TEST_P(PPagerankTests, ParametrizedTest) { + auto graph = std::get<0>(GetParam()); + auto personalisation_vector = std::get<1>(GetParam()) ; + auto expected = std::get<2>(GetParam()); + auto results = personalised_pagerank_alg::ParallelIterativePPageRank(graph, personalisation_vector); + ASSERT_TRUE(mg_test_utility::TestEqualVectors(results, expected)); +} + +INSTANTIATE_TEST_SUITE_P( + Pagerank, PPagerankTests, + /// + ///@brief Parametrized test consists out of tuple. First value represents the pagerank algorithm entry graph, while + /// second value stands for the expected value. + /// + testing::Values( + std::make_tuple(personalised_pagerank_alg::PageRankGraph(6, 10, {{0, 1}, {1, 0}, {1, 2}, {0, 2}, {2, 3}, {3, 2}, {4, 3}, {5, 3}, {4, 5}, {5, 4}}), + std::vector>{{3,1}}, + std::vector{0.000, 0.000, 0.459459, 0.540541, 0.000, 0.000}), + std::make_tuple(personalised_pagerank_alg::PageRankGraph(1, 0, {}), std::vector> {}, std::vector{1.00}), + std::make_tuple(personalised_pagerank_alg::PageRankGraph(2, 1, {{0, 1}}), std::vector> {}, std::vector{0.350877362, 0.649122638}), + std::make_tuple(personalised_pagerank_alg::PageRankGraph(0, 0, {}), std::vector> {}, std::vector{}), + std::make_tuple(personalised_pagerank_alg::PageRankGraph(1, 1, {{0, 0}}), std::vector> {}, std::vector{1.00}), + std::make_tuple(personalised_pagerank_alg::PageRankGraph(2, 2, {{0, 1}, {0, 1}}), std::vector> {}, + std::vector{0.350877362, 0.649122638}), + std::make_tuple(personalised_pagerank_alg::PageRankGraph(2, 1, {{1, 1}}), std::vector> {}, std::vector{0.130435201, 0.869564799}), + std::make_tuple(personalised_pagerank_alg::PageRankGraph( + 5, 10, {{0, 2}, {0, 0}, {2, 3}, {3, 1}, {1, 3}, {1, 0}, {1, 2}, {3, 0}, {0, 1}, {3, 2}}), + std::vector> {}, std::vector{0.240963851, 0.187763717, 0.240963851, 0.294163985, 0.036144598}), + std::make_tuple(personalised_pagerank_alg::PageRankGraph( + 10, 10, {{9, 5}, {4, 4}, {3, 8}, {0, 5}, {5, 0}, {3, 0}, {7, 9}, {3, 9}, {0, 4}, {0, 4}}), + std::vector> {}, std::vector{0.114178360, 0.023587998, 0.023587998, 0.023587998, 0.588577186, + 0.098712132, 0.023587998, 0.023587998, 0.030271265, 0.050321066}), + std::make_tuple(personalised_pagerank_alg::PageRankGraph( + 10, 9, {{8, 8}, {2, 2}, {0, 8}, {7, 8}, {1, 6}, {0, 0}, {1, 1}, {6, 3}, {9, 5}}), + std::vector> {}, std::vector{0.047683471, 0.047683471, 0.182781325, 0.067949168, 0.027417774, + 0.050723042, 0.047683471, 0.027417774, 0.473242731, 0.027417774}), + std::make_tuple(personalised_pagerank_alg::PageRankGraph(5, 25, {{3, 3}, {0, 3}, {4, 2}, {1, 1}, {3, 2}, {2, 0}, {4, 0}, + {4, 4}, {3, 2}, {4, 1}, {2, 4}, {2, 2}, {2, 3}, {3, 3}, + {0, 0}, {1, 0}, {4, 2}, {4, 0}, {1, 2}, {1, 4}, {4, 0}, + {4, 0}, {0, 0}, {4, 0}, {3, 3}}), + std::vector> {}, std::vector{{0.304824023, 0.049593211, 0.217782046, 0.331928795, 0.095871925}}), + std::make_tuple(personalised_pagerank_alg::PageRankGraph(4, 4, {{1, 0}, {3, 0}, {2, 0}, {3, 0}}), + std::vector> {}, std::vector{0.541985357, 0.152671548, 0.152671548, 0.152671548}), + std::make_tuple(personalised_pagerank_alg::PageRankGraph( + 7, 30, {{0, 6}, {3, 0}, {6, 2}, {0, 3}, {2, 3}, {6, 4}, {1, 1}, {2, 0}, {0, 3}, {5, 0}, + {0, 4}, {5, 2}, {1, 5}, {5, 3}, {2, 3}, {6, 1}, {2, 0}, {6, 1}, {2, 6}, {2, 2}, + {0, 0}, {6, 0}, {6, 0}, {0, 6}, {3, 3}, {6, 3}, {1, 3}, {4, 0}, {1, 2}, {2, 1}}), + std::vector> {}, std::vector{0.318471859, 0.075311781, 0.071307161, 0.295999683, 0.081155915, + 0.037432346, 0.120321254}) + ) + ); +int main(int argc, char **argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}