{ "nbformat": 4, "nbformat_minor": 2, "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Transform data by using Spark\n", "\n", "Apache Spark provides a distributed data processing platform that you can use to perform complex data transformations at scale.\n", "\n", "\n", "## Load source data\n", "\n", "Let's start by loading some historical sales order data into a dataframe.\n", "\n", "Review the code in the cell below, which loads the sales order from all of the csv files within the **data** directory. Then click the **▷** button to the left of the cell to run it.\n", "\n", "> **Note**: The first time you run a cell in a notebook, the Spark pool must be started; which can take several minutes." ] }, { "cell_type": "code", "execution_count": null, "outputs": [], "metadata": { "collapsed": false }, "source": [ "order_details = spark.read.csv('/data/*.csv', header=True, inferSchema=True)\n", "display(order_details.limit(5))" ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "## Transform the data structure\r\n", "\r\n", "The source data includes a **CustomerName** field, that contains the customer's first and last name. Let's modify the dataframe to separate this field into separate **FirstName** and **LastName** fields." ] }, { "cell_type": "code", "execution_count": null, "outputs": [], "metadata": { "jupyter": { "source_hidden": false, "outputs_hidden": false }, "nteract": { "transient": { "deleting": false } }, "collapsed": false }, "source": [ "from pyspark.sql.functions import split, col\r\n", "\r\n", "# Create the new FirstName and LastName fields\r\n", "transformed_df = order_details.withColumn(\"FirstName\", split(col(\"CustomerName\"), \" \").getItem(0)).withColumn(\"LastName\", split(col(\"CustomerName\"), \" \").getItem(1))\r\n", "\r\n", "# Remove the CustomerName field\r\n", "transformed_df = transformed_df.drop(\"CustomerName\")\r\n", "\r\n", "display(transformed_df.limit(5))" ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "The code above creates a new dataframe with the **CustomerName** field removed and two new **FirstName** and **LastName** fields.\r\n", "\r\n", "You can use the full power of the Spark SQL library to transform the data by filtering rows, deriving, removing, renaming columns, and any applying other required data modifications.\r\n", "\r\n", "## Save the transformed data\r\n", "\r\n", "After making the required changes to the data, you can save the results in a supported file format.\r\n", "\r\n", "> **Note**: Commonly, *Parquet* format is preferred for data files that you will use for further analysis or ingestion into an analytical store. Parquet is a very efficient format that is supported by most large scale data analytics systems. In fact, sometimes your data transformation requirement may simply be to convert data from another format (such as CSV) to Parquet!\r\n", "\r\n", "Use the following code to save the transformed dataframe in Parquet format (Overwriting the data if it already exists)." ] }, { "cell_type": "code", "execution_count": null, "outputs": [], "metadata": { "jupyter": { "source_hidden": false, "outputs_hidden": false }, "nteract": { "transient": { "deleting": false } } }, "source": [ "transformed_df.write.mode(\"overwrite\").parquet('/transformed_data/orders.parquet')\r\n", "print (\"Transformed data saved!\")" ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "In the **files** tab (which should still be open above), navigate to the root **files** container and verify that a new folder named **transformed_data** has been created, containing a file named **orders.parquet**. Then return to this notebook." ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "## Partition data\n", "\n", "A common way to optimize performance when dealing with large volumes of data is to partition the data files based on one or more field values. This can significant improve performance and make it easier to filter data.\n", "\n", "Use the following cell to derive new **Year** and **Month** fields and then save the resulting data in Parquet format, partitioned by year and month." ] }, { "cell_type": "code", "execution_count": null, "outputs": [], "metadata": { "jupyter": { "source_hidden": false, "outputs_hidden": false }, "nteract": { "transient": { "deleting": false } }, "collapsed": false }, "source": [ "from pyspark.sql.functions import year, month, col\r\n", "\r\n", "dated_df = transformed_df.withColumn(\"Year\", year(col(\"OrderDate\"))).withColumn(\"Month\", month(col(\"OrderDate\")))\r\n", "display(dated_df.limit(5))\r\n", "dated_df.write.partitionBy(\"Year\",\"Month\").mode(\"overwrite\").parquet(\"/partitioned_data\")\r\n", "print (\"Transformed data saved!\")" ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "In the **files** tab (which should still be open above), navigate to the root **files** container and verify that a new folder named **partitioned_data** has been created, containing a hierachy of folders in the format **Year=*NNNN*** / **Month=*N***, each containing a .parquet file for the orders placed in the corresponding year and month. Then return to this notebook.\r\n", "\r\n", "You can read this data into a dataframe from any folder in the hierarchy, using explicit values or wildcards for partitioning fields. For example, use the following code to get the sales orders placed in 2020 for all months." ] }, { "cell_type": "code", "execution_count": null, "outputs": [], "metadata": { "jupyter": { "source_hidden": false, "outputs_hidden": false }, "nteract": { "transient": { "deleting": false } }, "collapsed": false }, "source": [ "orders_2020 = spark.read.parquet('/partitioned_data/Year=2020/Month=*')\r\n", "display(orders_2020.limit(5))" ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "Note that the partitioning columns specified in the file path are omitted in the resulting dataframe.\r\n", "\r\n", "## Use SQL to transform data\r\n", "\r\n", "Spark is a very flexible platform, and the **SQL** library that provides the dataframe also enables you to work with data using SQL semantics. You can query and transform data in dataframes by using SQL queries, and persist the results as tables - which are metadata abstractions over files.\r\n", "\r\n", "First, use the following code to save the original sales orders data (loaded from CSV files) as a table. Technically, this is an *external* table because the **path** parameter is used to specify where the data files for the table are stored (an *internal* table is stored in the system storage for the Spark metastore and managed automatically)." ] }, { "cell_type": "code", "execution_count": null, "outputs": [], "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "microsoft": {}, "nteract": { "transient": { "deleting": false } } }, "source": [ "order_details.write.saveAsTable('sales_orders', format='parquet', mode='overwrite', path='/sales_orders_table')" ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "In the **files** tab (which should still be open above), navigate to the root **files** container and verify that a new folder named **sales_orders_table** has been created, containing parquet files for the table data. Then return to this notebook.\r\n", "\r\n", "Now that the table has been created, you can use SQL to transform it. For example, the following code derives new Year and Month columns and then saves the results as a partitioned external table." ] }, { "cell_type": "code", "execution_count": null, "outputs": [], "metadata": { "jupyter": { "source_hidden": false, "outputs_hidden": false }, "nteract": { "transient": { "deleting": false } }, "collapsed": false }, "source": [ "sql_transform = spark.sql(\"SELECT *, YEAR(OrderDate) AS Year, MONTH(OrderDate) AS Month FROM sales_orders\")\r\n", "display(sql_transform.limit(5))\r\n", "sql_transform.write.partitionBy(\"Year\",\"Month\").saveAsTable('transformed_orders', format='parquet', mode='overwrite', path='/transformed_orders_table')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In the **files** tab (which should still be open above), navigate to the root **files** container and verify that a new folder named **transformed_orders_table** has been created, containing a hierachy of folders in the format **Year=*NNNN*** / **Month=*N***, each containing a .parquet file for the orders placed in the corresponding year and month. Then return to this notebook.\n", "\n", "Essentially you've performed the same data transformation into partitioned parquet files as before, but by using SQL instead of native dataframe methods.\n", "\n", "You can read this data into a dataframe from any folder in the hierarchy as before, but because the data files are also abstracted by a table in the metastore, you can query the data directly using SQL." ] }, { "cell_type": "code", "execution_count": null, "outputs": [], "metadata": { "microsoft": { "language": "sparksql" }, "collapsed": false }, "source": [ "%%sql\r\n", "\r\n", "SELECT * FROM transformed_orders\r\n", "WHERE Year = 2021\r\n", " AND Month = 1" ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "Because these are *external* tables, you can drop the tables from the metastore without deleting the files - so the transfomed data remains available for other downstream data analytics or ingestion processes." ] }, { "cell_type": "code", "execution_count": null, "outputs": [], "metadata": { "jupyter": { "source_hidden": false, "outputs_hidden": false }, "nteract": { "transient": { "deleting": false } }, "microsoft": { "language": "sparksql" }, "collapsed": false }, "source": [ "%%sql\r\n", "\r\n", "DROP TABLE transformed_orders;\r\n", "DROP TABLE sales_orders;" ] } ], "metadata": { "description": null, "save_output": true, "kernelspec": { "name": "synapse_pyspark", "display_name": "Synapse PySpark" }, "language_info": { "name": "python" }, "synapse_widget": { "version": "0.1", "state": {} } } }