# RDD Transformations und Actions

In dieser Lektion werden wie tiefer in Spark und Python eintaucehn. Bitte schaue das Video für ausführliche Erklärungen.

## Wichtige Begriffe

Schauen wir uns schnell die wichtigen Begriffe an:

* RDD - Resilient Distributed Dataset
* Transformation - Spark Operation, die ein RDD erzeugt
* Action - Spark Operation, die ein lokales Objekt erzeugt
* Spark Job - Sequenz von Transformations auf Daten mit finaler Action

## Ein RDD erstellen

Es gibt zwei übliche Wege um ein RDD zu erstellen:

Methode |Ergebnis
---------- |-------
`sc.parallelize(array)` |RDD aus Elementen eines Arrays (oder Liste) erstellen
`sc.textFile(path/to/file)` |RDD aus Zeilen einer Datei erstellen

## RDD Transformations

Wir können Transformations nutzen, um ein Set von Anweisungen zu erstellen, die wir auf das RDD anwenden wollen.

Transformation Beispiel |Ergennis
---------- |-------
`filter(lambda x: x % 2 == 0)` |Ungerade Elemente ausschließen
`map(lambda x: x * 2)` |Jedes RDD Element mit `2` multiplizieren
`map(lambda x: x.split())` |Jeden String in Worte trennen
`flatMap(lambda x: x.split())` |Jeden String in Worte trennen und Sequenz ebnen
`sample(withReplacement=True,0.25)` |Ein Sample mit 25% der Elemente mit Ersetzen
`union(rdd)` |`rdd` an existierendes RDD anhängen
`distinct()` |Duplikate im RDD entfernen
`sortBy(lambda x: x, ascending=False)` |Elemente in abseitegender Reihenfolge ordnen

## RDD Actions

Sobald wir unseren "Plan" an Transformations geschrieben haben können wir als nächstes eine Action auf das Ergebnis anwenden. Einige der üblichen Actions in der Übersicht:


Action |Ergebnis
---------- |-------
`collect()` |RDD in eine Liste im Speicher umwandeln
`take(3)` |Erste `3` Elemente des RDD
`top(3)` |Top `3` Elemente des RDD
`takeSample(withReplacement=True,3)` |Ein Sample mit `3` Elementen mit Ersetzen
`sum()` |Summe der Elemente (setzt numerische Werte voraus)
`mean()` |Durchschnitt der Elemente (setzt numerische Werte voraus)
`stdev()` |Standardabweichung der Elemente (setzt numerische Werte voraus)

## Beispiele

Der beste Weg all das zu verstehen ist es sich einige Beispiele anzuschauen. Wir werden erst einmal gemächlich einsteigen und mit einer einfachen Textdatei arbeiten. Danach fahren wir mit etwas realitätsnäheren Daten wie Kunden- und Verkaufsdaten fort.

### Ein RDD aus einer Textdatei erstellen:
#### Textdatei erstellen

In [1]:
%%writefile beispiel2.txt
erste
zweite zeile
die dritte zeile
dann eine vierte zeile

Writing beispiel2.txt


Jetzt können wir einige Transformations und Actions darauf anwenden:

In [2]:
from pyspark import SparkContext

In [3]:
sc = SparkContext()

In [4]:
# RDD erstellen
sc.textFile('beispiel2.txt')

beispiel2.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [5]:
# Referenz für RDD erstellen
text_rdd = sc.textFile('beispiel2.txt')

In [6]:
# Map eine Funktion (oder Lambda Expression) zu jeder Zeile
# Dann "collect" das Ergebnis
text_rdd.map(lambda line: line.split()).collect()

[['erste'],
 ['zweite', 'zeile'],
 ['die', 'dritte', 'zeile'],
 ['dann', 'eine', 'vierte', 'zeile']]

## Map vs flatMap

In [7]:
# Alles als geebnet ausgeben
text_rdd.flatMap(lambda line: line.split()).collect()

['erste',
 'zweite',
 'zeile',
 'die',
 'dritte',
 'zeile',
 'dann',
 'eine',
 'vierte',
 'zeile']

## RDDs und Key Value Pairs

Jetzt wo wir mit RDDs gearbeitet haben und damit, wie wir Werte aggregieren, können wir uns *Key Value Pairs* anschauen. Dazu erstellen wir einige Fake Daten in einer Textdatei.

Diese Daten repräsentiert einige Services, die an Kunden eines SAAS Anbieters verkauft wurden.

In [8]:
%%writefile services.txt
#EventId Timestamp Customer State ServiceID Amount
201 10/13/2017 100 NY 131 100.00
204 10/18/2017 700 TX 129 450.00
202 10/15/2017 203 CA 121 200.00
206 10/19/2017 202 CA 131 500.00
203 10/17/2017 101 NY 173 750.00
205 10/19/2017 202 TX 121 200.00

Writing services.txt


In [9]:
services = sc.textFile('services.txt')

In [10]:
services.take(2)

['#EventId Timestamp Customer State ServiceID Amount',
 '201 10/13/2017 100 NY 131 100.00']

In [11]:
services.map(lambda x: x.split())

PythonRDD[9] at RDD at PythonRDD.scala:48

In [12]:
services.map(lambda x: x.split()).take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

Lasst uns den ersten Hash-Tag entfernen!

In [13]:
services.map(lambda x: x[1:] if x[0]=='#' else x).collect()

['EventId Timestamp Customer State ServiceID Amount',
 '201 10/13/2017 100 NY 131 100.00',
 '204 10/18/2017 700 TX 129 450.00',
 '202 10/15/2017 203 CA 121 200.00',
 '206 10/19/2017 202 CA 131 500.00',
 '203 10/17/2017 101 NY 173 750.00',
 '205 10/19/2017 202 TX 121 200.00']

In [14]:
services.map(lambda x: x[1:] if x[0]=='#' else x).map(lambda x: x.split()).collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

## Key Value Pairs für Operationen nutzen

Wir können als nächstes Methoden verwenden, die Lambda Expressions mit `ByKey` Argumenten kombinieren. Diese `ByKey` Methoden nehmen an, dass die Daten in Key-Value Format vorliegen.

Als Beispiel können wir die Sales Daten pro Staat ausgeben:

In [15]:
# Von zuvor
cleanServ = services.map(lambda x: x[1:] if x[0]=='#' else x).map(lambda x: x.split())

In [16]:
cleanServ.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [17]:
# Üben wir nun einzelne Felder auszuwählen
cleanServ.map(lambda lst: (lst[3],lst[-1])).collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [18]:
# Weiter mit reduceByKey
# Dabei gehen wir davon aus, dass der erste Wert der Key ist
cleanServ.map(lambda lst: (lst[3],lst[-1]))\
 .reduceByKey(lambda amt1,amt2 : amt1+amt2)\
 .collect()

[('State', 'Amount'),
 ('NY', '100.00750.00'),
 ('TX', '450.00200.00'),
 ('CA', '200.00500.00')]

Wir können unsere Analyse damit fortsetzen, den Output zu sortieren:

In [19]:
# State und Amount nehmen
# Addieren
# ('State','Amount') loswerden
# Nach Amount Wert sortieren
cleanServ.map(lambda lst: (lst[3],lst[-1]))\
.reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))\
.filter(lambda x: not x[0]=='State')\
.sortBy(lambda stateAmount: stateAmount[1], ascending=False)\
.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

**Denkt daran, *unpacking* für die Leserlichkeit zu verwenden. Zum Beispiel:**

In [20]:
x = ['ID','State','Amount']

In [21]:
def funk1(lst):
 return lst[-1]

In [22]:
def funk2(id_st_amt):
 # Unpack
 (Id,st,amt) = id_st_amt
 return amt

In [23]:
funk1(x)

'Amount'

In [24]:
funk2(x)

'Amount'

# Gut gemacht!