{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Basic Spark Example\n", "\n", "Almond comes with a Spark integration module called *almond-spark*, which allows you to connect to a Spark cluster and\n", "to run Spark calculations interactively from a Jupyter notebook.\n", "\n", "It is based on [ammonite-spark](https://github.com/alexarchambault/ammonite-spark), adding Jupyter specific features\n", "such as progress bars and cancellation for running Spark computations.\n", "\n", "*ammonite-spark* handles loading Spark in a clever way, and does not rely on a specific Spark distribution.\n", "Because of that, you can use it with any Spark 2.x version.\n", "The only limitation is that the Scala version of Spark and the running Almond kernel must match, so make sure your\n", "kernel uses the same Scala version as your Spark cluster.\n", "Spark 2.0.x - 2.3.x requires Scala 2.11. Spark 2.4.x supports both Scala 2.11 and 2.12.\n", "\n", "For more information, see the [README](https://github.com/alexarchambault/ammonite-spark/blob/master/README.md) of ammonite-spark.\n", "\n", "To use it, just import Spark 2.x, the *almond-spark* dependency will be added automatically." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\u001b[32mimport \u001b[39m\u001b[36m$ivy.$ // Or use any other 2.x version here\n", "\u001b[39m\n", "\u001b[32mimport \u001b[39m\u001b[36m$ivy.$ \n", "\n", "\u001b[39m\n", "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql._\u001b[39m" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import $ivy.`org.apache.spark::spark-sql:2.4.3` // Or use any other 2.x version here\n", "// import $ivy.`sh.almond::almond-spark:_` // Added automatically on importing Spark\n", "\n", "import org.apache.spark.sql._" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Usually you want to disable logging in order to avoid polluting your cell outputs." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\u001b[32mimport \u001b[39m\u001b[36morg.apache.log4j.{Level, Logger}\n", "\u001b[39m" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import org.apache.log4j.{Level, Logger}\n", "Logger.getLogger(\"org\").setLevel(Level.OFF)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Then create a `SparkSession` using the `NotebookSparkSessionBuilder` provided by *almond-spark*.\n", "\n", "## Running in local mode\n", "This will run Spark in the same JVM as your kernel." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Loading spark-stubs\n", "Getting spark JARs\n", "Creating SparkSession\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\n" ] }, { "data": { "text/html": [ "Spark UI" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\u001b[36mspark\u001b[39m: \u001b[32mSparkSession\u001b[39m = org.apache.spark.sql.SparkSession@6d754fd1" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "val spark = {\n", " NotebookSparkSession.builder()\n", " .master(\"local[*]\")\n", " .getOrCreate()\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "When running this, you should see that the cell output contains a link to the Spark UI.\n", "\n", "Note the use of `NotebookSparkSession.builder()`, instead of `SparkSession.builder()` that you would use when e.g. writing a Spark job.\n", "\n", "The builder returned by `NotebookSparkSession.builder()` extends the one of `SparkSession.builder()`,\n", "so that you can call `.appName(\"foo\")`, `.config(\"key\", \"value\")`, etc. on it.\n", "\n", "## Connecting to a Real Cluster\n", "\n", "Of course you can also connect to a real Spark cluster. *ammonite-spark* currently supports standalone and *yarn* clusters. Mesos and Kubernetes aren't supported yet. See the *ammonite-spark* [README](https://github.com/alexarchambault/ammonite-spark/blob/master/README.md) for details.\n", "\n", "### Using with a Standalone Cluster\n", "\n", "Simply set the master to `spark://…` when building the session, e.g." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "// val spark = {\n", "// NotebookSparkSession.builder()\n", "// .master(\"spark://localhost:7077\")\n", "// .config(\"spark.executor.instances\", \"4\")\n", "// .config(\"spark.executor.memory\", \"2g\")\n", "// .getOrCreate()\n", "// }" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Make sure the version of Spark used to start the master and executors matches the one loaded in the notebook session\n", "(via e.g. ``import $ivy.`org.apache.spark::spark-sql:X.Y.Z` ``), and that the machine running the kernel can access / is\n", "accessible from all nodes of the standalone cluster.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Using with a YARN Cluster\n", "\n", "Set the master to `\"yarn\"` when building the session, e.g." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "// val spark = {\n", "// NotebookSparkSession.builder()\n", "// .master(\"yarn\")\n", "// .config(\"spark.executor.instances\", \"4\")\n", "// .config(\"spark.executor.memory\", \"2g\")\n", "// .getOrCreate()\n", "// }" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Ensure the configuration directory of the cluster is set in `HADOOP_CONF_DIR` or `YARN_CONF_DIR` in the environment, or is available at `/etc/hadoop/conf`. This directory should contain files like `core-site.xml`, `hdfs-site.xml`, … Ensure also that the machine you run Ammonite on can indeed act as the driver (it should have access to and be accessible from the YARN nodes, etc.)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that we have a `SparkSession`, we can get a `SparkContext` from it run Spark calculations." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "defined \u001b[32mfunction\u001b[39m \u001b[36msc\u001b[39m" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def sc = spark.sparkContext" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And then create an `RDD` and run some calculations." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\u001b[36mrdd\u001b[39m: \u001b[32morg\u001b[39m.\u001b[32mapache\u001b[39m.\u001b[32mspark\u001b[39m.\u001b[32mrdd\u001b[39m.\u001b[32mRDD\u001b[39m[\u001b[32mInt\u001b[39m] = ParallelCollectionRDD[0] at parallelize at cmd4.sc:1" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "val rdd = sc.parallelize(1 to 100000000, 100)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "scrolled": true }, "outputs": [ { "data": { "text/html": [ "\n", " " ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "
\n", " sum at cmd5.sc:1\n", "
\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "
\n", "
\n", " 100 / 100\n", "
\n", "
\n", "
\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\u001b[36mn\u001b[39m: \u001b[32mDouble\u001b[39m = \u001b[32m5.00000015E15\u001b[39m" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "val n = rdd.map(_ + 1).sum()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "When you execute a Spark action like `sum` you should see a progress bar, showing the progress of the running Spark job. If you're using the Jupyter classic UI, you can also click on *(kill)* to cancel the job." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", " map at cmd6.sc:1\n", "
\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "
\n", "
\n", " 100 / 100\n", "
\n", "
\n", "
\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "
\n", " collect at cmd6.sc:1\n", "
\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "
\n", "
\n", " 100 / 100\n", "
\n", "
\n", "
\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "\u001b[36mn\u001b[39m: \u001b[32mArray\u001b[39m[(\u001b[32mInt\u001b[39m, \u001b[32mInt\u001b[39m)] = \u001b[33mArray\u001b[39m(\n", " (\u001b[32m0\u001b[39m, \u001b[32m1432236160\u001b[39m),\n", " (\u001b[32m1\u001b[39m, \u001b[32m1342236160\u001b[39m),\n", " (\u001b[32m2\u001b[39m, \u001b[32m1352236160\u001b[39m),\n", " (\u001b[32m3\u001b[39m, \u001b[32m1362236160\u001b[39m),\n", " (\u001b[32m4\u001b[39m, \u001b[32m1372236160\u001b[39m),\n", " (\u001b[32m5\u001b[39m, \u001b[32m1382236160\u001b[39m),\n", " (\u001b[32m6\u001b[39m, \u001b[32m1392236160\u001b[39m),\n", " (\u001b[32m7\u001b[39m, \u001b[32m1402236160\u001b[39m),\n", " (\u001b[32m8\u001b[39m, \u001b[32m1412236160\u001b[39m),\n", " (\u001b[32m9\u001b[39m, \u001b[32m1422236160\u001b[39m)\n", ")" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "val n = rdd.map(n => (n % 10, n)).reduceByKey(_ + _).collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Syncing Dependencies\n", "\n", "If extra dependencies are loaded, via ``import $ivy.`…` `` after the `SparkSession` has been created, you should call `NotebookSparkSession.sync()` for the newly added JARs to be passed to the Spark executors." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\u001b[32mimport \u001b[39m\u001b[36m$ivy.$ \n", "\n", "\u001b[39m\n", "\u001b[36mres7_1\u001b[39m: \u001b[32mSparkSession\u001b[39m = org.apache.spark.sql.SparkSession@6d754fd1" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import $ivy.`org.typelevel::cats-core:1.6.0`\n", "\n", "NotebookSparkSession.sync() // cats should be available on workers" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Datasets and Dataframes\n", "\n", "If you try to create a `Dataset` or a `Dataframe` from some data structure containing a case class and you're getting an `org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class ...` when calling `.toDS`/`.toDF`, try the following workaround:\n", "\n", "Add `org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)` in the same cell where you define case classes involved." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "\u001b[32mimport \u001b[39m\u001b[36mspark.implicits._\n", "\n", "\u001b[39m\n", "defined \u001b[32mclass\u001b[39m \u001b[36mPerson\u001b[39m\n", "\u001b[36mds\u001b[39m: \u001b[32mDataset\u001b[39m[\u001b[32mPerson\u001b[39m] = [id: string, value: int]" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import spark.implicits._\n", "\n", "org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this);\n", "\n", "case class Person(id: String, value: Int)\n", "\n", "val ds = List(Person(\"Alice\", 42), Person(\"Bob\", 43), Person(\"Charlie\", 44)).toDS" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This workaround won't be neccessary anymore in future Spark versions." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Rich Display of Datasets and Dataframes\n", "\n", "As of now, *almond-spark* doesn't include native rich display capabilities for Datasets and Dataframes. So by default, we only have ascii rendering of tables." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+-----+\n", "| id|value|\n", "+-------+-----+\n", "| Alice| 42|\n", "| Bob| 43|\n", "|Charlie| 44|\n", "+-------+-----+\n", "\n" ] } ], "source": [ "ds.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It's not too hard to add your own displayer though. Here's an example:" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "defined \u001b[32mclass\u001b[39m \u001b[36mRichDF\u001b[39m" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "// based on a snippet by Ivan Zaitsev\n", "// https://github.com/almond-sh/almond/issues/180#issuecomment-364711999\n", "implicit class RichDF(val df: DataFrame) {\n", " def showHTML(limit:Int = 20, truncate: Int = 20) = {\n", " import xml.Utility.escape\n", " val data = df.take(limit)\n", " val header = df.schema.fieldNames.toSeq\n", " val rows: Seq[Seq[String]] = data.map { row =>\n", " row.toSeq.map { cell =>\n", " val str = cell match {\n", " case null => \"null\"\n", " case binary: Array[Byte] => binary.map(\"%02X\".format(_)).mkString(\"[\", \" \", \"]\")\n", " case array: Array[_] => array.mkString(\"[\", \", \", \"]\")\n", " case seq: Seq[_] => seq.mkString(\"[\", \", \", \"]\")\n", " case _ => cell.toString\n", " }\n", " if (truncate > 0 && str.length > truncate) {\n", " // do not show ellipses for strings shorter than 4 characters.\n", " if (truncate < 4) str.substring(0, truncate)\n", " else str.substring(0, truncate - 3) + \"...\"\n", " } else {\n", " str\n", " }\n", " }: Seq[String]\n", " }\n", "\n", " publish.html(s\"\"\"\n", " \n", " \n", " ${header.map(h => s\"\").mkString}\n", " \n", " ${rows.map { row =>\n", " s\"${row.map { c => s\"\" }.mkString}\"\n", " }.mkString\n", " }\n", "
${escape(h)}
${escape(c)}
\"\"\")\n", " }\n", "}" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " \n", " \n", " \n", "
idvalue
Alice42
Bob43
Charlie44
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "ds.toDF.showHTML()" ] } ], "metadata": { "kernelspec": { "display_name": "Scala (2.12)", "language": "scala", "name": "scala212" }, "language_info": { "codemirror_mode": "text/x-scala", "file_extension": ".scala", "mimetype": "text/x-scala", "name": "scala", "nbconvert_exporter": "script", "version": "2.12.10" } }, "nbformat": 4, "nbformat_minor": 2 }