{ "cells": [ { "cell_type": "code", "execution_count": null, "id": "ec9c4962-e6d5-4029-9913-52dfd34eefd2", "metadata": { "id": "ec9c4962-e6d5-4029-9913-52dfd34eefd2", "outputId": "cb6298b3-cf2f-486c-b30b-339f3e2bb959", "trusted": true }, "outputs": [], "source": [ "%session_id_prefix hudi-dataframe-\n", "%glue_version 3.0\n", "%idle_timeout 60\n", "%connections \n", "%%configure \n", "{\n", " \"--conf\": \"spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false\",\n", "}" ] }, { "cell_type": "code", "execution_count": null, "id": "74b7200d-2175-4f2f-b7ff-569ce57fd192", "metadata": { "id": "74b7200d-2175-4f2f-b7ff-569ce57fd192", "outputId": "59a307cc-ed32-406f-afc8-5bebee80bb39", "trusted": true }, "outputs": [], "source": [ "bucket_name = \"\"\n", "bucket_prefix = \"\"\n", "database_name = \"hudi_df\"\n", "table_name = \"product_cow\"\n", "table_prefix = f\"{bucket_prefix}/{database_name}/{table_name}\"\n", "table_location = f\"s3://{bucket_name}/{table_prefix}\"" ] }, { "cell_type": "markdown", "id": "7e7eab99-9d36-4b5b-8eb0-d7b935351750", "metadata": { "id": "7e7eab99-9d36-4b5b-8eb0-d7b935351750" }, "source": [ "## Clean up existing resources" ] }, { "cell_type": "code", "execution_count": null, "id": "62dc44d1-4c48-4f24-bfce-60637972914b", "metadata": { "id": "62dc44d1-4c48-4f24-bfce-60637972914b", "outputId": "c26902eb-9339-4bbe-88b2-d81cebd5f699", "trusted": true }, "outputs": [], "source": [ "import boto3\n", "\n", "## Create a database with the name hudi_df to host hudi tables if not exists.\n", "try:\n", " glue = boto3.client('glue')\n", " glue.create_database(DatabaseInput={'Name': database_name})\n", " print(f\"New database {database_name} created\")\n", "except glue.exceptions.AlreadyExistsException:\n", " print(f\"Database {database_name} already exist\")\n", "\n", "## Delete files in S3\n", "s3 = boto3.resource('s3')\n", "bucket = s3.Bucket(bucket_name)\n", "bucket.objects.filter(Prefix=f\"{table_prefix}/\").delete()\n", "\n", "## Drop table in Glue Data Catalog\n", "try:\n", " glue = boto3.client('glue')\n", " glue.delete_table(DatabaseName=database_name, Name=table_name)\n", "except glue.exceptions.EntityNotFoundException:\n", " print(f\"Table {database_name}.{table_name} does not exist\")\n" ] }, { "cell_type": "markdown", "id": "08706080-9af8-4721-bdfa-0872e0407c68", "metadata": { "id": "08706080-9af8-4721-bdfa-0872e0407c68" }, "source": [ "## Create Hudi table with sample data using catalog sync" ] }, { "cell_type": "code", "execution_count": null, "id": "1d241e37-0ab5-4e1d-9ec1-fd428bc865e8", "metadata": { "id": "1d241e37-0ab5-4e1d-9ec1-fd428bc865e8", "outputId": "21c3354e-da8d-48de-8a76-6f73b24317dc", "trusted": true }, "outputs": [], "source": [ "from pyspark.sql import Row\n", "import time\n", "\n", "ut = time.time()\n", "\n", "product = [\n", " {'product_id': '00001', 'product_name': 'Heater', 'price': 250, 'category': 'Electronics', 'updated_at': ut},\n", " {'product_id': '00002', 'product_name': 'Thermostat', 'price': 400, 'category': 'Electronics', 'updated_at': ut},\n", " {'product_id': '00003', 'product_name': 'Television', 'price': 600, 'category': 'Electronics', 'updated_at': ut},\n", " {'product_id': '00004', 'product_name': 'Blender', 'price': 100, 'category': 'Electronics', 'updated_at': ut},\n", " {'product_id': '00005', 'product_name': 'USB charger', 'price': 50, 'category': 'Electronics', 'updated_at': ut}\n", "]\n", "\n", "df_products = spark.createDataFrame(Row(**x) for x in product)" ] }, { "cell_type": "code", "execution_count": null, "id": "a5c89612-6971-413a-8bec-29be15404bad", "metadata": { "id": "a5c89612-6971-413a-8bec-29be15404bad", "outputId": "f89a9f6b-87f4-472d-e176-b67d6af4b78d", "trusted": true }, "outputs": [], "source": [ "hudi_options = {\n", " 'hoodie.table.name': table_name,\n", " 'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE',\n", " 'hoodie.datasource.write.recordkey.field': 'product_id',\n", " 'hoodie.datasource.write.partitionpath.field': 'category',\n", " 'hoodie.datasource.write.table.name': table_name,\n", " 'hoodie.datasource.write.operation': 'upsert',\n", " 'hoodie.datasource.write.precombine.field': 'updated_at',\n", " 'hoodie.datasource.write.hive_style_partitioning': 'true',\n", " 'hoodie.upsert.shuffle.parallelism': 2,\n", " 'hoodie.insert.shuffle.parallelism': 2,\n", " 'path': table_location,\n", " 'hoodie.datasource.hive_sync.enable': 'true',\n", " 'hoodie.datasource.hive_sync.database': database_name,\n", " 'hoodie.datasource.hive_sync.table': table_name,\n", " 'hoodie.datasource.hive_sync.partition_fields': 'category',\n", " 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',\n", " 'hoodie.datasource.hive_sync.use_jdbc': 'false',\n", " 'hoodie.datasource.hive_sync.mode': 'hms'\n", "}\n" ] }, { "cell_type": "code", "execution_count": null, "id": "9e4eca5d-b71a-43f4-963a-7841fff73c8a", "metadata": { "id": "9e4eca5d-b71a-43f4-963a-7841fff73c8a", "outputId": "938e0afa-5d43-47fe-b994-b080df797aed", "trusted": true }, "outputs": [], "source": [ "df_products.write.format(\"hudi\") \\\n", " .options(**hudi_options) \\\n", " .mode(\"overwrite\") \\\n", " .save()" ] }, { "cell_type": "markdown", "id": "fda19a10-4b69-4272-99a2-1b1156e937c2", "metadata": { "id": "fda19a10-4b69-4272-99a2-1b1156e937c2" }, "source": [ "## Read from Hudi table" ] }, { "cell_type": "code", "execution_count": null, "id": "14a6f45f-1cf6-4f6f-9e63-4c89d6ce2cbd", "metadata": { "id": "14a6f45f-1cf6-4f6f-9e63-4c89d6ce2cbd", "outputId": "01e366c7-2545-434a-c972-f429eac04343", "trusted": true }, "outputs": [], "source": [ "df_products_read = spark.read \\\n", " .format(\"hudi\") \\\n", " .load(table_location)\n", "df_products_read.show()" ] }, { "cell_type": "markdown", "id": "c013d49c-da63-4910-b423-4ebd0e346e1f", "metadata": { "id": "c013d49c-da63-4910-b423-4ebd0e346e1f" }, "source": [ "## Upsert records into Hudi table" ] }, { "cell_type": "code", "execution_count": null, "id": "97c52ecd-ac33-4178-b41d-ede0db0b1c97", "metadata": { "id": "97c52ecd-ac33-4178-b41d-ede0db0b1c97", "outputId": "29c34dd9-ec88-49d9-ccaf-5a529abb0050", "trusted": true }, "outputs": [], "source": [ "ut = time.time()\n", "\n", "product_updates = [\n", " {'product_id': '00001', 'product_name': 'Heater', 'price': 400, 'category': 'Electronics', 'updated_at': ut}, # Update\n", " {'product_id': '00006', 'product_name': 'Chair', 'price': 50, 'category': 'Furniture', 'updated_at': ut} # Insert\n", "]\n", "df_product_updates = spark.createDataFrame(Row(**x) for x in product_updates)\n", "df_product_updates.write.format(\"hudi\") \\\n", " .options(**hudi_options) \\\n", " .mode(\"append\") \\\n", " .save()" ] }, { "cell_type": "code", "execution_count": null, "id": "72b1d420-a08e-4f8f-9c81-69e8ef6fea42", "metadata": { "id": "72b1d420-a08e-4f8f-9c81-69e8ef6fea42", "outputId": "9c3f2af0-4261-4205-a683-435825c00b4b", "trusted": true }, "outputs": [], "source": [ "df_product_updates_read = spark.read \\\n", " .format(\"hudi\") \\\n", " .load(table_location)\n", "df_product_updates_read.show()" ] }, { "cell_type": "markdown", "id": "11e8e6f3-4724-468c-8ded-0718bf272bb8", "metadata": { "id": "11e8e6f3-4724-468c-8ded-0718bf272bb8", "tags": [] }, "source": [ "## Delete a Record\n", "To hard delete a record, you can upsert an empty payload. In this case, the PAYLOAD_CLASS_OPT_KEY option specifies the EmptyHoodieRecordPayload class." ] }, { "cell_type": "code", "execution_count": null, "id": "2fb72d8f-a907-4b1f-9406-57fda424e985", "metadata": { "id": "2fb72d8f-a907-4b1f-9406-57fda424e985", "outputId": "48e37fd8-338f-475d-d284-7bc37b54a5bd", "trusted": true }, "outputs": [], "source": [ "df_delete = df_product_updates_read.where(\"product_id==00001\")" ] }, { "cell_type": "code", "execution_count": null, "id": "206e8f0d-476b-4f79-b012-45a8f56b96a2", "metadata": { "id": "206e8f0d-476b-4f79-b012-45a8f56b96a2", "outputId": "01ab8752-e7ec-484a-be3d-c4b3718d24ce", "trusted": true }, "outputs": [], "source": [ "df_delete.write \\\n", " .format(\"org.apache.hudi\") \\\n", " .option(\"hoodie.datasource.write.payload.class\", \"org.apache.hudi.common.model.EmptyHoodieRecordPayload\") \\\n", " .options(**hudi_options) \\\n", " .mode(\"append\") \\\n", " .save() " ] }, { "cell_type": "code", "execution_count": null, "id": "408fde81-25df-473a-a6c8-50f0192ba293", "metadata": { "id": "408fde81-25df-473a-a6c8-50f0192ba293", "outputId": "6f953193-a789-423d-9a05-81f8b66d6399", "trusted": true }, "outputs": [], "source": [ "df_product_delete_read = spark.read \\\n", " .format(\"hudi\") \\\n", " .load(table_location)\n", "df_product_delete_read.show()" ] }, { "cell_type": "markdown", "id": "db78cddc-082b-476b-8f0b-425876b9e15f", "metadata": { "id": "db78cddc-082b-476b-8f0b-425876b9e15f" }, "source": [ "## Point in time query\n", "Lets look at how to query data as of a specific time. The specific time can be represented by pointing endTime to a specific commit time and beginTime to \"000\" (denoting earliest possible commit time)." ] }, { "cell_type": "code", "execution_count": null, "id": "677ce407-cfaa-428b-85a8-221e267b4874", "metadata": { "id": "677ce407-cfaa-428b-85a8-221e267b4874", "outputId": "20a98d31-258a-460d-8447-d9faaac77df9", "trusted": true }, "outputs": [], "source": [ "spark.read \\\n", " .format(\"hudi\") \\\n", " .load(table_location) \\\n", " .createOrReplaceTempView(\"hudi_snapshot\")" ] }, { "cell_type": "code", "execution_count": null, "id": "0d48bb59-57bc-422e-8154-70a6e75ae4a8", "metadata": { "id": "0d48bb59-57bc-422e-8154-70a6e75ae4a8", "outputId": "c9b6df32-88ce-4440-b7f7-dc439cedc76d", "trusted": true }, "outputs": [], "source": [ "# store commits history as a list\n", "commits = list(map(lambda row: row[0], spark.sql(\"select distinct(_hoodie_commit_time) as commitTime from hudi_snapshot order by commitTime\").limit(50).collect()))" ] }, { "cell_type": "code", "execution_count": null, "id": "97a00785-0019-47e6-9b01-a89ae18cf137", "metadata": { "id": "97a00785-0019-47e6-9b01-a89ae18cf137", "outputId": "894a123a-84ba-431a-ff8b-ef89f90fa6aa", "trusted": true }, "outputs": [], "source": [ "beginTime = \"000\" # Represents all commits > this time.\n", "endTime = commits[len(commits) - 2]\n", "\n", "# query point in time data\n", "point_in_time_read_options = {\n", " 'hoodie.datasource.query.type': 'incremental',\n", " 'hoodie.datasource.read.end.instanttime': endTime,\n", " 'hoodie.datasource.read.begin.instanttime': beginTime\n", "}\n", "\n", "# get the initial table before upsert and delete\n", "df_product_point_in_time_read = spark.read.format(\"hudi\") \\\n", " .options(**point_in_time_read_options) \\\n", " .load(table_location) \\\n", " .show()\n", "\n" ] }, { "cell_type": "markdown", "id": "b8d237d8-e586-4e22-b540-05d8f3a15627", "metadata": { "id": "b8d237d8-e586-4e22-b540-05d8f3a15627" }, "source": [ "## Incremental Query\n", "Hudi also provides capability to obtain a stream of records that changed since given commit timestamp. This can be achieved using Hudi's incremental querying and providing a begin time from which changes need to be streamed. We do not need to specify endTime, if we want all changes after the given commit." ] }, { "cell_type": "code", "execution_count": null, "id": "d68c0e6e-d26a-48da-9460-08232ef8ea4b", "metadata": { "id": "d68c0e6e-d26a-48da-9460-08232ef8ea4b", "outputId": "48be961a-7e0d-4947-a0b9-7f567f91c0a7", "trusted": true }, "outputs": [], "source": [ "beginTime = commits[len(commits) - 2] # commit time we are interested in\n", "\n", "# incrementally query data\n", "incremental_read_options = {\n", " 'hoodie.datasource.query.type': 'incremental',\n", " 'hoodie.datasource.read.begin.instanttime': beginTime\n", "}\n", "\n", "df_product_incremental_read = spark.read.format(\"hudi\") \\\n", " .options(**incremental_read_options) \\\n", " .load(table_location) \\\n", " .show()" ] }, { "cell_type": "markdown", "id": "68344ffa", "metadata": {}, "source": [ "## Stop Session" ] }, { "cell_type": "code", "execution_count": null, "id": "7d6bc232-eef6-4493-8c79-812cd73a17f0", "metadata": { "id": "7d6bc232-eef6-4493-8c79-812cd73a17f0", "outputId": "2a8cd8f5-29ca-4bad-9fc5-9658776b2ee2", "trusted": true }, "outputs": [], "source": [ "%stop_session" ] } ], "metadata": { "colab": { "collapsed_sections": [], "name": "hudi_dataframe.ipynb", "provenance": [] }, "kernelspec": { "display_name": "Glue PySpark", "language": "python", "name": "glue_pyspark" } }, "nbformat": 4, "nbformat_minor": 5 }