## Schema Evolution in Merge Operations
This notebook provides an example of how to perform schema evolution in merge operations for the [2019 Novel Coronavirus COVID-19 (2019-nCoV) Data Repository by Johns Hopkins CSSE](https://github.com/CSSEGISandData/COVID-19) dataset.   This primer example allows you to create a Delta Lake table (instead of native Parquet) to track the changes of the this dataset to support the [Johns Hopkins COVID-19 Data Analysis Sample Notebook](https://github.com/databricks/tech-talks/blob/master/samples/JHU%20COVID-19%20Analysis.html).

The data is updated in the `/databricks-datasets/COVID/CSSEGISandData/` location regularly so you can access the data directly.

In [2]:
# PySpark
from pyspark.sql.functions import input_file_name, lit, col
from pyspark.sql.types import IntegerType, StringType

## Create Different DataFrames for Different Schemas
As noted in the [Johns Hopkins COVID-19 Analysis](https://github.com/databricks/tech-talks/blob/master/samples/JHU%20COVID-19%20Analysis.html) notebook, as of this writing, there are three different schemas for this dataset; for this example, we will focus on the last two schema changes.

| id | Schema String List | Date Range | 
| -- | ------------------ | ---------- |
| 1 | `Province/State Country/Region Last Update Confirmed Deaths Recovered Latitude Longitude` | 03-01-2020 to 03-21-2020 |
| 2 | `FIPS Admin2 Province_State Country_Region Last_Update Lat Long_ Confirmed Deaths Recovered Active Combined_Key` | 03-22-2020 to current |

The following is an example of:
* How to run merge operations with schema evolution for representative files of the different schemas
* We will focus on only Washington State data for this example

In [4]:
# File paths
#   Two representative files of the two different schemas
file_1 = '/databricks-datasets/COVID/CSSEGISandData/csse_covid_19_data/csse_covid_19_daily_reports/03-21-2020.csv'
file_2 = '/databricks-datasets/COVID/CSSEGISandData/csse_covid_19_data/csse_covid_19_daily_reports/04-11-2020.csv'

# Schema 1: Representing datasets between 2020-03-01 and 2020-03-21 (8 columns originally)
#   Renaming some of the columns for better standardization
old_data = (spark.read
          .option("inferSchema", True)
          .option("header", True)
          .csv(file_1)
          .withColumnRenamed("Last Update", "Last_Update")
          .withColumnRenamed("Province/State", "Province_State")
          .withColumnRenamed("Country/Region", "Country_Region")
          .withColumn("process_date", lit('2020-03-21'))   # Date determined by the filename, manually entered in this example
          .withColumn("level", lit(2))                     # Specify the level: 1 - Country, 2 - Province/State, 3 - County
          .where("Province_State == 'Washington'"))        # Filter by only Washington State (expected output: 1 row)

# Schema 2: Latest schema representing data from 2020-03-22 onwards (12 columns originally)
#   Renaming some of the columns for better standardization
new_data = (spark.read
          .option("inferSchema", True)
          .option("header", True)
          .csv(file_2)
          .withColumnRenamed("Lat", "Latitude")
          .withColumnRenamed("Long_", "Longitude")
          .withColumn("process_date", lit('2020-04-11'))    # Date determined by the filename, manually entered in this example
          .withColumn("level", lit(3))                      # Specify the level: 1 - Country, 2 - Province/State, 3 - County
          .where("Province_State == 'Washington'"))         # Filter by only Washington State (expected output: 39 rows)

# Notes: Expand each DataFrame below to review the schema

In [5]:
# Old Data Schema
old_data.printSchema()

In [6]:
# New Data Schema
new_data.printSchema()

The difference between these two schemas are:
* Columns that were renamed: `Province/State -> Province_State`, `Country/Region -> Country_Region`, `Latitude -> Lat`, `Longitude -> Long_`.  To resolve this issue, we've standardized the column names
* Columns that were added:
  * `FIPS`: The Federal Information Processing Standard Publication 6-4 (FIPS 6-4) was a five-digit Federal Information Processing Standards code which uniquely identified counties and county equivalents in the United States, certain U.S. possessions, and certain freely associated states ([source](https://en.wikipedia.org/wiki/FIPS_county_code)) that is commonly used for US topological maps. This code has been supplanted with the [INCITS 31 â€“ 2009](https://en.wikipedia.org/wiki/International_Committee_for_Information_Technology_Standards) codes. 
  * `Admin2`: Contains more granular region name, e.g. within the United States this would be the county name.
  * `Combined_Key`: Comma concatenation of `Admin2`, `Province_State`, `Country_Region`.
  * `Active`: Active COVID-19 cases
  
We also added the following columns:
* `process_date`: The date of the confirmed cases (when the tests were processed) which is not in the data itself but within the file name
* `level`: Describing the level of granuality of the data: `old_data` is at the state/province level (`level = 2`) while `new_data` is at the county level (`level = 3`)

### Create File Path for Delta Lake Table
Removing if exists and creating the following file path for our Delta Lake Table

In [9]:
%sh
rm -fR /dbfs/tmp/dennylee/COVID/df_jhu/ && mkdir -p /dbfs/tmp/dennylee/COVID/df_jhu/ && ls -lsgA /dbfs/tmp/dennylee/COVID/df_jhu/

In [10]:
# Create our initial Delta Lake table
DELTA_PATH = "/tmp/dennylee/COVID/df_jhu/"
old_data.write.format("delta").save(DELTA_PATH)

## Simulate an Updated Entry
In this example scenario, on April 27th, 2020 we needed to updated the `Last_Updated` column for data for March 21st, 2020 which was stored in the older schema (`old_data`).

But this **update** entry is included in the `new_data` with a newer schema including:
* An updated `Last_Update` value
* Including the FIPS county code for Washington State

In [12]:
# Simulate an Updated Entry
items = [(53, '', 'Washington', 'US', '2020-04-27T19:00:00', 47.4009, -121.4905, 1793, 94, 0, '', '', '2020-03-21', 2)]
cols = ['FIPS', 'Admin2', 'Province_State', 'Country_Region', 'Last_Update', 'Latitude', 'Longitude', 'Confirmed', 'Deaths', 'Recovered', 'Active', 'Combined_Key', 'process_date', 'level']
simulated_update = spark.createDataFrame(items, cols)

In [13]:
# Add this updated entry into the new_data
new_data = new_data.union(simulated_update)

## Review the Data
Let's review the data for our **Schema Evolution in Merge Operations** example:
1. `old_data` contains the original schema 
2. `new_data` contains a new schema which includes the columns: `FIPS`, `Admin2`, `Active`, `Combined_Key`
3. `new_data` also contains our simulated update entry originally inserted with the old schema (`old_data`) which includes an updated `Last_Update` and `FIPS` value

In [15]:
print("Number of rows for old_data: %s, new_data: %s " % (old_data.count(), new_data.count()))

In [16]:
# In the old data, there is a single row with Washington state with 10 columns
display(old_data.where(col("Province_State") == "Washington"))

Province_State,Country_Region,Last_Update,Confirmed,Deaths,Recovered,Latitude,Longitude,process_date,level
Washington,US,2020-03-21T22:43:04.000+0000,1793,94,0,47.4009,-121.4905,2020-03-21,2


In [17]:
# In the second DataFrame, there are:
# - multiple rows with Washington State
# - It now contains 14 columns
# - An additional row for , we've updated the `Last_Update` value
# - Note this row has level = 2 (other rows has level = 3)
display(new_data.where(col("Province_State") == "Washington").sort(col("FIPS")))

FIPS,Admin2,Province_State,Country_Region,Last_Update,Latitude,Longitude,Confirmed,Deaths,Recovered,Active,Combined_Key,process_date,level
53,,Washington,US,2020-04-27T19:00:00,47.4009,-121.4905,1793,94,0,,,2020-03-21,2
53001,Adams,Washington,US,2020-04-11 22:45:33,46.98299757,-118.5601734,30,0,0,0.0,"Adams, Washington, US",2020-04-11,3
53003,Asotin,Washington,US,2020-04-11 22:45:33,46.18894415,-117.2022851,4,0,0,0.0,"Asotin, Washington, US",2020-04-11,3
53005,Benton,Washington,US,2020-04-11 22:45:33,46.23946995,-119.5120834,244,25,0,0.0,"Benton, Washington, US",2020-04-11,3
53007,Chelan,Washington,US,2020-04-11 22:45:33,47.87046092,-120.6173956,53,5,0,0.0,"Chelan, Washington, US",2020-04-11,3
53009,Clallam,Washington,US,2020-04-11 22:45:33,48.04754642,-123.9226319,11,0,0,0.0,"Clallam, Washington, US",2020-04-11,3
53011,Clark,Washington,US,2020-04-11 22:45:33,45.77568046,-122.4829204,220,14,0,0.0,"Clark, Washington, US",2020-04-11,3
53013,Columbia,Washington,US,2020-04-11 22:45:33,46.29442881,-117.9051983,1,0,0,0.0,"Columbia, Washington, US",2020-04-11,3
53015,Cowlitz,Washington,US,2020-04-11 22:45:33,46.19074721,-122.6782231,23,0,0,0.0,"Cowlitz, Washington, US",2020-04-11,3
53017,Douglas,Washington,US,2020-04-11 22:45:33,47.7361335,-119.692937,16,0,0,0.0,"Douglas, Washington, US",2020-04-11,3


## Schema Evolution?
We could potentially use `new_data.write.option("merge", "true").mode("append").save(path)` to merge the schemas but `new_data` also contains data that has to be updated in the original table.  One approach could be that you:
* Run the `merge` as one operation
* Run the `schema evolution` as another operation

Or, we could do this as a single operation by **[Automatic Schema Evolution](https://docs.delta.io/latest/delta-update.html#automatic-schema-evolution)**

In [19]:
# Automatic Schema Evolution
spark.sql("SET spark.databricks.delta.schema.autoMerge.enabled = true")

In [20]:
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, DELTA_PATH)

# Schema Evolution with a Merge Operation
deltaTable.alias("t").merge(
  new_data.alias("s"),
  "s.process_date = t.process_date AND s.province_state = t.province_state AND s.country_region = t.country_region AND s.level = t.level"
).whenMatchedUpdateAll(  
).whenNotMatchedInsertAll(
).execute()

## Review the Delta Lake Table

In [22]:
# Load the data
df = spark.read.format("delta").load(DELTA_PATH)

In [23]:
print("Number of rows: %s" % df.count())

Recall this is expected as the `old_data` contained 1 row, `new_data` contained 40 rows, <br/>
but one of the rows contained a simulated row to update the values previously inserted into the Delta Table by `old_data`.

In [25]:
display(df.sort(col("FIPS")))

Province_State,Country_Region,Last_Update,Confirmed,Deaths,Recovered,Latitude,Longitude,process_date,level,FIPS,Admin2,Active,Combined_Key
Washington,US,2020-04-27T19:00:00.000+0000,1793,94,0,47.4009,-121.4905,2020-03-21,2,53,,,
Washington,US,2020-04-11T22:45:33.000+0000,30,0,0,46.98299757,-118.5601734,2020-04-11,3,53001,Adams,0.0,"Adams, Washington, US"
Washington,US,2020-04-11T22:45:33.000+0000,4,0,0,46.18894415,-117.2022851,2020-04-11,3,53003,Asotin,0.0,"Asotin, Washington, US"
Washington,US,2020-04-11T22:45:33.000+0000,244,25,0,46.23946995,-119.5120834,2020-04-11,3,53005,Benton,0.0,"Benton, Washington, US"
Washington,US,2020-04-11T22:45:33.000+0000,53,5,0,47.87046092,-120.6173956,2020-04-11,3,53007,Chelan,0.0,"Chelan, Washington, US"
Washington,US,2020-04-11T22:45:33.000+0000,11,0,0,48.04754642,-123.9226319,2020-04-11,3,53009,Clallam,0.0,"Clallam, Washington, US"
Washington,US,2020-04-11T22:45:33.000+0000,220,14,0,45.77568046,-122.4829204,2020-04-11,3,53011,Clark,0.0,"Clark, Washington, US"
Washington,US,2020-04-11T22:45:33.000+0000,1,0,0,46.29442881,-117.9051983,2020-04-11,3,53013,Columbia,0.0,"Columbia, Washington, US"
Washington,US,2020-04-11T22:45:33.000+0000,23,0,0,46.19074721,-122.6782231,2020-04-11,3,53015,Cowlitz,0.0,"Cowlitz, Washington, US"
Washington,US,2020-04-11T22:45:33.000+0000,16,0,0,47.7361335,-119.692937,2020-04-11,3,53017,Douglas,0.0,"Douglas, Washington, US"


As expected, there are 40 rows with the `level = 2` containing an updated `Last_Update` value thus
* The Delta Lake table schema evolved from 10 columns to 14 columns
* A single row value was updated 

All of this occured in a single atomic operation as noted in the history below.

In [27]:
display(deltaTable.history())

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics
1,2020-04-30T04:53:22.000+0000,100802,denny.lee@databricks.com,MERGE,Map(predicate -> (((s.`process_date` = t.`process_date`) AND (s.`province_state` = t.`province_state`)) AND ((s.`country_region` = t.`country_region`) AND (s.`level` = CAST(t.`level` AS BIGINT))))),,List(25384224),0430-041257-sided264,0.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 3, numTargetRowsInserted -> 39, numTargetRowsUpdated -> 1, numOutputRows -> 40, numSourceRows -> 40, numTargetFilesRemoved -> 1)"
0,2020-04-30T04:45:00.000+0000,100802,denny.lee@databricks.com,WRITE,"Map(mode -> ErrorIfExists, partitionBy -> [])",,List(25384224),0430-041257-sided264,,WriteSerializable,True,"Map(numFiles -> 1, numOutputBytes -> 2723, numOutputRows -> 1)"


## Review the Operational Metrics
Note the `operationMetrics` column for this example:
* `numTargetRowsInserted: 39` were the number of rows added (with the new schema)
* `numTargetRowsUpdated: 1` were the number of rows updated (with the old schema)

**Important:** Don't forget to review the SQL tab of the SQL UI to better understand the internals (it should look similar to the animated GIF below)

![](https://raw.githubusercontent.com/databricks/tech-talks/master/images/schema-evolution_merge-operation-spark-ui-sql-tab-5.gif)

## Review the Transaction Log
Let's take a quick look at the transaction log

In [30]:
%sh
ls -lsgA /dbfs/tmp/dennylee/COVID/df_jhu/_delta_log

In [31]:
tl_1 = spark.read.json("/tmp/dennylee/COVID/df_jhu/_delta_log/00000000000000000001.json")

In [32]:
# Commit Information: Note the operations metrics
display(tl_1.select("commitInfo").where("commitInfo is not null"))

commitInfo
"List(0430-041257-sided264, false, WriteSerializable, List(25384224), MERGE, List(40, 40, 3, 1, 0, 0, 39, 1), List((((s.`process_date` = t.`process_date`) AND (s.`province_state` = t.`province_state`)) AND ((s.`country_region` = t.`country_region`) AND (s.`level` = CAST(t.`level` AS BIGINT))))), 0, 1588222401301, 100802, denny.lee@databricks.com)"


In [33]:
# Add Information:
#  Notice the two rows under `stats`: one noting the 39 records inserted and one noting the 1 record updated
display(tl_1.select("add").where("add is not null"))

add
"List(true, 1588222398000, part-00000-74632cb5-6b7e-4f2f-81b1-7238403c31d9-c000.snappy.parquet, 1463, {""numRecords"":0,""minValues"":{},""maxValues"":{},""nullCount"":{}})"
"List(true, 1588222401000, part-00147-b6f679c9-8f22-4673-8bd8-b0442c34fdf1-c000.snappy.parquet, 5746, {""numRecords"":39,""minValues"":{""Province_State"":""Washington"",""Country_Region"":""US"",""Last_Update"":""2020-04-11T22:45:33.000Z"",""Confirmed"":1,""Deaths"":0,""Recovered"":0,""Latitude"":45.77568046,""Longitude"":-123.92263190000001,""process_date"":""2020-04-11"",""level"":3,""FIPS"":53001,""Admin2"":""Adams"",""Active"":""0"",""Combined_Key"":""Adams, Washington, US""},""maxValues"":{""Province_State"":""Washington"",""Country_Region"":""US"",""Last_Update"":""2020-04-11T22:45:33.000Z"",""Confirmed"":4262,""Deaths"":284,""Recovered"":0,""Latitude"":48.82227976,""Longitude"":-117.2022851,""process_date"":""2020-04-11"",""level"":3,""FIPS"":90053,""Admin2"":""Yakima"",""Active"":""0"",""Combined_Key"":""Yakima, Washington, US""},""nullCount"":{""Province_State"":0,""Country_Region"":0,""Last_Update"":0,""Confirmed"":0,""Deaths"":0,""Recovered"":0,""Latitude"":1,""Longitude"":1,""process_date"":0,""level"":0,""FIPS"":0,""Admin2"":0,""Active"":0,""Combined_Key"":0}})"
"List(true, 1588222401000, part-00183-46079500-157e-4369-9256-1a72d4b32d04-c000.snappy.parquet, 3534, {""numRecords"":1,""minValues"":{""Province_State"":""Washington"",""Country_Region"":""US"",""Last_Update"":""2020-04-27T19:00:00.000Z"",""Confirmed"":1793,""Deaths"":94,""Recovered"":0,""Latitude"":47.4009,""Longitude"":-121.4905,""process_date"":""2020-03-21"",""level"":2,""FIPS"":53,""Admin2"":"""",""Active"":"""",""Combined_Key"":""""},""maxValues"":{""Province_State"":""Washington"",""Country_Region"":""US"",""Last_Update"":""2020-04-27T19:00:00.000Z"",""Confirmed"":1793,""Deaths"":94,""Recovered"":0,""Latitude"":47.4009,""Longitude"":-121.4905,""process_date"":""2020-03-21"",""level"":2,""FIPS"":53,""Admin2"":"""",""Active"":"""",""Combined_Key"":""""},""nullCount"":{""Province_State"":0,""Country_Region"":0,""Last_Update"":0,""Confirmed"":0,""Deaths"":0,""Recovered"":0,""Latitude"":0,""Longitude"":0,""process_date"":0,""level"":0,""FIPS"":0,""Admin2"":0,""Active"":0,""Combined_Key"":0}})"


#Join the community!


* [Delta Lake on GitHub](https://github.com/delta-io/delta)
* [Delta Lake Slack Channel](https://delta-users.slack.com/) ([Registration Link](https://join.slack.com/t/delta-users/shared_invite/enQtNTY1NDg0ODcxOTI1LWJkZGU3ZmQ3MjkzNmY2ZDM0NjNlYjE4MWIzYjg2OWM1OTBmMWIxZTllMjg3ZmJkNjIwZmE1ZTZkMmQ0OTk5ZjA))
* [Public Mailing List](https://groups.google.com/forum/#!forum/delta-users)