In [None]:
%session_id_prefix hudi-dataframe-
%glue_version 3.0
%idle_timeout 60
%connections 
%%configure 
{
 "--conf": "spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false",
}

In [None]:
bucket_name = ""
bucket_prefix = ""
database_name = "hudi_df"
table_name = "product_cow"
table_prefix = f"{bucket_prefix}/{database_name}/{table_name}"
table_location = f"s3://{bucket_name}/{table_prefix}"

## Clean up existing resources

In [None]:
import boto3

## Create a database with the name hudi_df to host hudi tables if not exists.
try:
 glue = boto3.client('glue')
 glue.create_database(DatabaseInput={'Name': database_name})
 print(f"New database {database_name} created")
except glue.exceptions.AlreadyExistsException:
 print(f"Database {database_name} already exist")

## Delete files in S3
s3 = boto3.resource('s3')
bucket = s3.Bucket(bucket_name)
bucket.objects.filter(Prefix=f"{table_prefix}/").delete()

## Drop table in Glue Data Catalog
try:
 glue = boto3.client('glue')
 glue.delete_table(DatabaseName=database_name, Name=table_name)
except glue.exceptions.EntityNotFoundException:
 print(f"Table {database_name}.{table_name} does not exist")


## Create Hudi table with sample data using catalog sync

In [None]:
from pyspark.sql import Row
import time

ut = time.time()

product = [
 {'product_id': '00001', 'product_name': 'Heater', 'price': 250, 'category': 'Electronics', 'updated_at': ut},
 {'product_id': '00002', 'product_name': 'Thermostat', 'price': 400, 'category': 'Electronics', 'updated_at': ut},
 {'product_id': '00003', 'product_name': 'Television', 'price': 600, 'category': 'Electronics', 'updated_at': ut},
 {'product_id': '00004', 'product_name': 'Blender', 'price': 100, 'category': 'Electronics', 'updated_at': ut},
 {'product_id': '00005', 'product_name': 'USB charger', 'price': 50, 'category': 'Electronics', 'updated_at': ut}
]

df_products = spark.createDataFrame(Row(**x) for x in product)

In [None]:
hudi_options = {
 'hoodie.table.name': table_name,
 'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE',
 'hoodie.datasource.write.recordkey.field': 'product_id',
 'hoodie.datasource.write.partitionpath.field': 'category',
 'hoodie.datasource.write.table.name': table_name,
 'hoodie.datasource.write.operation': 'upsert',
 'hoodie.datasource.write.precombine.field': 'updated_at',
 'hoodie.datasource.write.hive_style_partitioning': 'true',
 'hoodie.upsert.shuffle.parallelism': 2,
 'hoodie.insert.shuffle.parallelism': 2,
 'path': table_location,
 'hoodie.datasource.hive_sync.enable': 'true',
 'hoodie.datasource.hive_sync.database': database_name,
 'hoodie.datasource.hive_sync.table': table_name,
 'hoodie.datasource.hive_sync.partition_fields': 'category',
 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
 'hoodie.datasource.hive_sync.use_jdbc': 'false',
 'hoodie.datasource.hive_sync.mode': 'hms'
}


In [None]:
df_products.write.format("hudi") \
 .options(**hudi_options) \
 .mode("overwrite") \
 .save()

## Read from Hudi table

In [None]:
df_products_read = spark.read \
 .format("hudi") \
 .load(table_location)
df_products_read.show()

## Upsert records into Hudi table

In [None]:
ut = time.time()

product_updates = [
 {'product_id': '00001', 'product_name': 'Heater', 'price': 400, 'category': 'Electronics', 'updated_at': ut}, # Update
 {'product_id': '00006', 'product_name': 'Chair', 'price': 50, 'category': 'Furniture', 'updated_at': ut} # Insert
]
df_product_updates = spark.createDataFrame(Row(**x) for x in product_updates)
df_product_updates.write.format("hudi") \
 .options(**hudi_options) \
 .mode("append") \
 .save()

In [None]:
df_product_updates_read = spark.read \
 .format("hudi") \
 .load(table_location)
df_product_updates_read.show()

## Delete a Record
To hard delete a record, you can upsert an empty payload. In this case, the PAYLOAD_CLASS_OPT_KEY option specifies the EmptyHoodieRecordPayload class.

In [None]:
df_delete = df_product_updates_read.where("product_id==00001")

In [None]:
df_delete.write \
 .format("org.apache.hudi") \
 .option("hoodie.datasource.write.payload.class", "org.apache.hudi.common.model.EmptyHoodieRecordPayload") \
 .options(**hudi_options) \
 .mode("append") \
 .save() 

In [None]:
df_product_delete_read = spark.read \
 .format("hudi") \
 .load(table_location)
df_product_delete_read.show()

## Point in time query
Lets look at how to query data as of a specific time. The specific time can be represented by pointing endTime to a specific commit time and beginTime to "000" (denoting earliest possible commit time).

In [None]:
spark.read \
 .format("hudi") \
 .load(table_location) \
 .createOrReplaceTempView("hudi_snapshot")

In [None]:
# store commits history as a list
commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_snapshot order by commitTime").limit(50).collect()))

In [None]:
beginTime = "000" # Represents all commits > this time.
endTime = commits[len(commits) - 2]

# query point in time data
point_in_time_read_options = {
 'hoodie.datasource.query.type': 'incremental',
 'hoodie.datasource.read.end.instanttime': endTime,
 'hoodie.datasource.read.begin.instanttime': beginTime
}

# get the initial table before upsert and delete
df_product_point_in_time_read = spark.read.format("hudi") \
 .options(**point_in_time_read_options) \
 .load(table_location) \
 .show()



## Incremental Query
Hudi also provides capability to obtain a stream of records that changed since given commit timestamp. This can be achieved using Hudi's incremental querying and providing a begin time from which changes need to be streamed. We do not need to specify endTime, if we want all changes after the given commit.

In [None]:
beginTime = commits[len(commits) - 2] # commit time we are interested in

# incrementally query data
incremental_read_options = {
 'hoodie.datasource.query.type': 'incremental',
 'hoodie.datasource.read.begin.instanttime': beginTime
}

df_product_incremental_read = spark.read.format("hudi") \
 .options(**incremental_read_options) \
 .load(table_location) \
 .show()

## Stop Session

In [None]:
%stop_session