{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Revisit Classification on Citation Network on K8s\n", "\n", "GraphScope is designed for processing large graphs, which are usually hard to fit in the memory of a single machine. With vineyard as the distributed in-memory data manager, GraphScope supports run on a cluster managed by Kubernetes(k8s).\n", "\n", "Next, we revisit the example we present in the first tutorial, showing how GraphScope process the node classification task on a Kubernetes cluster.\n", "\n", "This tutorial has the following steps:\n", "- Creating a session and loading graph;\n", "- Querying graph data;\n", "- Running graph algorithms;\n", "- Running graph-based machine learning tasks;\n", "- Closing the session and releasing resources.\n", "\n", "**Please note, since this tutorial is designed to run on a k8s cluster, it may not compatible to the environments besides Playground.**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create a session on kubernetes and load graph" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Import the graphscope module\n", "\n", "import graphscope\n", "\n", "graphscope.set_option(show_log=False) # enable logging" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create a session on kubernetes cluster and\n", "# mount dataset bucket to path \"/datasets\" in pod.\n", "\n", "from graphscope.dataset import load_ogbn_mag\n", "\n", "sess = graphscope.session(with_dataset=True, k8s_service_type='LoadBalancer', k8s_image_pull_policy='Always')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Behind the scenes, the session tries to launch a coordinator, which is the entry for the back-end engines. The coordinator manages a cluster of k8s pods (2 pods by default), and the interactive/analytical/learning engines run on them. For each pod in the cluster, there is a vineyard instance at service for distributed data in memory. \n", "\n", "Run the cell and take a look at the log, it prints the whole process of the session launching.\n", "\n", "The log **GraphScope coordinator service connected** means the session launches successfully, and the current Python client has connected to the session.\n", "\n", "You can also check a session's status by this." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sess" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Run this cell, you may find a \"status\" field with value \"active\". Together with the status, it also prints other meta-info of this session, i.e., such as the number of workers (pods), the coordinator endpoint for connection, and so on." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Load the obgn_mag dataset in \"sess\" as a graph\n", "\n", "graph = load_ogbn_mag(sess, \"/datasets/ogbn_mag_small/\")\n", "\n", "# print the schema of the graph\n", "print(graph)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Interactive query with gremlin\n", "\n", "In this example, we launch a interactive query and use graph traversal to count the number of papers two given authors have co-authored. To simplify the query, we assume the authors can be uniquely identified by ID `2` and `4307`, respectively." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Get the entrypoint for submitting Gremlin queries on graph g.\n", "interactive = sess.gremlin(graph)\n", "\n", "# Count the number of papers two authors (with id 2 and 4307) have co-authored.\n", "papers = interactive.execute(\n", " \"g.V().has('author', 'id', 2).out('writes').where(__.in('writes').has('id', 4307)).count()\"\n", ").one()\n", "print(\"result\", papers)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Graph analytics with analytical engine\n", "\n", "Continuing our example, we run graph algorithms on graph to generate structural features. below we first derive a subgraph by extracting publications in specific time out of the entire graph (using Gremlin!), and then run k-core decomposition and triangle counting to generate the structural features of each paper node." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Exact a subgraph of publication within a time range.\n", "sub_graph = interactive.subgraph(\"g.V().has('year', inside(2014, 2020)).outE('cites')\")\n", "\n", "# Project the subgraph to simple graph by selecting papers and their citations.\n", "simple_g = sub_graph.project(vertices={\"paper\": []}, edges={\"cites\": []})\n", "# compute the kcore and triangle-counting.\n", "kc_result = graphscope.k_core(simple_g, k=5)\n", "tc_result = graphscope.triangles(simple_g)\n", "\n", "# Add the results as new columns to the citation graph.\n", "sub_graph = sub_graph.add_column(kc_result, {\"kcore\": \"r\"})\n", "sub_graph = sub_graph.add_column(tc_result, {\"tc\": \"r\"})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Graph neural networks (GNNs)\n", "\n", "Then, we use the generated structural features and original features to train a learning model with learning engine.\n", "\n", "In our example, we train a supervised GraphSAGE model to classify the nodes (papers) into 349 categories,\n", "each of which represents a venue (e.g. pre-print and conference)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Define the features for learning,\n", "# we chose original 128-dimension feature and k-core, triangle count result as new features.\n", "paper_features = []\n", "for i in range(128):\n", " paper_features.append(\"feat_\" + str(i))\n", "paper_features.append(\"kcore\")\n", "paper_features.append(\"tc\")\n", "\n", "# launch a learning engine. here we split the dataset, 75% as train, 10% as validation and 15% as test.\n", "lg = sess.graphlearn(\n", " sub_graph,\n", " nodes=[(\"paper\", paper_features)],\n", " edges=[(\"paper\", \"cites\", \"paper\")],\n", " gen_labels=[\n", " (\"train\", \"paper\", 100, (0, 75)),\n", " (\"val\", \"paper\", 100, (75, 85)),\n", " (\"test\", \"paper\", 100, (85, 100)),\n", " ],\n", ")\n", "\n", "# Then we define the training process using the example EgoGraphSAGE model with tensorflow.\n", "try:\n", " # https://www.tensorflow.org/guide/migrate\n", " import tensorflow.compat.v1 as tf\n", " tf.disable_v2_behavior()\n", "except ImportError:\n", " import tensorflow as tf\n", "\n", "import argparse\n", "import graphscope.learning.graphlearn.python.nn.tf as tfg\n", "from graphscope.learning.examples import EgoGraphSAGE\n", "from graphscope.learning.examples import EgoSAGEUnsupervisedDataLoader\n", "from graphscope.learning.examples.tf.trainer import LocalTrainer\n", "\n", "def parse_args():\n", " argparser = argparse.ArgumentParser(\"Train EgoSAGE Unsupervised.\")\n", " argparser.add_argument('--batch_size', type=int, default=128)\n", " argparser.add_argument('--features_num', type=int, default=130)\n", " argparser.add_argument('--hidden_dim', type=int, default=128)\n", " argparser.add_argument('--output_dim', type=int, default=128)\n", " argparser.add_argument('--nbrs_num', type=list, default=[5, 5])\n", " argparser.add_argument('--neg_num', type=int, default=5)\n", " argparser.add_argument('--learning_rate', type=float, default=0.0001)\n", " argparser.add_argument('--epochs', type=int, default=1)\n", " argparser.add_argument('--agg_type', type=str, default=\"mean\")\n", " argparser.add_argument('--drop_out', type=float, default=0.0)\n", " argparser.add_argument('--sampler', type=str, default='random')\n", " argparser.add_argument('--neg_sampler', type=str, default='in_degree')\n", " argparser.add_argument('--temperature', type=float, default=0.07)\n", " return argparser.parse_args()\n", "\n", "args = parse_args()\n", "\n", "# define model\n", "dims = [args.features_num] + [args.hidden_dim] * (len(args.nbrs_num) - 1) + [args.output_dim]\n", "model = EgoGraphSAGE(dims, agg_type=args.agg_type, dropout=args.drop_out)\n", "\n", "# prepare the training dataset\n", "train_data = EgoSAGEUnsupervisedDataLoader(lg, None, sampler=args.sampler, \n", " neg_sampler=args.neg_sampler, batch_size=args.batch_size,\n", " node_type='paper', edge_type='cites', nbrs_num=args.nbrs_num)\n", "src_emb = model.forward(train_data.src_ego)\n", "dst_emb = model.forward(train_data.dst_ego)\n", "neg_dst_emb = model.forward(train_data.neg_dst_ego)\n", "loss = tfg.unsupervised_softmax_cross_entropy_loss(\n", " src_emb, dst_emb, neg_dst_emb, temperature=args.temperature)\n", "optimizer = tf.train.AdamOptimizer(learning_rate=args.learning_rate)\n", "\n", "# Start training\n", "trainer = LocalTrainer()\n", "trainer.train(train_data.iterator, loss, optimizer, epochs=args.epochs)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally, a session manages the resources in the cluster, thus it is important to release these resources when they are no longer required. To de-allocate the resources, use the method **close** on the session when all the graph tasks are finished." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Close the session.\n", "sess.close()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5 (default, Jul 28 2020, 12:59:40) \n[GCC 9.3.0]" }, "vscode": { "interpreter": { "hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6" } } }, "nbformat": 4, "nbformat_minor": 4 }