# Doric

<img src="images/dorict.png" alt="drawing" width="200"/>

## Spark setup

In [1]:
import $ivy.`org.apache.spark::spark-sql:3.1.1`
import $ivy.`org.typelevel::cats-core:2.3.0`
import $ivy.`com.lihaoyi::sourcecode:0.2.6`

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.{functions => f}

[32mimport [39m[36m$ivy.$                                  
[39m
[32mimport [39m[36m$ivy.$                               
[39m
[32mimport [39m[36m$ivy.$                              

[39m
[32mimport [39m[36morg.apache.spark._
[39m
[32mimport [39m[36morg.apache.spark.sql._
[39m
[32mimport [39m[36morg.apache.spark.sql.{functions => f}[39m

> There is no problem in combining conventional Spark column expressions and doric columns. However, to avoid name clashes, we will use the prefix `f` for the former ones.

In [4]:
val spark = org.apache.spark.sql.SparkSession.builder().appName("test").master("local").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@7c5776d5

In [5]:
import spark.implicits._

[32mimport [39m[36mspark.implicits._[39m

In [7]:
val userDF = List(
("Foo", "Madrid", 35),
("Bar", "New York", 40),
("John", "Paris", 30)
).toDF("name_user", "city_user", "age_user")

[36muserDF[39m: [32mDataFrame[39m = [name_user: string, city_user: string ... 1 more field]

In [8]:
def userS(colName: String): Column =
  f.col(colName + "_user")

defined [32mfunction[39m [36muserS[39m

In [9]:
val userW = userS("name1") //wrong column :S
scala.util.Try(userDF.select(userW)).fold(_.printStackTrace, identity)

org.apache.spark.sql.AnalysisException: cannot resolve '`name1_user`' given input columns: [age_user, city_user, name_user];
'Project ['name1_user]
+- Project [_1#3 AS name_user#10, _2#4 AS city_user#11, _3#5 AS age_user#12]
   +- LocalRelation [_1#3, _2#4, _3#5]

	at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:155)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:152)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:341)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:73)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:341)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUp$1(QueryPlan.scala:104

	at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)


[36muserW[39m: [32mColumn[39m = name1_user
[36mres8_1[39m: [32mAny[39m = ()

## Doric setup

In [None]:
import $ivy.`org.hablapps::doric:0.0.1`

Specific _doric_ imports:

In [None]:
import doric._

## Some compelling reasons for using `doric`

Here it's a list of use cases for doric: 
* Case 1: Get rid of malformed column expressions at compile time 
* Case 2: Avoid implicit type castings
* Case 3: Run DataFrames only when it is safe to do so
* Case 4: Get all errors at once
* Case 5: Modularize your business logic 

Let's start!

## Case 1: Get rid of malformed column expressions at compile time

We can't mix apples and oranges, and Spark knows that. For instance, Spark complains if we try to add integers with booleans:

In [None]:
val df = List(1,2,3).toDF.select($"value" * f.lit(true))

But it complains too late, with an exception raised at runtime. If we delay the creation of the DataFrame, the error dissapears ...  

In [None]:
def df = List(1,2,3).toDF.select($"value" * f.lit(true))

... momentarily, until we eventually invoke that code:

In [None]:
df

Using doric, there is no need to wait so long: errors will be reported at compile-time!

In [None]:
// This doesn't compile
def df = List(1,2,3).toDF.select(col[Int]("value") * lit(true))

Changes in column expressions are minimal: just annotate column references with the intended type, i.e. `col[Int]("value")`, instead of a plain `col("value")`. If you are not used to generic parameters, aliases `colInt`, `colString`, etc., are also available. We will use these aliases in the sequel.

> Naturally, this only works if you, the programmer, know the intended type of the column at compile-time. In a pure dynamic setting, doric is useless. Note, however, that you don't need to know in advance the whole row type as with `Dataset`s. In this way, doric sits between a whole-hearted static setting and a purely dynamic one. It offers type-safety at a minimum cost, without compromising performance.

We can also use doric columns within the context of a `withColumn` expression, or, in general, wherever we use plain columns: `join`, `filter`, etc.:

In [None]:
List(1,2,3).toDF.withColumn("other", colInt("value") * lit(1))
List(1,2,3).toDF.filter(colInt("value") > lit(3))

Join expressions are explained in a separate [notebook](joins.ipynb) in more detail.

## Case 2: Explicitly avoid implicit type castings

Implicit type conversions in Spark are pervasive. For instance, the following code won't cause Spark to complain at all:

In [None]:
val df0 = spark.range(1,10).withColumn("x", f.concat(f.col("id"), f.lit("jander"))) 

which means that an implicit conversion from integer to string is in effect:

In [None]:
df0.select(f.col("x")).show

Assuming that you are certain that your column holds vales of type `bigint`, the same code in doric won't compile:

In [None]:
val df = spark.range(1,10).toDF.withColumn("x", concat(colLong("id"), "jander".lit))

> Note that the Spark type `bigint` corresponds to the Scala type `Long`. The correspondences between Spark and Scala types in doric is the same as the one established in `Dataset`s by `Encoder` instances.

Still, doric will allow you to perform that operation provided that you explicitly enact the conversion:

In [None]:
val df = spark.range(1,10).toDF.withColumn("x", concat(colLong("id").cast[String], "jander".lit))
df.show

Let's also consider the following example:

In [None]:
val dfEq = List((1, "1"), (1, " 1"), (1, " 1 ")).toDF("int", "str")
dfEq.withColumn("eq", f.col("int") === f.col("str"))

What would you expect to be the result? Well, it all depends on the implicit conversion that Spark chooses to apply, if at all: 1) it may return false for the new column, given that the types of both input columns differ, thus choosing to apply no conversion; 2) it may convert the integer column into a string column; 3) it may convert strings to integers. Let's see what happens:

In [None]:
dfEq.show

Option 3 wins, but you can only learn this by trial and error. With doric, you can depart from all this magic and explicitly cast types, if you desired so:

In [None]:
// Option 1, no castings: compile error

In [None]:
dfEq.withColumn("eq", colInt("int") === colString("str")).show

In [None]:
// Option 2, casting from int to string

In [None]:
dfEq.withColumn("eq", colInt("int").cast[String] === colString("str")).show

In [None]:
// Option 3, casting from string to int, not safe!

In [None]:
dfEq.withColumn("eq", colInt("int") === colString("str").unsafeCast[Int]).show

Note that we can't simply `cast` an string to an integer, since this conversion is partial. If the programmer insists in doing this unsafe casting, doric will force her to explicitly acknowledge this fact using the conversion function `unsafeCast`.

> It's all about being explicit in order to enhance readability and avoid unexpected behaviours at runtime. Doric is a coding accelerator!

## Case 3: Don't let your DataFrame run if it shouldn't

Let's suppose that your DataFrame contains a reference to a non-existing column. No problem, Spark will detect that and will complain with an exception at runtime:

In [None]:
List(1,2,3).toDF.select(f.col("id")+1)

Now, let's assume that the column exists but its type is not what we expected. Spark won't be able to detect that, since type expectations are not encoded in plain columns. Thus, the following code will compile and execute without errors:

In [None]:
val df = List("1","2","three").toDF.select(f.col("value") + 1)

and we will be able to run the DataFrame:

In [None]:
df.show

obtaining `null` values and garbage results, in general.

Using doric we can prevent the creation of the DataFrame, since column expressions are typed: 

In [None]:
val df = List("1","2","three").toDF.select(colInt("value") + 1.lit)

If the column doesn't exist, it will complain with a similar message to that given by Spark:

In [None]:
val df = List("1","2","three").toDF.select(colInt("id") + 1.lit)

But note that the location of the error is also included. This will prove immensely useful, as we will see later on!

## Case 4: Get all errors at once!

Given the following DataFrame: 

In [None]:
val dfadd = List((1,2),(3,4)).toDF("int1", "int2")

let's try to add both columns as follows:

In [None]:
dfadd.withColumn("add", f.col("Int_1") + f.col("Int_2"))

Rightly, Spark complains because column "Int_1" doesn't exist. Let's fix that problem:

In [None]:
dfadd.withColumn("add", f.col("int1") + f.col("Int_2"))

Ooops, another error. Fortunately, this is the last one:

In [None]:
dfadd.withColumn("add", f.col("int1") + f.col("int2"))

But, why didn't Spark give us all errors at once? Well, a plain fail-fast strategy for error reporting is simpler. Unlike Spark, doric won't stop at the first error, and will keep accumulating all errors until no further one is found:

In [None]:
dfadd.withColumn("add", colInt("int_1") + colInt("int_2"))

## Case 5: Modularity FTW!

Let's pretend that our business logic is very complex, and modularised in different functions. For instance:

In [None]:
val col1: Column = f.col("int_1")
val col2: Column = f.col("int2")
val addColumns: Column = col1 + col2

There is an error when referring to the first column and Spark reports it:

In [None]:
dfadd.withColumn("add", addColumns)

But, Spark does not give a clue about the exact source of the error. It marks the error in the `withColumn` method, but the actual problem is elsewhere, in expression `col1`. You have no choice but diving into the code and perform a brute exhaustive search. 

Using doric, we can modularise our code without remorse:

In [None]:
val col1: DoricColumn[Int] = colInt("int_1")
val col2: DoricColumn[Int]  = colString("int2").unsafeCast[Int]
val addColumns: DoricColumn[Int] = col1 + col2

When we attempt to compose the DataFrame: 

In [None]:
dfadd.withColumn("add", addColumns)

we will get not only the errors, but the exact location of the culprit: 

```
habla.doric.DoricMultiError: Found 2 errors in withColumn
	Cannot resolve column name "int_1" among (int1, int2)
		located at . (cmd83.sc:1)
	The column with name 'int2' is of type IntegerType and it was expected to be StringType
		located at . (cmd83.sc:2)
```

As you can see, errors are reported referring to the source files (`cmd83.sc`) and line numbers (`1` and `2`, respectively) where they are located. If you are using an IDE, you will additionally obtain an hyperlink to the error. Isn't that nice? :)