{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# RDD Transformations und Actions\n", "\n", "In dieser Lektion werden wie tiefer in Spark und Python eintaucehn. Bitte schaue das Video für ausführliche Erklärungen.\n", "\n", "## Wichtige Begriffe\n", "\n", "Schauen wir uns schnell die wichtigen Begriffe an:\n", "\n", "* RDD - Resilient Distributed Dataset\n", "* Transformation - Spark Operation, die ein RDD erzeugt\n", "* Action - Spark Operation, die ein lokales Objekt erzeugt\n", "* Spark Job - Sequenz von Transformations auf Daten mit finaler Action\n", "\n", "## Ein RDD erstellen\n", "\n", "Es gibt zwei übliche Wege um ein RDD zu erstellen:\n", "\n", "Methode |Ergebnis\n", "---------- |-------\n", "`sc.parallelize(array)` |RDD aus Elementen eines Arrays (oder Liste) erstellen\n", "`sc.textFile(path/to/file)` |RDD aus Zeilen einer Datei erstellen" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## RDD Transformations\n", "\n", "Wir können Transformations nutzen, um ein Set von Anweisungen zu erstellen, die wir auf das RDD anwenden wollen.\n", "\n", "Transformation Beispiel |Ergennis\n", "---------- |-------\n", "`filter(lambda x: x % 2 == 0)` |Ungerade Elemente ausschließen\n", "`map(lambda x: x * 2)` |Jedes RDD Element mit `2` multiplizieren\n", "`map(lambda x: x.split())` |Jeden String in Worte trennen\n", "`flatMap(lambda x: x.split())` |Jeden String in Worte trennen und Sequenz ebnen\n", "`sample(withReplacement=True,0.25)` |Ein Sample mit 25% der Elemente mit Ersetzen\n", "`union(rdd)` |`rdd` an existierendes RDD anhängen\n", "`distinct()` |Duplikate im RDD entfernen\n", "`sortBy(lambda x: x, ascending=False)` |Elemente in abseitegender Reihenfolge ordnen" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## RDD Actions\n", "\n", "Sobald wir unseren \"Plan\" an Transformations geschrieben haben können wir als nächstes eine Action auf das Ergebnis anwenden. Einige der üblichen Actions in der Übersicht:\n", "\n", "\n", "Action |Ergebnis\n", "---------- |-------\n", "`collect()` |RDD in eine Liste im Speicher umwandeln\n", "`take(3)` |Erste `3` Elemente des RDD\n", "`top(3)` |Top `3` Elemente des RDD\n", "`takeSample(withReplacement=True,3)` |Ein Sample mit `3` Elementen mit Ersetzen\n", "`sum()` |Summe der Elemente (setzt numerische Werte voraus)\n", "`mean()` |Durchschnitt der Elemente (setzt numerische Werte voraus)\n", "`stdev()` |Standardabweichung der Elemente (setzt numerische Werte voraus)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Beispiele\n", "\n", "Der beste Weg all das zu verstehen ist es sich einige Beispiele anzuschauen. Wir werden erst einmal gemächlich einsteigen und mit einer einfachen Textdatei arbeiten. Danach fahren wir mit etwas realitätsnäheren Daten wie Kunden- und Verkaufsdaten fort.\n", "\n", "### Ein RDD aus einer Textdatei erstellen:\n", "#### Textdatei erstellen" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Writing beispiel2.txt\n" ] } ], "source": [ "%%writefile beispiel2.txt\n", "erste\n", "zweite zeile\n", "die dritte zeile\n", "dann eine vierte zeile" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Jetzt können wir einige Transformations und Actions darauf anwenden:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "from pyspark import SparkContext" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "sc = SparkContext()" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "beispiel2.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# RDD erstellen\n", "sc.textFile('beispiel2.txt')" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "# Referenz für RDD erstellen\n", "text_rdd = sc.textFile('beispiel2.txt')" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[['erste'],\n", " ['zweite', 'zeile'],\n", " ['die', 'dritte', 'zeile'],\n", " ['dann', 'eine', 'vierte', 'zeile']]" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Map eine Funktion (oder Lambda Expression) zu jeder Zeile\n", "# Dann \"collect\" das Ergebnis\n", "text_rdd.map(lambda line: line.split()).collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Map vs flatMap" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['erste',\n", " 'zweite',\n", " 'zeile',\n", " 'die',\n", " 'dritte',\n", " 'zeile',\n", " 'dann',\n", " 'eine',\n", " 'vierte',\n", " 'zeile']" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Alles als geebnet ausgeben\n", "text_rdd.flatMap(lambda line: line.split()).collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## RDDs und Key Value Pairs\n", "\n", "Jetzt wo wir mit RDDs gearbeitet haben und damit, wie wir Werte aggregieren, können wir uns *Key Value Pairs* anschauen. Dazu erstellen wir einige Fake Daten in einer Textdatei.\n", "\n", "Diese Daten repräsentiert einige Services, die an Kunden eines SAAS Anbieters verkauft wurden." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Writing services.txt\n" ] } ], "source": [ "%%writefile services.txt\n", "#EventId Timestamp Customer State ServiceID Amount\n", "201 10/13/2017 100 NY 131 100.00\n", "204 10/18/2017 700 TX 129 450.00\n", "202 10/15/2017 203 CA 121 200.00\n", "206 10/19/2017 202 CA 131 500.00\n", "203 10/17/2017 101 NY 173 750.00\n", "205 10/19/2017 202 TX 121 200.00" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "services = sc.textFile('services.txt')" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['#EventId Timestamp Customer State ServiceID Amount',\n", " '201 10/13/2017 100 NY 131 100.00']" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "services.take(2)" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "PythonRDD[9] at RDD at PythonRDD.scala:48" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "services.map(lambda x: x.split())" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],\n", " ['201', '10/13/2017', '100', 'NY', '131', '100.00'],\n", " ['204', '10/18/2017', '700', 'TX', '129', '450.00']]" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "services.map(lambda x: x.split()).take(3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Lasst uns den ersten Hash-Tag entfernen!" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['EventId Timestamp Customer State ServiceID Amount',\n", " '201 10/13/2017 100 NY 131 100.00',\n", " '204 10/18/2017 700 TX 129 450.00',\n", " '202 10/15/2017 203 CA 121 200.00',\n", " '206 10/19/2017 202 CA 131 500.00',\n", " '203 10/17/2017 101 NY 173 750.00',\n", " '205 10/19/2017 202 TX 121 200.00']" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "services.map(lambda x: x[1:] if x[0]=='#' else x).collect()" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],\n", " ['201', '10/13/2017', '100', 'NY', '131', '100.00'],\n", " ['204', '10/18/2017', '700', 'TX', '129', '450.00'],\n", " ['202', '10/15/2017', '203', 'CA', '121', '200.00'],\n", " ['206', '10/19/2017', '202', 'CA', '131', '500.00'],\n", " ['203', '10/17/2017', '101', 'NY', '173', '750.00'],\n", " ['205', '10/19/2017', '202', 'TX', '121', '200.00']]" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "services.map(lambda x: x[1:] if x[0]=='#' else x).map(lambda x: x.split()).collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Key Value Pairs für Operationen nutzen\n", "\n", "Wir können als nächstes Methoden verwenden, die Lambda Expressions mit `ByKey` Argumenten kombinieren. Diese `ByKey` Methoden nehmen an, dass die Daten in Key-Value Format vorliegen.\n", "\n", "Als Beispiel können wir die Sales Daten pro Staat ausgeben:" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "# Von zuvor\n", "cleanServ = services.map(lambda x: x[1:] if x[0]=='#' else x).map(lambda x: x.split())" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],\n", " ['201', '10/13/2017', '100', 'NY', '131', '100.00'],\n", " ['204', '10/18/2017', '700', 'TX', '129', '450.00'],\n", " ['202', '10/15/2017', '203', 'CA', '121', '200.00'],\n", " ['206', '10/19/2017', '202', 'CA', '131', '500.00'],\n", " ['203', '10/17/2017', '101', 'NY', '173', '750.00'],\n", " ['205', '10/19/2017', '202', 'TX', '121', '200.00']]" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "cleanServ.collect()" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('State', 'Amount'),\n", " ('NY', '100.00'),\n", " ('TX', '450.00'),\n", " ('CA', '200.00'),\n", " ('CA', '500.00'),\n", " ('NY', '750.00'),\n", " ('TX', '200.00')]" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Üben wir nun einzelne Felder auszuwählen\n", "cleanServ.map(lambda lst: (lst[3],lst[-1])).collect()" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('State', 'Amount'),\n", " ('NY', '100.00750.00'),\n", " ('TX', '450.00200.00'),\n", " ('CA', '200.00500.00')]" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Weiter mit reduceByKey\n", "# Dabei gehen wir davon aus, dass der erste Wert der Key ist\n", "cleanServ.map(lambda lst: (lst[3],lst[-1]))\\\n", " .reduceByKey(lambda amt1,amt2 : amt1+amt2)\\\n", " .collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Wir können unsere Analyse damit fortsetzen, den Output zu sortieren:" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]" ] }, "execution_count": 19, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# State und Amount nehmen\n", "# Addieren\n", "# ('State','Amount') loswerden\n", "# Nach Amount Wert sortieren\n", "cleanServ.map(lambda lst: (lst[3],lst[-1]))\\\n", ".reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))\\\n", ".filter(lambda x: not x[0]=='State')\\\n", ".sortBy(lambda stateAmount: stateAmount[1], ascending=False)\\\n", ".collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Denkt daran, *unpacking* für die Leserlichkeit zu verwenden. Zum Beispiel:**" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [], "source": [ "x = ['ID','State','Amount']" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [], "source": [ "def funk1(lst):\n", " return lst[-1]" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [], "source": [ "def funk2(id_st_amt):\n", " # Unpack\n", " (Id,st,amt) = id_st_amt\n", " return amt" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'Amount'" ] }, "execution_count": 23, "metadata": {}, "output_type": "execute_result" } ], "source": [ "funk1(x)" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'Amount'" ] }, "execution_count": 24, "metadata": {}, "output_type": "execute_result" } ], "source": [ "funk2(x)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Gut gemacht!" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.5.4" } }, "nbformat": 4, "nbformat_minor": 2 }