## Revisit Classification on Citation Network on K8s

GraphScope is designed for processing large graphs, which are usually hard to fit in the memory of a single machine. With vineyard as the distributed in-memory data manager, GraphScope supports run on a cluster managed by Kubernetes(k8s).

Next, we revisit the example we present in the first tutorial, showing how GraphScope process the node classification task on a Kubernetes cluster.

This tutorial has the following steps:
- Creating a session and loading graph;
- Querying graph data;
- Running graph algorithms;
- Running graph-based machine learning tasks;
- Closing the session and releasing resources.

**Please note, since this tutorial is designed to run on a k8s cluster, it may not compatible to the environments besides Playground.**

## Create a session on kubernetes and load graph

In [None]:
# Import the graphscope module

import graphscope

graphscope.set_option(show_log=False) # enable logging

In [None]:
# Create a session on kubernetes cluster and
# mount dataset bucket to path "/datasets" in pod.

from graphscope.dataset import load_ogbn_mag

sess = graphscope.session(with_dataset=True, k8s_service_type='LoadBalancer', k8s_image_pull_policy='Always')

Behind the scenes, the session tries to launch a coordinator, which is the entry for the back-end engines. The coordinator manages a cluster of k8s pods (2 pods by default), and the interactive/analytical/learning engines run on them. For each pod in the cluster, there is a vineyard instance at service for distributed data in memory. 

Run the cell and take a look at the log, it prints the whole process of the session launching.

The log **GraphScope coordinator service connected** means the session launches successfully, and the current Python client has connected to the session.

You can also check a session's status by this.

In [None]:
sess

Run this cell, you may find a "status" field with value "active". Together with the status, it also prints other meta-info of this session, i.e., such as the number of workers (pods), the coordinator endpoint for connection, and so on.

In [None]:
# Load the obgn_mag dataset in "sess" as a graph

graph = load_ogbn_mag(sess, "/datasets/ogbn_mag_small/")

# print the schema of the graph
print(graph)

## Interactive query with gremlin

In this example, we launch a interactive query and use graph traversal to count the number of papers two given authors have co-authored. To simplify the query, we assume the authors can be uniquely identified by ID `2` and `4307`, respectively.

In [None]:
# Get the entrypoint for submitting Gremlin queries on graph g.
interactive = sess.gremlin(graph)

# Count the number of papers two authors (with id 2 and 4307) have co-authored.
papers = interactive.execute(
 "g.V().has('author', 'id', 2).out('writes').where(__.in('writes').has('id', 4307)).count()"
).one()
print("result", papers)

## Graph analytics with analytical engine

Continuing our example, we run graph algorithms on graph to generate structural features. below we first derive a subgraph by extracting publications in specific time out of the entire graph (using Gremlin!), and then run k-core decomposition and triangle counting to generate the structural features of each paper node.

In [None]:
# Exact a subgraph of publication within a time range.
sub_graph = interactive.subgraph("g.V().has('year', inside(2014, 2020)).outE('cites')")

# Project the subgraph to simple graph by selecting papers and their citations.
simple_g = sub_graph.project(vertices={"paper": []}, edges={"cites": []})
# compute the kcore and triangle-counting.
kc_result = graphscope.k_core(simple_g, k=5)
tc_result = graphscope.triangles(simple_g)

# Add the results as new columns to the citation graph.
sub_graph = sub_graph.add_column(kc_result, {"kcore": "r"})
sub_graph = sub_graph.add_column(tc_result, {"tc": "r"})

## Graph neural networks (GNNs)

Then, we use the generated structural features and original features to train a learning model with learning engine.

In our example, we train a supervised GraphSAGE model to classify the nodes (papers) into 349 categories,
each of which represents a venue (e.g. pre-print and conference).

In [None]:
# Define the features for learning,
# we chose original 128-dimension feature and k-core, triangle count result as new features.
paper_features = []
for i in range(128):
 paper_features.append("feat_" + str(i))
paper_features.append("kcore")
paper_features.append("tc")

# launch a learning engine. here we split the dataset, 75% as train, 10% as validation and 15% as test.
lg = sess.graphlearn(
 sub_graph,
 nodes=[("paper", paper_features)],
 edges=[("paper", "cites", "paper")],
 gen_labels=[
 ("train", "paper", 100, (0, 75)),
 ("val", "paper", 100, (75, 85)),
 ("test", "paper", 100, (85, 100)),
 ],
)

# Then we define the training process using the example EgoGraphSAGE model with tensorflow.
try:
 # https://www.tensorflow.org/guide/migrate
 import tensorflow.compat.v1 as tf
 tf.disable_v2_behavior()
except ImportError:
 import tensorflow as tf

import argparse
import graphscope.learning.graphlearn.python.nn.tf as tfg
from graphscope.learning.examples import EgoGraphSAGE
from graphscope.learning.examples import EgoSAGEUnsupervisedDataLoader
from graphscope.learning.examples.tf.trainer import LocalTrainer

def parse_args():
 argparser = argparse.ArgumentParser("Train EgoSAGE Unsupervised.")
 argparser.add_argument('--batch_size', type=int, default=128)
 argparser.add_argument('--features_num', type=int, default=130)
 argparser.add_argument('--hidden_dim', type=int, default=128)
 argparser.add_argument('--output_dim', type=int, default=128)
 argparser.add_argument('--nbrs_num', type=list, default=[5, 5])
 argparser.add_argument('--neg_num', type=int, default=5)
 argparser.add_argument('--learning_rate', type=float, default=0.0001)
 argparser.add_argument('--epochs', type=int, default=1)
 argparser.add_argument('--agg_type', type=str, default="mean")
 argparser.add_argument('--drop_out', type=float, default=0.0)
 argparser.add_argument('--sampler', type=str, default='random')
 argparser.add_argument('--neg_sampler', type=str, default='in_degree')
 argparser.add_argument('--temperature', type=float, default=0.07)
 return argparser.parse_args()

args = parse_args()

# define model
dims = [args.features_num] + [args.hidden_dim] * (len(args.nbrs_num) - 1) + [args.output_dim]
model = EgoGraphSAGE(dims, agg_type=args.agg_type, dropout=args.drop_out)

# prepare the training dataset
train_data = EgoSAGEUnsupervisedDataLoader(lg, None, sampler=args.sampler, 
 neg_sampler=args.neg_sampler, batch_size=args.batch_size,
 node_type='paper', edge_type='cites', nbrs_num=args.nbrs_num)
src_emb = model.forward(train_data.src_ego)
dst_emb = model.forward(train_data.dst_ego)
neg_dst_emb = model.forward(train_data.neg_dst_ego)
loss = tfg.unsupervised_softmax_cross_entropy_loss(
 src_emb, dst_emb, neg_dst_emb, temperature=args.temperature)
optimizer = tf.train.AdamOptimizer(learning_rate=args.learning_rate)

# Start training
trainer = LocalTrainer()
trainer.train(train_data.iterator, loss, optimizer, epochs=args.epochs)


Finally, a session manages the resources in the cluster, thus it is important to release these resources when they are no longer required. To de-allocate the resources, use the method **close** on the session when all the graph tasks are finished.

In [None]:
# Close the session.
sess.close()