{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# 编写自定义图分析算法" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Install graphscope package if you are NOT in the Playground\n", "\n", "!pip3 install graphscope\n", "GraphScope 的图分析引擎继承了 [GRAPE](https://dl.acm.org/doi/10.1145/3282488) , 该系统于 SIGMOD2017 上首次提出并获得最佳论文奖。\n", "\n", "与以往的系统的不同,GRAPE 支持将串行图算法自动并行化。在 GRAPE 中, 只需进行很小的更改即可轻松地将串行算法即插即用,使其并行化的运行在分布式环境,并高效地处理大规模图数据。 除了易于编程外,GRAPE 还被设计为高效且可拓展的系统,可灵活应对现实中图应用多变的规模、多样性和复杂性。\n", "\n", "在这个教程中,我们将展示如何进行自定义基于 PIE 模型或者 Pregel 模型的图分析算法。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Install graphscope package if you are NOT in the Playground\n", "\n", "!pip3 install graphscope" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 自定义基于 PIE 模型的图算法" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "GraphScope 支持用户使用纯 Python 语言,以 [PIE](https://dl.acm.org/doi/10.1145/3282488) 编程模型写图算法。首先我们需要导入 GraphScope 的包和 PIE 装饰器。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Import the graphscope module.\n", "\n", "import graphscope\n", "from graphscope.framework.app import AppAssets\n", "from graphscope.analytical.udf.decorators import pie\n", "\n", "\n", "graphscope.set_option(show_log=False) # enable logging" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "以单源最短路径([SSSP](https://en.wikipedia.org/wiki/Shortest_path_problem))为例,编写 PIE 算法需要填写以下几个函数。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@pie(vd_type=\"double\", md_type=\"double\")\n", "class SSSP_PIE(AppAssets):\n", " @staticmethod\n", " def Init(frag, context):\n", " pass\n", "\n", " @staticmethod\n", " def PEval(frag, context):\n", " pass\n", "\n", " @staticmethod\n", " def IncEval(frag, context):\n", " pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "装饰器 **pie** 包含两个参数 `vd_type` 和 `md_type`, 他们分别代表节点上的数据类型和消息的数据类型。\n", "\n", "您可以为您的算法指定这两个数据类型,可用的数据类型包括 `int`,`double` 和 `string`。\n", "在我们这个示例中,由于 SSSP 计算的距离和发送的消息更新均是 `double` 类型,所以我们为这两个类型都指定为 `double`。\n", "\n", "在函数 `Init`,`PEval` 和 `IncEval`中,都有 **frag** 和 **context** 这两个参数。它们分别用于在算法逻辑中访问图数据和中间结果数据。可以查看文档 [Cython SDK API](https://graphscope.io/docs/reference/cython_sdk.html) 了解更多。\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 定义 Init 函数" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@pie(vd_type=\"double\", md_type=\"double\")\n", "class SSSP_PIE(AppAssets):\n", " @staticmethod\n", " def Init(frag, context):\n", " v_label_num = frag.vertex_label_num()\n", " for v_label_id in range(v_label_num):\n", " nodes = frag.nodes(v_label_id)\n", " context.init_value(\n", " nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate\n", " )\n", " context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)\n", "\n", " @staticmethod\n", " def PEval(frag, context):\n", " pass\n", "\n", " @staticmethod\n", " def IncEval(frag, context):\n", " pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`Init` 函数有以下几个职责:1)为每个节点设置初始值; 2)定义消息传递的策略; 3)指定聚合器,以在每一轮中处理收到的消息。\n", " \n", "请注意,您定义的算法将在属性图上运行。所以我们应该首先通过 `v_label_num = frag.vertex_label_num()` 获得顶点标签,然后可以遍历所有具有相同标签的节点,\n", "并通过 `nodes = frag.nodes(v_label_id)` 和 `context.init_value(nodes, v_label_id, 1000000000.0,PIEAggregateType.kMinAggregate)` 设置初始值。\n", "\n", "由于我们正在计算源节点和其他节点之间的最短路径,在这里我们将 `PIEAggregateType.kMinAggregate` 用作消息聚合的聚合器。这意味着它将对所有收到的消息执行去 Min 操作。其他可用的聚合器包括 `kMaxAggregate`,`kSumAggregate`,`kProductAggregate` 和 `kOverwriteAggregate`。\n", "\n", "在 `Init` 函数的最后,我们向节点注册 `MessageStrategy.kSyncOnOuterVertex` 指定如何传递消息。" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 定义 PEval 函数" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@pie(vd_type=\"double\", md_type=\"double\")\n", "class SSSP_PIE(AppAssets):\n", " @staticmethod\n", " def Init(frag, context):\n", " v_label_num = frag.vertex_label_num()\n", " for v_label_id in range(v_label_num):\n", " nodes = frag.nodes(v_label_id)\n", " context.init_value(\n", " nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate\n", " )\n", " context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)\n", "\n", " @staticmethod\n", " def PEval(frag, context):\n", " src = int(context.get_config(b\"src\"))\n", " graphscope.declare(graphscope.Vertex, source)\n", " native_source = False\n", " v_label_num = frag.vertex_label_num()\n", " for v_label_id in range(v_label_num):\n", " if frag.get_inner_node(v_label_id, src, source):\n", " native_source = True\n", " break\n", " if native_source:\n", " context.set_node_value(source, 0)\n", " else:\n", " return\n", " e_label_num = frag.edge_label_num()\n", " for e_label_id in range(e_label_num):\n", " edges = frag.get_outgoing_edges(source, e_label_id)\n", " for e in edges:\n", " dst = e.neighbor()\n", " distv = e.get_int(2)\n", " if context.get_node_value(dst) > distv:\n", " context.set_node_value(dst, distv)\n", "\n", " @staticmethod\n", " def IncEval(frag, context):\n", " pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "在 **SSSP** 的 `PEval` 函数中, 通过函数 `context.get_config(b\"src\")` 获取用户查询的 src 节点。\n", "\n", "`PEval` 会在每个分区上调用 `frag.get_inner_node(v_label_id, src, source)` 的方法查询 src 节点是否在本分区。请注意这里函数 `get_inner_node` method needs a `source` 需要一个类型为 `Vertex` 的参数, 我们通过 `graphscope.declare(graphscope.Vertex, source)` 来定义声明。\n", " \n", "如果一个分区包含查询的源节点,它将调用 `frag.get_outgoing_edges(source,e_label_id)` 遍历源节点的所有输出边。 对于每个顶点,它计算到源的距离。如果该值小于初始值,则更新该值。" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 定义 IncEval 函数\n", "\n", "**SSSP** 算法的 `IncEval` 和 `PEval` 之间唯一区别是 `IncEval` 将会在每个分区上都被调用。计算完 `IncEval` 后,将会产生一些消息,这些消息将被发送到其他分区供下一轮`IncEval` 调用。一直到所有分区都不再产生消息,此时算法结束。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@pie(vd_type=\"double\", md_type=\"double\")\n", "class SSSP_PIE(AppAssets):\n", " @staticmethod\n", " def Init(frag, context):\n", " v_label_num = frag.vertex_label_num()\n", " for v_label_id in range(v_label_num):\n", " nodes = frag.nodes(v_label_id)\n", " context.init_value(\n", " nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate\n", " )\n", " context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)\n", "\n", " @staticmethod\n", " def PEval(frag, context):\n", " src = int(context.get_config(b\"src\"))\n", " graphscope.declare(graphscope.Vertex, source)\n", " native_source = False\n", " v_label_num = frag.vertex_label_num()\n", " for v_label_id in range(v_label_num):\n", " if frag.get_inner_node(v_label_id, src, source):\n", " native_source = True\n", " break\n", " if native_source:\n", " context.set_node_value(source, 0)\n", " else:\n", " return\n", " e_label_num = frag.edge_label_num()\n", " for e_label_id in range(e_label_num):\n", " edges = frag.get_outgoing_edges(source, e_label_id)\n", " for e in edges:\n", " dst = e.neighbor()\n", " distv = e.get_int(2)\n", " if context.get_node_value(dst) > distv:\n", " context.set_node_value(dst, distv)\n", "\n", " @staticmethod\n", " def IncEval(frag, context):\n", " v_label_num = frag.vertex_label_num()\n", " e_label_num = frag.edge_label_num()\n", " for v_label_id in range(v_label_num):\n", " iv = frag.inner_nodes(v_label_id)\n", " for v in iv:\n", " v_dist = context.get_node_value(v)\n", " for e_label_id in range(e_label_num):\n", " es = frag.get_outgoing_edges(v, e_label_id)\n", " for e in es:\n", " u = e.neighbor()\n", " u_dist = v_dist + e.get_int(2)\n", " if context.get_node_value(u) > u_dist:\n", " context.set_node_value(u, u_dist)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 在图上调用您的算法" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Load p2p network dataset\n", "\n", "from graphscope.dataset import load_p2p_network\n", "\n", "graph = load_p2p_network(directed=False, generate_eid=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "然后,初始化刚才定义的算法,并查询起始点为 `6` 的最短路径。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sssp = SSSP_PIE()\n", "ctx = sssp(graph, src=6)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "执行这个 Cell,您的算法应该被成功执行。\n", "此时,结果存在于分布式的 pod 内存中,由 vineyard 在进行管理。我们可以通过以下方法将远程数据取过来并展示。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "r1 = (\n", " ctx.to_dataframe({\"node\": \"v:host.id\", \"r\": \"r:host\"})\n", " .sort_values(by=[\"node\"])\n", " .to_numpy(dtype=float)\n", ")\n", "r1" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 保存和载入您的算法\n", "\n", "您可以保存您的自定义算法供之后使用。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "\n", "# specify the path you want to dump\n", "dump_path = os.path.expanduser(\"~/sssp_pie.gar\")\n", "\n", "# dump\n", "SSSP_PIE.to_gar(dump_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "现在 您可以在目录 `~/` 下看到您的算法包 `sssp_pie.gar`。下次再次使用时,只需要这样载入:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from graphscope.framework.app import load_app\n", "\n", "# specify the path you want to dump\n", "dump_path = os.path.expanduser(\"~/sssp_pie.gar\")\n", "\n", "sssp2 = load_app(dump_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 基于 Pregel 模型编写算法\n", "\n", "除了上面的 PIE 模型,您也可以通过基于点中心编程的 Pregel 模型编写自己的算法。首先我们需要导入 GraphScope 包和 **pregel** 装饰器。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import graphscope\n", "\n", "from graphscope.framework.app import AppAssets\n", "from graphscope.analytical.udf.decorators import pregel" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@pregel(vd_type=\"double\", md_type=\"double\")\n", "class SSSP_Pregel(AppAssets):\n", " @staticmethod\n", " def Init(v, context):\n", " pass\n", "\n", " @staticmethod\n", " def Compute(messages, v, context):\n", " pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**pregel** 装饰器也有两个类型参数 `vd_type` 和 `md_type`,分别用于指定点上的数据类型和消息的类型,支持的类型包括 `int`,`double` 和 `string`。\n", "在我们的 **SSSP** 例子中,我们将这两个值都设置为 `double`。\n", "\n", "由于 Pregel 模型的算法逻辑是定义在点上的,它的 `Init` 和 `Compute` 函数有一个参数 `v` 用于访问点上的数据。查看文档 [Cython SDK API](https://graphscope.io/docs/reference/cython_sdk.html) 了解更多。" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 定义 Init 函数" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@pregel(vd_type=\"double\", md_type=\"double\")\n", "class SSSP_Pregel(AppAssets):\n", " @staticmethod\n", " def Init(v, context):\n", " v.set_value(1000000000.0)\n", "\n", " @staticmethod\n", " def Compute(messages, v, context):\n", " pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`Init` 函数将每个点上的初始路径设置为极大值。" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 定义 Compute 函数\n", "\n", "**SSSP** 的 `Compute` 函数通过以下步骤为每个节点计算新距离:\n", "\n", "1)首先初始化值为1000000000的新值 \n", "2)如果顶点是源节点,则将其距离设置为0 \n", "3)计算接收到的消息的 Min 值,如果该值小于当前值,则设置该值。 \n", "\n", "重复这些步骤,直到不再生成新消息(距离更短)为止。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@pregel(vd_type=\"double\", md_type=\"double\")\n", "class SSSP_Pregel(AppAssets):\n", " @staticmethod\n", " def Init(v, context):\n", " v.set_value(1000000000.0)\n", "\n", " @staticmethod\n", " def Compute(messages, v, context):\n", " src_id = context.get_config(b\"src\")\n", " cur_dist = v.value()\n", " new_dist = 1000000000.0\n", " if v.id() == src_id:\n", " new_dist = 0\n", " for message in messages:\n", " new_dist = min(message, new_dist)\n", " if new_dist < cur_dist:\n", " v.set_value(new_dist)\n", " for e_label_id in range(context.edge_label_num()):\n", " edges = v.outgoing_edges(e_label_id)\n", " for e in edges:\n", " v.send(e.vertex(), new_dist + e.get_int(2))\n", " v.vote_to_halt()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 可选的 Combiner 函数\n", "\n", "我们可以定义一个 Combiner 以减少消息通信的开销,注意这个 Combiner 函数是可选的,您也可以不实现。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@pregel(vd_type=\"double\", md_type=\"double\")\n", "class SSSP_Pregel(AppAssets):\n", " @staticmethod\n", " def Init(v, context):\n", " v.set_value(1000000000.0)\n", "\n", " @staticmethod\n", " def Compute(messages, v, context):\n", " src_id = context.get_config(b\"src\")\n", " cur_dist = v.value()\n", " new_dist = 1000000000.0\n", " if v.id() == src_id:\n", " new_dist = 0\n", " for message in messages:\n", " new_dist = min(message, new_dist)\n", " if new_dist < cur_dist:\n", " v.set_value(new_dist)\n", " for e_label_id in range(context.edge_label_num()):\n", " edges = v.outgoing_edges(e_label_id)\n", " for e in edges:\n", " v.send(e.vertex(), new_dist + e.get_int(2))\n", " v.vote_to_halt()\n", "\n", " @staticmethod\n", " def Combine(messages):\n", " ret = 1000000000.0\n", " for m in messages:\n", " ret = min(ret, m)\n", " return ret" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 运行您的 Pregel 算法\n", "\n", "接下来让我们运行基于 Pregel 写的 SSSP 算法和查看结果。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sssp_pregel = SSSP_Pregel()\n", "ctx = sssp_pregel(graph, src=6)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "r2 = (\n", " ctx.to_dataframe({\"node\": \"v:host.id\", \"r\": \"r:host\"})\n", " .sort_values(by=[\"node\"])\n", " .to_numpy(dtype=float)\n", ")\n", "r2" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Pregel 模型中的 Aggregator" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Pregel 模型中的 aggregator 聚合器是一种用于全局通信、监视和计数的机制。 \n", "\n", "每个顶点都可以在超步 `S` 中为聚合器提供值,系统将这些值聚合在一起,使用归约算符对这些值进行计算,并在超步 `S+1` 中将所得值提供给所有顶点。\n", "GraphScope 为 Pregel 算法提供了许多预定义的聚合器,例如对数据类型的 `min`,`max` 或 `sum` 等。\n", "\n", "这里是使用内置 aggregator 的示例,您也可以在文档 [Cython SDK API](https://graphscope.io/docs/reference/cython_sdk.html) 中了解更多" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@pregel(vd_type=\"double\", md_type=\"double\")\n", "class Aggregators_Pregel_Test(AppAssets):\n", " @staticmethod\n", " def Init(v, context):\n", " # int\n", " context.register_aggregator(\n", " b\"int_sum_aggregator\", PregelAggregatorType.kInt64SumAggregator\n", " )\n", " context.register_aggregator(\n", " b\"int_max_aggregator\", PregelAggregatorType.kInt64MaxAggregator\n", " )\n", " context.register_aggregator(\n", " b\"int_min_aggregator\", PregelAggregatorType.kInt64MinAggregator\n", " )\n", " # double\n", " context.register_aggregator(\n", " b\"double_product_aggregator\", PregelAggregatorType.kDoubleProductAggregator\n", " )\n", " context.register_aggregator(\n", " b\"double_overwrite_aggregator\",\n", " PregelAggregatorType.kDoubleOverwriteAggregator,\n", " )\n", " # bool\n", " context.register_aggregator(\n", " b\"bool_and_aggregator\", PregelAggregatorType.kBoolAndAggregator\n", " )\n", " context.register_aggregator(\n", " b\"bool_or_aggregator\", PregelAggregatorType.kBoolOrAggregator\n", " )\n", " context.register_aggregator(\n", " b\"bool_overwrite_aggregator\", PregelAggregatorType.kBoolOverwriteAggregator\n", " )\n", " # text\n", " context.register_aggregator(\n", " b\"text_append_aggregator\", PregelAggregatorType.kTextAppendAggregator\n", " )\n", "\n", " @staticmethod\n", " def Compute(messages, v, context):\n", " if context.superstep() == 0:\n", " context.aggregate(b\"int_sum_aggregator\", 1)\n", " context.aggregate(b\"int_max_aggregator\", int(v.id()))\n", " context.aggregate(b\"int_min_aggregator\", int(v.id()))\n", " context.aggregate(b\"double_product_aggregator\", 1.0)\n", " context.aggregate(b\"double_overwrite_aggregator\", 1.0)\n", " context.aggregate(b\"bool_and_aggregator\", True)\n", " context.aggregate(b\"bool_or_aggregator\", False)\n", " context.aggregate(b\"bool_overwrite_aggregator\", True)\n", " context.aggregate(b\"text_append_aggregator\", v.id() + b\",\")\n", " else:\n", " if v.id() == b\"1\":\n", " assert context.get_aggregated_value(b\"int_sum_aggregator\") == 62586\n", " assert context.get_aggregated_value(b\"int_max_aggregator\") == 62586\n", " assert context.get_aggregated_value(b\"int_min_aggregator\") == 1\n", " assert context.get_aggregated_value(b\"double_product_aggregator\") == 1.0\n", " assert (\n", " context.get_aggregated_value(b\"double_overwrite_aggregator\") == 1.0\n", " )\n", " assert context.get_aggregated_value(b\"bool_and_aggregator\") == True\n", " assert context.get_aggregated_value(b\"bool_or_aggregator\") == False\n", " assert (\n", " context.get_aggregated_value(b\"bool_overwrite_aggregator\") == True\n", " )\n", " context.get_aggregated_value(b\"text_append_aggregator\")\n", " v.vote_to_halt()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" } }, "nbformat": 4, "nbformat_minor": 4 }