{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Solution-1: Introduction to PySpark RDD\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Import and initialize SparkContext" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "\n", "spark = SparkSession.builder.appName(\"solution-1\").getOrCreate()\n", "sc = spark.sparkContext" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create Parallelized Collections" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "pyspark.rdd.RDD" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data = [1, 2, 3, 4, 5]\n", "distData = sc.parallelize(data)\n", "\n", "type(distData)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## TODO: Read external data file\n", "### Use SparkContext's textFile function to read in a text file" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "pyspark.rdd.RDD" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "textFilePath = './emails.txt'\n", "\n", "emails = sc.textFile(textFilePath)\n", "type(emails)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Generate a list of data, from 1 to 10" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]\n" ] } ], "source": [ "data = list(range(1,11))\n", "print(data)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Parallelize the data with 2 partitions" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "numbers = sc.parallelize(data,2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Print RDD" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "ParallelCollectionRDD[3] at parallelize at PythonRDD.scala:194\n", "[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]\n" ] } ], "source": [ "print(numbers)\n", "\n", "print(numbers.collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Get only even numbers, and collect them" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "ename": "Py4JJavaError", "evalue": "An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 1 times, most recent failure: Lost task 1.0 in stage 1.0 (TID 3, localhost, executor driver): org.apache.spark.SparkException: \nError from python worker:\n Traceback (most recent call last):\n File \"/opt/miniconda3/lib/python3.8/runpy.py\", line 185, in _run_module_as_main\n mod_name, mod_spec, code = _get_module_details(mod_name, _Error)\n File \"/opt/miniconda3/lib/python3.8/runpy.py\", line 111, in _get_module_details\n __import__(pkg_name)\n File \"\", line 991, in _find_and_load\n File \"\", line 975, in _find_and_load_unlocked\n File \"\", line 655, in _load_unlocked\n File \"\", line 618, in _load_backward_compatible\n File \"\", line 259, in load_module\n File \"/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/__init__.py\", line 46, in \n File \"\", line 991, in _find_and_load\n File \"\", line 975, in _find_and_load_unlocked\n File \"\", line 655, in _load_unlocked\n File \"\", line 618, in _load_backward_compatible\n File \"\", line 259, in load_module\n File \"/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/context.py\", line 31, in \n File \"\", line 991, in _find_and_load\n File \"\", line 975, in _find_and_load_unlocked\n File \"\", line 655, in _load_unlocked\n File \"\", line 618, in _load_backward_compatible\n File \"\", line 259, in load_module\n File \"/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/accumulators.py\", line 97, in \n File \"\", line 991, in _find_and_load\n File \"\", line 975, in _find_and_load_unlocked\n File \"\", line 655, in _load_unlocked\n File \"\", line 618, in _load_backward_compatible\n File \"\", line 259, in load_module\n File \"/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/cloudpickle.py\", line 146, in \n File \"/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/cloudpickle.py\", line 127, in _make_cell_set_template_code\n TypeError: an integer is required (got type bytes)\nPYTHONPATH was:\n /opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip:/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.7-src.zip:/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/jars/spark-core_2.11-2.3.2.jar\njava.io.EOFException\n\tat java.io.DataInputStream.readInt(DataInputStream.java:392)\n\tat org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:177)\n\tat org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:100)\n\tat org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:74)\n\tat org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)\n\tat org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:86)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:288)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:109)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)\n\tat scala.Option.foreach(Option.scala:257)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:363)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:944)\n\tat org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:165)\n\tat org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: org.apache.spark.SparkException: \nError from python worker:\n Traceback (most recent call last):\n File \"/opt/miniconda3/lib/python3.8/runpy.py\", line 185, in _run_module_as_main\n mod_name, mod_spec, code = _get_module_details(mod_name, _Error)\n File \"/opt/miniconda3/lib/python3.8/runpy.py\", line 111, in _get_module_details\n __import__(pkg_name)\n File \"\", line 991, in _find_and_load\n File \"\", line 975, in _find_and_load_unlocked\n File \"\", line 655, in _load_unlocked\n File \"\", line 618, in _load_backward_compatible\n File \"\", line 259, in load_module\n File \"/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/__init__.py\", line 46, in \n File \"\", line 991, in _find_and_load\n File \"\", line 975, in _find_and_load_unlocked\n File \"\", line 655, in _load_unlocked\n File \"\", line 618, in _load_backward_compatible\n File \"\", line 259, in load_module\n File \"/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/context.py\", line 31, in \n File \"\", line 991, in _find_and_load\n File \"\", line 975, in _find_and_load_unlocked\n File \"\", line 655, in _load_unlocked\n File \"\", line 618, in _load_backward_compatible\n File \"\", line 259, in load_module\n File \"/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/accumulators.py\", line 97, in \n File \"\", line 991, in _find_and_load\n File \"\", line 975, in _find_and_load_unlocked\n File \"\", line 655, in _load_unlocked\n File \"\", line 618, in _load_backward_compatible\n File \"\", line 259, in load_module\n File \"/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/cloudpickle.py\", line 146, in \n File \"/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/cloudpickle.py\", line 127, in _make_cell_set_template_code\n TypeError: an integer is required (got type bytes)\nPYTHONPATH was:\n /opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip:/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.7-src.zip:/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/jars/spark-core_2.11-2.3.2.jar\njava.io.EOFException\n\tat java.io.DataInputStream.readInt(DataInputStream.java:392)\n\tat org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:177)\n\tat org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:100)\n\tat org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:74)\n\tat org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)\n\tat org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:86)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:288)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:109)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\t... 1 more\n", "output_type": "error", "traceback": [ "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[0;31mPy4JJavaError\u001b[0m Traceback (most recent call last)", "\u001b[0;32m\u001b[0m in \u001b[0;36m\u001b[0;34m\u001b[0m\n\u001b[0;32m----> 1\u001b[0;31m \u001b[0mnumbers\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mfilter\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;32mlambda\u001b[0m \u001b[0mx\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0mx\u001b[0m \u001b[0;34m%\u001b[0m \u001b[0;36m2\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0;36m0\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcollect\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m", "\u001b[0;32m/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/rdd.py\u001b[0m in \u001b[0;36mcollect\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 812\u001b[0m \"\"\"\n\u001b[1;32m 813\u001b[0m \u001b[0;32mwith\u001b[0m \u001b[0mSCCallSiteSync\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcontext\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0mcss\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 814\u001b[0;31m \u001b[0msock_info\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mctx\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jvm\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPythonRDD\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcollectAndServe\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jrdd\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mrdd\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 815\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mlist\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0m_load_from_socket\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0msock_info\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jrdd_deserializer\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 816\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;32m/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1255\u001b[0m \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1256\u001b[0m return_value = get_return_value(\n\u001b[0;32m-> 1257\u001b[0;31m answer, self.gateway_client, self.target_id, self.name)\n\u001b[0m\u001b[1;32m 1258\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1259\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;32m/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 61\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mdeco\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 62\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 63\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 64\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mpy4j\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mprotocol\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPy4JJavaError\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 65\u001b[0m \u001b[0ms\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mjava_exception\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtoString\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;32m/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/py4j/protocol.py\u001b[0m in \u001b[0;36mget_return_value\u001b[0;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[1;32m 326\u001b[0m raise Py4JJavaError(\n\u001b[1;32m 327\u001b[0m \u001b[0;34m\"An error occurred while calling {0}{1}{2}.\\n\"\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 328\u001b[0;31m format(target_id, \".\", name), value)\n\u001b[0m\u001b[1;32m 329\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 330\u001b[0m raise Py4JError(\n", "\u001b[0;31mPy4JJavaError\u001b[0m: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 1 times, most recent failure: Lost task 1.0 in stage 1.0 (TID 3, localhost, executor driver): org.apache.spark.SparkException: \nError from python worker:\n Traceback (most recent call last):\n File \"/opt/miniconda3/lib/python3.8/runpy.py\", line 185, in _run_module_as_main\n mod_name, mod_spec, code = _get_module_details(mod_name, _Error)\n File \"/opt/miniconda3/lib/python3.8/runpy.py\", line 111, in _get_module_details\n __import__(pkg_name)\n File \"\", line 991, in _find_and_load\n File \"\", line 975, in _find_and_load_unlocked\n File \"\", line 655, in _load_unlocked\n File \"\", line 618, in _load_backward_compatible\n File \"\", line 259, in load_module\n File \"/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/__init__.py\", line 46, in \n File \"\", line 991, in _find_and_load\n File \"\", line 975, in _find_and_load_unlocked\n File \"\", line 655, in _load_unlocked\n File \"\", line 618, in _load_backward_compatible\n File \"\", line 259, in load_module\n File \"/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/context.py\", line 31, in \n File \"\", line 991, in _find_and_load\n File \"\", line 975, in _find_and_load_unlocked\n File \"\", line 655, in _load_unlocked\n File \"\", line 618, in _load_backward_compatible\n File \"\", line 259, in load_module\n File \"/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/accumulators.py\", line 97, in \n File \"\", line 991, in _find_and_load\n File \"\", line 975, in _find_and_load_unlocked\n File \"\", line 655, in _load_unlocked\n File \"\", line 618, in _load_backward_compatible\n File \"\", line 259, in load_module\n File \"/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/cloudpickle.py\", line 146, in \n File \"/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/cloudpickle.py\", line 127, in _make_cell_set_template_code\n TypeError: an integer is required (got type bytes)\nPYTHONPATH was:\n /opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip:/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.7-src.zip:/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/jars/spark-core_2.11-2.3.2.jar\njava.io.EOFException\n\tat java.io.DataInputStream.readInt(DataInputStream.java:392)\n\tat org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:177)\n\tat org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:100)\n\tat org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:74)\n\tat org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)\n\tat org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:86)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:288)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:109)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)\n\tat scala.Option.foreach(Option.scala:257)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:363)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:944)\n\tat org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:165)\n\tat org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: org.apache.spark.SparkException: \nError from python worker:\n Traceback (most recent call last):\n File \"/opt/miniconda3/lib/python3.8/runpy.py\", line 185, in _run_module_as_main\n mod_name, mod_spec, code = _get_module_details(mod_name, _Error)\n File \"/opt/miniconda3/lib/python3.8/runpy.py\", line 111, in _get_module_details\n __import__(pkg_name)\n File \"\", line 991, in _find_and_load\n File \"\", line 975, in _find_and_load_unlocked\n File \"\", line 655, in _load_unlocked\n File \"\", line 618, in _load_backward_compatible\n File \"\", line 259, in load_module\n File \"/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/__init__.py\", line 46, in \n File \"\", line 991, in _find_and_load\n File \"\", line 975, in _find_and_load_unlocked\n File \"\", line 655, in _load_unlocked\n File \"\", line 618, in _load_backward_compatible\n File \"\", line 259, in load_module\n File \"/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/context.py\", line 31, in \n File \"\", line 991, in _find_and_load\n File \"\", line 975, in _find_and_load_unlocked\n File \"\", line 655, in _load_unlocked\n File \"\", line 618, in _load_backward_compatible\n File \"\", line 259, in load_module\n File \"/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/accumulators.py\", line 97, in \n File \"\", line 991, in _find_and_load\n File \"\", line 975, in _find_and_load_unlocked\n File \"\", line 655, in _load_unlocked\n File \"\", line 618, in _load_backward_compatible\n File \"\", line 259, in load_module\n File \"/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/cloudpickle.py\", line 146, in \n File \"/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/cloudpickle.py\", line 127, in _make_cell_set_template_code\n TypeError: an integer is required (got type bytes)\nPYTHONPATH was:\n /opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip:/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.7-src.zip:/opt/miniconda3/envs/mmtf-workshop-2018/lib/python3.7/site-packages/pyspark/jars/spark-core_2.11-2.3.2.jar\njava.io.EOFException\n\tat java.io.DataInputStream.readInt(DataInputStream.java:392)\n\tat org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:177)\n\tat org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:100)\n\tat org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:74)\n\tat org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)\n\tat org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:86)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:288)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:109)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\t... 1 more\n" ] } ], "source": [ "numbers.filter(lambda x: x % 2 == 0).collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## TODO: find emails with hotmail domain" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "emails.filter(lambda e: '@hotmail' in e).collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Square all the numbers in the list using the map operation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "numbers.map(lambda x: x*x).collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Use flatMap to apply a function that returns a list and flatten the result" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "m = numbers.map(lambda x: [x**2, x**3]).collect()\n", "\n", "fm = numbers.flatMap(lambda x: [x**2, x**3]).collect()\n", "\n", "print(m)\n", "print(fm)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## TODO: separate username and domain from all emails\n", "\n", "### eg: marshuang80@gmail.com -> [marshuang80, gmail.com]" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "# Hint use the pyhton split() function\n", "username_domain = emails.map(lambda x: x.split('@'))\n", "username_domain.collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## RDD Key-Value Pairs" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "username_domain.keys().collect()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "username_domain.values().collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Reduce by key" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = [\"a\", \"b\", \"a\", \"a\", \"b\", \"b\", \"b\", \"b\"]\n", "rdd = sc.parallelize(data)\n", "\n", "pairRDD = rdd.map(lambda x: (x, 1))\n", "\n", "pairRDD.reduceByKey(lambda x,y: x+y).collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## TODO: count the number of domains with the same username" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# do another mapping operation to make all domains in a list\n", "username_domain = username_domain.map(lambda x: (x[0],1))\n", "print(\"** Results from mapping values to list\")\n", "print(username_domain.top(3))\n", "\n", "print(\"\\n** Results from reduceByKey ** \")\n", "username_domain.reduceByKey(lambda val1, val2: val1 + val2).collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Very important to stop Spark" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sc.stop()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 2 }