{ "cells": [ { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "ceabfbf1-8128-4e50-ac0e-e7611cb071df", "showTitle": false, "title": "" } }, "source": [ "This notebook is available at https://github.com/databricks-industry-solutions/review-summarisation. For more information about this solution accelerator, check out our [website](https://www.databricks.com/solutions/accelerators/large-language-models-retail) and [blog post](https://www.databricks.com/blog/automated-analysis-product-reviews-using-large-language-models-llms)." ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "1524fe84-efbe-499b-9a26-db5bed6b0796", "showTitle": false, "title": "" } }, "source": [ "# Data Download\n", "We begin our project with doing the necessary data setup and downloading the dataset we need. \n", "\n", "The online retail giant [Amazon's Product Reviews](https://cseweb.ucsd.edu/~jmcauley/datasets/amazon_v2/) are publicly available via an easily downloadable route. Each row in the dataset equates a review written by a user, and also has other data points such as star ratings which we will get to explore later.. \n", "\n", "---\n", "\n", "**Setup Used:**\n", "\n", "- Runtime: 13.2 ML\n", "- Cluster:\n", " - Machine: 16 CPU + 64 GB RAM (For Driver & Worker) \n", " - 8 Workers" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "c17d91f4-4f7c-43a3-93dd-4ffe7db2f793", "showTitle": false, "title": "" } }, "source": [ "### Initial Setup\n", "\n", "Before this step, please take a look at `./config.py` which can be found in the main directory to ensure that you have the right configuration.\n", "\n", "Setting up the necessary data holding objects such as Catalogs, Databases or Volumes are a great way to start projects on Databricks. These help us organize our assets with ease.\n", "\n", "Given this, we will use the next few cells of code to create a Catalog, a Database (Schema) within that Catalog which will hold our tables, and also a Volume which will hold our files.\n", "\n", "_If Unity Catalog is not yet enabled on your workspace, please follow the instructions for alternatives. It is not required for this project_" ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "implicitDf": true, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "6922837c-875e-43b0-a34f-600f96e691b2", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "# Imports\n", "from config import CATALOG_NAME, SCHEMA_NAME, USE_UC, USE_VOLUMES\n", "\n", "# If UC is enabled\n", "if USE_UC:\n", " # Creating a Catalog (Optional, skip if no-UC)\n", " _ = spark.sql(f\"CREATE CATALOG IF NOT EXISTS {CATALOG_NAME};\")\n", "\n", " # Select the Catalog as Default for this Notebook\n", " _ = spark.sql(f\"USE CATALOG {CATALOG_NAME};\")\n", "\n", " # Grant permissions so that all users can use this accelerator\n", " _ = spark.sql(f\"GRANT ALL PRIVILEGES ON CATALOG {CATALOG_NAME} TO `account users`;\")\n", "\n", "# Create a Database\n", "_ = spark.sql(f\"CREATE SCHEMA IF NOT EXISTS {SCHEMA_NAME};\")\n", "\n", "# Select the Database as Default\n", "_ = spark.sql(f\"USE SCHEMA {SCHEMA_NAME};\")\n", "\n", "# If Volumes are enabled\n", "if USE_VOLUMES:\n", " # Create a Volume\n", " _ = spark.sql(\"CREATE VOLUME IF NOT EXISTS data_store;\")\n", " " ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "15700134-fd9b-483b-9fbd-85f9aab3c00f", "showTitle": false, "title": "" } }, "source": [ "### Setting Up Paths\n", "\n", "We will now set up our paths, which we will use while downloading and storing the data. This code will give you the option to select a `dbfs` path or any other path you might want to use for storing the raw files." ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "66bd2ba1-0a0b-4a55-80d1-24b0a904fadc", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "# Import the OS system to declare a ENV variable\n", "from config import MAIN_STORAGE_PATH, MAIN_DATA_PATH\n", "import os\n", "\n", "# Setting up the storage path (please edit this if you would like to store the data somewhere else)\n", "main_storage_path = f\"{MAIN_STORAGE_PATH}/data_store\"\n", "main_data_path = f\"{MAIN_DATA_PATH}/data_store\"\n", "\n", "# Declaring as an Environment Variable \n", "os.environ[\"MAIN_STORAGE_PATH\"] = main_storage_path" ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "c5fba014-4353-40ea-a0e1-7bd52f59b7ff", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "%sh\n", "\n", "# Confirming the variable made it through\n", "echo $MAIN_STORAGE_PATH" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "be7432bd-278a-43e7-8527-39930e69827e", "showTitle": false, "title": "" } }, "source": [ "### Downloading the Data\n", "Now, we can download the data from the public registry.. There are many datasets which are available in this source. They are grouped by category such as Books or Cameras. For this use case, we will focus on the books dataset as we might see reviews about the books we have read before.\n", "\n", "These datasets are in the form of compressed JSON. Our first task is going to be to download and unzip the data in the main location we have predefined, and we are going to execute this within a shell script, using the `curl` utility for download.\n", "\n", "_This part might take about 12 minutes_" ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "f511c1f8-f1b2-4342-a545-5f9cf1015408", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "%sh\n", "\n", "# Create a new folder in storage\n", "export AMAZON_REVIEWS_FOLDER=$MAIN_STORAGE_PATH/amazon_reviews\n", "mkdir -p $AMAZON_REVIEWS_FOLDER\n", "\n", "# Create a temporary folder on local disk\n", "export TMP_DATA_FOLDER=/local_disk0/tmp_data_save\n", "mkdir -p $TMP_DATA_FOLDER\n", "\n", "# Move to temp folder\n", "cd $TMP_DATA_FOLDER\n", "\n", "# Download the data\n", "curl -sO https://datarepo.eng.ucsd.edu/mcauley_group/data/amazon_v2/categoryFiles/Books.json.gz &\n", "curl -sO https://datarepo.eng.ucsd.edu/mcauley_group/data/amazon_v2/metaFiles2/meta_Books.json.gz &\n", "wait\n", "echo Download Complete\n", "\n", "# Unzip \n", "gunzip Books.json.gz &\n", "gunzip meta_Books.json.gz &\n", "wait\n", "echo Unzipping Complete\n", "\n", "# Copy to Target\n", "cp Books.json $AMAZON_REVIEWS_FOLDER/books.json &\n", "cp meta_Books.json $AMAZON_REVIEWS_FOLDER/meta_books.json &\n", "wait\n", "echo Copying Complete\n", "\n", "# Display whats there\n", "du -ah $AMAZON_REVIEWS_FOLDER" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "37251b67-861a-4004-b19c-5bdb9dfb9039", "showTitle": false, "title": "" } }, "source": [ "#### Quick View on Data\n", "\n", "At this point, we downloaded two datasets from the source:\n", "- `meta_books.json` Contains data about the products (metadata) such as title, price, ID..\n", "- `books.json` Contains the actual reviews on the products.\n", "\n", "\n", "Lets take a quick look into how many rows we have in each dataset, and what the data looks like" ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "d052100b-7a74-4315-ad76-7906bb87acb7", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "%sh\n", "\n", "# Get a count of total reviews\n", "echo -e \"Reviews Count\" \n", "wc -l < $MAIN_STORAGE_PATH/amazon_reviews/books.json\n", "\n", "# Get a count of products (metadata)\n", "echo -e \"\\nMetadata Count\"\n", "wc -l < $MAIN_STORAGE_PATH/amazon_reviews/meta_books.json" ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "4b63ff24-a69a-4ee4-af75-bd99f6790107", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "%sh \n", "\n", "# Preview Reviews\n", "echo -e \"Reviews Example\"\n", "head -n 1 $MAIN_STORAGE_PATH/amazon_reviews/books.json\n", "\n", "# Preview Metadata (Books)\n", "echo -e \"\\nMetadata Example\"\n", "head -n 1 $MAIN_STORAGE_PATH/amazon_reviews/meta_books.json" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "e69fb93a-5cf7-4494-b193-db2ab9b151fc", "showTitle": false, "title": "" } }, "source": [ "#### Reading as PySpark Dataframes\n", "\n", "Our data is in JSON format, and from the above example, we can see what the structure of the JSON looks like. We can move on the creating schemas for each datasets and then read them as PySpark Dataframes." ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "512aae34-d1f9-4e33-921b-8df74c69d565", "showTitle": false, "title": "" } }, "source": [ "#### Reviews Table\n", "\n", "This table holds the reviews received by customers. We define a schema for it by using the information we got from above and then use spark for reading it." ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "11afe1b7-85d3-4fbe-9743-7b373998600b", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "# Imports\n", "from pyspark.sql.types import (\n", " StructType,\n", " StructField,\n", " StringType,\n", " FloatType,\n", " BooleanType,\n", " IntegerType,\n", " LongType,\n", ")\n", "\n", "# Define the reviews JSON schema\n", "reviews_schema = StructType(\n", " [\n", " StructField(\"overall\", FloatType(), True),\n", " StructField(\"verified\", BooleanType(), True),\n", " StructField(\"reviewTime\", StringType(), True),\n", " StructField(\"reviewerID\", StringType(), True),\n", " StructField(\"asin\", StringType(), True),\n", " StructField(\"reviewerName\", StringType(), True),\n", " StructField(\"reviewText\", StringType(), True),\n", " StructField(\"summary\", StringType(), True),\n", " StructField(\"unixReviewTime\", LongType(), True),\n", " ]\n", ")\n", "\n", "# Read the JSON file\n", "raw_reviews_df = spark.read.json(\n", " f\"{main_data_path}/amazon_reviews/books.json\",\n", " mode=\"DROPMALFORMED\",\n", " schema=reviews_schema\n", ")\n", "\n", "# Repartition\n", "raw_reviews_df = raw_reviews_df.repartition(128)\n", "\n", "# Get count\n", "print(f\"Table row count: {raw_reviews_df.count()}\")\n", "\n", "# Display\n", "display(raw_reviews_df.limit(2))\n" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "8e5b679e-9ecb-4bc8-bd78-1cf77ee4a326", "showTitle": false, "title": "" } }, "source": [ "#### Books Table\n", "\n", "This table holds metadata for the books such as author, price, etc.. We follow the same process from above." ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "b63635c7-13e5-4414-8e98-3a4aad101477", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "# Imports\n", "from pyspark.sql.types import (\n", " StructType,\n", " StructField,\n", " StringType,\n", " ArrayType,\n", " BooleanType,\n", ")\n", "\n", "# Define the books JSON schema\n", "books_schema_schema = StructType(\n", " [\n", " StructField(\"category\", ArrayType(StringType()), True),\n", " StructField(\"tech1\", StringType(), True),\n", " StructField(\"description\", ArrayType(StringType()), True),\n", " StructField(\"fit\", StringType(), True),\n", " StructField(\"title\", StringType(), True),\n", " StructField(\"also_buy\", ArrayType(StringType()), True),\n", " StructField(\"tech2\", StringType(), True),\n", " StructField(\"brand\", StringType(), True),\n", " StructField(\"feature\", ArrayType(StringType()), True),\n", " StructField(\"rank\", StringType(), True),\n", " StructField(\"also_view\", ArrayType(StringType()), True),\n", " StructField(\"main_cat\", StringType(), True),\n", " StructField(\"similar_item\", StringType(), True),\n", " StructField(\"date\", StringType(), True),\n", " StructField(\"price\", StringType(), True),\n", " StructField(\"asin\", StringType(), True),\n", " StructField(\"imageURL\", ArrayType(StringType()), True),\n", " StructField(\"imageURLHighRes\", ArrayType(StringType()), True),\n", " ]\n", ")\n", "\n", "# Read the JSON file\n", "raw_books_df = spark.read.json(\n", " f\"{main_data_path}/amazon_reviews/meta_books.json\",\n", " mode=\"DROPMALFORMED\",\n", " schema=books_schema_schema,\n", ")\n", "\n", "# Get row count\n", "print(f\"Table row count: {raw_books_df.count()}\")\n", "\n", "# Repartition\n", "raw_books_df = raw_books_df.repartition(128)\n", "\n", "# Display\n", "display(raw_books_df.limit(2))" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "bd2bd4a4-8e41-4cda-84f3-8151458c6388", "showTitle": false, "title": "" } }, "source": [ "### Joining Two Tables\n", "By having a quick look above, we can tell that the data is in the format we expected it to be.. There are some columns which look redundant in the products (metadata) table, however we can deal with those in the next notebook where we will do pre-processing & exploration work.\n", "\n", "Whats also important is that the row counts of the dataframes are matching with the counts we got with our shell command, which means that we do not have any malformed records or data loss in the read process.\n", "\n", "Lets go ahead and join the two tables together to create a `book_reviews_df` which will have both metadata and reviews in a single place. We expect the row count of this one to be the same as the reviews row count if there are no mismatches..\n", "\n", "We can use the `asin` column to join, which is the id of the products, and execute an inner join" ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "f38ab912-8415-4835-ad28-413a2cf7a1cd", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "# Join and Create the new df\n", "raw_book_reviews_df = raw_books_df.join(raw_reviews_df, how=\"inner\", on=[\"asin\"])\n", "\n", "# Partition\n", "raw_book_reviews_df = raw_book_reviews_df.repartition(128)\n", "\n", "# Get a count\n", "print(f\"DF row count: {raw_book_reviews_df.count()}\")\n", "\n", "# Display the dataframe\n", "display(raw_book_reviews_df.limit(2))" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "9196ffda-2f0c-4fb8-90e5-ec3071c122f9", "showTitle": false, "title": "" } }, "source": [ "It looks like the number of rows have increased! This means we have some duplicates in the data which we will deal with in the next section." ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "7c696790-6684-416f-9535-60a84e9a8f9f", "showTitle": false, "title": "" } }, "source": [ "### Save All Dataframes\n", "\n", "Final step is to save all of the dataframes we have as Delta tables, in the specific Schema we have created at the very top of this notebook.\n", "\n", "Even though we will probably only need the `raw_book_reviews` dataframe in the next sections, it is important to save the other two as well just in case we need to go back to them at some points.\n", "\n", "In the following section, we will specify some code to save. We do not need to specify the schema name since we have already done so at the very top of the notebook with the `USE SCHEMA` SQL command.\n", "\n", "We will also get to run an `OPTIMISE` command to ensure that the data is layed out in an optimal way in our lake." ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "8c011a65-9e3f-4ca4-ac44-fae7da60add1", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "# Save Raw Reviews\n", "(\n", " raw_reviews_df\n", " .write\n", " .mode(\"overwrite\")\n", " .option(\"overwriteSchema\", \"true\")\n", " .saveAsTable(\"raw_reviews\")\n", ")\n", "\n", "# Save Raw Books\n", "(\n", " raw_books_df\n", " .write\n", " .mode(\"overwrite\")\n", " .option(\"overwriteSchema\", \"true\")\n", " .saveAsTable(\"raw_books\")\n", ")\n", "\n", "# Save Book Reviews\n", "(\n", " raw_book_reviews_df\n", " .write\n", " .mode(\"overwrite\")\n", " .option(\"overwriteSchema\", \"true\")\n", " .saveAsTable(\"raw_book_reviews\")\n", ")\n", "\n", "# Optimize All\n", "_ = spark.sql(\"OPTIMIZE raw_reviews;\")\n", "_ = spark.sql(\"OPTIMIZE raw_books;\")\n", "_ = spark.sql(\"OPTIMIZE raw_book_reviews;\")" ] } ], "metadata": { "application/vnd.databricks.v1+notebook": { "dashboards": [], "language": "python", "notebookMetadata": { "mostRecentlyExecutedCommandWithImplicitDF": { "commandId": 3605256645507561, "dataframes": [ "_sqldf" ] }, "pythonIndentUnit": 4 }, "notebookName": "01-data-download", "widgets": {} }, "language_info": { "name": "python" } }, "nbformat": 4, "nbformat_minor": 0 }