{"cells":[{"cell_type":"markdown","source":["## Schema Evolution in Merge Operations\nThis notebook provides an example of how to perform schema evolution in merge operations for the [2019 Novel Coronavirus COVID-19 (2019-nCoV) Data Repository by Johns Hopkins CSSE](https://github.com/CSSEGISandData/COVID-19) dataset. This primer example allows you to create a Delta Lake table (instead of native Parquet) to track the changes of the this dataset to support the [Johns Hopkins COVID-19 Data Analysis Sample Notebook](https://github.com/databricks/tech-talks/blob/master/samples/JHU%20COVID-19%20Analysis.html).\n\nThe data is updated in the `/databricks-datasets/COVID/CSSEGISandData/` location regularly so you can access the data directly."],"metadata":{}},{"cell_type":"code","source":["# PySpark\nfrom pyspark.sql.functions import input_file_name, lit, col\nfrom pyspark.sql.types import IntegerType, StringType"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
"]}}],"execution_count":2},{"cell_type":"markdown","source":["## Create Different DataFrames for Different Schemas\nAs noted in the [Johns Hopkins COVID-19 Analysis](https://github.com/databricks/tech-talks/blob/master/samples/JHU%20COVID-19%20Analysis.html) notebook, as of this writing, there are three different schemas for this dataset; for this example, we will focus on the last two schema changes.\n\n| id | Schema String List | Date Range | \n| -- | ------------------ | ---------- |\n| 1 | `Province/State Country/Region Last Update Confirmed Deaths Recovered Latitude Longitude` | 03-01-2020 to 03-21-2020 |\n| 2 | `FIPS Admin2 Province_State Country_Region Last_Update Lat Long_ Confirmed Deaths Recovered Active Combined_Key` | 03-22-2020 to current |\n\nThe following is an example of:\n* How to run merge operations with schema evolution for representative files of the different schemas\n* We will focus on only Washington State data for this example"],"metadata":{}},{"cell_type":"code","source":["# File paths\n# Two representative files of the two different schemas\nfile_1 = '/databricks-datasets/COVID/CSSEGISandData/csse_covid_19_data/csse_covid_19_daily_reports/03-21-2020.csv'\nfile_2 = '/databricks-datasets/COVID/CSSEGISandData/csse_covid_19_data/csse_covid_19_daily_reports/04-11-2020.csv'\n\n# Schema 1: Representing datasets between 2020-03-01 and 2020-03-21 (8 columns originally)\n# Renaming some of the columns for better standardization\nold_data = (spark.read\n .option(\"inferSchema\", True)\n .option(\"header\", True)\n .csv(file_1)\n .withColumnRenamed(\"Last Update\", \"Last_Update\")\n .withColumnRenamed(\"Province/State\", \"Province_State\")\n .withColumnRenamed(\"Country/Region\", \"Country_Region\")\n .withColumn(\"process_date\", lit('2020-03-21')) # Date determined by the filename, manually entered in this example\n .withColumn(\"level\", lit(2)) # Specify the level: 1 - Country, 2 - Province/State, 3 - County\n .where(\"Province_State == 'Washington'\")) # Filter by only Washington State (expected output: 1 row)\n\n# Schema 2: Latest schema representing data from 2020-03-22 onwards (12 columns originally)\n# Renaming some of the columns for better standardization\nnew_data = (spark.read\n .option(\"inferSchema\", True)\n .option(\"header\", True)\n .csv(file_2)\n .withColumnRenamed(\"Lat\", \"Latitude\")\n .withColumnRenamed(\"Long_\", \"Longitude\")\n .withColumn(\"process_date\", lit('2020-04-11')) # Date determined by the filename, manually entered in this example\n .withColumn(\"level\", lit(3)) # Specify the level: 1 - Country, 2 - Province/State, 3 - County\n .where(\"Province_State == 'Washington'\")) # Filter by only Washington State (expected output: 39 rows)\n\n# Notes: Expand each DataFrame below to review the schema"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
"]}}],"execution_count":4},{"cell_type":"code","source":["# Old Data Schema\nold_data.printSchema()"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
root\n-- Province_State: string (nullable = true)\n-- Country_Region: string (nullable = true)\n-- Last_Update: timestamp (nullable = true)\n-- Confirmed: integer (nullable = true)\n-- Deaths: integer (nullable = true)\n-- Recovered: integer (nullable = true)\n-- Latitude: double (nullable = true)\n-- Longitude: double (nullable = true)\n-- process_date: string (nullable = false)\n-- level: integer (nullable = false)\n\n
"]}}],"execution_count":5},{"cell_type":"code","source":["# New Data Schema\nnew_data.printSchema()"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
root\n-- FIPS: integer (nullable = true)\n-- Admin2: string (nullable = true)\n-- Province_State: string (nullable = true)\n-- Country_Region: string (nullable = true)\n-- Last_Update: string (nullable = true)\n-- Latitude: double (nullable = true)\n-- Longitude: double (nullable = true)\n-- Confirmed: integer (nullable = true)\n-- Deaths: integer (nullable = true)\n-- Recovered: integer (nullable = true)\n-- Active: integer (nullable = true)\n-- Combined_Key: string (nullable = true)\n-- process_date: string (nullable = false)\n-- level: integer (nullable = false)\n\n
"]}}],"execution_count":6},{"cell_type":"markdown","source":["The difference between these two schemas are:\n* Columns that were renamed: `Province/State -> Province_State`, `Country/Region -> Country_Region`, `Latitude -> Lat`, `Longitude -> Long_`. To resolve this issue, we've standardized the column names\n* Columns that were added:\n * `FIPS`: The Federal Information Processing Standard Publication 6-4 (FIPS 6-4) was a five-digit Federal Information Processing Standards code which uniquely identified counties and county equivalents in the United States, certain U.S. possessions, and certain freely associated states ([source](https://en.wikipedia.org/wiki/FIPS_county_code)) that is commonly used for US topological maps. This code has been supplanted with the [INCITS 31 – 2009](https://en.wikipedia.org/wiki/International_Committee_for_Information_Technology_Standards) codes. \n * `Admin2`: Contains more granular region name, e.g. within the United States this would be the county name.\n * `Combined_Key`: Comma concatenation of `Admin2`, `Province_State`, `Country_Region`.\n * `Active`: Active COVID-19 cases\n \nWe also added the following columns:\n* `process_date`: The date of the confirmed cases (when the tests were processed) which is not in the data itself but within the file name\n* `level`: Describing the level of granuality of the data: `old_data` is at the state/province level (`level = 2`) while `new_data` is at the county level (`level = 3`)"],"metadata":{}},{"cell_type":"markdown","source":["### Create File Path for Delta Lake Table\nRemoving if exists and creating the following file path for our Delta Lake Table"],"metadata":{}},{"cell_type":"code","source":["%sh\nrm -fR /dbfs/tmp/dennylee/COVID/df_jhu/ && mkdir -p /dbfs/tmp/dennylee/COVID/df_jhu/ && ls -lsgA /dbfs/tmp/dennylee/COVID/df_jhu/"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
total 0\n
"]}}],"execution_count":9},{"cell_type":"code","source":["# Create our initial Delta Lake table\nDELTA_PATH = \"/tmp/dennylee/COVID/df_jhu/\"\nold_data.write.format(\"delta\").save(DELTA_PATH)"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
"]}}],"execution_count":10},{"cell_type":"markdown","source":["## Simulate an Updated Entry\nIn this example scenario, on April 27th, 2020 we needed to updated the `Last_Updated` column for data for March 21st, 2020 which was stored in the older schema (`old_data`).\n\nBut this **update** entry is included in the `new_data` with a newer schema including:\n* An updated `Last_Update` value\n* Including the FIPS county code for Washington State"],"metadata":{}},{"cell_type":"code","source":["# Simulate an Updated Entry\nitems = [(53, '', 'Washington', 'US', '2020-04-27T19:00:00', 47.4009, -121.4905, 1793, 94, 0, '', '', '2020-03-21', 2)]\ncols = ['FIPS', 'Admin2', 'Province_State', 'Country_Region', 'Last_Update', 'Latitude', 'Longitude', 'Confirmed', 'Deaths', 'Recovered', 'Active', 'Combined_Key', 'process_date', 'level']\nsimulated_update = spark.createDataFrame(items, cols)"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
"]}}],"execution_count":12},{"cell_type":"code","source":["# Add this updated entry into the new_data\nnew_data = new_data.union(simulated_update)"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
"]}}],"execution_count":13},{"cell_type":"markdown","source":["## Review the Data\nLet's review the data for our **Schema Evolution in Merge Operations** example:\n1. `old_data` contains the original schema \n2. `new_data` contains a new schema which includes the columns: `FIPS`, `Admin2`, `Active`, `Combined_Key`\n3. `new_data` also contains our simulated update entry originally inserted with the old schema (`old_data`) which includes an updated `Last_Update` and `FIPS` value"],"metadata":{}},{"cell_type":"code","source":["print(\"Number of rows for old_data: %s, new_data: %s \" % (old_data.count(), new_data.count()))"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
Number of rows for old_data: 1, new_data: 40 \n
"]}}],"execution_count":15},{"cell_type":"code","source":["# In the old data, there is a single row with Washington state with 10 columns\ndisplay(old_data.where(col(\"Province_State\") == \"Washington\"))"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["
Province_StateCountry_RegionLast_UpdateConfirmedDeathsRecoveredLatitudeLongitudeprocess_datelevel
WashingtonUS2020-03-21T22:43:04.000+0000179394047.4009-121.49052020-03-212
"]}}],"execution_count":16},{"cell_type":"code","source":["# In the second DataFrame, there are:\n# - multiple rows with Washington State\n# - It now contains 14 columns\n# - An additional row for , we've updated the `Last_Update` value\n# - Note this row has level = 2 (other rows has level = 3)\ndisplay(new_data.where(col(\"Province_State\") == \"Washington\").sort(col(\"FIPS\")))"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["
FIPSAdmin2Province_StateCountry_RegionLast_UpdateLatitudeLongitudeConfirmedDeathsRecoveredActiveCombined_Keyprocess_datelevel
53WashingtonUS2020-04-27T19:00:0047.4009-121.490517939402020-03-212
53001AdamsWashingtonUS2020-04-11 22:45:3346.98299757-118.5601734000000130000Adams, Washington, US2020-04-113
53003AsotinWashingtonUS2020-04-11 22:45:3346.18894415-117.20228514000Asotin, Washington, US2020-04-113
53005BentonWashingtonUS2020-04-11 22:45:3346.23946995-119.512083400000012442500Benton, Washington, US2020-04-113
53007ChelanWashingtonUS2020-04-11 22:45:3347.87046092-120.617395653500Chelan, Washington, US2020-04-113
53009ClallamWashingtonUS2020-04-11 22:45:3348.04754642-123.9226319000000111000Clallam, Washington, US2020-04-113
53011ClarkWashingtonUS2020-04-11 22:45:3345.77568046-122.482920400000012201400Clark, Washington, US2020-04-113
53013ColumbiaWashingtonUS2020-04-11 22:45:3346.29442881-117.90519831000Columbia, Washington, US2020-04-113
53015CowlitzWashingtonUS2020-04-11 22:45:3346.19074721-122.678223123000Cowlitz, Washington, US2020-04-113
53017DouglasWashingtonUS2020-04-11 22:45:3347.7361335-119.6929370000000116000Douglas, Washington, US2020-04-113
53019FerryWashingtonUS2020-04-11 22:45:3348.471143100000006-118.51540411000Ferry, Washington, US2020-04-113
53021FranklinWashingtonUS2020-04-11 22:45:3346.53351425-118.9018212107200Franklin, Washington, US2020-04-113
53025GrantWashingtonUS2020-04-11 22:45:3347.20753737-119.4500749110200Grant, Washington, US2020-04-113
53027Grays HarborWashingtonUS2020-04-11 22:45:3347.14003705-123.78205678000Grays Harbor, Washington, US2020-04-113
53029IslandWashingtonUS2020-04-11 22:45:3348.14713305-122.5216407157700Island, Washington, US2020-04-113
53031JeffersonWashingtonUS2020-04-11 22:45:3347.75003077-123.5609704000000228000Jefferson, Washington, US2020-04-113
53033KingWashingtonUS2020-04-11 22:45:3347.49137892-121.8346131426228400King, Washington, US2020-04-113
53035KitsapWashingtonUS2020-04-11 22:45:3347.63479026-122.6436061128100Kitsap, Washington, US2020-04-113
53037KittitasWashingtonUS2020-04-11 22:45:3347.12521214-120.6794389999999915100Kittitas, Washington, US2020-04-113
53039KlickitatWashingtonUS2020-04-11 22:45:3345.87325839-120.791359812100Klickitat, Washington, US2020-04-113
53041LewisWashingtonUS2020-04-11 22:45:3346.57756476-122.3925769000000118200Lewis, Washington, US2020-04-113
53043LincolnWashingtonUS2020-04-11 22:45:3347.57705241-118.419438800000012000Lincoln, Washington, US2020-04-113
53045MasonWashingtonUS2020-04-11 22:45:3347.35007586-123.186368518000Mason, Washington, US2020-04-113
53047OkanoganWashingtonUS2020-04-11 22:45:3348.54855019-119.7387224000000114000Okanogan, Washington, US2020-04-113
53049PacificWashingtonUS2020-04-11 22:45:3346.55418369-123.72857161000Pacific, Washington, US2020-04-113
53051Pend OreilleWashingtonUS2020-04-11 22:45:3348.53234021-117.27418041000Pend Oreille, Washington, US2020-04-113
53053PierceWashingtonUS2020-04-11 22:45:3347.03892768-122.140595799999999221900Pierce, Washington, US2020-04-113
53055San JuanWashingtonUS2020-04-11 22:45:3348.60182783-122.9674513000San Juan, Washington, US2020-04-113
53057SkagitWashingtonUS2020-04-11 22:45:3348.48171488-121.766131185600Skagit, Washington, US2020-04-113
53059SkamaniaWashingtonUS2020-04-11 22:45:3346.02408726-121.916440299999993000Skamania, Washington, US2020-04-113
53061SnohomishWashingtonUS2020-04-11 22:45:3348.04615983-121.717070318356800Snohomish, Washington, US2020-04-113
53063SpokaneWashingtonUS2020-04-11 22:45:3347.62113146-117.40464942511400Spokane, Washington, US2020-04-113
53065StevensWashingtonUS2020-04-11 22:45:3348.40035475-117.85427016000Stevens, Washington, US2020-04-113
53067ThurstonWashingtonUS2020-04-11 22:45:3346.9291895-122.8290655999999981100Thurston, Washington, US2020-04-113
53069WahkiakumWashingtonUS2020-04-11 22:45:3346.29180039999999-123.42508312000Wahkiakum, Washington, US2020-04-113
53071Walla WallaWashingtonUS2020-04-11 22:45:3346.23040051-118.477553920000Walla Walla, Washington, US2020-04-113
53073WhatcomWashingtonUS2020-04-11 22:45:3348.82227976-121.749001799999992612300Whatcom, Washington, US2020-04-113
53075WhitmanWashingtonUS2020-04-11 22:45:3346.90022523-117.5241763000000112000Whitman, Washington, US2020-04-113
53077YakimaWashingtonUS2020-04-11 22:45:3346.45738486-120.738012599999995112000Yakima, Washington, US2020-04-113
90053UnassignedWashingtonUS2020-04-11 22:45:33nullnull848000Unassigned, Washington, US2020-04-113
"]}}],"execution_count":17},{"cell_type":"markdown","source":["## Schema Evolution?\nWe could potentially use `new_data.write.option(\"merge\", \"true\").mode(\"append\").save(path)` to merge the schemas but `new_data` also contains data that has to be updated in the original table. One approach could be that you:\n* Run the `merge` as one operation\n* Run the `schema evolution` as another operation\n\nOr, we could do this as a single operation by **[Automatic Schema Evolution](https://docs.delta.io/latest/delta-update.html#automatic-schema-evolution)**"],"metadata":{}},{"cell_type":"code","source":["# Automatic Schema Evolution\nspark.sql(\"SET spark.databricks.delta.schema.autoMerge.enabled = true\")"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
Out[24]: DataFrame[key: string, value: string]
"]}}],"execution_count":19},{"cell_type":"code","source":["from delta.tables import *\ndeltaTable = DeltaTable.forPath(spark, DELTA_PATH)\n\n# Schema Evolution with a Merge Operation\ndeltaTable.alias(\"t\").merge(\n new_data.alias(\"s\"),\n \"s.process_date = t.process_date AND s.province_state = t.province_state AND s.country_region = t.country_region AND s.level = t.level\"\n).whenMatchedUpdateAll( \n).whenNotMatchedInsertAll(\n).execute()"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
"]}}],"execution_count":20},{"cell_type":"markdown","source":["## Review the Delta Lake Table"],"metadata":{}},{"cell_type":"code","source":["# Load the data\ndf = spark.read.format(\"delta\").load(DELTA_PATH)"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
"]}}],"execution_count":22},{"cell_type":"code","source":["print(\"Number of rows: %s\" % df.count())"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
Number of rows: 40\n
"]}}],"execution_count":23},{"cell_type":"markdown","source":["Recall this is expected as the `old_data` contained 1 row, `new_data` contained 40 rows,
\nbut one of the rows contained a simulated row to update the values previously inserted into the Delta Table by `old_data`."],"metadata":{}},{"cell_type":"code","source":["display(df.sort(col(\"FIPS\")))"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["
Province_StateCountry_RegionLast_UpdateConfirmedDeathsRecoveredLatitudeLongitudeprocess_datelevelFIPSAdmin2ActiveCombined_Key
WashingtonUS2020-04-27T19:00:00.000+0000179394047.4009-121.49052020-03-21253
WashingtonUS2020-04-11T22:45:33.000+0000300046.98299757-118.560173400000012020-04-11353001Adams0Adams, Washington, US
WashingtonUS2020-04-11T22:45:33.000+000040046.18894415-117.20228512020-04-11353003Asotin0Asotin, Washington, US
WashingtonUS2020-04-11T22:45:33.000+000024425046.23946995-119.512083400000012020-04-11353005Benton0Benton, Washington, US
WashingtonUS2020-04-11T22:45:33.000+0000535047.87046092-120.61739562020-04-11353007Chelan0Chelan, Washington, US
WashingtonUS2020-04-11T22:45:33.000+0000110048.04754642-123.922631900000012020-04-11353009Clallam0Clallam, Washington, US
WashingtonUS2020-04-11T22:45:33.000+000022014045.77568046-122.482920400000012020-04-11353011Clark0Clark, Washington, US
WashingtonUS2020-04-11T22:45:33.000+000010046.29442881-117.90519832020-04-11353013Columbia0Columbia, Washington, US
WashingtonUS2020-04-11T22:45:33.000+0000230046.19074721-122.67822312020-04-11353015Cowlitz0Cowlitz, Washington, US
WashingtonUS2020-04-11T22:45:33.000+0000160047.7361335-119.692937000000012020-04-11353017Douglas0Douglas, Washington, US
WashingtonUS2020-04-11T22:45:33.000+000010048.471143100000006-118.51540412020-04-11353019Ferry0Ferry, Washington, US
WashingtonUS2020-04-11T22:45:33.000+00001072046.53351425-118.90182122020-04-11353021Franklin0Franklin, Washington, US
WashingtonUS2020-04-11T22:45:33.000+00001102047.20753737-119.45007492020-04-11353025Grant0Grant, Washington, US
WashingtonUS2020-04-11T22:45:33.000+000080047.14003705-123.78205672020-04-11353027Grays Harbor0Grays Harbor, Washington, US
WashingtonUS2020-04-11T22:45:33.000+00001577048.14713305-122.52164072020-04-11353029Island0Island, Washington, US
WashingtonUS2020-04-11T22:45:33.000+0000280047.75003077-123.560970400000022020-04-11353031Jefferson0Jefferson, Washington, US
WashingtonUS2020-04-11T22:45:33.000+00004262284047.49137892-121.83461312020-04-11353033King0King, Washington, US
WashingtonUS2020-04-11T22:45:33.000+00001281047.63479026-122.64360612020-04-11353035Kitsap0Kitsap, Washington, US
WashingtonUS2020-04-11T22:45:33.000+0000151047.12521214-120.679438999999992020-04-11353037Kittitas0Kittitas, Washington, US
WashingtonUS2020-04-11T22:45:33.000+0000121045.87325839-120.79135982020-04-11353039Klickitat0Klickitat, Washington, US
WashingtonUS2020-04-11T22:45:33.000+0000182046.57756476-122.392576900000012020-04-11353041Lewis0Lewis, Washington, US
WashingtonUS2020-04-11T22:45:33.000+000020047.57705241-118.419438800000012020-04-11353043Lincoln0Lincoln, Washington, US
WashingtonUS2020-04-11T22:45:33.000+0000180047.35007586-123.18636852020-04-11353045Mason0Mason, Washington, US
WashingtonUS2020-04-11T22:45:33.000+0000140048.54855019-119.738722400000012020-04-11353047Okanogan0Okanogan, Washington, US
WashingtonUS2020-04-11T22:45:33.000+000010046.55418369-123.72857162020-04-11353049Pacific0Pacific, Washington, US
WashingtonUS2020-04-11T22:45:33.000+000010048.53234021-117.27418042020-04-11353051Pend Oreille0Pend Oreille, Washington, US
WashingtonUS2020-04-11T22:45:33.000+000092219047.03892768-122.140595799999992020-04-11353053Pierce0Pierce, Washington, US
WashingtonUS2020-04-11T22:45:33.000+0000130048.60182783-122.967452020-04-11353055San Juan0San Juan, Washington, US
WashingtonUS2020-04-11T22:45:33.000+00001856048.48171488-121.7661312020-04-11353057Skagit0Skagit, Washington, US
WashingtonUS2020-04-11T22:45:33.000+000030046.02408726-121.916440299999992020-04-11353059Skamania0Skamania, Washington, US
WashingtonUS2020-04-11T22:45:33.000+0000183568048.04615983-121.71707032020-04-11353061Snohomish0Snohomish, Washington, US
WashingtonUS2020-04-11T22:45:33.000+000025114047.62113146-117.40464942020-04-11353063Spokane0Spokane, Washington, US
WashingtonUS2020-04-11T22:45:33.000+000060048.40035475-117.85427012020-04-11353065Stevens0Stevens, Washington, US
WashingtonUS2020-04-11T22:45:33.000+0000811046.9291895-122.829065599999992020-04-11353067Thurston0Thurston, Washington, US
WashingtonUS2020-04-11T22:45:33.000+000020046.29180039999999-123.42508312020-04-11353069Wahkiakum0Wahkiakum, Washington, US
WashingtonUS2020-04-11T22:45:33.000+0000200046.23040051-118.47755392020-04-11353071Walla Walla0Walla Walla, Washington, US
WashingtonUS2020-04-11T22:45:33.000+000026123048.82227976-121.749001799999992020-04-11353073Whatcom0Whatcom, Washington, US
WashingtonUS2020-04-11T22:45:33.000+0000120046.90022523-117.524176300000012020-04-11353075Whitman0Whitman, Washington, US
WashingtonUS2020-04-11T22:45:33.000+000051120046.45738486-120.738012599999992020-04-11353077Yakima0Yakima, Washington, US
WashingtonUS2020-04-11T22:45:33.000+000084800nullnull2020-04-11390053Unassigned0Unassigned, Washington, US
"]}}],"execution_count":25},{"cell_type":"markdown","source":["As expected, there are 40 rows with the `level = 2` containing an updated `Last_Update` value thus\n* The Delta Lake table schema evolved from 10 columns to 14 columns\n* A single row value was updated \n\nAll of this occured in a single atomic operation as noted in the history below."],"metadata":{}},{"cell_type":"code","source":["display(deltaTable.history())"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["
versiontimestampuserIduserNameoperationoperationParametersjobnotebookclusterIdreadVersionisolationLevelisBlindAppendoperationMetrics
12020-04-30T04:53:22.000+0000100802denny.lee@databricks.comMERGEMap(predicate -> (((s.`process_date` = t.`process_date`) AND (s.`province_state` = t.`province_state`)) AND ((s.`country_region` = t.`country_region`) AND (s.`level` = CAST(t.`level` AS BIGINT)))))nullList(25384224)0430-041257-sided2640WriteSerializablefalseMap(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 3, numTargetRowsInserted -> 39, numTargetRowsUpdated -> 1, numOutputRows -> 40, numSourceRows -> 40, numTargetFilesRemoved -> 1)
02020-04-30T04:45:00.000+0000100802denny.lee@databricks.comWRITEMap(mode -> ErrorIfExists, partitionBy -> [])nullList(25384224)0430-041257-sided264nullWriteSerializabletrueMap(numFiles -> 1, numOutputBytes -> 2723, numOutputRows -> 1)
"]}}],"execution_count":27},{"cell_type":"markdown","source":["## Review the Operational Metrics\nNote the `operationMetrics` column for this example:\n* `numTargetRowsInserted: 39` were the number of rows added (with the new schema)\n* `numTargetRowsUpdated: 1` were the number of rows updated (with the old schema)\n\n**Important:** Don't forget to review the SQL tab of the SQL UI to better understand the internals (it should look similar to the animated GIF below)\n\n![](https://raw.githubusercontent.com/databricks/tech-talks/master/images/schema-evolution_merge-operation-spark-ui-sql-tab-5.gif)"],"metadata":{}},{"cell_type":"markdown","source":["## Review the Transaction Log\nLet's take a quick look at the transaction log"],"metadata":{}},{"cell_type":"code","source":["%sh\nls -lsgA /dbfs/tmp/dennylee/COVID/df_jhu/_delta_log"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
total 9\n1 -rwxrwxrwx 1 root 89 Apr 30 04:45 00000000000000000000.crc\n3 -rwxrwxrwx 1 root 2382 Apr 30 04:45 00000000000000000000.json\n1 -rwxrwxrwx 1 root 90 Apr 30 04:53 00000000000000000001.crc\n5 -rwxrwxrwx 1 root 4697 Apr 30 04:53 00000000000000000001.json\n0 -rwxrwxrwx 1 root 0 Apr 30 04:45 .s3-optimization-0\n0 -rwxrwxrwx 1 root 0 Apr 30 04:45 .s3-optimization-1\n0 -rwxrwxrwx 1 root 0 Apr 30 04:45 .s3-optimization-2\n
"]}}],"execution_count":30},{"cell_type":"code","source":["tl_1 = spark.read.json(\"/tmp/dennylee/COVID/df_jhu/_delta_log/00000000000000000001.json\")"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["\n
"]}}],"execution_count":31},{"cell_type":"code","source":["# Commit Information: Note the operations metrics\ndisplay(tl_1.select(\"commitInfo\").where(\"commitInfo is not null\"))"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["
commitInfo
List(0430-041257-sided264, false, WriteSerializable, List(25384224), MERGE, List(40, 40, 3, 1, 0, 0, 39, 1), List((((s.`process_date` = t.`process_date`) AND (s.`province_state` = t.`province_state`)) AND ((s.`country_region` = t.`country_region`) AND (s.`level` = CAST(t.`level` AS BIGINT))))), 0, 1588222401301, 100802, denny.lee@databricks.com)
"]}}],"execution_count":32},{"cell_type":"code","source":["# Add Information:\n# Notice the two rows under `stats`: one noting the 39 records inserted and one noting the 1 record updated\ndisplay(tl_1.select(\"add\").where(\"add is not null\"))"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["
add
List(true, 1588222398000, part-00000-74632cb5-6b7e-4f2f-81b1-7238403c31d9-c000.snappy.parquet, 1463, {\"numRecords\":0,\"minValues\":{},\"maxValues\":{},\"nullCount\":{}})
List(true, 1588222401000, part-00147-b6f679c9-8f22-4673-8bd8-b0442c34fdf1-c000.snappy.parquet, 5746, {\"numRecords\":39,\"minValues\":{\"Province_State\":\"Washington\",\"Country_Region\":\"US\",\"Last_Update\":\"2020-04-11T22:45:33.000Z\",\"Confirmed\":1,\"Deaths\":0,\"Recovered\":0,\"Latitude\":45.77568046,\"Longitude\":-123.92263190000001,\"process_date\":\"2020-04-11\",\"level\":3,\"FIPS\":53001,\"Admin2\":\"Adams\",\"Active\":\"0\",\"Combined_Key\":\"Adams, Washington, US\"},\"maxValues\":{\"Province_State\":\"Washington\",\"Country_Region\":\"US\",\"Last_Update\":\"2020-04-11T22:45:33.000Z\",\"Confirmed\":4262,\"Deaths\":284,\"Recovered\":0,\"Latitude\":48.82227976,\"Longitude\":-117.2022851,\"process_date\":\"2020-04-11\",\"level\":3,\"FIPS\":90053,\"Admin2\":\"Yakima\",\"Active\":\"0\",\"Combined_Key\":\"Yakima, Washington, US\"},\"nullCount\":{\"Province_State\":0,\"Country_Region\":0,\"Last_Update\":0,\"Confirmed\":0,\"Deaths\":0,\"Recovered\":0,\"Latitude\":1,\"Longitude\":1,\"process_date\":0,\"level\":0,\"FIPS\":0,\"Admin2\":0,\"Active\":0,\"Combined_Key\":0}})
List(true, 1588222401000, part-00183-46079500-157e-4369-9256-1a72d4b32d04-c000.snappy.parquet, 3534, {\"numRecords\":1,\"minValues\":{\"Province_State\":\"Washington\",\"Country_Region\":\"US\",\"Last_Update\":\"2020-04-27T19:00:00.000Z\",\"Confirmed\":1793,\"Deaths\":94,\"Recovered\":0,\"Latitude\":47.4009,\"Longitude\":-121.4905,\"process_date\":\"2020-03-21\",\"level\":2,\"FIPS\":53,\"Admin2\":\"\",\"Active\":\"\",\"Combined_Key\":\"\"},\"maxValues\":{\"Province_State\":\"Washington\",\"Country_Region\":\"US\",\"Last_Update\":\"2020-04-27T19:00:00.000Z\",\"Confirmed\":1793,\"Deaths\":94,\"Recovered\":0,\"Latitude\":47.4009,\"Longitude\":-121.4905,\"process_date\":\"2020-03-21\",\"level\":2,\"FIPS\":53,\"Admin2\":\"\",\"Active\":\"\",\"Combined_Key\":\"\"},\"nullCount\":{\"Province_State\":0,\"Country_Region\":0,\"Last_Update\":0,\"Confirmed\":0,\"Deaths\":0,\"Recovered\":0,\"Latitude\":0,\"Longitude\":0,\"process_date\":0,\"level\":0,\"FIPS\":0,\"Admin2\":0,\"Active\":0,\"Combined_Key\":0}})
"]}}],"execution_count":33},{"cell_type":"markdown","source":["#Join the community!\n\n\n* [Delta Lake on GitHub](https://github.com/delta-io/delta)\n* [Delta Lake Slack Channel](https://delta-users.slack.com/) ([Registration Link](https://join.slack.com/t/delta-users/shared_invite/enQtNTY1NDg0ODcxOTI1LWJkZGU3ZmQ3MjkzNmY2ZDM0NjNlYjE4MWIzYjg2OWM1OTBmMWIxZTllMjg3ZmJkNjIwZmE1ZTZkMmQ0OTk5ZjA))\n* [Public Mailing List](https://groups.google.com/forum/#!forum/delta-users)"],"metadata":{}}],"metadata":{"name":"Schema Evolution in Merge Operations","notebookId":6697717},"nbformat":4,"nbformat_minor":0}