This notebook is available at https://github.com/databricks-industry-solutions/review-summarisation. For more information about this solution accelerator, check out our [website](https://www.databricks.com/solutions/accelerators/large-language-models-retail) and [blog post](https://www.databricks.com/blog/automated-analysis-product-reviews-using-large-language-models-llms).

# Data Download
We begin our project with doing the necessary data setup and downloading the dataset we need. 

The online retail giant [Amazon's Product Reviews](https://cseweb.ucsd.edu/~jmcauley/datasets/amazon_v2/) are publicly available via an easily downloadable route. Each row in the dataset equates a review written by a user, and also has other data points such as star ratings which we will get to explore later.. 

---

**Setup Used:**

- Runtime: 13.2 ML
- Cluster:
 - Machine: 16 CPU + 64 GB RAM (For Driver & Worker) 
 - 8 Workers

### Initial Setup

Before this step, please take a look at `./config.py` which can be found in the main directory to ensure that you have the right configuration.

Setting up the necessary data holding objects such as Catalogs, Databases or Volumes are a great way to start projects on Databricks. These help us organize our assets with ease.

Given this, we will use the next few cells of code to create a Catalog, a Database (Schema) within that Catalog which will hold our tables, and also a Volume which will hold our files.

_If Unity Catalog is not yet enabled on your workspace, please follow the instructions for alternatives. It is not required for this project_

In [0]:
# Imports
from config import CATALOG_NAME, SCHEMA_NAME, USE_UC, USE_VOLUMES

# If UC is enabled
if USE_UC:
 # Creating a Catalog (Optional, skip if no-UC)
 _ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {CATALOG_NAME};")

 # Select the Catalog as Default for this Notebook
 _ = spark.sql(f"USE CATALOG {CATALOG_NAME};")

 # Grant permissions so that all users can use this accelerator
 _ = spark.sql(f"GRANT ALL PRIVILEGES ON CATALOG {CATALOG_NAME} TO `account users`;")

# Create a Database
_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA_NAME};")

# Select the Database as Default
_ = spark.sql(f"USE SCHEMA {SCHEMA_NAME};")

# If Volumes are enabled
if USE_VOLUMES:
 # Create a Volume
 _ = spark.sql("CREATE VOLUME IF NOT EXISTS data_store;")
 

### Setting Up Paths

We will now set up our paths, which we will use while downloading and storing the data. This code will give you the option to select a `dbfs` path or any other path you might want to use for storing the raw files.

In [0]:
# Import the OS system to declare a ENV variable
from config import MAIN_STORAGE_PATH, MAIN_DATA_PATH
import os

# Setting up the storage path (please edit this if you would like to store the data somewhere else)
main_storage_path = f"{MAIN_STORAGE_PATH}/data_store"
main_data_path = f"{MAIN_DATA_PATH}/data_store"

# Declaring as an Environment Variable 
os.environ["MAIN_STORAGE_PATH"] = main_storage_path

In [0]:
%sh

# Confirming the variable made it through
echo $MAIN_STORAGE_PATH

### Downloading the Data
Now, we can download the data from the public registry.. There are many datasets which are available in this source. They are grouped by category such as Books or Cameras. For this use case, we will focus on the books dataset as we might see reviews about the books we have read before.

These datasets are in the form of compressed JSON. Our first task is going to be to download and unzip the data in the main location we have predefined, and we are going to execute this within a shell script, using the `curl` utility for download.

_This part might take about 12 minutes_

In [0]:
%sh

# Create a new folder in storage
export AMAZON_REVIEWS_FOLDER=$MAIN_STORAGE_PATH/amazon_reviews
mkdir -p $AMAZON_REVIEWS_FOLDER

# Create a temporary folder on local disk
export TMP_DATA_FOLDER=/local_disk0/tmp_data_save
mkdir -p $TMP_DATA_FOLDER

# Move to temp folder
cd $TMP_DATA_FOLDER

# Download the data
curl -sO https://datarepo.eng.ucsd.edu/mcauley_group/data/amazon_v2/categoryFiles/Books.json.gz &
curl -sO https://datarepo.eng.ucsd.edu/mcauley_group/data/amazon_v2/metaFiles2/meta_Books.json.gz &
wait
echo Download Complete

# Unzip 
gunzip Books.json.gz &
gunzip meta_Books.json.gz &
wait
echo Unzipping Complete

# Copy to Target
cp Books.json $AMAZON_REVIEWS_FOLDER/books.json &
cp meta_Books.json $AMAZON_REVIEWS_FOLDER/meta_books.json &
wait
echo Copying Complete

# Display whats there
du -ah $AMAZON_REVIEWS_FOLDER

#### Quick View on Data

At this point, we downloaded two datasets from the source:
- `meta_books.json` Contains data about the products (metadata) such as title, price, ID..
- `books.json` Contains the actual reviews on the products.


Lets take a quick look into how many rows we have in each dataset, and what the data looks like

In [0]:
%sh

# Get a count of total reviews
echo -e "Reviews Count" 
wc -l < $MAIN_STORAGE_PATH/amazon_reviews/books.json

# Get a count of products (metadata)
echo -e "\nMetadata Count"
wc -l < $MAIN_STORAGE_PATH/amazon_reviews/meta_books.json

In [0]:
%sh 

# Preview Reviews
echo -e "Reviews Example"
head -n 1 $MAIN_STORAGE_PATH/amazon_reviews/books.json

# Preview Metadata (Books)
echo -e "\nMetadata Example"
head -n 1 $MAIN_STORAGE_PATH/amazon_reviews/meta_books.json

#### Reading as PySpark Dataframes

Our data is in JSON format, and from the above example, we can see what the structure of the JSON looks like. We can move on the creating schemas for each datasets and then read them as PySpark Dataframes.

#### Reviews Table

This table holds the reviews received by customers. We define a schema for it by using the information we got from above and then use spark for reading it.

In [0]:
# Imports
from pyspark.sql.types import (
 StructType,
 StructField,
 StringType,
 FloatType,
 BooleanType,
 IntegerType,
 LongType,
)

# Define the reviews JSON schema
reviews_schema = StructType(
 [
 StructField("overall", FloatType(), True),
 StructField("verified", BooleanType(), True),
 StructField("reviewTime", StringType(), True),
 StructField("reviewerID", StringType(), True),
 StructField("asin", StringType(), True),
 StructField("reviewerName", StringType(), True),
 StructField("reviewText", StringType(), True),
 StructField("summary", StringType(), True),
 StructField("unixReviewTime", LongType(), True),
 ]
)

# Read the JSON file
raw_reviews_df = spark.read.json(
 f"{main_data_path}/amazon_reviews/books.json",
 mode="DROPMALFORMED",
 schema=reviews_schema
)

# Repartition
raw_reviews_df = raw_reviews_df.repartition(128)

# Get count
print(f"Table row count: {raw_reviews_df.count()}")

# Display
display(raw_reviews_df.limit(2))


#### Books Table

This table holds metadata for the books such as author, price, etc.. We follow the same process from above.

In [0]:
# Imports
from pyspark.sql.types import (
 StructType,
 StructField,
 StringType,
 ArrayType,
 BooleanType,
)

# Define the books JSON schema
books_schema_schema = StructType(
 [
 StructField("category", ArrayType(StringType()), True),
 StructField("tech1", StringType(), True),
 StructField("description", ArrayType(StringType()), True),
 StructField("fit", StringType(), True),
 StructField("title", StringType(), True),
 StructField("also_buy", ArrayType(StringType()), True),
 StructField("tech2", StringType(), True),
 StructField("brand", StringType(), True),
 StructField("feature", ArrayType(StringType()), True),
 StructField("rank", StringType(), True),
 StructField("also_view", ArrayType(StringType()), True),
 StructField("main_cat", StringType(), True),
 StructField("similar_item", StringType(), True),
 StructField("date", StringType(), True),
 StructField("price", StringType(), True),
 StructField("asin", StringType(), True),
 StructField("imageURL", ArrayType(StringType()), True),
 StructField("imageURLHighRes", ArrayType(StringType()), True),
 ]
)

# Read the JSON file
raw_books_df = spark.read.json(
 f"{main_data_path}/amazon_reviews/meta_books.json",
 mode="DROPMALFORMED",
 schema=books_schema_schema,
)

# Get row count
print(f"Table row count: {raw_books_df.count()}")

# Repartition
raw_books_df = raw_books_df.repartition(128)

# Display
display(raw_books_df.limit(2))

### Joining Two Tables
By having a quick look above, we can tell that the data is in the format we expected it to be.. There are some columns which look redundant in the products (metadata) table, however we can deal with those in the next notebook where we will do pre-processing & exploration work.

Whats also important is that the row counts of the dataframes are matching with the counts we got with our shell command, which means that we do not have any malformed records or data loss in the read process.

Lets go ahead and join the two tables together to create a `book_reviews_df` which will have both metadata and reviews in a single place. We expect the row count of this one to be the same as the reviews row count if there are no mismatches..

We can use the `asin` column to join, which is the id of the products, and execute an inner join

In [0]:
# Join and Create the new df
raw_book_reviews_df = raw_books_df.join(raw_reviews_df, how="inner", on=["asin"])

# Partition
raw_book_reviews_df = raw_book_reviews_df.repartition(128)

# Get a count
print(f"DF row count: {raw_book_reviews_df.count()}")

# Display the dataframe
display(raw_book_reviews_df.limit(2))

It looks like the number of rows have increased! This means we have some duplicates in the data which we will deal with in the next section.

### Save All Dataframes

Final step is to save all of the dataframes we have as Delta tables, in the specific Schema we have created at the very top of this notebook.

Even though we will probably only need the `raw_book_reviews` dataframe in the next sections, it is important to save the other two as well just in case we need to go back to them at some points.

In the following section, we will specify some code to save. We do not need to specify the schema name since we have already done so at the very top of the notebook with the `USE SCHEMA` SQL command.

We will also get to run an `OPTIMISE` command to ensure that the data is layed out in an optimal way in our lake.

In [0]:
# Save Raw Reviews
(
 raw_reviews_df
 .write
 .mode("overwrite")
 .option("overwriteSchema", "true")
 .saveAsTable("raw_reviews")
)

# Save Raw Books
(
 raw_books_df
 .write
 .mode("overwrite")
 .option("overwriteSchema", "true")
 .saveAsTable("raw_books")
)

# Save Book Reviews
(
 raw_book_reviews_df
 .write
 .mode("overwrite")
 .option("overwriteSchema", "true")
 .saveAsTable("raw_book_reviews")
)

# Optimize All
_ = spark.sql("OPTIMIZE raw_reviews;")
_ = spark.sql("OPTIMIZE raw_books;")
_ = spark.sql("OPTIMIZE raw_book_reviews;")