{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\u001b[32mimport \u001b[39m\u001b[36mcoursier._\n", "\n", "\u001b[39m" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import coursier._\n", "\n", "interp.repositories() ++= Seq(MavenRepository(\"https://repo1.maven.org/maven2\"))\n" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\u001b[32mimport \u001b[39m\u001b[36m$ivy.$ \n", "\u001b[39m\n", "\u001b[32mimport \u001b[39m\u001b[36m$ivy.$ \n", "\u001b[39m\n", "\u001b[32mimport \u001b[39m\u001b[36m$ivy.$ \n", "\u001b[39m\n", "\u001b[32mimport \u001b[39m\u001b[36m$ivy.$ \n", "\n", "\u001b[39m\n", "\u001b[32mimport \u001b[39m\u001b[36m$ivy.$ \n", "\n", "\u001b[39m\n", "\u001b[32mimport \u001b[39m\u001b[36morg.apache.log4j.{Level, Logger}\n", "\u001b[39m\n", "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql.functions._\n", "\u001b[39m\n", "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql._\u001b[39m" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import $ivy.`org.apache.spark::spark-sql:2.3.1`\n", "import $ivy.`sh.almond::almond-spark:0.4.0`\n", "import $ivy.`com.github.julien-truffaut::monocle-core:1.5.0`\n", "import $ivy.`com.github.julien-truffaut::monocle-macro:1.5.0`\n", "\n", "import $ivy.`org.hablapps::spark-optics:0.1.0`\n", "\n", "import org.apache.log4j.{Level, Logger}\n", "Logger.getLogger(\"org\").setLevel(Level.OFF)\n", "\n", "import org.apache.spark.sql.functions._\n", "import org.apache.spark.sql._" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Loading spark-stubs\n", "Getting spark JARs\n", "Creating SparkSession\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\n" ] }, { "data": { "text/html": [ "Spark UI" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\u001b[36msparkSession\u001b[39m: \u001b[32mSparkSession\u001b[39m = org.apache.spark.sql.SparkSession@34c7457e" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "val sparkSession = NotebookSparkSession.builder().master(\"local\").appName(\"jupiter\").getOrCreate()\n", "sparkSession.sparkContext.setLogLevel(\"ERROR\")" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "defined \u001b[32mclass\u001b[39m \u001b[36mStreet\u001b[39m\n", "defined \u001b[32mclass\u001b[39m \u001b[36mAddress\u001b[39m\n", "defined \u001b[32mclass\u001b[39m \u001b[36mCompany\u001b[39m\n", "defined \u001b[32mclass\u001b[39m \u001b[36mEmployee\u001b[39m" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)\n", "case class Street(number: Int, name: String)\n", "case class Address(city: String, street: Street)\n", "case class Company(name: String, address: Address)\n", "case class Employee(name: String, company: Company)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "*Spark lenses*\n", "Spark has a columnar format, the columns can be of any basic sql type, integers, floats, strings, timestamps, dates. Also spark allow us to use complex structures, as structs what would be a product in ADT. Also arrays and maps are consider complex types.\n", "In our case we are going to focus in structs only, and to make it easier, first we are going to create a case class that will be our default structure.\n", "Due to issues of creating case classes in jupyter, we already have them precompiled in the project. And they follow the following code:\n", "case class Street(number: Int, name: String)\n", "case class Address(city: String, street: Street)\n", "case class Company(name: String, address: Address)\n", "case class Employee(name: String, company: Company)" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\u001b[36memployee\u001b[39m: \u001b[32mEmployee\u001b[39m = \u001b[33mEmployee\u001b[39m(\n", " \u001b[32m\"john\"\u001b[39m,\n", " \u001b[33mCompany\u001b[39m(\u001b[32m\"awesome inc\"\u001b[39m, \u001b[33mAddress\u001b[39m(\u001b[32m\"london\"\u001b[39m, \u001b[33mStreet\u001b[39m(\u001b[32m23\u001b[39m, \u001b[32m\"high street\"\u001b[39m)))\n", ")" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "//import org.habla.sparklens.{Employee,Company,Address,Street}\n", "val employee = Employee(\"john\", Company(\"awesome inc\", Address(\"london\", Street(23, \"high street\"))))" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\u001b[32mimport \u001b[39m\u001b[36msparkSession.implicits._\n", "\u001b[39m\n", "\u001b[36mdf\u001b[39m: \u001b[32mDataFrame\u001b[39m = [name: string, company: struct>>]" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import sparkSession.implicits._\n", "val df = List(employee).toDS.toDF" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----+--------------------+\n", "|name| company|\n", "+----+--------------------+\n", "|john|[awesome inc, [lo...|\n", "+----+--------------------+\n", "\n", "root\n", " |-- name: string (nullable = true)\n", " |-- company: struct (nullable = true)\n", " | |-- name: string (nullable = true)\n", " | |-- address: struct (nullable = true)\n", " | | |-- city: string (nullable = true)\n", " | | |-- street: struct (nullable = true)\n", " | | | |-- number: integer (nullable = false)\n", " | | | |-- name: string (nullable = true)\n", "\n" ] } ], "source": [ "df.show\n", "df.printSchema" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As you can see, now we have a dataframe representation of the employee:\n", "The name is a string element, and the company is a struct, that also have complex types inside.\n", "Due to the sql oriented api of the dataset api, its hard to modify a single element, keeping the structure the same, eaven for the first level data." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+--------------------+\n", "| name| company|\n", "+-------+--------------------+\n", "|john!!!|[awesome inc, [lo...|\n", "+-------+--------------------+\n", "\n", "root\n", " |-- name: string (nullable = true)\n", " |-- company: struct (nullable = true)\n", " | |-- name: string (nullable = true)\n", " | |-- address: struct (nullable = true)\n", " | | |-- city: string (nullable = true)\n", " | | |-- street: struct (nullable = true)\n", " | | | |-- number: integer (nullable = false)\n", " | | | |-- name: string (nullable = true)\n", "\n" ] }, { "data": { "text/plain": [ "\u001b[36memployeeNameChanged\u001b[39m: \u001b[32mDataFrame\u001b[39m = [name: string, company: struct>>]" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "val employeeNameChanged = df.select(concat(df(\"name\"),lit(\"!!!\")).as(\"name\"),df(\"company\"))\n", "employeeNameChanged.show\n", "employeeNameChanged.printSchema" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And for the structs? Let's try to change the name of the company." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----+--------------------+\n", "|name| company|\n", "+----+--------------------+\n", "|john|[awesome inc!!!, ...|\n", "+----+--------------------+\n", "\n", "root\n", " |-- name: string (nullable = true)\n", " |-- company: struct (nullable = false)\n", " | |-- name: string (nullable = true)\n", " | |-- address: struct (nullable = true)\n", " | | |-- city: string (nullable = true)\n", " | | |-- street: struct (nullable = true)\n", " | | | |-- number: integer (nullable = false)\n", " | | | |-- name: string (nullable = true)\n", "\n" ] }, { "data": { "text/plain": [ "\u001b[36mcompanyNameChanged\u001b[39m: \u001b[32mDataFrame\u001b[39m = [name: string, company: struct>>]" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "val companyNameChanged = df.select(\n", " df(\"name\"),\n", " struct(\n", " concat(df(\"company.name\"),lit(\"!!!\")).as(\"name\"),\n", " df(\"company.address\")\n", " ).as(\"company\")\n", ")\n", "companyNameChanged.show\n", "companyNameChanged.printSchema" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "OMG!! 😱😱😱\n", "We have to keep track of the name of the transformed element, and also for all the parents!!!\n", "But if we had our case classes, how come we do this?" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\u001b[36mres9_0\u001b[39m: \u001b[32mEmployee\u001b[39m = \u001b[33mEmployee\u001b[39m(\n", " \u001b[32m\"john!!!\"\u001b[39m,\n", " \u001b[33mCompany\u001b[39m(\u001b[32m\"awesome inc\"\u001b[39m, \u001b[33mAddress\u001b[39m(\u001b[32m\"london\"\u001b[39m, \u001b[33mStreet\u001b[39m(\u001b[32m23\u001b[39m, \u001b[32m\"high street\"\u001b[39m)))\n", ")\n", "\u001b[36mres9_1\u001b[39m: \u001b[32mEmployee\u001b[39m = \u001b[33mEmployee\u001b[39m(\n", " \u001b[32m\"john\"\u001b[39m,\n", " \u001b[33mCompany\u001b[39m(\u001b[32m\"awesome inc!!!\"\u001b[39m, \u001b[33mAddress\u001b[39m(\u001b[32m\"london\"\u001b[39m, \u001b[33mStreet\u001b[39m(\u001b[32m23\u001b[39m, \u001b[32m\"high street\"\u001b[39m)))\n", ")" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "employee.copy(name = employee.name+\"!!!\")\n", "employee.copy(company = employee.company.copy(name = employee.company.name+\"!!!\"))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Well, it's sorter, but it's still a pain in the back. Luckly we have optics :D" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\u001b[32mimport \u001b[39m\u001b[36mmonocle.Lens\n", "\u001b[39m\n", "\u001b[32mimport \u001b[39m\u001b[36mmonocle.macros.GenLens\n", "\n", "\u001b[39m\n", "\u001b[36mcompany\u001b[39m: \u001b[32mLens\u001b[39m[\u001b[32mEmployee\u001b[39m, \u001b[32mCompany\u001b[39m] = ammonite.$sess.cmd10$Helper$$anon$1@44dfd41d\n", "\u001b[36maddress\u001b[39m: \u001b[32mLens\u001b[39m[\u001b[32mCompany\u001b[39m, \u001b[32mAddress\u001b[39m] = ammonite.$sess.cmd10$Helper$$anon$2@295d28eb\n", "\u001b[36mstreet\u001b[39m: \u001b[32mLens\u001b[39m[\u001b[32mAddress\u001b[39m, \u001b[32mStreet\u001b[39m] = ammonite.$sess.cmd10$Helper$$anon$3@7f95e953\n", "\u001b[36mstreetName\u001b[39m: \u001b[32mLens\u001b[39m[\u001b[32mStreet\u001b[39m, \u001b[32mString\u001b[39m] = ammonite.$sess.cmd10$Helper$$anon$4@69d4a9e5\n", "\u001b[36memployeeStreet\u001b[39m: \u001b[32mmonocle\u001b[39m.\u001b[32mPLens\u001b[39m[\u001b[32mEmployee\u001b[39m, \u001b[32mEmployee\u001b[39m, \u001b[32mString\u001b[39m, \u001b[32mString\u001b[39m] = monocle.PLens$$anon$1@5391f7fe" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import monocle.Lens\n", "import monocle.macros.GenLens\n", "\n", "val company : Lens[Employee, Company] = GenLens[Employee](_.company)\n", "val address : Lens[Company , Address] = GenLens[Company](_.address)\n", "val street : Lens[Address , Street] = GenLens[Address](_.street)\n", "val streetName: Lens[Street , String] = GenLens[Street](_.name)\n", "\n", "val employeeStreet = company composeLens address composeLens street composeLens streetName" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\u001b[36mstreetChanger\u001b[39m: \u001b[32mEmployee\u001b[39m => \u001b[32mEmployee\u001b[39m = \n", "\u001b[36mres11_1\u001b[39m: \u001b[32mEmployee\u001b[39m = \u001b[33mEmployee\u001b[39m(\n", " \u001b[32m\"john\"\u001b[39m,\n", " \u001b[33mCompany\u001b[39m(\u001b[32m\"awesome inc\"\u001b[39m, \u001b[33mAddress\u001b[39m(\u001b[32m\"london\"\u001b[39m, \u001b[33mStreet\u001b[39m(\u001b[32m23\u001b[39m, \u001b[32m\"high street!!!\"\u001b[39m)))\n", ")" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "val streetChanger:Employee => Employee = employeeStreet.modify(_ + \"!!!\")\n", "streetChanger(employee)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "That easy? Wish there was something like this in spark..." ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- name: string (nullable = true)\n", " |-- company: struct (nullable = false)\n", " | |-- name: string (nullable = true)\n", " | |-- address: struct (nullable = false)\n", " | | |-- city: string (nullable = true)\n", " | | |-- street: struct (nullable = false)\n", " | | | |-- number: integer (nullable = true)\n", " | | | |-- name: string (nullable = true)\n", "\n" ] }, { "data": { "text/plain": [ "\u001b[32mimport \u001b[39m\u001b[36morg.hablapps.sparkOptics.Lens\n", "\u001b[39m\n", "\u001b[32mimport \u001b[39m\u001b[36morg.hablapps.sparkOptics.syntax._\n", "\u001b[39m\n", "\u001b[36mlens\u001b[39m: \u001b[32mLens\u001b[39m = Lens(company.address.street.name)\n", "\u001b[36mtransformedDF\u001b[39m: \u001b[32mDataFrame\u001b[39m = [name: string, company: struct>>]\n", "\u001b[36mres14_5\u001b[39m: \u001b[32mEmployee\u001b[39m = \u001b[33mEmployee\u001b[39m(\n", " \u001b[32m\"john\"\u001b[39m,\n", " \u001b[33mCompany\u001b[39m(\u001b[32m\"awesome inc\"\u001b[39m, \u001b[33mAddress\u001b[39m(\u001b[32m\"london\"\u001b[39m, \u001b[33mStreet\u001b[39m(\u001b[32m23\u001b[39m, \u001b[32m\"high street!!!\"\u001b[39m)))\n", ")" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import org.hablapps.sparkOptics.Lens\n", "import org.hablapps.sparkOptics.syntax._\n", "val lens = Lens(\"company.address.street.name\")(df.schema)\n", "val transformedDF = df.select(lens.modify(concat(_,lit(\"!!!\"))):_*)\n", "transformedDF.printSchema\n", "transformedDF.as[Employee].head" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Hold on, explain me that. Start from the begin, make it like monocle.\n", "Ok, lets create our first lens." ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\u001b[32mimport \u001b[39m\u001b[36morg.hablapps.sparkOptics.ProtoLens.ProtoLens\n", "\n", "\u001b[39m\n", "\u001b[36mcompanyProtoLens\u001b[39m: \u001b[32mtypes\u001b[39m.\u001b[32mStructType\u001b[39m => \u001b[32mLens\u001b[39m = \n", "\u001b[36mcompanyLens\u001b[39m: \u001b[32mLens\u001b[39m = Lens(company)" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import org.hablapps.sparkOptics.ProtoLens.ProtoLens\n", "\n", "val companyProtoLens: ProtoLens = Lens(\"company\")\n", "//the name of the column, similar to the \"_.company\" of \"GenLens[Employee](_.company),\n", "//this is the element that we will focus in the structure\n", "val companyLens: Lens = companyProtoLens(df.schema) \n", "//providing the schema, it's similar to the \"Employee\" of \"GenLens[Employee](_.company)\n", "//this is the context of the lens" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First difference with monocle and sparkOptics, monocle, due to the hard typed languaje of scala,\n", "it returns compiling errors if you try to do a `GenLens[Employee](_.unknownField)`. \n", "But spark sql is a dynamic typed, but lenses helps you to make your transformations a little bit more safe. " ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\u001b[32mimport \u001b[39m\u001b[36mscala.util.Try\n", "\u001b[39m\n", "\u001b[36munknownFieldLens\u001b[39m: \u001b[32mtypes\u001b[39m.\u001b[32mStructType\u001b[39m => \u001b[32mLens\u001b[39m = \n", "\u001b[36mres16_2\u001b[39m: \u001b[32mTry\u001b[39m[\u001b[32mLens\u001b[39m] = \u001b[33mFailure\u001b[39m(\n", " java.lang.AssertionError: assertion failed: the column unknownField not found in [name,company]\n", ")" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import scala.util.Try\n", "val unknownFieldLens:ProtoLens = Lens(\"unknownField\")\n", "Try{unknownFieldLens(df.schema)}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It's not a compile error, but it's something! You can create a ProtoLens (a lens only with the column name defined)\n", "and when you try to generate a Lens, it gives you an error, you can't create invalid lenses!\n", "But lets see how we can compose new lenses." ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- name: string (nullable = true)\n", " |-- company: struct (nullable = false)\n", " | |-- name: string (nullable = true)\n", " | |-- address: struct (nullable = false)\n", " | | |-- city: string (nullable = true)\n", " | | |-- street: struct (nullable = false)\n", " | | | |-- number: integer (nullable = true)\n", " | | | |-- name: string (nullable = false)\n", "\n" ] }, { "data": { "text/plain": [ "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql.types.StructType\n", "\u001b[39m\n", "\u001b[36mcompanyL\u001b[39m: \u001b[32mLens\u001b[39m = Lens(company)\n", "\u001b[36mcompanySchema\u001b[39m: \u001b[32mStructType\u001b[39m = \u001b[33mStructType\u001b[39m(\n", " \u001b[33mStructField\u001b[39m(\u001b[32m\"name\"\u001b[39m, StringType, true, {}),\n", " \u001b[33mStructField\u001b[39m(\n", " \u001b[32m\"address\"\u001b[39m,\n", " \u001b[33mStructType\u001b[39m(\n", " \u001b[33mStructField\u001b[39m(\u001b[32m\"city\"\u001b[39m, StringType, true, {}),\n", " \u001b[33mStructField\u001b[39m(\n", " \u001b[32m\"street\"\u001b[39m,\n", " \u001b[33mStructType\u001b[39m(\n", " \u001b[33mStructField\u001b[39m(\u001b[32m\"number\"\u001b[39m, IntegerType, false, {}),\n", " \u001b[33mStructField\u001b[39m(\u001b[32m\"name\"\u001b[39m, StringType, true, {})\n", " ),\n", " true,\n", " {}\n", " )\n", " ),\n", " true,\n", " {}\n", " )\n", ")\n", "\u001b[36maddressL\u001b[39m: \u001b[32mLens\u001b[39m = Lens(address)\n", "\u001b[36maddressSchema\u001b[39m: \u001b[32mStructType\u001b[39m = \u001b[33mStructType\u001b[39m(\n", " \u001b[33mStructField\u001b[39m(\u001b[32m\"city\"\u001b[39m, StringType, true, {}),\n", " \u001b[33mStructField\u001b[39m(\n", " \u001b[32m\"street\"\u001b[39m,\n", " \u001b[33mStructType\u001b[39m(\n", " \u001b[33mStructField\u001b[39m(\u001b[32m\"number\"\u001b[39m, IntegerType, false, {}),\n", " \u001b[33mStructField\u001b[39m(\u001b[32m\"name\"\u001b[39m, StringType, true, {})\n", " ),\n", " true,\n", " {}\n", " )\n", ")\n", "\u001b[36mstreetL\u001b[39m: \u001b[32mLens\u001b[39m = Lens(street)\n", "\u001b[36mstreetSchema\u001b[39m: \u001b[32mStructType\u001b[39m = \u001b[33mStructType\u001b[39m(\n", " \u001b[33mStructField\u001b[39m(\u001b[32m\"number\"\u001b[39m, IntegerType, false, {}),\n", " \u001b[33mStructField\u001b[39m(\u001b[32m\"name\"\u001b[39m, StringType, true, {})\n", ")\n", "\u001b[36mstreetNameL\u001b[39m: \u001b[32mLens\u001b[39m = Lens(name)\n", "\u001b[36memployeeCompanyStreetName\u001b[39m: \u001b[32mLens\u001b[39m = Lens(company.address.street.name)\n", "\u001b[36mmodifiedDF\u001b[39m: \u001b[32mDataFrame\u001b[39m = [name: string, company: struct>>]\n", "\u001b[36mres17_11\u001b[39m: \u001b[32mEmployee\u001b[39m = \u001b[33mEmployee\u001b[39m(\n", " \u001b[32m\"john\"\u001b[39m,\n", " \u001b[33mCompany\u001b[39m(\u001b[32m\"awesome inc\"\u001b[39m, \u001b[33mAddress\u001b[39m(\u001b[32m\"london\"\u001b[39m, \u001b[33mStreet\u001b[39m(\u001b[32m23\u001b[39m, \u001b[32m\"new street name\"\u001b[39m)))\n", ")" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import org.apache.spark.sql.types.StructType\n", "val companyL: Lens = Lens(\"company\")(df.schema)\n", "val companySchema = df.schema.fields.find(_.name == \"company\").get.dataType.asInstanceOf[StructType]\n", "val addressL = Lens(\"address\")(companySchema)\n", "val addressSchema = companySchema.fields.find(_.name == \"address\").get.dataType.asInstanceOf[StructType]\n", "val streetL = Lens(\"street\")(addressSchema)\n", "val streetSchema = addressSchema.fields.find(_.name == \"street\").get.dataType.asInstanceOf[StructType]\n", "val streetNameL = Lens(\"name\")(streetSchema)\n", "val employeeCompanyStreetName = companyL composeLens addressL composeLens streetL composeLens streetNameL\n", "val modifiedDF = df.select(employeeCompanyStreetName.set(lit(\"new street name\")):_*)\n", "modifiedDF.printSchema\n", "modifiedDF.as[Employee].head" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Too much code still, passing all the time the schema of each element...\n", "In spark the schemas are recursive, they not only have the schema of the level, also of all the sub elements.\n", "So we can take advance of the ProtoLenses." ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- name: string (nullable = true)\n", " |-- company: struct (nullable = false)\n", " | |-- name: string (nullable = true)\n", " | |-- address: struct (nullable = false)\n", " | | |-- city: string (nullable = true)\n", " | | |-- street: struct (nullable = false)\n", " | | | |-- number: integer (nullable = true)\n", " | | | |-- name: string (nullable = true)\n", "\n" ] }, { "data": { "text/plain": [ "\u001b[36mshorterLens\u001b[39m: \u001b[32mLens\u001b[39m = Lens(company.address.street.name)\n", "\u001b[36mmodifiedDF\u001b[39m: \u001b[32mDataFrame\u001b[39m = [name: string, company: struct>>]\n", "\u001b[36mres18_3\u001b[39m: \u001b[32mEmployee\u001b[39m = \u001b[33mEmployee\u001b[39m(\n", " \u001b[32m\"john\"\u001b[39m,\n", " \u001b[33mCompany\u001b[39m(\u001b[32m\"awesome inc\"\u001b[39m, \u001b[33mAddress\u001b[39m(\u001b[32m\"london\"\u001b[39m, \u001b[33mStreet\u001b[39m(\u001b[32m23\u001b[39m, \u001b[32m\"HIGH STREET\"\u001b[39m)))\n", ")" ] }, "execution_count": 19, "metadata": {}, "output_type": "execute_result" } ], "source": [ "val shorterLens = \n", "Lens(\"company\")(df.schema) composeProtoLens Lens(\"address\") composeProtoLens Lens(\"street\") composeProtoLens Lens(\"name\") \n", "val modifiedDF = df.select(shorterLens.modify(upper):_*)\n", "modifiedDF.printSchema\n", "modifiedDF.as[Employee].head" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We have created first a Lens, and then compose them with ProtoLenses, in the composition the lens\n", "will extract the schema of the selected element for you, checking if it exist.\n", "Still too much code? You can compose with a syntax closer to spark." ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- name: string (nullable = true)\n", " |-- company: struct (nullable = false)\n", " | |-- name: string (nullable = true)\n", " | |-- address: struct (nullable = false)\n", " | | |-- city: string (nullable = true)\n", " | | |-- street: struct (nullable = false)\n", " | | | |-- number: integer (nullable = true)\n", " | | | |-- name: string (nullable = true)\n", "\n" ] }, { "data": { "text/plain": [ "\u001b[36mflashLens\u001b[39m: \u001b[32mLens\u001b[39m = Lens(company.address.street.name)\n", "\u001b[36mmodifiedDF\u001b[39m: \u001b[32mDataFrame\u001b[39m = [name: string, company: struct>>]\n", "\u001b[36mres19_3\u001b[39m: \u001b[32mEmployee\u001b[39m = \u001b[33mEmployee\u001b[39m(\n", " \u001b[32m\"john\"\u001b[39m,\n", " \u001b[33mCompany\u001b[39m(\u001b[32m\"awesome inc\"\u001b[39m, \u001b[33mAddress\u001b[39m(\u001b[32m\"london\"\u001b[39m, \u001b[33mStreet\u001b[39m(\u001b[32m23\u001b[39m, \u001b[32m\"HIGH STREET\"\u001b[39m)))\n", ")" ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" } ], "source": [ "val flashLens = Lens(\"company.address.street.name\")(df.schema)\n", "val modifiedDF = df.select(flashLens.modify(upper):_*)\n", "modifiedDF.printSchema\n", "modifiedDF.as[Employee].head" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Whant to see how much code whould have been that example?" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- name: string (nullable = true)\n", " |-- company: struct (nullable = false)\n", " | |-- name: string (nullable = true)\n", " | |-- address: struct (nullable = false)\n", " | | |-- city: string (nullable = true)\n", " | | |-- street: struct (nullable = false)\n", " | | | |-- number: integer (nullable = true)\n", " | | | |-- name: string (nullable = true)\n", "\n" ] }, { "data": { "text/plain": [ "\u001b[36mmDF\u001b[39m: \u001b[32mDataFrame\u001b[39m = [name: string, company: struct>>]\n", "\u001b[36mlongCodeEmployee\u001b[39m: \u001b[32mEmployee\u001b[39m = \u001b[33mEmployee\u001b[39m(\n", " \u001b[32m\"john\"\u001b[39m,\n", " \u001b[33mCompany\u001b[39m(\u001b[32m\"awesome inc\"\u001b[39m, \u001b[33mAddress\u001b[39m(\u001b[32m\"london\"\u001b[39m, \u001b[33mStreet\u001b[39m(\u001b[32m23\u001b[39m, \u001b[32m\"HIGH STREET\"\u001b[39m)))\n", ")\n", "\u001b[36mres20_3\u001b[39m: \u001b[32mBoolean\u001b[39m = true" ] }, "execution_count": 21, "metadata": {}, "output_type": "execute_result" } ], "source": [ "val mDF = df.select(df(\"name\"),struct(\n", " df(\"company.name\").as(\"name\"),\n", " struct(\n", " df(\"company.address.city\").as(\"city\"),\n", " struct(\n", " df(\"company.address.street.number\").as(\"number\"),\n", " upper(df(\"company.address.street.name\")).as(\"name\")\n", " ).as(\"street\")\n", " ).as(\"address\")\n", ").as(\"company\"))\n", "mDF.printSchema\n", "val longCodeEmployee = mDF.as[Employee].head\n", "longCodeEmployee == modifiedDF.as[Employee].head" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This is only for a 4 levels depth structure, and each level only 2 elements, imagine for a larger structure. 😱" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Why use this utilities? Why not datasets?\n", "Datasets it's a great api, but it has the problem that it can only work with well defined case classes, and can't work with interfaces.\n", "So when you need to abstract yourself, you only have the dataframe api.\n", "Using protolens, you can interact with common elements of different dataframes, making simple, reusable and clear code.\n", "All your topics from kafka share common metadata fields? create lenses for them." ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\u001b[36mres21\u001b[39m: \u001b[32mStructType\u001b[39m = \u001b[33mStructType\u001b[39m(\n", " \u001b[33mStructField\u001b[39m(\u001b[32m\"name\"\u001b[39m, StringType, true, {}),\n", " \u001b[33mStructField\u001b[39m(\n", " \u001b[32m\"company\"\u001b[39m,\n", " \u001b[33mStructType\u001b[39m(\n", " \u001b[33mStructField\u001b[39m(\u001b[32m\"name\"\u001b[39m, StringType, true, {}),\n", " \u001b[33mStructField\u001b[39m(\n", " \u001b[32m\"address\"\u001b[39m,\n", " \u001b[33mStructType\u001b[39m(\n", " \u001b[33mStructField\u001b[39m(\u001b[32m\"city\"\u001b[39m, StringType, true, {}),\n", " \u001b[33mStructField\u001b[39m(\n", " \u001b[32m\"street\"\u001b[39m,\n", " \u001b[33mStructType\u001b[39m(\u001b[33mStructField\u001b[39m(\u001b[32m\"number\"\u001b[39m, IntegerType, true, {})),\n", " false,\n", " {}\n", " )\n", " ),\n", " false,\n", " {}\n", " )\n", " ),\n", " false,\n", " {}\n", " )\n", ")" ] }, "execution_count": 22, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.select(flashLens.prune(Vector.empty):_*).schema" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\u001b[36mres22\u001b[39m: \u001b[32mStructType\u001b[39m = \u001b[33mStructType\u001b[39m(\n", " \u001b[33mStructField\u001b[39m(\u001b[32m\"name\"\u001b[39m, StringType, true, {}),\n", " \u001b[33mStructField\u001b[39m(\n", " \u001b[32m\"company\"\u001b[39m,\n", " \u001b[33mStructType\u001b[39m(\n", " \u001b[33mStructField\u001b[39m(\u001b[32m\"name\"\u001b[39m, StringType, true, {}),\n", " \u001b[33mStructField\u001b[39m(\n", " \u001b[32m\"address\"\u001b[39m,\n", " \u001b[33mStructType\u001b[39m(\n", " \u001b[33mStructField\u001b[39m(\u001b[32m\"city\"\u001b[39m, StringType, true, {}),\n", " \u001b[33mStructField\u001b[39m(\n", " \u001b[32m\"street\"\u001b[39m,\n", " \u001b[33mStructType\u001b[39m(\n", " \u001b[33mStructField\u001b[39m(\u001b[32m\"number\"\u001b[39m, IntegerType, true, {}),\n", " \u001b[33mStructField\u001b[39m(\u001b[32m\"newName\"\u001b[39m, StringType, true, {})\n", " ),\n", " false,\n", " {}\n", " )\n", " ),\n", " false,\n", " {}\n", " )\n", " ),\n", " false,\n", " {}\n", " )\n", ")" ] }, "execution_count": 23, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.select(flashLens.rename(\"newName\"):_*).schema" ] }, { "cell_type": "code", "execution_count": 34, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\u001b[36mres33\u001b[39m: \u001b[32mString\u001b[39m = \u001b[32m\"high streethigh street\"\u001b[39m" ] }, "execution_count": 34, "metadata": {}, "output_type": "execute_result" } ], "source": [ "flashLens.modifyDF(c => concat(c,c))(df).select(flashLens.get).as[String].head" ] }, { "cell_type": "code", "execution_count": 36, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\u001b[36mres35\u001b[39m: \u001b[32mStructType\u001b[39m = \u001b[33mStructType\u001b[39m(\n", " \u001b[33mStructField\u001b[39m(\u001b[32m\"name\"\u001b[39m, StringType, true, {}),\n", " \u001b[33mStructField\u001b[39m(\n", " \u001b[32m\"company\"\u001b[39m,\n", " \u001b[33mStructType\u001b[39m(\n", " \u001b[33mStructField\u001b[39m(\u001b[32m\"name\"\u001b[39m, StringType, true, {}),\n", " \u001b[33mStructField\u001b[39m(\n", " \u001b[32m\"address\"\u001b[39m,\n", " \u001b[33mStructType\u001b[39m(\n", " \u001b[33mStructField\u001b[39m(\u001b[32m\"city\"\u001b[39m, StringType, true, {}),\n", " \u001b[33mStructField\u001b[39m(\n", " \u001b[32m\"street\"\u001b[39m,\n", " \u001b[33mStructType\u001b[39m(\n", " \u001b[33mStructField\u001b[39m(\u001b[32m\"number\"\u001b[39m, IntegerType, true, {}),\n", " \u001b[33mStructField\u001b[39m(\u001b[32m\"name\"\u001b[39m, StringType, true, {})\n", " ),\n", " false,\n", " {}\n", " )\n", " ),\n", " false,\n", " {}\n", " )\n", " ),\n", " false,\n", " {}\n", " )\n", ")" ] }, "execution_count": 36, "metadata": {}, "output_type": "execute_result" } ], "source": [ "flashLens.modifyDF(c => concat(c,c))(df).schema" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Scala", "language": "scala", "name": "scala" }, "language_info": { "codemirror_mode": "text/x-scala", "file_extension": ".scala", "mimetype": "text/x-scala", "name": "scala", "nbconvert_exporter": "script", "version": "2.11.12" } }, "nbformat": 4, "nbformat_minor": 2 }