# RAG-on-GKE Application

This is a Python notebook for generating the vector embeddings used by the RAG on GKE application. For full information, please checkout the GitHub documentation [here](https://github.com/GoogleCloudPlatform/ai-on-gke/blob/main/applications/rag/README.md).


## Setup Kaggle Credentials

First we will setup your Kaggle credentials and use the Kaggle CLI to download the NetFlix shows dataset to the GCS bucket. Replace the following with your own settings from the Kaggle web page. Navigate to https://www.kaggle.com/settings/account and generate an API token to be used to setup the env variable. See https://www.kaggle.com/docs/api#authentication how to create one.

In [None]:
import os
os.environ['KAGGLE_USERNAME'] = "<username>"
os.environ['KAGGLE_KEY'] = "<token>"

# Download the zip file to local storage and then extract the desired contents directly to the GKE GCS CSI mounted bucket. The bucket is mounted at the "/persist-data" path in the jupyter pod.
!kaggle datasets download -d shivamb/netflix-shows -p ~/data --force
!mkdir /data/netflix-shows -p
!unzip -o ~/data/netflix-shows.zip -d /data/netflix-shows

## Download the Data

Let's now import required modules:

In [None]:
import os
import uuid
import ray
from langchain.document_loaders import ArxivLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from sentence_transformers import SentenceTransformer
from typing import List
import torch
from datasets import load_dataset_builder, load_dataset, Dataset
from huggingface_hub import snapshot_download
from google.cloud.sql.connector import Connector, IPTypes
import sqlalchemy

Next we'll setup some parameters for the dataset processing steps:

In [None]:
SHARED_DATA_BASEPATH='/data/rag/st'
SENTENCE_TRANSFORMER_MODEL = 'intfloat/multilingual-e5-small' # Transformer to use for converting text chunks to vector embeddings
SENTENCE_TRANSFORMER_MODEL_PATH_NAME='models--intfloat--multilingual-e5-small' # the downloaded model path takes this form for a given model name
SENTENCE_TRANSFORMER_MODEL_SNAPSHOT="ffdcc22a9a5c973ef0470385cef91e1ecb461d9f" # specific snapshot of the model to use
SENTENCE_TRANSFORMER_MODEL_PATH = SHARED_DATA_BASEPATH + '/' + SENTENCE_TRANSFORMER_MODEL_PATH_NAME + '/snapshots/' + SENTENCE_TRANSFORMER_MODEL_SNAPSHOT # the path where the model is downloaded one time

# the dataset has been pre-dowloaded to the GCS bucket as part of the notebook in the cell above. Ray workers will find the dataset readily mounted.
SHARED_DATASET_BASE_PATH="/data/netflix-shows/"
REVIEWS_FILE_NAME="netflix_titles.csv"

BATCH_SIZE = 500
CHUNK_SIZE = 1000 # text chunk sizes which will be converted to vector embeddings
CHUNK_OVERLAP = 10
TABLE_NAME = 'netflix_reviews_db'  # CloudSQL table name
DIMENSION = 384  # Embeddings size
ACTOR_POOL_SIZE = 1 # number of actors for the distributed map_batches function

Now we will download the sentence transformer model to our GCS bucket:

In [None]:
# prepare the persistent shared directory to store artifacts needed for the ray workers
os.makedirs(SHARED_DATA_BASEPATH, exist_ok=True)

# One time download of the sentence transformer model to a shared persistent storage available to the ray workers
snapshot_download(repo_id=SENTENCE_TRANSFORMER_MODEL, revision=SENTENCE_TRANSFORMER_MODEL_SNAPSHOT, cache_dir=SHARED_DATA_BASEPATH)

## Generating Vector Embeddings

We are ready to begin. Let's first create some code for generating the vector embeddings:

In [None]:
class Embed:
  def __init__(self):
        print("torch cuda version", torch.version.cuda)
        device="cpu"
        if torch.cuda.is_available():
            print("device cuda found")
            device="cuda"

        print ("reading sentence transformer model from cache path:", SENTENCE_TRANSFORMER_MODEL_PATH)
        self.transformer = SentenceTransformer(SENTENCE_TRANSFORMER_MODEL_PATH, device=device)
        self.splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP, length_function=len)

  def __call__(self, text_batch: List[str]):
      text = text_batch["item"]
      # print("type(text)=", type(text), "type(text_batch)=", type(text_batch))
      chunks = []
      for data in text:
        splits = self.splitter.split_text(data)
        # print("len(data)", len(data), "len(splits)=", len(splits))
        chunks.extend(splits)

      embeddings = self.transformer.encode(
          chunks,
          batch_size=BATCH_SIZE
      ).tolist()
      print("len(chunks)=", len(chunks), ", len(emb)=", len(embeddings))
      return {'results':list(zip(chunks, embeddings))}

Next we will initialize a Ray cluster to execute the remote task:

In [None]:
import ray

ray.init(
    address="ray://ray-cluster-kuberay-head-svc:10001",
    runtime_env={
        "pip": [               
            "langchain==0.1.10",
            "transformers==4.38.1",
            "sentence-transformers==2.5.1",
            "pyarrow",
            "datasets==2.18.0",
            "torch==2.0.1",
            "huggingface_hub==0.21.3",
        ]
    }
)

Generate vector embeddings using our Embed class above:

In [None]:
# Process the dataset first, wrap the csv file contents into a Ray dataset
ray_ds = ray.data.read_csv(SHARED_DATASET_BASE_PATH + REVIEWS_FILE_NAME)
print(ray_ds.schema)

# Distributed flat map to extract the raw text fields.
ds_batch = ray_ds.flat_map(lambda row: [{
    'item': "This is a " + str(row["type"]) + " in " + str(row["country"]) + " called " + str(row["title"]) + 
    " added at " + str(row["date_added"]) + " whose director is " + str(row["director"]) + 
    " and with cast: " + str(row["cast"]) + " released at " + str(row["release_year"]) + 
    ". Its rating is: " + str(row['rating']) + ". Its duration is " + str(row["duration"]) + 
    ". Its description is " + str(row['description']) + "."
}])
print(ds_batch.schema)

# Distributed map batches to create chunks out of each row, and fetch the vector embeddings by running inference on the sentence transformer
ds_embed = ds_batch.map_batches(
    Embed,
    compute=ray.data.ActorPoolStrategy(size=ACTOR_POOL_SIZE),
    batch_size=BATCH_SIZE,  # Large batch size to maximize GPU utilization.
    num_gpus=1,  # 1 GPU for each actor.
    # num_cpus=1,
)

Retrieve the result data from Ray remote workers:

In [None]:
@ray.remote
def ray_data_task(ds_embed):
    results = []
    for row in ds_embed.iter_rows():
        data_text = row["results"][0][:65535]
        data_emb = row["results"][1]

        results.append((data_text, data_emb))
        
    return results
    
results = ray.get(ray_data_task.remote(ds_embed))

## Writing Results Back to MySQL

Now that we have our vector embeddings, we can write our results back to the MySQL database:

In [None]:
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, String, Text, text
from sqlalchemy.orm import scoped_session, sessionmaker, mapped_column
from pgvector.sqlalchemy import Vector

# initialize parameters

INSTANCE_CONNECTION_NAME = os.environ["CLOUDSQL_INSTANCE_CONNECTION_NAME"]
print(f"Your instance connection name is: {INSTANCE_CONNECTION_NAME}")
DB_NAME = "pgvector-database"

db_username_file = open("/etc/secret-volume/username", "r")
DB_USER = db_username_file.read()
db_username_file.close()

db_password_file = open("/etc/secret-volume/password", "r")
DB_PASS = db_password_file.read()
db_password_file.close()

# initialize Connector object
connector = Connector()

# function to return the database connection object
def getconn():
    conn = connector.connect(
        INSTANCE_CONNECTION_NAME,
        "pg8000",
        user=DB_USER,
        password=DB_PASS,
        db=DB_NAME,
        ip_type=IPTypes.PRIVATE
    )
    return conn

# create connection pool with 'creator' argument to our connection object function
pool = sqlalchemy.create_engine(
    "postgresql+pg8000://",
    creator=getconn,
)

Base = declarative_base()
DBSession = scoped_session(sessionmaker())

class TextEmbedding(Base):
    __tablename__ = TABLE_NAME
    id = Column(String(255), primary_key=True)
    text = Column(Text)
    text_embedding = mapped_column(Vector(384))

with pool.connect() as conn:
    conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector"))
    conn.commit() 
    
DBSession.configure(bind=pool, autoflush=False, expire_on_commit=False)
Base.metadata.drop_all(pool)
Base.metadata.create_all(pool)

rows = []
for r in results:
    id = uuid.uuid4() 
    rows.append(TextEmbedding(id=id, text=r[0], text_embedding=r[1]))

DBSession.bulk_save_objects(rows)
DBSession.commit()

Finally let's verify that our embeddings got stored in the database correctly:

In [None]:
with pool.connect() as db_conn:
  # verify results
  transformer = SentenceTransformer(SENTENCE_TRANSFORMER_MODEL)
  query_text = "During my holiday in Marmaris we ate here to fit the food. It's really good" 
  query_emb = transformer.encode(query_text).tolist()
  query_request = "SELECT id, text, text_embedding, 1 - ('[" + ",".join(map(str, query_emb)) + "]' <=> text_embedding) AS cosine_similarity FROM " + TABLE_NAME + " ORDER BY cosine_similarity DESC LIMIT 5;" 
  query_results = db_conn.execute(sqlalchemy.text(query_request)).fetchall()
  db_conn.commit()
    
  print("print query_results, the 1st one is the hit")
  for row in query_results:
    print(row)

# cleanup connector object
connector.close()