{ "cells": [ { "cell_type": "code", "execution_count": null, "id": "1e90b576", "metadata": {}, "outputs": [], "source": [ "%session_id_prefix delta-sql-\n", "%glue_version 3.0\n", "%idle_timeout 60\n", "%connections \n", "%%configure \n", "{\n", " \"--conf\": \"spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog\",\n", " \"--extra-py-files\": \"/tmp/etl-delta-core_2.12-1.0.0.jar\" # for custom connector\n", " # \"--extra-py-files\": \"/tmp/delta-core_2.12-1.0.0.jar\" # for marketplace connector\n", "}" ] }, { "cell_type": "code", "execution_count": null, "id": "9a034a2e", "metadata": {}, "outputs": [], "source": [ "bucket_name = \"\"\n", "bucket_prefix = \"\"\n", "database_name = \"delta_sql\"\n", "database_prefix = f\"{bucket_prefix}/{database_name}\"\n", "database_location = f\"s3://{bucket_name}/{database_prefix}/\"\n", "table_name = \"products\"\n", "table_prefix = f\"{database_prefix}/{table_name}\"\n", "table_location = f\"s3://{bucket_name}/{table_prefix}/\"" ] }, { "cell_type": "markdown", "id": "429b7d93", "metadata": {}, "source": [ "## Clean up existing resources" ] }, { "cell_type": "code", "execution_count": null, "id": "47da9906", "metadata": {}, "outputs": [], "source": [ "import boto3\n", "\n", "## Delete files in S3\n", "s3 = boto3.resource('s3')\n", "bucket = s3.Bucket(bucket_name)\n", "bucket.objects.filter(Prefix=f\"{table_prefix}/\").delete()" ] }, { "cell_type": "code", "execution_count": null, "id": "5eebe198", "metadata": {}, "outputs": [], "source": [ "%%sql\n", "DROP TABLE IF EXISTS delta_sql.products" ] }, { "cell_type": "markdown", "id": "483e0bb9", "metadata": {}, "source": [ "## Create Delta table with sample data" ] }, { "cell_type": "code", "execution_count": null, "id": "6dc864b1", "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import Row\n", "import time\n", "\n", "ut = time.time()\n", "\n", "product = [\n", " {'product_id': '00001', 'product_name': 'Heater', 'price': 250, 'category': 'Electronics', 'updated_at': ut},\n", " {'product_id': '00002', 'product_name': 'Thermostat', 'price': 400, 'category': 'Electronics', 'updated_at': ut},\n", " {'product_id': '00003', 'product_name': 'Television', 'price': 600, 'category': 'Electronics', 'updated_at': ut},\n", " {'product_id': '00004', 'product_name': 'Blender', 'price': 100, 'category': 'Electronics', 'updated_at': ut},\n", " {'product_id': '00005', 'product_name': 'USB charger', 'price': 50, 'category': 'Electronics', 'updated_at': ut}\n", "]\n", "\n", "df_products = spark.createDataFrame(Row(**x) for x in product)" ] }, { "cell_type": "code", "execution_count": null, "id": "92a476a4", "metadata": {}, "outputs": [], "source": [ "df_products.write.format(\"delta\"). \\\n", " mode(\"overwrite\"). \\\n", " save(table_location)" ] }, { "cell_type": "markdown", "id": "3f184165", "metadata": {}, "source": [ "## Create a Delta Lake table" ] }, { "cell_type": "code", "execution_count": null, "id": "6cd86cb5", "metadata": {}, "outputs": [], "source": [ "%%sql\n", "CREATE DATABASE IF NOT EXISTS delta_sql" ] }, { "cell_type": "code", "execution_count": null, "id": "446bebbd", "metadata": {}, "outputs": [], "source": [ "# create table in metastore\n", "query = f\"\"\"\n", "CREATE TABLE {database_name}.{table_name}\n", "USING delta\n", "LOCATION '{table_location}'\n", "\"\"\"\n", "spark.sql(query)" ] }, { "cell_type": "code", "execution_count": null, "id": "a9736c18", "metadata": {}, "outputs": [], "source": [ "%%sql\n", "USE delta_sql" ] }, { "cell_type": "code", "execution_count": null, "id": "ae6a7942", "metadata": {}, "outputs": [], "source": [ "%%sql\n", "SHOW TABLES" ] }, { "cell_type": "markdown", "id": "50a6e89e", "metadata": {}, "source": [ "## Read from Delta Lake table" ] }, { "cell_type": "code", "execution_count": null, "id": "47777845", "metadata": {}, "outputs": [], "source": [ "%%sql #Read table from metastore\n", "SELECT * FROM delta_sql.products" ] }, { "cell_type": "markdown", "id": "948f6bf4", "metadata": {}, "source": [ "## Insert records" ] }, { "cell_type": "code", "execution_count": null, "id": "af3b1204", "metadata": {}, "outputs": [], "source": [ "ut = time.time()\n", "query=f\"\"\"INSERT INTO {database_name}.{table_name} VALUES('00006', 'Pen', 30,'Stationery',{ut}), ('00007', 'Book', 500,'Stationery',{ut})\"\"\"\n", "spark.sql(query)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "c1b8c7a4", "metadata": {}, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM delta_sql.products" ] }, { "cell_type": "markdown", "id": "62c4d603", "metadata": {}, "source": [ "## Update records" ] }, { "cell_type": "code", "execution_count": null, "id": "b6d2c312", "metadata": {}, "outputs": [], "source": [ "ut = time.time()\n", "query=f\"\"\"UPDATE {database_name}.{table_name} SET price=300, updated_at={ut} WHERE product_id == '00007'\"\"\"\n", "spark.sql(query)" ] }, { "cell_type": "code", "execution_count": null, "id": "79e66ede", "metadata": {}, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM delta_sql.products" ] }, { "cell_type": "markdown", "id": "2749314e", "metadata": {}, "source": [ "## Upsert records" ] }, { "cell_type": "code", "execution_count": null, "id": "5dc1d8aa", "metadata": {}, "outputs": [], "source": [ "ut = time.time()\n", "product_updates = [\n", " {'product_id': '00001', 'product_name': 'Heater', 'price': 400, 'category': 'Electronics', 'updated_at': ut}, # Update\n", " {'product_id': '00008', 'product_name': 'Chair', 'price': 50, 'category': 'Furniture', 'updated_at': ut} # Insert\n", "]\n", "df_product_updates = spark.createDataFrame(Row(**x) for x in product_updates)\n", "\n", "df_product_updates.createOrReplaceTempView(\"tmp_products_updates\")" ] }, { "cell_type": "code", "execution_count": null, "id": "791e4cd3", "metadata": {}, "outputs": [], "source": [ "%%sql\n", "MERGE INTO delta_sql.products AS old \\\n", "USING tmp_products_updates AS new \\\n", "ON old.product_id=new.product_id \\\n", "WHEN MATCHED THEN \\\n", "UPDATE SET \\\n", " old.product_name=new.product_name, \\\n", " old.price=new.price, \\\n", " old.category=new.category, \\\n", " old.updated_at=new.updated_at \\\n", "WHEN NOT MATCHED \\\n", "THEN INSERT (product_id, product_name, price,category,updated_at) \\\n", "VALUES ( \\\n", " new.product_id, \\\n", " new.product_name, \\\n", " new.price, \\\n", " new.category, \\\n", " new.updated_at \\\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "002f9201", "metadata": {}, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM delta_sql.products" ] }, { "cell_type": "markdown", "id": "324a5cb1", "metadata": {}, "source": [ "## Alter DeltaLake table" ] }, { "cell_type": "code", "execution_count": null, "id": "2677931e", "metadata": {}, "outputs": [], "source": [ "%%sql\n", "ALTER TABLE delta_sql.products ADD COLUMNS (CURRENCY STRING AFTER PRICE)" ] }, { "cell_type": "code", "execution_count": null, "id": "caeab22e", "metadata": {}, "outputs": [], "source": [ "%%sql\n", "UPDATE delta_sql.products SET CURRENCY =\"INR\"" ] }, { "cell_type": "code", "execution_count": null, "id": "203b5f1d", "metadata": {}, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM delta_sql.products" ] }, { "cell_type": "markdown", "id": "a4fb1043", "metadata": {}, "source": [ "## Delete records" ] }, { "cell_type": "code", "execution_count": null, "id": "6133b28a", "metadata": {}, "outputs": [], "source": [ "%%sql\n", "DELETE FROM delta_sql.products WHERE product_name == \"Pen\"" ] }, { "cell_type": "code", "execution_count": null, "id": "96a6924e", "metadata": {}, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM delta_sql.products" ] }, { "cell_type": "markdown", "id": "712ba1d3", "metadata": {}, "source": [ "## View History" ] }, { "cell_type": "code", "execution_count": null, "id": "076409de", "metadata": {}, "outputs": [], "source": [ "%%sql\n", "DESCRIBE HISTORY delta_sql.products" ] }, { "cell_type": "markdown", "id": "0c51d9c6", "metadata": {}, "source": [ "## Stop Session" ] }, { "cell_type": "code", "execution_count": null, "id": "f0b4eed8", "metadata": {}, "outputs": [], "source": [ "%stop_session" ] } ], "metadata": { "kernelspec": { "display_name": "Glue PySpark", "language": "python", "name": "glue_pyspark" } }, "nbformat": 4, "nbformat_minor": 5 }