{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Writing Your Own Graph Algorithms" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The analytical engine in GraphScope derives from [GRAPE](https://dl.acm.org/doi/10.1145/3282488), a graph processing system proposed on SIGMOD-2017. GRAPE differs from prior systems in its ability to parallelize sequential graph algorithms as a whole. In GRAPE, sequential algorithms can be easily **plugged into** with only minor changes and get parallelized to handle large graphs efficiently. \n", "\n", "In this tutorial, we will show how to define and run your own algorithm in PIE and Pregel models.\n", "\n", "Sounds like fun? Excellent, here we go!" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Install graphscope package if you are NOT in the Playground\n", "\n", "!pip3 install graphscope" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Writing algorithm in PIE model" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "GraphScope enables users to write algorithms in the [PIE](https://dl.acm.org/doi/10.1145/3282488) programming model in a pure Python mode, first of all, you should import **graphscope** package and the **pie** decorator." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Import the graphscope module.\n", "\n", "import graphscope\n", "from graphscope.framework.app import AppAssets\n", "from graphscope.analytical.udf.decorators import pie\n", "\n", "\n", "graphscope.set_option(show_log=False) # enable logging" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We use the single source shortest path ([SSSP](https://en.wikipedia.org/wiki/Shortest_path_problem)) algorithm as an example. To implement the PIE model, you just need to **fulfill this class**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@pie(vd_type=\"double\", md_type=\"double\")\n", "class SSSP_PIE(AppAssets):\n", " @staticmethod\n", " def Init(frag, context):\n", " pass\n", "\n", " @staticmethod\n", " def PEval(frag, context):\n", " pass\n", "\n", " @staticmethod\n", " def IncEval(frag, context):\n", " pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The **pie** decorator contains two params named `vd_type` and `md_type` , which represent the vertex data type and message type respectively. \n", "\n", "You may specify types for your own algorithms, optional values are `int`, `double`, and `string`. \n", "In our **SSSP** case, we compute the shortest distance to the source for all nodes, so we use `double` value for `vd_type` and `md_type` both.\n", "\n", "In `Init`, `PEval`, and `IncEval`, it has **frag** and **context** as parameters. You can use these two parameters to access the fragment data and intermediate results. Detail usage please refer to [Cython SDK API](https://graphscope.io/docs/reference/cython_sdk.html).\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Fulfill Init Function" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@pie(vd_type=\"double\", md_type=\"double\")\n", "class SSSP_PIE(AppAssets):\n", " @staticmethod\n", " def Init(frag, context):\n", " v_label_num = frag.vertex_label_num()\n", " for v_label_id in range(v_label_num):\n", " nodes = frag.nodes(v_label_id)\n", " context.init_value(\n", " nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate\n", " )\n", " context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)\n", "\n", " @staticmethod\n", " def PEval(frag, context):\n", " pass\n", "\n", " @staticmethod\n", " def IncEval(frag, context):\n", " pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `Init` function are responsable for 1) setting the initial value for each node; 2) defining the strategy of message passing; and 3) specifing aggregator for handing received message on each rounds. \n", "\n", "Note that the algorithm you defined will run on a property graph. So we should get the vertex label first by `v_label_num = frag.vertex_label_num()`, then we can traverse all nodes with the same label \n", "and set the initial value by `nodes = frag.nodes(v_label_id)` and `context.init_value(nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate)`. \n", "\n", "Since we are computing the shorest path between the source node and others nodes. So we use `PIEAggregateType.kMinAggregate` as the aggregator for mesaage aggregation, which means it will \n", "perform `min` operation upon all received messages. Other avaliable aggregators are `kMaxAggregate`, `kSumAggregate`, `kProductAggregate`, and `kOverwriteAggregate`.\n", "\n", "At the end of `Init` function, we register the sync buffer for each node with `MessageStrategy.kSyncOnOuterVertex`, which tells the engine how to pass the message." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Fulfill PEval Function" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@pie(vd_type=\"double\", md_type=\"double\")\n", "class SSSP_PIE(AppAssets):\n", " @staticmethod\n", " def Init(frag, context):\n", " v_label_num = frag.vertex_label_num()\n", " for v_label_id in range(v_label_num):\n", " nodes = frag.nodes(v_label_id)\n", " context.init_value(\n", " nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate\n", " )\n", " context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)\n", "\n", " @staticmethod\n", " def PEval(frag, context):\n", " src = int(context.get_config(b\"src\"))\n", " graphscope.declare(graphscope.Vertex, source)\n", " native_source = False\n", " v_label_num = frag.vertex_label_num()\n", " for v_label_id in range(v_label_num):\n", " if frag.get_inner_node(v_label_id, src, source):\n", " native_source = True\n", " break\n", " if native_source:\n", " context.set_node_value(source, 0)\n", " else:\n", " return\n", " e_label_num = frag.edge_label_num()\n", " for e_label_id in range(e_label_num):\n", " edges = frag.get_outgoing_edges(source, e_label_id)\n", " for e in edges:\n", " dst = e.neighbor()\n", " distv = e.get_int(2)\n", " if context.get_node_value(dst) > distv:\n", " context.set_node_value(dst, distv)\n", "\n", " @staticmethod\n", " def IncEval(frag, context):\n", " pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In `PEval` of **SSSP**, it gets the queried source node by `context.get_config(b\"src\")`. \n", "\n", "`PEval` checks each fragment whether it contains source node by `frag.get_inner_node(v_label_id, src, source)`. Note that the `get_inner_node` method needs a `source` parameter in type `Vertex`, which you can declare by `graphscope.declare(graphscope.Vertex, source)`\n", " \n", "If a fragment contains the source node, it will traverse the outgoing edges of the source with `frag.get_outgoing_edges(source, e_label_id)`. For each vertex, it computes the distance from the source, and updates the value if the it less than the initial value." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Fulfill IncEval Function" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@pie(vd_type=\"double\", md_type=\"double\")\n", "class SSSP_PIE(AppAssets):\n", " @staticmethod\n", " def Init(frag, context):\n", " v_label_num = frag.vertex_label_num()\n", " for v_label_id in range(v_label_num):\n", " nodes = frag.nodes(v_label_id)\n", " context.init_value(\n", " nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate\n", " )\n", " context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)\n", "\n", " @staticmethod\n", " def PEval(frag, context):\n", " src = int(context.get_config(b\"src\"))\n", " graphscope.declare(graphscope.Vertex, source)\n", " native_source = False\n", " v_label_num = frag.vertex_label_num()\n", " for v_label_id in range(v_label_num):\n", " if frag.get_inner_node(v_label_id, src, source):\n", " native_source = True\n", " break\n", " if native_source:\n", " context.set_node_value(source, 0)\n", " else:\n", " return\n", " e_label_num = frag.edge_label_num()\n", " for e_label_id in range(e_label_num):\n", " edges = frag.get_outgoing_edges(source, e_label_id)\n", " for e in edges:\n", " dst = e.neighbor()\n", " distv = e.get_int(2)\n", " if context.get_node_value(dst) > distv:\n", " context.set_node_value(dst, distv)\n", "\n", " @staticmethod\n", " def IncEval(frag, context):\n", " v_label_num = frag.vertex_label_num()\n", " e_label_num = frag.edge_label_num()\n", " for v_label_id in range(v_label_num):\n", " iv = frag.inner_nodes(v_label_id)\n", " for v in iv:\n", " v_dist = context.get_node_value(v)\n", " for e_label_id in range(e_label_num):\n", " es = frag.get_outgoing_edges(v, e_label_id)\n", " for e in es:\n", " u = e.neighbor()\n", " u_dist = v_dist + e.get_int(2)\n", " if context.get_node_value(u) > u_dist:\n", " context.set_node_value(u, u_dist)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The only difference between `IncEval` and `PEval` of **SSSP** algorithm is that `IncEval` are invoked\n", " on each fragment, rather than only the fragment with source node. A fragment will repeat the `IncEval` until there is no messages received. When all the fragments are finished computation, the algorithm is terminated. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Run Your Algorithm on the p2p network Graph." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Load p2p network dataset\n", "\n", "from graphscope.dataset import load_p2p_network\n", "\n", "graph = load_p2p_network(directed=False, generate_eid=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Then initialize your algorithm and query the shorest path from vertex `6` over the graph." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sssp = SSSP_PIE()\n", "ctx = sssp(graph, src=6)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Runing this cell, your algorithm should evaluate successfully. The results are stored in vineyard in the distributed machies. Let's fetch and check the results." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "r1 = (\n", " ctx.to_dataframe({\"node\": \"v:host.id\", \"r\": \"r:host\"})\n", " .sort_values(by=[\"node\"])\n", " .to_numpy(dtype=float)\n", ")\n", "r1" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Dump and Reload Your Algorithm\n", "\n", "You can dump and save your define algorithm for future use." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "\n", "# specify the path you want to dump\n", "dump_path = os.path.expanduser(\"~/sssp_pie.gar\")\n", "\n", "# dump\n", "SSSP_PIE.to_gar(dump_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, you can find a package named `sssp_pie.gar` in your `~/`. Reload this algorithm with following code. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from graphscope.framework.app import load_app\n", "\n", "# specify the path you want to dump\n", "dump_path = os.path.expanduser(\"~/sssp_pie.gar\")\n", "\n", "sssp2 = load_app(dump_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Write Algorithm in Pregel Model" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In addition to the sub-graph based PIE model, GraphScope supports vertex-centric Pregel model. To define a Pregel algorithm, you should import **pregel** decorator and fulfil the functions defined on vertex." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import graphscope\n", "from graphscope.framework.app import AppAssets\n", "from graphscope.analytical.udf.decorators import pregel" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@pregel(vd_type=\"double\", md_type=\"double\")\n", "class SSSP_Pregel(AppAssets):\n", " @staticmethod\n", " def Init(v, context):\n", " pass\n", "\n", " @staticmethod\n", " def Compute(messages, v, context):\n", " pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The **pregel** decorator has two parameters named `vd_type` and `md_type`, which represent the vertex data type and message type respectively. \n", "\n", "You can specify the types for your algorithm, options are `int`, `double`, and `string`. For **SSSP**, we set both to `double`.\n", "\n", "Since Pregel model are defined on vertex, the `Init` and `Compute` functions has a parameter `v` to access the vertex data. See more details in [Cython SDK API](https://graphscope.io/docs/reference/cython_sdk.html)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Fulfill Init Function¶" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@pregel(vd_type=\"double\", md_type=\"double\")\n", "class SSSP_Pregel(AppAssets):\n", " @staticmethod\n", " def Init(v, context):\n", " v.set_value(1000000000.0)\n", "\n", " @staticmethod\n", " def Compute(messages, v, context):\n", " pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `Init` function sets the initial value for each node by `v.set_value(1000000000.0)`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Fulfill Compute function¶" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@pregel(vd_type=\"double\", md_type=\"double\")\n", "class SSSP_Pregel(AppAssets):\n", " @staticmethod\n", " def Init(v, context):\n", " v.set_value(1000000000.0)\n", "\n", " @staticmethod\n", " def Compute(messages, v, context):\n", " src_id = context.get_config(b\"src\")\n", " cur_dist = v.value()\n", " new_dist = 1000000000.0\n", " if v.id() == src_id:\n", " new_dist = 0\n", " for message in messages:\n", " new_dist = min(message, new_dist)\n", " if new_dist < cur_dist:\n", " v.set_value(new_dist)\n", " for e_label_id in range(context.edge_label_num()):\n", " edges = v.outgoing_edges(e_label_id)\n", " for e in edges:\n", " v.send(e.vertex(), new_dist + e.get_int(2))\n", " v.vote_to_halt()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `Compute` function for **SSSP** computes the new distance for each node by the following steps:\n", "\n", "1) Initialize the new value with value 1000000000 \n", "2) If the vertex is source node, set its distance to 0. \n", "3) Compute the `min` value of messages received, and set the value if it less than the current value.\n", "\n", "Repeat these, until no more new messages(shorter distance) are generated." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Optional Combiner" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Optionally, we can define a combiner to reduce the message communication overhead." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@pregel(vd_type=\"double\", md_type=\"double\")\n", "class SSSP_Pregel(AppAssets):\n", " @staticmethod\n", " def Init(v, context):\n", " v.set_value(1000000000.0)\n", "\n", " @staticmethod\n", " def Compute(messages, v, context):\n", " src_id = context.get_config(b\"src\")\n", " cur_dist = v.value()\n", " new_dist = 1000000000.0\n", " if v.id() == src_id:\n", " new_dist = 0\n", " for message in messages:\n", " new_dist = min(message, new_dist)\n", " if new_dist < cur_dist:\n", " v.set_value(new_dist)\n", " for e_label_id in range(context.edge_label_num()):\n", " edges = v.outgoing_edges(e_label_id)\n", " for e in edges:\n", " v.send(e.vertex(), new_dist + e.get_int(2))\n", " v.vote_to_halt()\n", "\n", " @staticmethod\n", " def Combine(messages):\n", " ret = 1000000000.0\n", " for m in messages:\n", " ret = min(ret, m)\n", " return ret" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Run Your Pregel Algorithm on Graph." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, let's run your Pregel algorithm on the graph, and check the results." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sssp_pregel = SSSP_Pregel()\n", "ctx = sssp_pregel(graph, src=6)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "r2 = (\n", " ctx.to_dataframe({\"node\": \"v:host.id\", \"r\": \"r:host\"})\n", " .sort_values(by=[\"node\"])\n", " .to_numpy(dtype=float)\n", ")\n", "r2" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Aggregator in Pregel" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Pregel aggregators are a mechanism for global communication, monitoring, and counting. Each vertex can provide a value to an aggregator in superstep `S`, the system combines these \n", "values using a reducing operator, and the resulting value is made available to all vertices in superstep `S+1`. GraphScope provides a number of predefined aggregators for Pregel algorithms, such as `min`, `max`, or `sum` operations on data types.\n", "\n", "Here is a example for use a builtin aggregator, more details can be found in [Cython SDK API](https://graphscope.io/docs/reference/cython_sdk.html) " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@pregel(vd_type=\"double\", md_type=\"double\")\n", "class Aggregators_Pregel_Test(AppAssets):\n", " @staticmethod\n", " def Init(v, context):\n", " # int\n", " context.register_aggregator(\n", " b\"int_sum_aggregator\", PregelAggregatorType.kInt64SumAggregator\n", " )\n", " context.register_aggregator(\n", " b\"int_max_aggregator\", PregelAggregatorType.kInt64MaxAggregator\n", " )\n", " context.register_aggregator(\n", " b\"int_min_aggregator\", PregelAggregatorType.kInt64MinAggregator\n", " )\n", " # double\n", " context.register_aggregator(\n", " b\"double_product_aggregator\", PregelAggregatorType.kDoubleProductAggregator\n", " )\n", " context.register_aggregator(\n", " b\"double_overwrite_aggregator\",\n", " PregelAggregatorType.kDoubleOverwriteAggregator,\n", " )\n", " # bool\n", " context.register_aggregator(\n", " b\"bool_and_aggregator\", PregelAggregatorType.kBoolAndAggregator\n", " )\n", " context.register_aggregator(\n", " b\"bool_or_aggregator\", PregelAggregatorType.kBoolOrAggregator\n", " )\n", " context.register_aggregator(\n", " b\"bool_overwrite_aggregator\", PregelAggregatorType.kBoolOverwriteAggregator\n", " )\n", " # text\n", " context.register_aggregator(\n", " b\"text_append_aggregator\", PregelAggregatorType.kTextAppendAggregator\n", " )\n", "\n", " @staticmethod\n", " def Compute(messages, v, context):\n", " if context.superstep() == 0:\n", " context.aggregate(b\"int_sum_aggregator\", 1)\n", " context.aggregate(b\"int_max_aggregator\", int(v.id()))\n", " context.aggregate(b\"int_min_aggregator\", int(v.id()))\n", " context.aggregate(b\"double_product_aggregator\", 1.0)\n", " context.aggregate(b\"double_overwrite_aggregator\", 1.0)\n", " context.aggregate(b\"bool_and_aggregator\", True)\n", " context.aggregate(b\"bool_or_aggregator\", False)\n", " context.aggregate(b\"bool_overwrite_aggregator\", True)\n", " context.aggregate(b\"text_append_aggregator\", v.id() + b\",\")\n", " else:\n", " if v.id() == b\"1\":\n", " assert context.get_aggregated_value(b\"int_sum_aggregator\") == 62586\n", " assert context.get_aggregated_value(b\"int_max_aggregator\") == 62586\n", " assert context.get_aggregated_value(b\"int_min_aggregator\") == 1\n", " assert context.get_aggregated_value(b\"double_product_aggregator\") == 1.0\n", " assert (\n", " context.get_aggregated_value(b\"double_overwrite_aggregator\") == 1.0\n", " )\n", " assert context.get_aggregated_value(b\"bool_and_aggregator\") == True\n", " assert context.get_aggregated_value(b\"bool_or_aggregator\") == False\n", " assert (\n", " context.get_aggregated_value(b\"bool_overwrite_aggregator\") == True\n", " )\n", " context.get_aggregated_value(b\"text_append_aggregator\")\n", " v.vote_to_halt()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" } }, "nbformat": 4, "nbformat_minor": 4 }