{ "cells": [ { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "# Transforming Data Using Azure Synapse Dataflows" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "In this lab, we will be performing the following:\n", "\n", "- Copying data using a Synapse data flow\n", "- Performing data transformation using activities such as join, sort, and filter\n", "- Monitoring data flows and pipelines\n", "- Configuring partitions to optimize data flows\n", "- Parameterizing mapping data flows\n", "- Handling schema changes dynamically in data flows using schema drift\n", "\n", "By the end of the lab, you will have learned how to copy data to Parquet files using data flows, perform data transformation using data flows, build dynamic and resilient data flows using parameterization and schema drifting, and monitor data flows and pipelines. " ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Recipe 1 - Copying data using a Synapse data flow" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "In this recipe, we will convert a CSV file into the Parquet format using a Synapse data flow. We will be performing the following tasks to achieve this:\n", "\n", "- Provisioning a Synapse Analytics workspace and a Synapse integration pipeline.\n", "- Creating a data flow activity in the Synapse integration pipeline.\n", "- Building the data flow activity to copy and convert the CSV file to the Parquet format.\n", "\n", "To get started, do the following:\n", "\n", "1. Log in to https://portal.azure.com using your Azure credentials.\n", "1. Create a Synapse Analytics workspace.\n", "1. Upload the files, transaction_table-t1.csv and transaction_table-t2.csv, to the csv folder.\n", "1. Log in to portal.azure.com, go to All resources, and open/create the Synapse Analytics workspace. Click on Open Synapse Studio.\n", "1. Click on the blue cylinder (the data symbol) on the left-hand side, which will take you to the Data section. Click on the Linked tab. Expand the data lake account of the Synapse workspace (sparshadesynapse for this example). Click on the synapse (Primary) container. Click on the + New folder button and create a folder called transaction_table-t1-parquet. We will use a Parquet folder as the destination to copy the Parquet file converted from the CSV file.\n", "1. Click on the pipe-like icon on the left-hand side of Synapse Studio. It will open the integration pipeline development area. Hit the + button and click on Pipeline.\n", "1. Name the pipeline Copy_Data_Flow by editing the Name property field on the right. Search for Data flow under Activities. Drag and drop the Data flow activity into the pipeline development area.\n", "1. Scroll down and under Settings, click on the + New button to create a data flow.\n", "1. Change the data flow name to Copy_CSV_to_Parquet on the left. Click on the Add Source button.\n", "1. Scroll down and name the output stream csv. Click on the + New button to create a new dataset.\n", "1. Search for Azure Data Lake, select Azure Data Lake Storage Gen2, and click on Continue.\n", "1. Select CSV as the data format, as the input file is CSV.\n", "1. Provide the dataset name, `transactiontablet1`. Set Linked service to sparshadesynapse-WorkspaceDefaultStorage. Set File path to synapse/csv/transaction_table-t1.csv, the location to which we uploaded the CSV file. Check the First row as header checkbox.\n", "1. Click on the + button, search for sink, and select Sink. Sink is the destination component (transformation) that we will be copying the data to.\n", "1. Set Output stream name to Parquet. Click + New.\n", "1. We need to define the destination dataset. Search for Azure Data Lake, select Azure Data Lake Storage Gen2, and click Continue.\n", "1. Set the format to Parquet.\n", "1. Provide the dataset name, transactiontablet1parquet. Set Linked service to sparshadesynapse-WorkspaceDefaultStorage. Set File path to synapse/transaction_table-t1-parquet/, the location where we will store the Parquet file. Press OK.\n", "1. Hit the Publish all button. The data flow, datasets, and the pipeline will be published. Once they’re published, go to the Copy_Data_Flow pipeline.\n", "1. Click on the Add trigger button and click Trigger now to trigger the execution of the pipeline.\n", "1. Once the pipeline successfully runs, go to the storage section by clicking on the data icon (the blue cylinder) on the left-hand side and navigating to the synapse (Primary) folder. Right-click on the transaction_table-t1-parquet folder and click Select TOP 100 rows.\n", "1. Set File type to Parquet format for querying.\n", "1. Hit the Run button in the query window. Data is successfully read via serverless SQL pool, confirming that the data transfer successfully completed.\n", "\n", "![](https://user-images.githubusercontent.com/62965911/218272262-1640ad2d-3537-4213-898c-005af5a4e1ef.png)\n", "\n", "![](https://user-images.githubusercontent.com/62965911/218272316-db2ebd57-f5f5-4c9f-8d20-7e1e610c46a8.png)\n", "\n", "\n", "How it works…\n", "\n", "The data flow does a simple conversion of the CSV file to a Parquet file. The data flow is linked to a Synapse integration pipeline called Copy_Data_Flow. The Copy_Data_Flow pipeline uses a data flow activity that calls the Copy_CSV_to_Parquet data flow, which does the data transfer. While this task can be done using a simple Copy activity as well (instead of data flow), the key advantage of a data flow is being able to perform transformations while moving the data from the source to the sink." ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Recipe 2 - Performing data transformation using activities such as join, sort, and filter" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "A common scenario in data engineering pipelines is combining two or more files based on a column, filtering by column, sorting the results, and storing them for querying. We will perform the following actions to achieve this:\n", "\n", "- Read two CSV files\n", "- Use a join transformation to combine the two files based on a column\n", "- Use a filter transformation to filter the rows based on a condition\n", "- Sort the filtered data based on a column value and store the result in Parquet format\n", "\n", "In this recipe, we will perform the following:\n", "\n", "- Create Azure Data Lake datasets connected to the transaction_table-t1.csv and transaction_table-t2.csv files\n", "- Create a data flow activity in a pipeline\n", "- Add two source transformations for reading CSV files\n", "- Add a join transformation to combine the files based on a column named tid\n", "- Add a filter transformation on the transaction_date column to filter the data by a particular date\n", "- Add a sort transformation to sort the filtered data by the total_cost column\n", "- Create a dataset with Parquet format as destination\n", "- Add a sink and save the result in a dataset created using the Parquet format\n", "\n", "The detailed steps to carry this out are as follows:\n", "\n", "1. Log in to portal.azure.com, go to All resources, and create/open the Azure Synapse Studio.\n", "1. Click on the blue cylinder (the data symbol) on the left-hand side, which will take you to Data section. Click on the Linked tab. Expand the data lake account of the Synapse Analytics workspace (sparshadesynapse for this example). Click on the synapse (Primary) container. Click on the + New folder button and create a folder called transaction_table-transformation-parquet. We will use this folder as the destination to store the transformed data.\n", "1. Create a pipeline named DataFlow-Transformation. Add a data flow activity in the pipeline. Create a new data flow named SortFilterDataFlow by clicking on the + New button under the data flow activity settings.\n", "1. Add a source transformation named transactiontablet1, click on the + New button, and create a dataset named transactiontablet1, as created in earlier recipe.\n", "1. Click on the Add Source button.\n", "1. Name the source transformation transactiontablet2. Click on the + New button to create a new dataset to link to synapse/csv/ transaction_table-t2.csv.\n", "1. Create a dataset named transactiontablet2 linking to Azure Data Lake Storage Gen2 account attached to the Synapse Analytics workspace, with the location set to synapse/csv/transaction_table-t2.csv and the format set to csv. Refer to steps 7 to 10 in the How to do it… section from the Copying data using a Synapse data flow recipe.\n", "1. Click on the + button on the transactiontablet1 source and select the Join transformation.\n", "1. Set the Left stream dataset to transactiontablet1 and the Right stream dataset to transactiontablet2. Select Inner as the join type and join using the tid column, which exists on both the datasets.\n", "1. Click on the + button on the right-hand side of the join1 transformation and select the Filter transformation.\n", "1. Type `transactiontablet1@transaction_date==\"20210915\"` into the Filter on text box.\n", "1. Click on the + button on the right-hand side of the filter1 transformation and select the Sort transformation.\n", "1. Under Sort conditions, select `transactiontablet1@total_cost` for the filter1’s column dropdown and set Order to Descending.\n", "1. Hit the + button on the right-hand side of the sort1 transformation and add a sink. Under the Sink properties, click on the + New button and add a new dataset named transformationparquet linking to the Azure Data Lake Gen2 account attached to the Synapse Analytics workspace with the location set to synapse/transaction_table-transformation-parquet folder and the format set to Parquet. \n", "1. Switch to the DataFlow-Transformation pipeline and hit the Publish all button. Once published, hit the Trigger now button under Add trigger. After successful execution, go to the synapse/transaction_table-transformation-parquet folder in the Synapse Analytics workspace, right-click on it, and select the top 100 rows. The result is shown in the following screenshot.\n", "\n", "![](https://user-images.githubusercontent.com/62965911/218273081-5b346272-978e-4a43-a3a4-7cdf9a1c93a8.png)\n", "\n", "![](https://user-images.githubusercontent.com/62965911/218273292-2f3d5ebf-f26b-41d4-ba13-8f0269461225.png)\n", "\n", "How it works…\n", "\n", "A join transformation helped us to combine the transaction_table-t1.csv and transaction_table-t2.csv files based on the tid column. The combined data from both files was filtered using the filter transformation and the transaction_table-t1.csv file’s transaction_date column was filtered for the date 20210915. The sort transformation sorted the filtered rows based on the transaction_table-t1.csv file’s total_cost column. A sink transformation was linked to a dataset in Parquet format, which meant that the sorted and filtered data from the CSV file was seamlessly converted and stored in Parquet format too.\n", "\n", "Data flows have plenty of transformation options available. You can explore more examples at https://docs.microsoft.com/en-us/azure/data-factory/data-flow-transformation-overview." ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Recipe 3 - Monitoring data flows and pipelines" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Azure Synapse Analytics provides a user-friendly interface out of the box for monitoring the pipeline and data flow runs. In this recipe, we will track a data flow execution and understand the transformation execution timings.\n", "\n", "Perform the following steps to monitor data flows:\n", "\n", "1. Click on Open Synapse Studio. Click on the monitor button (a speedometer-like circular button) on the left-hand side. By default, it will give you the information about pipeline runs from the last 24 hours.\n", "1. Click on the DataFlow-Transformation pipeline with the most recent run date.\n", "1. Under Activity runs, hover your mouse over the Data flow1 activity and click on the glasses icon.\n", "1. All the transformations under our activity are displayed. Click on the Stages icon to identify the transformation that took the most time.\n", "1. The total execution time was 25 seconds, of which sink and sort operations took 20 seconds. We also noticed that there were 1009 rows sorted and written to sink. Click Close.\n", "\n", "How it works…\n", "\n", "The Azure Synapse Analytics out-of-the-box monitoring solution records details about all pipeline execution runs and details about the activities inside the pipeline too. In this recipe, we saw that we could quickly identify even a slow transformation inside a data flow activity in a matter of a few clicks. The monitoring data is by default stored for 45 days and can be retained for a longer duration by integrating Synapse Analytics diagnostics data with Azure Log Analytics or an Azure Data Lake Storage account." ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Recipe 4 - Configuring partitions to optimize data flows" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Data flows, by default, create partitions behind the scenes to make transformation activities such as join, filter, and sort run faster. Partitions split the large data into multiple smaller pieces so that the backend processes in the data flows can divide and conquer their tasks and finish the execution quickly.\n", "\n", "In this recipe, we will take a slow-running data flow and adjust the partitioning to reduce the execution time.\n", "\n", "Perform the following steps to optimize data flows:\n", "\n", "1. Log in to portal.azure.com, go to All resources, and search for sparshadesynapse. Click on the workspace. Click Open Synapse Studio. Click on the monitor button (the speedometer-like circular button) on the left-hand side. Refer to steps 2 to 5 of the How to do it… section from the Monitoring data flows and pipelines recipe and perform the following actions:\n", " - Click on the latest run of the DataFlow-Transformation pipeline.\n", " - Click on the Data flow1 activity details.\n", " - Click on the Stages icon to get the transformation level breakdown of the execution times. Notice that the sort and sink operations consume 80% of the execution time, taking 20 seconds.\n", "1. Click on the Sort1 transformation in the monitoring window. Notice the following:\n", " - The total number of rows processed by the transformation is only 1,009\n", " - There are about 200 partitions\n", " - Each partition has around 4 to 6 rows\n", "\n", "Let’s make some changes to partitioning in the sort transformation. Click on the Edit transformation button at the top of the page:\n", "\n", "![B18309_09_041](https://user-images.githubusercontent.com/62965911/218273928-61e4f535-c56f-4ab5-89ba-c759afa4e4f7.jpeg)\n", "\n", "Click on the sort1 transformation. Go to the Optimize section. Select Single partition (instead of the existing setting, Use current partitioning). Hit the Publish all button:\n", "\n", "![B18309_09_042](https://user-images.githubusercontent.com/62965911/218273977-cc1b786b-779c-4e1c-a18d-d2eec3428724.jpeg)\n", "\n", "Go to DataFlow-Transformation, click on Add trigger, and click Trigger now.\n", "\n", "Check the notification section (the bell icon) on the right-hand corner of the screen. The run completion will be indicated via a notification message. Click View pipeline run.\n", "\n", "Click on the Dataflow1 activity details. Click on the Stages icon to get the transformation level breakdown of the execution times, as we did previously:\n", " - Notice that the total execution time has been reduced to 8 seconds\n", " - The sort and sink executions were reduced to 2.8 seconds, compared to the earlier duration of 20 seconds\n", "\n", "Before:\n", "\n", "![B18309_09_040](https://user-images.githubusercontent.com/62965911/218274062-b506fb18-5106-4f9a-a576-8dbb5cad0c49.jpeg)\n", "\n", "Now:\n", "\n", "![B18309_09_045](https://user-images.githubusercontent.com/62965911/218274065-156609f1-dacd-4bda-bc57-1edcd54676ff.jpeg)\n", "\n", "How it works…\n", "\n", "Behind the scenes, Synapse Analytics data flows use tasks in Spark clusters to perform data flow transformations. When observing the partition statistics on the sink1 transformation in step 2 of the How to do it… section, we noticed that 1,009 rows were split across 200 partitions. Typically, we would like to have at least a few thousand rows per partition (or 10 MB to 100 MB in size). Having 4 to 6 rows per partition makes any transformation slow and hence, the sort operation was slow as well. Having too many partitions implies that the backend jobs spend a lot of time creating many partitions. This becomes overkill when there are just a few rows per partition and most of the time is spent on creating partitions rather than processing the data inside them.\n", "\n", "Switching the partition settings to Single partition via the sink transformation’s Optimize setting creates a single partition for all 1,009 rows, performs the sort activity in a single partition, and returns the results quickly. Had there been, say, a few hundred thousand or a few million rows for the sort activity, having multiple partitions would have been a better bet.\n", "\n", "Data flows for each transformation, by default, use the setting called current partitioning, which implies that the data flow will estimate the optimal number of partitions for that transformation based on the size of the data. Setting a single partition as done in this recipe is not recommended for all scenarios – however, it is equally important to track the partition count for transformations and make adjustments if required." ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Parameterizing Synapse data flows" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Adding parameters to data flows provides flexibility and agility for data flows while performing their transformations. In this recipe, we will create a data flow that accepts a parameter, filters the data based on the value passed in the parameter, and copies the data to Parquet files.\n", "\n", "In this recipe, we will add a parameter to the Copy_CSV_to_Parquet dataflow, and make the Copy_CSV_to_Parquet data flow copy selective rows based on the value passed to the parameter by the Copy_Data_Flow pipeline. The Copy_CSV_to_Parquet data flow currently copies all 300,000 rows present in the transaction_table-t1.csv file and adds the filter based on parameter that will result in lesser number of rows being copied.\n", "\n", "1. Open Synapse Studio. Click on the Develop icon (the notebook-like symbol) on the left-hand side of the screen. Expand Data flows and click Copy_CSV_to_Parquet. Under Parameters, click + New and add a new parameter with the name sid. Set Default value to 0.\n", "1. Click on the + button on the right-hand side of the source transformation named csv, search for a filter transformation, and add a filter transformation.\n", "1. Click on the Filter on textbox under Filter settings. Click on the Open expression builder link found below the textbox.\n", "1. In the expression text box, type the column name that we will be filtering the parameter against; in our case, it is sid. Click on == (double equal to operator). Click Parameters on the bottom-left-hand side and select the sid parameter. The expression textbox should read as shown in the following screenshot. Click Save and finish:\n", "\n", "![B18309_09_049](https://user-images.githubusercontent.com/62965911/218274387-91993fb6-6077-4f0c-90f0-bce59fed9374.jpeg)\n", "\n", "1. Click on the Integrate icon (the pipe-like symbol) on the left-hand side of Synapse Studio. Click on the Copy_Data_Flow pipeline. Click on the Data flow1 task and go to the Parameters section of the task. The filename parameter we added to the data flow in step 1 appears here. Click on the Value box, select Pipeline expression, and check the Expression checkbox.\n", "1. Type in '10' under dynamic content and click OK.\n", "1. Publish the pipeline, click on Add trigger, and select Trigger now to execute the pipeline.\n", "1. Once the run completes, click on the monitoring icon on Synapse Studio. Select the latest run of the Copy_Data_Flow pipeline. \n", "1. Click on the Dataflow1 activity details. Refer to steps 1 to 4 of the How to do it… section in the Monitoring data flows and pipelines recipe for detailed example for a similar task. Notice that the pipeline only processed 28957 rows, compared to 300,000 rows initially present in the transaction_table-t1.csv file, as the rows were filtered by parameter.\n", "\n", "How it works…\n", "\n", "We added a parameter named sid to the Copy_CSV_to_Parquet data flow. We wrote an expression in the filter transformation in the data flow that compared the sid parameter with the sid column in the data flow. The comparison expression in the filter transformation filtered the rows based on the condition that we defined. We passed the value to the sid data flow parameter from the Copy_Data_Flow pipeline’s data flow activity. The data flow activity exposed the parameters that were present inside the data flow to the pipeline. We passed the value '10' from the Copy_Data_Flow pipeline to the data flow using the pipeline’s data flow activity and all the rows that had a value equal to 10 on the sid column were selected and copied in Parquet format to the sink destination." ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Recipe 5 - Handling schema changes dynamically in data flows using schema drift" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "A common challenge in extraction, transformation, and load (ETL) projects is when the schema changes at the source and the pipelines that are supposed to read the data from the source, transform it, and ingest it to the destination, start to fail. Schema drift, a feature in data flows, addresses this problem by allowing us to dynamically define the column mapping in transformations. In this recipe, we will make some changes to the schema of a data source, use schema drift to detect the changes, and handle changes without any manual intervention gracefully.\n", "\n", "In this recipe, we will be using the Copy_CSV_to_Parquet data flow completed in Copying data using a Synapse data flow. We will perform the following tasks:\n", "\n", "- Add columns to a CSV file used as the source in the Copy_CSV_to_Parquet data flow from the Copying data using a Synapse data flow recipe\n", "- Observe the new columns that are added being detected by the data flow using the schema drift feature\n", "- Split the originally expected columns and newly detected columns into two different streams\n", "- Use rule-based column mapping to dynamically map the new detected columns\n", "\n", "Perform the following steps to achieve this:\n", "\n", "1. Open the transaction_table-t1.csv file on your machine.\n", "1. Insert a column named d1 after the pid column but before the c1 column. Fill some random values in for the first 10 rows. Insert a column named d2 after the last column, c2. Fill some random values in for the first 10 rows. Save the file.\n", " ![B18309_09_053](https://user-images.githubusercontent.com/62965911/218274528-0713687e-8972-4d72-ac22-b08df93f2899.jpeg)\n", "1. Upload the transaction_table-t1.csv file to the synapse/csv folder in the storage account linked with the Synapse workspace. Ensure that you overwrite the existing file.\n", "1. Within sparshadesynapse of Synapse Studio, click on the Develop icon (the notebook-like symbol) on the left-hand side of the screen. Expand Data flows and click Copy_CSV_to_Parquet. Turn on the debug mode by clicking on the toggle button at the top.\n", "1. Click OK to enable the debug mode for the data flow. We will use the debug mode to see how data flows through the transformations in this recipe.\n", "1. Click on source transformation (csv). Observe that Allow schema drift is turned on by default in Source settings.\n", " ![B18309_09_056](https://user-images.githubusercontent.com/62965911/218274581-1be4b4c2-84d1-4f77-97de-e1eb159cf8a7.jpeg)\n", "1. Once the debug mode is ready (it typically takes 5 minutes to prepare after turning on), go to Data preview. Hit the Refresh button. Scroll to the right. Notice that the new columns we added in the Excel sheet, namely d1 and d2, have been automatically added. Ignore the _c9, _c10, and _c11 columns that Excel has added by itself. Also, notice that even though we added the d1 column in the middle of the existing columns, the data flow has positioned it to the end, as it is not an expected column and has been newly added. Click on the Map drifted button.\n", "1. Clicking the Map drifted button creates a derived column transformation. Click on it and notice that the data flow has automatically created the mappings for the new columns. To make the column mapping dynamic, let’s delete the existing column mappings first. Select all the column mappings by checking the checkboxes in the Columns list and hitting Delete.\n", "1. To make column mapping dynamic, let’s use the column pattern option. Click Add and select Add column pattern.\n", "1. Click on the column1 mapping and hit the delete button. Check the checkbox next to Each column that matches and type in position > 8. Type $$ in both textboxes, on the left and right. By specifying position > 8, for each column after position number 8 in the list of columns read by the Mapdrifted1 transformation, the data flow will dynamically create a new column. As all our columns after position 8 (counting the columns from left to right) are derived columns (new columns), we have configured the data mapping to automatically create columns for them. $$ represents the value of the column and by specifying $$ in the left- and right-hand text boxes, we are accepting the values in the columns as they are:\n", " ![B18309_09_060](https://user-images.githubusercontent.com/62965911/218274642-176b2240-2cda-49f2-9b7f-9dc0b23448d6.jpeg)\n", "1. A key objective is to separate the derived columns and originally mapped columns into two streams so that the original data transformation from source to sink doesn’t break and the expected columns are transferred as planned from the CSV file to the Parquet destination. To achieve this, we can configure a fixed rule mapping for the Parquet sink transformation. Click on the Parquet sink transformation and go to the Mapping section. The original columns are automatically listed. Check the checkbox next to Input columns to map all eight original columns that are expected to be copied to the Parquet destination folder.\n", "1. Now, let’s get the primary key and the drifted columns into a separate stream. Click on the + symbol on the right-hand side of the MapDrifted1 transformation again and add Sink. This sink will serve as the secondary path for tracking whether there are any drifted columns from the source.\n", "1. For the purpose of testing, we set Sink type to Cache. You can set the sink for the drifted columns to a Parquet dataset (or any integration dataset) if you wish to store the values of the drifted column.\n", "1. Click on the newly added sink1 transformation (the cache sink) and go to the Mapping section. Disable Auto mapping. Delete all mapping except the mapping for the tid column, as shown in the following screenshot. Once done, click Add mapping and select Rule-based mapping. Rule-based mapping helps us to dynamically create and define columns based on the column patterns:\n", " ![B18309_09_064](https://user-images.githubusercontent.com/62965911/218274831-2a56ffc2-4536-4eb0-b5fe-cb3f0631875d.jpeg)\n", "1. For Rule-based mapping, specify the column pattern as position > 8 and the value as $$, which implies that columns from any position after 8 will be used as they are, as in step 10. Check the tid mapping and the Rule-based mapping checkboxes.\n", "1. Click Data preview and hit the Refresh button to see the tid column and the derived columns only.\n", "\n", "How it works…\n", "\n", "Schema drifting and rule-based mapping offer resilience and flexibility for the pipelines to accommodate almost any kind of change made at the data source. In this recipe, we used the Map drifted columns option and a derived columns transformation in step 7 and step 8 to identify the unplanned or the new columns for the source. We split the new columns using rule-based mapping to flow to a separate sink (a cache sink for testing) and ensured that the originally expected columns flowed to the Parquet destination using fixed rule mapping. Using the preceding recipe, data engineers can ensure that their pipelines transfer data as expected, but they can also track any new columns that are added to the source and can make adjustments to their pipelines if required." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "env-spacy", "language": "python", "name": "python3" }, "language_info": { "name": "python", "version": "3.9.7" }, "orig_nbformat": 4, "vscode": { "interpreter": { "hash": "343191058819caea96d5cde1bd3b1a75b4807623ce2cda0e1c8499e39ac847e3" } } }, "nbformat": 4, "nbformat_minor": 2 }