In [None]:
%session_id_prefix delta-sql-
%glue_version 3.0
%idle_timeout 60
%connections 
%%configure 
{
 "--conf": "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog",
 "--extra-py-files": "/tmp/etl-delta-core_2.12-1.0.0.jar" # for custom connector
 # "--extra-py-files": "/tmp/delta-core_2.12-1.0.0.jar" # for marketplace connector
}

In [None]:
bucket_name = ""
bucket_prefix = ""
database_name = "delta_sql"
database_prefix = f"{bucket_prefix}/{database_name}"
database_location = f"s3://{bucket_name}/{database_prefix}/"
table_name = "products"
table_prefix = f"{database_prefix}/{table_name}"
table_location = f"s3://{bucket_name}/{table_prefix}/"

## Clean up existing resources

In [None]:
import boto3

## Delete files in S3
s3 = boto3.resource('s3')
bucket = s3.Bucket(bucket_name)
bucket.objects.filter(Prefix=f"{table_prefix}/").delete()

In [None]:
%%sql
DROP TABLE IF EXISTS delta_sql.products

## Create Delta table with sample data

In [None]:
from pyspark.sql import Row
import time

ut = time.time()

product = [
 {'product_id': '00001', 'product_name': 'Heater', 'price': 250, 'category': 'Electronics', 'updated_at': ut},
 {'product_id': '00002', 'product_name': 'Thermostat', 'price': 400, 'category': 'Electronics', 'updated_at': ut},
 {'product_id': '00003', 'product_name': 'Television', 'price': 600, 'category': 'Electronics', 'updated_at': ut},
 {'product_id': '00004', 'product_name': 'Blender', 'price': 100, 'category': 'Electronics', 'updated_at': ut},
 {'product_id': '00005', 'product_name': 'USB charger', 'price': 50, 'category': 'Electronics', 'updated_at': ut}
]

df_products = spark.createDataFrame(Row(**x) for x in product)

In [None]:
df_products.write.format("delta"). \
 mode("overwrite"). \
 save(table_location)

## Create a Delta Lake table

In [None]:
%%sql
CREATE DATABASE IF NOT EXISTS delta_sql

In [None]:
# create table in metastore
query = f"""
CREATE TABLE {database_name}.{table_name}
USING delta
LOCATION '{table_location}'
"""
spark.sql(query)

In [None]:
%%sql
USE delta_sql

In [None]:
%%sql
SHOW TABLES

## Read from Delta Lake table

In [None]:
%%sql #Read table from metastore
SELECT * FROM delta_sql.products

## Insert records

In [None]:
ut = time.time()
query=f"""INSERT INTO {database_name}.{table_name} VALUES('00006', 'Pen', 30,'Stationery',{ut}), ('00007', 'Book', 500,'Stationery',{ut})"""
spark.sql(query)


In [None]:
%%sql
SELECT * FROM delta_sql.products

## Update records

In [None]:
ut = time.time()
query=f"""UPDATE {database_name}.{table_name} SET price=300, updated_at={ut} WHERE product_id == '00007'"""
spark.sql(query)

In [None]:
%%sql
SELECT * FROM delta_sql.products

## Upsert records

In [None]:
ut = time.time()
product_updates = [
 {'product_id': '00001', 'product_name': 'Heater', 'price': 400, 'category': 'Electronics', 'updated_at': ut}, # Update
 {'product_id': '00008', 'product_name': 'Chair', 'price': 50, 'category': 'Furniture', 'updated_at': ut} # Insert
]
df_product_updates = spark.createDataFrame(Row(**x) for x in product_updates)

df_product_updates.createOrReplaceTempView("tmp_products_updates")

In [None]:
%%sql
MERGE INTO delta_sql.products AS old \
USING tmp_products_updates AS new \
ON old.product_id=new.product_id \
WHEN MATCHED THEN \
UPDATE SET \
 old.product_name=new.product_name, \
 old.price=new.price, \
 old.category=new.category, \
 old.updated_at=new.updated_at \
WHEN NOT MATCHED \
THEN INSERT (product_id, product_name, price,category,updated_at) \
VALUES ( \
 new.product_id, \
 new.product_name, \
 new.price, \
 new.category, \
 new.updated_at \
)

In [None]:
%%sql
SELECT * FROM delta_sql.products

## Alter DeltaLake table

In [None]:
%%sql
ALTER TABLE delta_sql.products ADD COLUMNS (CURRENCY STRING AFTER PRICE)

In [None]:
%%sql
UPDATE delta_sql.products SET CURRENCY ="INR"

In [None]:
%%sql
SELECT * FROM delta_sql.products

## Delete records

In [None]:
%%sql
DELETE FROM delta_sql.products WHERE product_name == "Pen"

In [None]:
%%sql
SELECT * FROM delta_sql.products

## View History

In [None]:
%%sql
DESCRIBE HISTORY delta_sql.products

## Stop Session

In [None]:
%stop_session