{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## 基于 Kubernetes 环境再探 论文引用网络中的节点分类任务\n", "\n", "GraphScope 作为一站式超大规模图处理系统,背后依赖 [vineyard](https://github.com/v6d-io/v6d) 作为分布式内存数据管理器,支持在 Kubernetes 管理的集群上运行。\n", "\n", "接下来,我们回顾第一个教程中的示例,展示 GraphScope 如何基于 Kubernetes 集群,计算论文引用网络中的节点分类任务。\n", "\n", "这一教程将会分为以下几个步骤:\n", "\n", "- 建立会话和载图;\n", "- 通过gremlin交互式查询图;\n", "- 执行图算法做图分析;\n", "- 执行基于图数据的机器学习任务;\n", "- 关闭会话\n", "\n", "**除 [GraphScope Jupyter](https://try.graphscope.app) 外,确保运行该教程的环境具备访问操作 [Kubernetes 集群](https://github.com/kubernetes/kubernetes) 的能力**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 创建一个会话,并载入 ogbn_mag 数据集" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Import the graphscope module\n", "\n", "import graphscope\n", "\n", "graphscope.set_option(show_log=False) # enable logging" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create a session on kubernetes cluster and\n", "# mount dataset bucket to path \"/datasets\" in pod.\n", "\n", "from graphscope.dataset import load_ogbn_mag\n", "\n", "sess = graphscope.session(with_dataset=True, k8s_service_type='LoadBalancer', k8s_image_pull_policy='Always')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "命令执行后,会话会首先尝试去拉起协调者(Coordinator),其为后端引擎的入口。协调者管理一个由 K8s Pod 组成的集群 (默认有二个 Pod),图交互引擎,图分析引擎,图学习引擎运行在集群上。对于集群中的每一个 Pod,都有一个 Vineyard 实例运行,提供内存中的分布式内存访问。\n", "\n", "运行上面的代码单元格之后,输出的日志里包含创建会话的所有过程。\n", "\n", "日志中 **GraphScope coordinator service connected** 代表会话创建成功,且当前 Python 客户端已连接到此会话。\n", "\n", "如下命令也可以查看会话状态。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sess" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "运行此单元格,可以看到 \"status: active\" 的字样,代表会话状态正常。此外还有一些其他会话的元信息,如工作者 (Worker/Pod)数量,协调者的 endpoint 等等。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Load the obgn_mag dataset in \"sess\" as a graph\n", "\n", "graph = load_ogbn_mag(sess, \"/datasets/ogbn_mag_small/\")\n", "\n", "# print the schema of the graph\n", "\n", "print(graph)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Interactive query with gremlin\n", "\n", "在此示例中,我们启动了一个交互查询引擎,然后使用图遍历来查看两位给定作者共同撰写的论文数量。为了简化查询,我们假设作者可以分别由ID 2 和 4307 唯一标识。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Get the entrypoint for submitting Gremlin queries on graph g.\n", "interactive = sess.gremlin(graph)\n", "\n", "# Count the number of papers two authors (with id 2 and 4307) have co-authored.\n", "papers = interactive.execute(\n", " \"g.V().has('author', 'id', 2).out('writes').where(__.in('writes').has('id', 4307)).count()\"\n", ").one()\n", "print(\"result\", papers)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Graph analytics with analytical engine\n", "\n", "继续我们的示例,下面我们在图数据中进行图分析来生成节点结构特征。我们首先通过在特定周期内从全图中提取论文(使用Gremlin!)来导出一个子图,然后运行 k-core 分解和三角形计数以生成每个论文节点的结构特征。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Exact a subgraph of publication within a time range.\n", "sub_graph = interactive.subgraph(\"g.V().has('year', inside(2014, 2020)).outE('cites')\")\n", "\n", "# Project the subgraph to simple graph by selecting papers and their citations.\n", "simple_g = sub_graph.project(vertices={\"paper\": []}, edges={\"cites\": []})\n", "# compute the kcore and triangle-counting.\n", "kc_result = graphscope.k_core(simple_g, k=5)\n", "tc_result = graphscope.triangles(simple_g)\n", "\n", "# Add the results as new columns to the citation graph.\n", "sub_graph = sub_graph.add_column(kc_result, {\"kcore\": \"r\"})\n", "sub_graph = sub_graph.add_column(tc_result, {\"tc\": \"r\"})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Graph neural networks (GNNs)\n", "\n", "接着我们利用生成的结构特征和原有特征通过GraphScope的学习引擎来训练一个学习模型。\n", "\n", "在本例中,我们训练了GraphSAGE模型,将节点(论文)分类为349个类别,每个类别代表一个出处(例如预印本和会议)。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Define the features for learning,\n", "# we chose original 128-dimension feature and k-core, triangle count result as new features.\n", "paper_features = []\n", "for i in range(128):\n", " paper_features.append(\"feat_\" + str(i))\n", "paper_features.append(\"kcore\")\n", "paper_features.append(\"tc\")\n", "\n", "# launch a learning engine. here we split the dataset, 75% as train, 10% as validation and 15% as test.\n", "lg = sess.graphlearn(\n", " sub_graph,\n", " nodes=[(\"paper\", paper_features)],\n", " edges=[(\"paper\", \"cites\", \"paper\")],\n", " gen_labels=[\n", " (\"train\", \"paper\", 100, (0, 75)),\n", " (\"val\", \"paper\", 100, (75, 85)),\n", " (\"test\", \"paper\", 100, (85, 100)),\n", " ],\n", ")\n", "\n", "# Then we define the training process using the example EgoGraphSAGE model with tensorflow.\n", "try:\n", " # https://www.tensorflow.org/guide/migrate\n", " import tensorflow.compat.v1 as tf\n", " tf.disable_v2_behavior()\n", "except ImportError:\n", " import tensorflow as tf\n", "\n", "import argparse\n", "import graphscope.learning.graphlearn.python.nn.tf as tfg\n", "from graphscope.learning.examples import EgoGraphSAGE\n", "from graphscope.learning.examples import EgoSAGEUnsupervisedDataLoader\n", "from graphscope.learning.examples.tf.trainer import LocalTrainer\n", "\n", "def parse_args():\n", " argparser = argparse.ArgumentParser(\"Train EgoSAGE Unsupervised.\")\n", " argparser.add_argument('--batch_size', type=int, default=128)\n", " argparser.add_argument('--features_num', type=int, default=130)\n", " argparser.add_argument('--hidden_dim', type=int, default=128)\n", " argparser.add_argument('--output_dim', type=int, default=128)\n", " argparser.add_argument('--nbrs_num', type=list, default=[5, 5])\n", " argparser.add_argument('--neg_num', type=int, default=5)\n", " argparser.add_argument('--learning_rate', type=float, default=0.0001)\n", " argparser.add_argument('--epochs', type=int, default=1)\n", " argparser.add_argument('--agg_type', type=str, default=\"mean\")\n", " argparser.add_argument('--drop_out', type=float, default=0.0)\n", " argparser.add_argument('--sampler', type=str, default='random')\n", " argparser.add_argument('--neg_sampler', type=str, default='in_degree')\n", " argparser.add_argument('--temperature', type=float, default=0.07)\n", " return argparser.parse_args()\n", "\n", "args = parse_args()\n", "\n", "# define model\n", "dims = [args.features_num] + [args.hidden_dim] * (len(args.nbrs_num) - 1) + [args.output_dim]\n", "model = EgoGraphSAGE(dims, agg_type=args.agg_type, dropout=args.drop_out)\n", "\n", "# prepare the training dataset\n", "train_data = EgoSAGEUnsupervisedDataLoader(lg, None, sampler=args.sampler, \n", " neg_sampler=args.neg_sampler, batch_size=args.batch_size,\n", " node_type='paper', edge_type='cites', nbrs_num=args.nbrs_num)\n", "src_emb = model.forward(train_data.src_ego)\n", "dst_emb = model.forward(train_data.dst_ego)\n", "neg_dst_emb = model.forward(train_data.neg_dst_ego)\n", "loss = tfg.unsupervised_softmax_cross_entropy_loss(\n", " src_emb, dst_emb, neg_dst_emb, temperature=args.temperature)\n", "optimizer = tf.train.AdamOptimizer(learning_rate=args.learning_rate)\n", "\n", "# Start training\n", "trainer = LocalTrainer()\n", "trainer.train(train_data.iterator, loss, optimizer, epochs=args.epochs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "最后,会话管理着集群的资源,因此在使用完会话后需要释放资源。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Close the session.\n", "sess.close()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" } }, "nbformat": 4, "nbformat_minor": 4 }