Pointers to directories and files.

In [ ]:
import sys.process._
val remoteFile = "http://med-at-scale.s3.amazonaws.com/spark-training/dj.csv"
var dataDir = "/tmp"
val localFile = s"${dataDir}/dow.csv"

We download this file

In [ ]:
s"wget $remoteFile -O $localFile" !!

In [ ]:
sparkContext.getConf.toDebugString

In [ ]:
val lines = sparkContext.textFile(localFile)

In [ ]:
lines.take(4)

The case classes we need to define the schema and parsers

In [ ]:
object model extends Serializable {

  object MyDate {
    val df = new java.text.SimpleDateFormat("yyyy-MM-dd")
    def parse(field: String): Option[MyDate] = {
      try {
        val ts = df.parse(field).getTime
        field.split("-").map(_.toInt).toList match {
          case year :: month :: day :: _ => Some(MyDate(year, month, day, ts))
          case _ => None
        }
      } catch {
        case ex: Throwable => 
          Console.err.println(s"$ex: datefield = $field")
        None
      }
    }
  }
  case class MyDate(year: Int, month: Int, day: Int, timestamp: Long)
  

  object Quote {
    def parse(line: String): Option[Quote] = {
      val fields = line.trim.split("""\s*,\s*""")
      try {
        MyDate.parse(fields(1)).map { date => Quote(fields(0), date, fields(2).toDouble)}                
      } catch {
        case ex: NumberFormatException =>
          Console.err.println(s"$ex: line = $line")
          None
        case ex: IndexOutOfBoundsException =>
          Console.err.println(s"$ex: line = $line")
          None
      }
    }
  }
  case class Quote(stock:String, date:MyDate, price:Double)

}
import model._

Parsing the file and convert to Quote objects

In [ ]:
val quotes = lines.map(Quote.parse).collect{case Some(q) => q}

Import SQLContext (wrapper around SparkContext to give access to sparkSQL functions)
Create the SQLContext
Load some implicit functions 

In [ ]:
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sparkContext)
import sqlContext.implicits._ 

Now we crrate a dataframe from the RDD, schema is built from the case class definition, including nested structure

In [ ]:
val quotesdf = sqlContext.createDataFrame(quotes)

Print the Dataframe schema

In [ ]:
quotesdf.printSchema

We change the column names by creating a new dataframe

In [ ]:
val ts = quotesdf.select("date.timestamp").map(_.getAs[Long](0)).distinct.collect.toList.sorted
val withNextTs = ts.sliding(2, 1).map(x => (x(0) â†’ x(1))).toList

In [ ]:
quotesdf.count

In [ ]:
val scoped = new Serializable {
  
  @transient val wnt = withNextTs
  
  val bc = sparkContext.broadcast(wnt)
  
  val updatedQuotes:Dataset[(String, Long, Double)] = 
      quotesdf
      .withColumn("ts", $"date.timestamp")
      .flatMap { row: Row =>
        val ts = row.getAs[Long](3)
        val wnt = bc.value
        wnt.find(_._2 == ts) match {
          case None => List.empty[(String, Long, Double)]
          case Some((previousTs, _)) => 
            val thisUpdated = (row.getAs[String](0), ts, -1 * row.getAs[Double](2)) 
            val newRow = (row.getAs[String](0), previousTs, row.getAs[Double](2)) 
            
            if (!wnt.find(_._1 == ts).isDefined)
              List(newRow)
            else if (!wnt.find(_._2 == previousTs).isDefined)
              List(thisUpdated)
            else
              List(newRow, thisUpdated)
        }
      }
  val df = updatedQuotes.toDF("symbol", "ts", "close")
}
val duplicatedWithPreviousTs = scoped.df

import org.apache.spark.sql.functions._
val diffQuotes = duplicatedWithPreviousTs.groupBy("symbol", "ts").agg(sum("close").as("diff_close"))

In [ ]:
:markdown 
We have ${diffQuotes.count} element

Write data to parquet and json

In [ ]:
diffQuotes.write.parquet(s"$dataDir/dow.parquet")

In [ ]:
diffQuotes.write.json(s"$dataDir/quotes.json")

Filter rows

In [ ]:
val ibm = diffQuotes.filter($"symbol" === "IBM" && $"diff_close" < -10).orderBy($"diff_close".asc)

In [ ]:
val ko = diffQuotes.filter($"symbol" === "KO" && $"diff_close" < -10)

In [ ]:
quotesdf.filter($"stock" === "KO").agg(max("price"), min("price"))

Dataframes can be cached too

In [ ]:
ibm.cache()

Create a grouping

In [ ]:
val bySymbol = diffQuotes.groupBy("symbol")

Apply some aggregation on the grouping

In [ ]:
bySymbol.count.orderBy($"count".desc)

Apply several aggregations on the grouping

In [ ]:
import org.apache.spark.sql.functions._
bySymbol.agg(
  count("diff_close").as("count"), 
  min("diff_close").as("min"), 
  max("diff_close").as("max"), 
  mean("diff_close").as("avg.")
)

### Now work with SQL...

In [ ]:
val data = sqlContext.read.parquet(s"$dataDir/dow.parquet")

In [ ]:
data.registerTempTable("quote")
data.cache()
()

In [ ]:
sqlContext.sql("""
  SELECT s.symbol, s.ts, s.diff_close 
  FROM quote s 
  WHERE symbol = 'IBM' ORDER BY s.ts ASC
""")

In [ ]:
sqlContext.sql("""
 SELECT q.symbol AS symbol, count(*) as count 
 FROM quote q GROUP BY q.symbol 
 ORDER BY count DESC
""")