# Hurtownie Danych i Big Data
## Laboratorium 14 - Opracowanie i uruchomienie własnych programów realizujących obliczenia zgodnie z paradygmatem MapReduce

Po zapoznaniu się z treścią notatnika będziesz wiedział jak wykorzystać moduły Spark SQL i MLlib do sprawniejszej pracy z danymi.

### Zawartość notatnika
1. Przykład użycia Spark SQL
2. Przykład użycia algorytmu regresji liniowej w Spark MLlib

## Apache Spark SQL
Moduł [Apache Spark SQL](https://data-flair.training/blogs/apache-spark-sql-tutorial/) umożliwia reprezentację danych w postaci relacyjnej. Jest nakładką na tzw. _Core API_ udostępniające metody do operowania na poziomie RDD w rozproszonym środowisku. Użytkownik może przetwarzać dane w bardziej intuicyjnej formie i przykładać mniej uwagi do technicznych detali. Operacje są mapowane do metod z _Core API_ jednak są już odpowiednio zoptymalizowane i bardzo często wykonują się bardziej wydajnie. 

Aby korzystać z możliwości modułu Spark SQL dane muszą być przedstawione formie tzw. `DataFrame` (inna nazwa to po prostu `SchemaRDD` - czyli RDD z elementami typu `Row` ze znanymi typami pól).

### Dlaczego?
- `DataFrame` to wyższy poziom abstrakcji niż RDD (udostępnia własne DSL oraz możliwość formułowania zapytań SQL),
- bardziej zaawansowana optymalizacja (wykorzystanie *query planner* optymalizującego zapytanie przed przekazaniem do wykonania),
- dane posiadają zdefiniowaną strukturę (schema),
- dużo czytelniejsze niż szereg niskopoziomowych operacji na RDD,
- umożliwia skupienia się na biznesowym problemie bardziej niż na szczegółach implementacyjnych

> Korzystając z języków Java oraz Scala uzyskujemy dostęp do trzeciego sposobu reprezentacji danych - `Dataset`. Rozszerza on możliwości `DataFrame` wprowadzając kontrolę typów każdej kolumny na poziomie kompilacji kodu oraz jeszcze lepsze mechanizmy optymalizacji wykonywania zapytań.

Zainteresowani mogą obejrzeć [wideo](https://www.youtube.com/watch?v=pZQsDloGB4w) przedstawiające porównanie wszystkich 3 sposobów (`RDD`, `DataFrame`, `Dataset`) reprezentacji danych.

### Przykład
Zdefiniujmy zadanie którego celem jest analiza "*średniego wieku użytkownia w zależności od płci.*"

**Dane wejściowe**: Zmienna `people` przechowuje początkowy zbiór danych w formacie JSON<br/>
**Dane wyjściowe**: Średnia wieku dla każdej płci

In [0]:
people = [
  {"name": "Bob", "age": 12, "sex": "M"},
  {"name": "Alice", "age": 25, "sex": "F"},
  {"name": "Jack", "age": 18, "sex": "M"},
  {"name": "Susan", "age": 21, "sex": "F"},
  {"name": "Cristina", "age": 87, "sex": "F"}
]

### Wariant 1 - wykorzystanie RDD
Zadanie można zrealizować w tradycyjny sposób wykorzystując podstawowe operacje RDD.

In [0]:
# Stworzenie RDD z tablicy `people`
people_rdd = spark.sparkContext.parallelize(people)

# Transformacja danych do formatu <klucz, wartość>, jako <płeć, wiek>
ages_rdd = people_rdd.map(lambda row: (row['sex'], row['age']))

# Grupowanie wierszy dla każdej płci. Wynik docelowy: <'M', [age1, age2,...]>, <'F', [age1, age2,...]>
groupedBySex_rdd = ages_rdd.groupByKey()

# Obliczenie średniej wieku dla każdej płci.
# Użyto funkcji `mapValues()` która wykonuje funkcję dla wszystkich elementów danego klucza 
avg_rdd = groupedBySex_rdd.mapValues(lambda ages: sum(ages) / float(len(ages)))

# Uruchomienie obliczeń i przesłanie wyników z powrotem do sterownika
result = avg_rdd.collect()

# Wyświetlenie wyników
for sex, age in result:
  print("Średni wiek dla {} to: {:.2f} lat".format(sex, age))

### Wariant 2 - Użycie `DataFrame`
Jest kilka problemów związanych z operacjami bezpośrednio z RDD:
- konieczność wnikliwego wczytania się w kod i zrozumienia poszczególnych operacji aby poznać intencje autora,
- konieczność zrozumienia mechanizmu planowania zapytania w celu optymalizacji jego wykonania,
- konieczność ręcznego zadbania o optymalne partycjonowanie danych

Lepszym wariantem jest jednak opakowanie danych w określoną strukturę korzystając z modułu Spark SQL. To podejście jest aktualnie forsowane jako zalecane przez twórców Apache Spark.

Poniżej przedstawiono trzy warianty tworzenia `DataFrames`:
1. import danych wejściowych słownikowych (typu `dict`). Ten sposób jest najprostszy ale zostaje wycofywany z nowszych wersji, ponieważ Spark ma często problem z prawidłowym rozpoznaniem typów obiektów umieszonych w słowniku,
2. zmapowanie elementów do klasy `Row`,
3. dla danych wejściowych przedstawionych w formacie JSON (popularna operacja)

#### Użycie metody `createDataFrame()`
Metoda `createDataFrame()` umożliwia utworzenie obiektu `DataFrame` z danych pochodzących z obiektów RDD, Pythonowych list czy ramek danych Pandas (popularna biblioteka używana w przetwarzaniu danych).

> Z dokumentacji [link](https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.SparkSession.createDataFrame):
>
> `SparkSession.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)`

W przykładzie niżej jako argument przekazujemy zmieną z tablicą obiektów.

In [0]:
people_df = spark.createDataFrame(people)

# Podgląd utworzonej ramki danych
people_df.show()

Aby sprawdzić czy typy danych zostały poprawnie rozpoznane można skorzystać z metody `printSchema()`.

In [0]:
people_df.printSchema()

Jak widać schema danych została poprawnie rozpoznana. Wiek jest interpretowany jako liczba (typu `long`), a reszta pól to dane tekstowe. 

Przy tworzeniu ramki danych otrzymaliśmy błąd ponieważ nie zawsze daje się w tak prosty sposób rozpoznać typy danych i powinniśmy to zrobić w bardziej przejrzysty sposób. Np. zadeklarować schemę jawnie.

W przykładzie niżej zmieniliśmy typ danych wiekowych z `long` na `int`.

In [0]:
from pyspark.sql.types import *

# Deklaracja schemy
# Trzeci parametr w konstruktorze `StructField` określa czy pola w komórce mogą być puste
schema = StructType([
  StructField("name", StringType(), False),
  StructField("sex", StringType(), False),
  StructField("age", IntegerType(), False)])

# Utworzenie ramki z jawnie podaną schemą
people_df = spark.createDataFrame(people, schema)

people_df.printSchema()

people_df.show()

Zamiast tworzyć osobny obiekt reprezentujący schemę w prostych przypadkach możemy zadeklarować ją _inline_ (szybsze dla prostych danych).

In [0]:
people_df = spark.createDataFrame(people, "name: string, sex: string, age: int")

people_df.printSchema()

people_df.show()

#### Mapowanie RDD do typu `Row`
Innym sposobem jest przekształcenie poszczególnych elementów RDD do obiektów typu `Row`. Jest to niskopoziomowa operacja wykonywana automatycznie przez framework podczas innych sposobów ładowania danych. Wykorzystanie tego sposobu nie niesie ze sobą większych korzyści oprócz zaznajomienia się z wewnętrznym sposobem reprezentacji danych.

In [0]:
from pyspark.sql import Row

# Rzutujemy każdy obiekt do typu Row
peopleRow_rdd = people_rdd.map(lambda el: Row(**el))

# Tworzymy obiekt RDD
people_df = spark.createDataFrame(peopleRow_rdd)

# Sprawdzenie schemy
people_df.printSchema()

# I zawartości
people_df.show()

#### Zewnętrzne źródła danych
Metoda `read()` umożliwia odczyt danych z zewnętrznych źródeł - takich jak pliki JSON, Hive, Parquet czy [innych](https://spark.apache.org/docs/latest/sql-programming-guide.html#data-sources).

Poniższy przykład konwertuje tablicę `people` do poprawnego formatu JSON, tworzy RDD na następnie inicjalizuje `DataFrame` korzystając z metody `spark.read(...)`.

In [0]:
import json

# Konwersja tablicy z elementami do obiektów JSON używając wbudowanej funkcji `json.dumps()`
people_json = [json.dumps(item) for item in people]

# Tworzenie RDD z elementami formatu JSON
people_json_rdd = sc.parallelize(people_json)

# Tworzenie Dataframe z użyciem stworzonego RDD
people_df = spark.read.json(people_json_rdd)

# Prezentacja schemy
people_df.printSchema()

# Kilka wierszy do weryfikacji
people_df.show()

### Apache Spark SQL DSL
Poniżej zaprezentowano przykładowe operacje DSL na obiektach typu `DataFrame`. Pełen opis API można znaleźć [tutaj](http://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame).

In [0]:
# Wybór jednej kolumny - metoda `select`
people_df \
  .select("name") \
  .show()

In [0]:
# Wyłuskane dwie kolumny, jedna dynamicznie zmodyfikowana
people_df \
  .select(people_df['name'], people_df['age']+1) \
  .show()

In [0]:
# Filtrowanie danych
people_df \
  .filter(people_df['age'] > 21) \
  .show()

In [0]:
# Funkcja `count`
people_df \
  .groupBy("sex") \
  .count() \
  .show()

Przedstawione na początku notatnika zadanie także można zrealizować wykorzystując DSL (1 linia kodu):

In [0]:
people_df.groupBy("sex").mean("age").show()

Oczywiście istnieje także możliwość otrzymana z powrotem obiektu RDD

In [0]:
people_df \
  .groupBy("sex") \
  .mean("age") \
  .rdd \
  .collectAsMap()

Istnieje także możliwość korzystania bezpośrednio ze składni SQL. W tym celu na samym początku należy zadeklarować w pamięci *widok* danych o określonej nazwie.

> Widok tabeli SQL zostanie usunięty z pamięci w momencie zatrzymania klastra.

In [0]:
# Rejestracja tymczasowego widoku z ramką `people_df` w pamięci jako tablica `people`
people_df.createOrReplaceTempView("people")

In [0]:
# Przykład użycia składni SQL
spark.sql("SELECT * FROM people").show()
spark.sql("SELECT * FROM people WHERE SEX = 'M'").show()

Poniżej przedstawiono zapytanie SQL realizujące pierwotne zadanie. Jest ono czytelniejsze i prawdopodobnie wydajniejsze niż pisanie szeregu przekształceń RDD.

In [0]:
spark.sql("SELECT sex, AVG(age) FROM people GROUP BY sex").show()

Notatniki Zeppelin umożliwiają nawet skorzystanie z interpretera SQL renderującego dane w przyjazny sposób - komórka powinna być poprzedzona poleceniem `%sql`.

> Wykorzystaj rezultat poniższego zapytania, aby **zwizualizować** odpowiedź (kliknięcie w ikonę histogramu znajdującą się pod tabelą z wynikami).

In [0]:
%sql
SELECT * FROM people;

### User Defined Functions (UDF)
Spark SQL posiada możliwość możliwość wywoływania **funkcji użytkownika**, tzw "_user defined functions (UDFs)_". Mają możliwość wykonania obliczeń dla danego pola. Ponieważ funkcje te tworzone są przez programistów ich działanie traktowane jest jako "czarna skrzynka" i nie podlega zaawansowanej optymalizacji.

Przykład poniżej demonstruje tworzenie UDF konwertującego tekst do wielkich liter.

In [0]:
# UDF jest zwykłą funkcją
def upper(s):
  return s.upper()

Aby móc wywołaś funkcję korzystając z DSL Apache Spark należy najpierw opakować ją funkcją `udf`:

In [0]:
from pyspark.sql.functions import udf
upper_udf = udf(upper, StringType())

# zapytanie DSL
people_df \
  .withColumn('uppercased_name', upper_udf('name')) \
  .show()

Jeśli chcemy wywołać ją poprzez natywne zapytanie SQL należy ją także uprzednio zarejestrować w kontekście.

In [0]:
# rejestracja funkcji w kontekście SQL
sqlContext.udf.register("upper_udf", upper, StringType())

In [0]:
%sql
SELECT
  name,
  upper_udf(name) AS `uppercased_name`
FROM people

### Rzutowanie typów danych
Bardzo często okazuje  się, że typu danych w ramkach nie są dopasowane do rodzaju operacji. W takiej sytuacji należy zrzutować je na poprawny typ (funkcja `cast`). 

In [0]:
people_df \
  .withColumn("age_float", people_df['age'].cast(FloatType())) \
  .show()

## Spark MLlib
Biblioteka [MLlib](http://spark.apache.org/docs/latest/ml-guide.html) upraszcza pracę z algorytmami uczenia maszynowego na dużych zbiorach danych.

Podstawowe funkcjonalności to:
- gotowe implementacje wielu algorytmów w rozproszonej wersji (klasyfikacja, regresja, klastrowanie, rekomendacje),
- tworzenie nowych cech (transformacje, redukcje wymiarów, ...),
- możliwość definiowania tzw *pipelines* - procesów grupujących kolejne operacje konieczne do realizacji danego zadania,
- możliwość zapisu gotowego modelu w pliku tekstowym a następnie zaimportowanie go (w dowolnym interpreterze Apache Spark),
- dostępne metody pomocnicze w operacjach na danych (np. dla algebry liniowej, funkcje statystyczne itp).

In [0]:
# instalacja zależności
!pip install mlflow

### Zadanie
*Celem jest stworzenie modelu regresji liniowej korzystając z danych poniżej*.

**Plan działania jest następujący**:
1. Reprezentacja danych w formacie nadającym się do analizy w module MLlib,
2. Uruchomienie algorytmu regresji liniowej dla wszystkich danych,
3. Podział na dwa zbiory danych - treningowy i testowy,
4. Wyszukiwanie najlepszej kombinacji hiperparametrów z użyciem walidacji krzyżowej (ang. *cross-validation*)

#### Dane
W poniższym zadaniu wykorzystamy przykładowe dane z problemu ["Kwartetu Anscombe'a"](https://www.wikiwand.com/pl/Kwartet_Anscombe%27a) (skopiowane [stąd](https://gist.github.com/endolith/3299951)).

In [0]:
# Dane wejściowe (kwartet Anscombe'a)
x = [10.0, 8.0,  13.0,  9.0,  11.0, 14.0, 6.0,  4.0,  12.0,  7.0,  5.0]
y = [7.46, 6.77, 12.74, 7.11, 7.81, 8.84, 6.08, 5.39, 8.15,  6.42, 5.73]

W celu wizualizacji wykorzystamy popularną bibliotekę `matplotlib`. Jak widać dane są umieszczone prawie idealnie na linii prostej (z pominięciem jednego odchylenia).

In [0]:
import matplotlib.pyplot as plt

fig, ax = plt.subplots()

ax.scatter(x, y, c="r")
ax.set_title("Anscombe's Quarter")
ax.set_xlabel("X")
ax.set_ylabel("Y")
ax.grid(True)

display(fig)

#### 1. Reprezentacja danych w formacie używanym w MLLib
Dane powinny zostać zapisane w formacie RDD kompatybilnym z biblioteką MLlib.

W przypadku Pythona mogą one być zarówno:
- zwykłymi listami/krotkami,
- listami z biblioteki Numpy. 

Więcej informacji [tutaj](http://spark.apache.org/docs/latest/mllib-data-types.html#data-types-rdd-based-api).

In [0]:
data = []

# Konsersja danych do krotki typu <cecha, wynik>
for feature, label in zip(x, y):
  data.append((feature, label))

# Tworzenie RDD
rdd = spark.sparkContext.parallelize(data)

# Rzut okiem na rezultat
rdd.collect()

Taką kolekcję należy reprezentować jako `DataFrame`, gdzie w pierwszej kolumnie umieszczona jest informacja o informacji jaką chcemy przewidzieć (*target label*), a następnie umieszamy wektor cech (klasa `Vectors` umożliwia stworzenie normalnego wektora jak i takiego działającego dla rzadkich wartości - *sparse*).

> W tym obrazowym scenariuszu dostępna jest jedna cecha. W rzeczywistych scenariuszach obiekt `Vectors` będzie reprezentował wektor wielu elementów (np. wiek, waga, lokalizacja, itp).

In [0]:
from pyspark.ml.linalg import Vectors

# Definicja funkcji konwertującą krotkę do formatu akceptowalnego przez MLlib
def convert(row):
  return (float(row[1]), Vectors.dense(row[0]))

# Konwersja na RDD
features_rdd = rdd.map(convert)

# Stworzenie DataFrame, jawne dodanie nazw kolumn
dataset_df = spark.createDataFrame(features_rdd, ["label", "features"])

# Wyświetlenie pierwszych dwóch elementów
dataset_df.show(2)

#### 2. Uruchomienie algorytmu
Mają dane zapisane w dogodnej postaci możemy zadeklarować obiekt algorytmu, ustawić przykładowe parametry a następnie dopasować jego współczynniki od danych.

In [0]:
from pyspark.ml.regression import LinearRegression

# Deklaracja obiektu reprezentującego algorytm liniowej regresji (ze stałymi parametrami)
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Trenowanie modelu
lr_model = lr.fit(dataset_df)

Po trenowaniu mamy dostęp do kilku zmiennych określających wynik procesu.

In [0]:
# Obtained model
print("MODEL")
print("Coefficients: [{}]".format(lr_model.coefficients))
print("Intercept: [{}]".format(lr_model.intercept))

# Training stats
print("\nTRAINING STATS")
print("numIterations: {}".format(lr_model.summary.totalIterations))
print("objectiveHistory: {}".format(lr_model.summary.objectiveHistory))
print("RMSE: {}".format(lr_model.summary.rootMeanSquaredError))
print("r2: {}".format(lr_model.summary.r2))
lr_model.summary.residuals.show()

#### 3. Podział na dwa zbiory - treningowy i testowy
Niestety otrzymane wyżej wyniki nie mogą uznać za wiarygodne - algorytm został przetestowany na wcześniej widzianych danych (występuje tu zjawisko nadmiernego dopasowania - tzw. "_overfitting_").

Dobrą praktyką jest podział danych na dwa zbiory:
- treningowy (używany do obliczenia najlepszych współczynników modelu),
- testowy (dane nie wykorzystane w procesie uczenia które przysłużą się do ocenienia wydajności modelu)

W tym przypadku zbiór testowy zawierał będzie 30% wszystkich rekordów.

In [0]:
# Losowy podział danych na zbiór treningowy i testowy
train_df, test_df = dataset_df.randomSplit([0.7, 0.3], seed=12345)

In [0]:
print("Ilość elementów w zbiorze treningowym: {}".format(train_df.count()))
print("Ilość elementów w zbiorze testowym: {}".format(test_df.count()))

#### 4. Wyszukiwanie najlepszej kombinacji hiper-parametrów z użyciem walidacji krzyżowej (ang. *cross-validation*)
Bardzo rzadko zdarza się, że znamy dokładne współczynniki algorytmu przetwarzającego dane.

Aby poradzić sobie z tym problemem zdefiniujemy zakres możliwych wartości dla parametrów które chcemy dostroić. Następnie każdna możliwa kombinacja zostanie wykonana i przetestowana. W MLlib w celu określenie przestrzeni parametrów stworzono klasę `ParamGridBuilder`. 

W tym przypadku będziemy modyfikować dwa parametry modelu - `regParam` i `elasticNetParam`.

In [0]:
from pyspark.ml.tuning import ParamGridBuilder

# Deklaracja parametrów i możliwych wartości które zostaną sprawdzone
paramGrid = ParamGridBuilder() \
  .addGrid(lr.regParam, [0.1, 0.01]) \
  .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
  .build()

Kolejnym krokiem jest uruchomienie [walidacji krzyżowej](https://www.wikiwand.com/en/Cross-validation_(statistics).

W MLlib możemy ustalić następujące parametry:
- `estimator` - testowany model,
- `estimatorParamsMap` - obiekt opisujący dozwolone wartości parametrów do przetestowania,
- `evaluator` - obiekt określający metrykę wyboru najlepszego modelu
- `numFolds` - na ile podzbiorów zostaną podzielone dane treningowe

In [0]:
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

# Deklaracja strategii walidacji krzyżowej
crossval = CrossValidator(
  estimator=lr,
  estimatorParamMaps=paramGrid,
  evaluator=RegressionEvaluator(),
  numFolds=2
)

Wytrenujmy model korzystając z przygotowanego wcześniej zbioru danych treningowych:

In [0]:
cv_model = crossval.fit(train_df)

Warto zwrócić uwagę na ilość zadań uruchomionych w tle. Framework sprawdził i ocenił każdą kombinację parametrów.

Tak uzyskany najlepszy model może być użyty dla przewidzenia danych ze zbioru testowego

In [0]:
predicted_df = cv_model.transform(test_df)

Wyświetlmy otrzymane predykcje w celu porównania ich z prawdziwymi wartościami

In [0]:
predicted_df \
  .withColumnRenamed('label', 'true_value') \
  .select("true_value", "prediction") \
  .show()

## Ćwiczenia
1. Utworzyć dwa obiekty `DataFrame`. Jeden powinien zawierać dane o produktach, drugi o sprzedażach.
2. Zarejestrować w pamięci oba widoki
3. Należy napisać zapytanie SQL wyświetlające nazwę i średnią sprzedanych produktów (zaprezentowane powinny być tylko produkty znajdujace się w tablicy `products`).
4. Należy napisać zapytanie SQL wyświetlające nazwę produktu i powiązany z nim sumaryczny dochód (zaprezentowane powinny być tylko produkty znajdujace się w tablicy `products`)
5. Należy napisać własną funkcję UDF która zaokrągli cenę produktu do pełnej liczby całkowitej.

In [0]:
# Lista produktów
products = [
  {"id": 1, "name": "ProductA", "price": 9.99},
  {"id": 2, "name": "ProductB", "price": 12.99},
  {"id": 3, "name": "ProductC", "price": 69.99}
]

# Lista danych sprzedażowych
sales = [
  {"product_id": 1, "amount": 1},
  {"product_id": 2, "amount": 5},
  {"product_id": 3, "amount": 5},
  {"product_id": 1, "amount": 1},
  {"product_id": 2, "amount": 2},
  {"product_id": 2, "amount": 1},
  {"product_id": 3, "amount": 1},
  {"product_id": 1, "amount": 10},
  {"product_id": 4, "amount": 5},
  {"product_id": 2, "amount": 2},
  {"product_id": 4, "amount": 7},
  {"product_id": 2, "amount": 3},
  {"product_id": 1, "amount": 1},
  {"product_id": 3, "amount": 7},
  {"product_id": 1, "amount": 3},
  {"product_id": 3, "amount": 1},
  {"product_id": 3, "amount": 8},
  {"product_id": 1, "amount": 10},
]