{ "metadata" : { "name" : "Spark Example", "user_save_timestamp" : "1970-01-01T00:00:00.000Z", "auto_save_timestamp" : "1970-01-01T00:00:00.000Z", "language_info" : { "name" : "scala", "file_extension" : "scala", "codemirror_mode" : "text/x-scala" }, "trusted" : true, "customLocalRepo" : null, "customRepos" : null, "customDeps" : null, "customImports" : null, "customArgs" : null, "customSparkConf" : null }, "cells" : [ { "metadata" : { }, "cell_type" : "markdown", "source" : "Pointers to directories and files." }, { "metadata" : { "trusted" : true, "input_collapsed" : false, "collapsed" : false }, "cell_type" : "code", "source" : "import sys.process._\nval remoteFile = \"http://med-at-scale.s3.amazonaws.com/spark-training/dj.csv\"\nvar dataDir = \"/tmp\"\nval localFile = s\"${dataDir}/dow.csv\"", "outputs" : [ ] }, { "metadata" : { }, "cell_type" : "markdown", "source" : "We download this file" }, { "metadata" : { "trusted" : true, "input_collapsed" : false, "collapsed" : false }, "cell_type" : "code", "source" : "s\"wget $remoteFile -O $localFile\" !!", "outputs" : [ ] }, { "metadata" : { "trusted" : true, "input_collapsed" : false, "collapsed" : false }, "cell_type" : "code", "source" : "sparkContext.getConf.toDebugString", "outputs" : [ ] }, { "metadata" : { "trusted" : true, "input_collapsed" : false, "collapsed" : false }, "cell_type" : "code", "source" : "val lines = sparkContext.textFile(localFile)", "outputs" : [ ] }, { "metadata" : { "trusted" : true, "input_collapsed" : false, "collapsed" : false }, "cell_type" : "code", "source" : "lines.take(4)", "outputs" : [ ] }, { "metadata" : { }, "cell_type" : "markdown", "source" : "The case classes we need to define the schema and parsers" }, { "metadata" : { "trusted" : true, "input_collapsed" : false, "collapsed" : false }, "cell_type" : "code", "source" : "object model extends Serializable {\n\n object MyDate {\n val df = new java.text.SimpleDateFormat(\"yyyy-MM-dd\")\n def parse(field: String): Option[MyDate] = {\n try {\n val ts = df.parse(field).getTime\n field.split(\"-\").map(_.toInt).toList match {\n case year :: month :: day :: _ => Some(MyDate(year, month, day, ts))\n case _ => None\n }\n } catch {\n case ex: Throwable => \n Console.err.println(s\"$ex: datefield = $field\")\n None\n }\n }\n }\n case class MyDate(year: Int, month: Int, day: Int, timestamp: Long)\n \n\n object Quote {\n def parse(line: String): Option[Quote] = {\n val fields = line.trim.split(\"\"\"\\s*,\\s*\"\"\")\n try {\n MyDate.parse(fields(1)).map { date => Quote(fields(0), date, fields(2).toDouble)} \n } catch {\n case ex: NumberFormatException =>\n Console.err.println(s\"$ex: line = $line\")\n None\n case ex: IndexOutOfBoundsException =>\n Console.err.println(s\"$ex: line = $line\")\n None\n }\n }\n }\n case class Quote(stock:String, date:MyDate, price:Double)\n\n}\nimport model._", "outputs" : [ ] }, { "metadata" : { }, "cell_type" : "markdown", "source" : "Parsing the file and convert to Quote objects" }, { "metadata" : { "trusted" : true, "input_collapsed" : false, "collapsed" : false }, "cell_type" : "code", "source" : "val quotes = lines.map(Quote.parse).collect{case Some(q) => q}", "outputs" : [ ] }, { "metadata" : { }, "cell_type" : "markdown", "source" : "Import SQLContext (wrapper around SparkContext to give access to sparkSQL functions)\nCreate the SQLContext\nLoad some implicit functions " }, { "metadata" : { "trusted" : true, "input_collapsed" : false, "collapsed" : false }, "cell_type" : "code", "source" : "import org.apache.spark.sql.SQLContext\nval sqlContext = new SQLContext(sparkContext)\nimport sqlContext.implicits._ ", "outputs" : [ ] }, { "metadata" : { }, "cell_type" : "markdown", "source" : "Now we crrate a dataframe from the RDD, schema is built from the case class definition, including nested structure" }, { "metadata" : { "trusted" : true, "input_collapsed" : false, "collapsed" : false }, "cell_type" : "code", "source" : "val quotesdf = sqlContext.createDataFrame(quotes)", "outputs" : [ ] }, { "metadata" : { }, "cell_type" : "markdown", "source" : "Print the Dataframe schema" }, { "metadata" : { "trusted" : true, "input_collapsed" : false, "collapsed" : false }, "cell_type" : "code", "source" : "quotesdf.printSchema", "outputs" : [ ] }, { "metadata" : { }, "cell_type" : "markdown", "source" : "We change the column names by creating a new dataframe" }, { "metadata" : { "trusted" : true, "input_collapsed" : false, "collapsed" : false }, "cell_type" : "code", "source" : "val ts = quotesdf.select(\"date.timestamp\").map(_.getAs[Long](0)).distinct.collect.toList.sorted\nval withNextTs = ts.sliding(2, 1).map(x => (x(0) → x(1))).toList", "outputs" : [ ] }, { "metadata" : { "trusted" : true, "input_collapsed" : false, "collapsed" : true }, "cell_type" : "code", "source" : "quotesdf.count", "outputs" : [ ] }, { "metadata" : { "trusted" : true, "input_collapsed" : false, "collapsed" : false }, "cell_type" : "code", "source" : "val scoped = new Serializable {\n \n @transient val wnt = withNextTs\n \n val bc = sparkContext.broadcast(wnt)\n \n val updatedQuotes:Dataset[(String, Long, Double)] = \n quotesdf\n .withColumn(\"ts\", $\"date.timestamp\")\n .flatMap { row: Row =>\n val ts = row.getAs[Long](3)\n val wnt = bc.value\n wnt.find(_._2 == ts) match {\n case None => List.empty[(String, Long, Double)]\n case Some((previousTs, _)) => \n val thisUpdated = (row.getAs[String](0), ts, -1 * row.getAs[Double](2)) \n val newRow = (row.getAs[String](0), previousTs, row.getAs[Double](2)) \n \n if (!wnt.find(_._1 == ts).isDefined)\n List(newRow)\n else if (!wnt.find(_._2 == previousTs).isDefined)\n List(thisUpdated)\n else\n List(newRow, thisUpdated)\n }\n }\n val df = updatedQuotes.toDF(\"symbol\", \"ts\", \"close\")\n}\nval duplicatedWithPreviousTs = scoped.df\n\nimport org.apache.spark.sql.functions._\nval diffQuotes = duplicatedWithPreviousTs.groupBy(\"symbol\", \"ts\").agg(sum(\"close\").as(\"diff_close\"))", "outputs" : [ ] }, { "metadata" : { "trusted" : true, "input_collapsed" : false, "collapsed" : false }, "cell_type" : "code", "source" : ":markdown \nWe have ${diffQuotes.count} element", "outputs" : [ ] }, { "metadata" : { }, "cell_type" : "markdown", "source" : "Write data to parquet and json" }, { "metadata" : { "trusted" : true, "input_collapsed" : false, "collapsed" : false }, "cell_type" : "code", "source" : "diffQuotes.write.parquet(s\"$dataDir/dow.parquet\")", "outputs" : [ ] }, { "metadata" : { "trusted" : true, "input_collapsed" : false, "collapsed" : false }, "cell_type" : "code", "source" : "diffQuotes.write.json(s\"$dataDir/quotes.json\")", "outputs" : [ ] }, { "metadata" : { }, "cell_type" : "markdown", "source" : "Filter rows" }, { "metadata" : { "trusted" : true, "input_collapsed" : false, "collapsed" : false }, "cell_type" : "code", "source" : "val ibm = diffQuotes.filter($\"symbol\" === \"IBM\" && $\"diff_close\" < -10).orderBy($\"diff_close\".asc)", "outputs" : [ ] }, { "metadata" : { "trusted" : true, "input_collapsed" : false, "collapsed" : false }, "cell_type" : "code", "source" : "val ko = diffQuotes.filter($\"symbol\" === \"KO\" && $\"diff_close\" < -10)", "outputs" : [ ] }, { "metadata" : { "trusted" : true, "input_collapsed" : false, "collapsed" : false }, "cell_type" : "code", "source" : "quotesdf.filter($\"stock\" === \"KO\").agg(max(\"price\"), min(\"price\"))", "outputs" : [ ] }, { "metadata" : { }, "cell_type" : "markdown", "source" : "Dataframes can be cached too" }, { "metadata" : { "trusted" : true, "input_collapsed" : false, "collapsed" : false }, "cell_type" : "code", "source" : "ibm.cache()", "outputs" : [ ] }, { "metadata" : { }, "cell_type" : "markdown", "source" : "Create a grouping" }, { "metadata" : { "trusted" : true, "input_collapsed" : false, "collapsed" : false }, "cell_type" : "code", "source" : "val bySymbol = diffQuotes.groupBy(\"symbol\")", "outputs" : [ ] }, { "metadata" : { }, "cell_type" : "markdown", "source" : "Apply some aggregation on the grouping" }, { "metadata" : { "trusted" : true, "input_collapsed" : false, "collapsed" : false }, "cell_type" : "code", "source" : "bySymbol.count.orderBy($\"count\".desc)", "outputs" : [ ] }, { "metadata" : { }, "cell_type" : "markdown", "source" : "Apply several aggregations on the grouping" }, { "metadata" : { "trusted" : true, "input_collapsed" : false, "collapsed" : false }, "cell_type" : "code", "source" : "import org.apache.spark.sql.functions._\nbySymbol.agg(\n count(\"diff_close\").as(\"count\"), \n min(\"diff_close\").as(\"min\"), \n max(\"diff_close\").as(\"max\"), \n mean(\"diff_close\").as(\"avg.\")\n)", "outputs" : [ ] }, { "metadata" : { }, "cell_type" : "markdown", "source" : "### Now work with SQL..." }, { "metadata" : { "trusted" : true, "input_collapsed" : false, "collapsed" : false }, "cell_type" : "code", "source" : "val data = sqlContext.read.parquet(s\"$dataDir/dow.parquet\")", "outputs" : [ ] }, { "metadata" : { "trusted" : true, "input_collapsed" : false, "collapsed" : false }, "cell_type" : "code", "source" : "data.registerTempTable(\"quote\")\ndata.cache()\n()", "outputs" : [ ] }, { "metadata" : { "trusted" : true, "input_collapsed" : false, "collapsed" : false }, "cell_type" : "code", "source" : "sqlContext.sql(\"\"\"\n SELECT s.symbol, s.ts, s.diff_close \n FROM quote s \n WHERE symbol = 'IBM' ORDER BY s.ts ASC\n\"\"\")", "outputs" : [ ] }, { "metadata" : { "trusted" : true, "input_collapsed" : false, "collapsed" : false }, "cell_type" : "code", "source" : "sqlContext.sql(\"\"\"\n SELECT q.symbol AS symbol, count(*) as count \n FROM quote q GROUP BY q.symbol \n ORDER BY count DESC\n\"\"\")", "outputs" : [ ] } ], "nbformat" : 4 }