{"cells":[{"cell_type":"markdown","source":["# Hurtownie Danych i Big Data\n## Laboratorium 14 - Opracowanie i uruchomienie własnych programów realizujących obliczenia zgodnie z paradygmatem MapReduce\n\nPo zapoznaniu się z treścią notatnika będziesz wiedział jak wykorzystać moduły Spark SQL i MLlib do sprawniejszej pracy z danymi.\n\n### Zawartość notatnika\n1. Przykład użycia Spark SQL\n2. Przykład użycia algorytmu regresji liniowej w Spark MLlib"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"49b77a82-4697-4512-b84c-5f502ca85485"}}},{"cell_type":"markdown","source":["## Apache Spark SQL\nModuł [Apache Spark SQL](https://data-flair.training/blogs/apache-spark-sql-tutorial/) umożliwia reprezentację danych w postaci relacyjnej. Jest nakładką na tzw. _Core API_ udostępniające metody do operowania na poziomie RDD w rozproszonym środowisku. Użytkownik może przetwarzać dane w bardziej intuicyjnej formie i przykładać mniej uwagi do technicznych detali. Operacje są mapowane do metod z _Core API_ jednak są już odpowiednio zoptymalizowane i bardzo często wykonują się bardziej wydajnie. \n\nAby korzystać z możliwości modułu Spark SQL dane muszą być przedstawione formie tzw. `DataFrame` (inna nazwa to po prostu `SchemaRDD` - czyli RDD z elementami typu `Row` ze znanymi typami pól).\n\n### Dlaczego?\n- `DataFrame` to wyższy poziom abstrakcji niż RDD (udostępnia własne DSL oraz możliwość formułowania zapytań SQL),\n- bardziej zaawansowana optymalizacja (wykorzystanie *query planner* optymalizującego zapytanie przed przekazaniem do wykonania),\n- dane posiadają zdefiniowaną strukturę (schema),\n- dużo czytelniejsze niż szereg niskopoziomowych operacji na RDD,\n- umożliwia skupienia się na biznesowym problemie bardziej niż na szczegółach implementacyjnych\n\n> Korzystając z języków Java oraz Scala uzyskujemy dostęp do trzeciego sposobu reprezentacji danych - `Dataset`. Rozszerza on możliwości `DataFrame` wprowadzając kontrolę typów każdej kolumny na poziomie kompilacji kodu oraz jeszcze lepsze mechanizmy optymalizacji wykonywania zapytań.\n\nZainteresowani mogą obejrzeć [wideo](https://www.youtube.com/watch?v=pZQsDloGB4w) przedstawiające porównanie wszystkich 3 sposobów (`RDD`, `DataFrame`, `Dataset`) reprezentacji danych.\n\n### Przykład\nZdefiniujmy zadanie którego celem jest analiza \"*średniego wieku użytkownia w zależności od płci.*\"\n\n**Dane wejściowe**: Zmienna `people` przechowuje początkowy zbiór danych w formacie JSON
\n**Dane wyjściowe**: Średnia wieku dla każdej płci"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"dd36b638-af66-4451-bf4d-993a91a746cf"}}},{"cell_type":"code","source":["people = [\n {\"name\": \"Bob\", \"age\": 12, \"sex\": \"M\"},\n {\"name\": \"Alice\", \"age\": 25, \"sex\": \"F\"},\n {\"name\": \"Jack\", \"age\": 18, \"sex\": \"M\"},\n {\"name\": \"Susan\", \"age\": 21, \"sex\": \"F\"},\n {\"name\": \"Cristina\", \"age\": 87, \"sex\": \"F\"}\n]"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"d728a3bf-b703-46cd-9ea1-74aa4f664457"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["### Wariant 1 - wykorzystanie RDD\nZadanie można zrealizować w tradycyjny sposób wykorzystując podstawowe operacje RDD."],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"7bf8da51-94a2-4904-bf70-85b42ac84574"}}},{"cell_type":"code","source":["# Stworzenie RDD z tablicy `people`\npeople_rdd = spark.sparkContext.parallelize(people)\n\n# Transformacja danych do formatu , jako \nages_rdd = people_rdd.map(lambda row: (row['sex'], row['age']))\n\n# Grupowanie wierszy dla każdej płci. Wynik docelowy: <'M', [age1, age2,...]>, <'F', [age1, age2,...]>\ngroupedBySex_rdd = ages_rdd.groupByKey()\n\n# Obliczenie średniej wieku dla każdej płci.\n# Użyto funkcji `mapValues()` która wykonuje funkcję dla wszystkich elementów danego klucza \navg_rdd = groupedBySex_rdd.mapValues(lambda ages: sum(ages) / float(len(ages)))\n\n# Uruchomienie obliczeń i przesłanie wyników z powrotem do sterownika\nresult = avg_rdd.collect()\n\n# Wyświetlenie wyników\nfor sex, age in result:\n print(\"Średni wiek dla {} to: {:.2f} lat\".format(sex, age))"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"5ce23ae4-d052-4802-bcbf-dde8e9040c06"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["### Wariant 2 - Użycie `DataFrame`\nJest kilka problemów związanych z operacjami bezpośrednio z RDD:\n- konieczność wnikliwego wczytania się w kod i zrozumienia poszczególnych operacji aby poznać intencje autora,\n- konieczność zrozumienia mechanizmu planowania zapytania w celu optymalizacji jego wykonania,\n- konieczność ręcznego zadbania o optymalne partycjonowanie danych\n\nLepszym wariantem jest jednak opakowanie danych w określoną strukturę korzystając z modułu Spark SQL. To podejście jest aktualnie forsowane jako zalecane przez twórców Apache Spark.\n\nPoniżej przedstawiono trzy warianty tworzenia `DataFrames`:\n1. import danych wejściowych słownikowych (typu `dict`). Ten sposób jest najprostszy ale zostaje wycofywany z nowszych wersji, ponieważ Spark ma często problem z prawidłowym rozpoznaniem typów obiektów umieszonych w słowniku,\n2. zmapowanie elementów do klasy `Row`,\n3. dla danych wejściowych przedstawionych w formacie JSON (popularna operacja)"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"bcbe7258-19a9-4c12-a6c6-e3ead0155721"}}},{"cell_type":"markdown","source":["#### Użycie metody `createDataFrame()`\nMetoda `createDataFrame()` umożliwia utworzenie obiektu `DataFrame` z danych pochodzących z obiektów RDD, Pythonowych list czy ramek danych Pandas (popularna biblioteka używana w przetwarzaniu danych).\n\n> Z dokumentacji [link](https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.SparkSession.createDataFrame):\n>\n> `SparkSession.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)`\n\nW przykładzie niżej jako argument przekazujemy zmieną z tablicą obiektów."],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"37cb11f1-1ab9-4e5e-8df6-ef45b1786ecd"}}},{"cell_type":"code","source":["people_df = spark.createDataFrame(people)\n\n# Podgląd utworzonej ramki danych\npeople_df.show()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"ad258d5a-b6ee-4cfb-b293-a7456e71d409"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["Aby sprawdzić czy typy danych zostały poprawnie rozpoznane można skorzystać z metody `printSchema()`."],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"e0d907b6-9cbc-4973-b523-c260ce74f9ad"}}},{"cell_type":"code","source":["people_df.printSchema()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"b21b90e0-5d2b-4ef0-af74-1da23aa85f12"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["Jak widać schema danych została poprawnie rozpoznana. Wiek jest interpretowany jako liczba (typu `long`), a reszta pól to dane tekstowe. \n\nPrzy tworzeniu ramki danych otrzymaliśmy błąd ponieważ nie zawsze daje się w tak prosty sposób rozpoznać typy danych i powinniśmy to zrobić w bardziej przejrzysty sposób. Np. zadeklarować schemę jawnie.\n\nW przykładzie niżej zmieniliśmy typ danych wiekowych z `long` na `int`."],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"af70f611-661f-4ed7-9696-3a0ef943b97a"}}},{"cell_type":"code","source":["from pyspark.sql.types import *\n\n# Deklaracja schemy\n# Trzeci parametr w konstruktorze `StructField` określa czy pola w komórce mogą być puste\nschema = StructType([\n StructField(\"name\", StringType(), False),\n StructField(\"sex\", StringType(), False),\n StructField(\"age\", IntegerType(), False)])\n\n# Utworzenie ramki z jawnie podaną schemą\npeople_df = spark.createDataFrame(people, schema)\n\npeople_df.printSchema()\n\npeople_df.show()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"2f3532d0-33d0-485e-9aed-30f258528fa6"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["Zamiast tworzyć osobny obiekt reprezentujący schemę w prostych przypadkach możemy zadeklarować ją _inline_ (szybsze dla prostych danych)."],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"41310dbb-8210-4e59-b7da-09356cf00525"}}},{"cell_type":"code","source":["people_df = spark.createDataFrame(people, \"name: string, sex: string, age: int\")\n\npeople_df.printSchema()\n\npeople_df.show()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"e546e1e6-ae55-4947-bd39-30d197210698"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["#### Mapowanie RDD do typu `Row`\nInnym sposobem jest przekształcenie poszczególnych elementów RDD do obiektów typu `Row`. Jest to niskopoziomowa operacja wykonywana automatycznie przez framework podczas innych sposobów ładowania danych. Wykorzystanie tego sposobu nie niesie ze sobą większych korzyści oprócz zaznajomienia się z wewnętrznym sposobem reprezentacji danych."],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"1c7823da-ca26-4c05-b3dd-45a6ff64fc7b"}}},{"cell_type":"code","source":["from pyspark.sql import Row\n\n# Rzutujemy każdy obiekt do typu Row\npeopleRow_rdd = people_rdd.map(lambda el: Row(**el))\n\n# Tworzymy obiekt RDD\npeople_df = spark.createDataFrame(peopleRow_rdd)\n\n# Sprawdzenie schemy\npeople_df.printSchema()\n\n# I zawartości\npeople_df.show()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"ccc20643-cf5d-4d06-8864-d63843504142"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["#### Zewnętrzne źródła danych\nMetoda `read()` umożliwia odczyt danych z zewnętrznych źródeł - takich jak pliki JSON, Hive, Parquet czy [innych](https://spark.apache.org/docs/latest/sql-programming-guide.html#data-sources).\n\nPoniższy przykład konwertuje tablicę `people` do poprawnego formatu JSON, tworzy RDD na następnie inicjalizuje `DataFrame` korzystając z metody `spark.read(...)`."],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"2033a351-57c0-42b7-b6e6-9ff23c536fdd"}}},{"cell_type":"code","source":["import json\n\n# Konwersja tablicy z elementami do obiektów JSON używając wbudowanej funkcji `json.dumps()`\npeople_json = [json.dumps(item) for item in people]\n\n# Tworzenie RDD z elementami formatu JSON\npeople_json_rdd = sc.parallelize(people_json)\n\n# Tworzenie Dataframe z użyciem stworzonego RDD\npeople_df = spark.read.json(people_json_rdd)\n\n# Prezentacja schemy\npeople_df.printSchema()\n\n# Kilka wierszy do weryfikacji\npeople_df.show()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"eb02be67-8e0a-4e6d-9b51-cb937c49e011"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["### Apache Spark SQL DSL\nPoniżej zaprezentowano przykładowe operacje DSL na obiektach typu `DataFrame`. Pełen opis API można znaleźć [tutaj](http://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame)."],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"60c6fd0f-7b7d-49fd-92cc-11130d2acb87"}}},{"cell_type":"code","source":["# Wybór jednej kolumny - metoda `select`\npeople_df \\\n .select(\"name\") \\\n .show()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"a8e822ad-3251-4a2c-ac16-5dafeaef7b13"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"code","source":["# Wyłuskane dwie kolumny, jedna dynamicznie zmodyfikowana\npeople_df \\\n .select(people_df['name'], people_df['age']+1) \\\n .show()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"b8949674-a4fc-46b8-b886-bc919975cc1f"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"code","source":["# Filtrowanie danych\npeople_df \\\n .filter(people_df['age'] > 21) \\\n .show()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"415004b0-31fd-4ec8-8f6c-440961d3d84c"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"code","source":["# Funkcja `count`\npeople_df \\\n .groupBy(\"sex\") \\\n .count() \\\n .show()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"64d494b5-abfe-493d-8f17-84d43495d52b"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["Przedstawione na początku notatnika zadanie także można zrealizować wykorzystując DSL (1 linia kodu):"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"c9cf10ce-b1b8-4ad9-b7c4-1b25e5cc9d94"}}},{"cell_type":"code","source":["people_df.groupBy(\"sex\").mean(\"age\").show()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"b16e2d49-89f5-4f14-8aca-ac0ce12c38f1"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["Oczywiście istnieje także możliwość otrzymana z powrotem obiektu RDD"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"dd63933c-40c4-43e3-a317-07b99b276693"}}},{"cell_type":"code","source":["people_df \\\n .groupBy(\"sex\") \\\n .mean(\"age\") \\\n .rdd \\\n .collectAsMap()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"6d4c1133-ebc9-4d09-b72b-3d5c79dec3e0"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["Istnieje także możliwość korzystania bezpośrednio ze składni SQL. W tym celu na samym początku należy zadeklarować w pamięci *widok* danych o określonej nazwie.\n\n> Widok tabeli SQL zostanie usunięty z pamięci w momencie zatrzymania klastra."],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"6b735966-a44d-4db6-b8d7-6e42be47bec1"}}},{"cell_type":"code","source":["# Rejestracja tymczasowego widoku z ramką `people_df` w pamięci jako tablica `people`\npeople_df.createOrReplaceTempView(\"people\")"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"078b56aa-6b64-4c28-9ca7-ce0b293bf9d8"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"code","source":["# Przykład użycia składni SQL\nspark.sql(\"SELECT * FROM people\").show()\nspark.sql(\"SELECT * FROM people WHERE SEX = 'M'\").show()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"7776edd0-15c8-493d-8ad5-554df31a28da"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["Poniżej przedstawiono zapytanie SQL realizujące pierwotne zadanie. Jest ono czytelniejsze i prawdopodobnie wydajniejsze niż pisanie szeregu przekształceń RDD."],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"7519cc80-4377-49f0-92c3-b92fee70496f"}}},{"cell_type":"code","source":["spark.sql(\"SELECT sex, AVG(age) FROM people GROUP BY sex\").show()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"549c327d-6d19-4c91-bdde-1a02634545d1"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["Notatniki Zeppelin umożliwiają nawet skorzystanie z interpretera SQL renderującego dane w przyjazny sposób - komórka powinna być poprzedzona poleceniem `%sql`.\n\n> Wykorzystaj rezultat poniższego zapytania, aby **zwizualizować** odpowiedź (kliknięcie w ikonę histogramu znajdującą się pod tabelą z wynikami)."],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"310bb06f-8bf5-4f3e-8212-04ec2c5bcdfb"}}},{"cell_type":"code","source":["%sql\nSELECT * FROM people;"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"1052e7b5-55da-4655-9231-438296b86fc7"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["### User Defined Functions (UDF)\nSpark SQL posiada możliwość możliwość wywoływania **funkcji użytkownika**, tzw \"_user defined functions (UDFs)_\". Mają możliwość wykonania obliczeń dla danego pola. Ponieważ funkcje te tworzone są przez programistów ich działanie traktowane jest jako \"czarna skrzynka\" i nie podlega zaawansowanej optymalizacji.\n\nPrzykład poniżej demonstruje tworzenie UDF konwertującego tekst do wielkich liter."],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"4970c8d3-6e82-43fe-a4f2-307cbfb1265f"}}},{"cell_type":"code","source":["# UDF jest zwykłą funkcją\ndef upper(s):\n return s.upper()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"b3d9c175-7567-46cd-8ad0-3c3ff9063639"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["Aby móc wywołaś funkcję korzystając z DSL Apache Spark należy najpierw opakować ją funkcją `udf`:"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"ef304165-3ac7-4918-9aaf-0b5fa64ee48c"}}},{"cell_type":"code","source":["from pyspark.sql.functions import udf\nupper_udf = udf(upper, StringType())\n\n# zapytanie DSL\npeople_df \\\n .withColumn('uppercased_name', upper_udf('name')) \\\n .show()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"c32eaf26-cc2e-4143-8ce7-16bc326cb057"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["Jeśli chcemy wywołać ją poprzez natywne zapytanie SQL należy ją także uprzednio zarejestrować w kontekście."],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"f3887b06-5453-4a16-9b1c-0bc71878d125"}}},{"cell_type":"code","source":["# rejestracja funkcji w kontekście SQL\nsqlContext.udf.register(\"upper_udf\", upper, StringType())"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"74050308-b3fb-4b50-9f26-3b860e77c9b2"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"code","source":["%sql\nSELECT\n name,\n upper_udf(name) AS `uppercased_name`\nFROM people"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"9ef99cfe-f27b-4097-9cca-1c844502cea8"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["### Rzutowanie typów danych\nBardzo często okazuje się, że typu danych w ramkach nie są dopasowane do rodzaju operacji. W takiej sytuacji należy zrzutować je na poprawny typ (funkcja `cast`). "],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"585df739-db6b-4102-b61b-e834fbda6898"}}},{"cell_type":"code","source":["people_df \\\n .withColumn(\"age_float\", people_df['age'].cast(FloatType())) \\\n .show()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"f5ed6ddf-a8fa-4eae-9288-2b578fd8a592"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["## Spark MLlib\nBiblioteka [MLlib](http://spark.apache.org/docs/latest/ml-guide.html) upraszcza pracę z algorytmami uczenia maszynowego na dużych zbiorach danych.\n\nPodstawowe funkcjonalności to:\n- gotowe implementacje wielu algorytmów w rozproszonej wersji (klasyfikacja, regresja, klastrowanie, rekomendacje),\n- tworzenie nowych cech (transformacje, redukcje wymiarów, ...),\n- możliwość definiowania tzw *pipelines* - procesów grupujących kolejne operacje konieczne do realizacji danego zadania,\n- możliwość zapisu gotowego modelu w pliku tekstowym a następnie zaimportowanie go (w dowolnym interpreterze Apache Spark),\n- dostępne metody pomocnicze w operacjach na danych (np. dla algebry liniowej, funkcje statystyczne itp)."],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"06707c56-fc2c-4100-a19a-f22270cc06d9"}}},{"cell_type":"code","source":["# instalacja zależności\n!pip install mlflow"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"86fcd471-0d66-4e3f-a73d-d028c48b4e2c"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["### Zadanie\n*Celem jest stworzenie modelu regresji liniowej korzystając z danych poniżej*.\n\n**Plan działania jest następujący**:\n1. Reprezentacja danych w formacie nadającym się do analizy w module MLlib,\n2. Uruchomienie algorytmu regresji liniowej dla wszystkich danych,\n3. Podział na dwa zbiory danych - treningowy i testowy,\n4. Wyszukiwanie najlepszej kombinacji hiperparametrów z użyciem walidacji krzyżowej (ang. *cross-validation*)\n\n#### Dane\nW poniższym zadaniu wykorzystamy przykładowe dane z problemu [\"Kwartetu Anscombe'a\"](https://www.wikiwand.com/pl/Kwartet_Anscombe%27a) (skopiowane [stąd](https://gist.github.com/endolith/3299951))."],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"3abdccd8-98d4-44b1-a171-355849d96b59"}}},{"cell_type":"code","source":["# Dane wejściowe (kwartet Anscombe'a)\nx = [10.0, 8.0, 13.0, 9.0, 11.0, 14.0, 6.0, 4.0, 12.0, 7.0, 5.0]\ny = [7.46, 6.77, 12.74, 7.11, 7.81, 8.84, 6.08, 5.39, 8.15, 6.42, 5.73]"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"aa72c724-10bf-4e4b-baf7-3bbbee784ec5"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["W celu wizualizacji wykorzystamy popularną bibliotekę `matplotlib`. Jak widać dane są umieszczone prawie idealnie na linii prostej (z pominięciem jednego odchylenia)."],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"c9a5569a-fea1-408e-8e0b-d3fcc3e1424c"}}},{"cell_type":"code","source":["import matplotlib.pyplot as plt\n\nfig, ax = plt.subplots()\n\nax.scatter(x, y, c=\"r\")\nax.set_title(\"Anscombe's Quarter\")\nax.set_xlabel(\"X\")\nax.set_ylabel(\"Y\")\nax.grid(True)\n\ndisplay(fig)"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"be4523bd-a5ec-4abd-b03f-275079b2b4fc"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["#### 1. Reprezentacja danych w formacie używanym w MLLib\nDane powinny zostać zapisane w formacie RDD kompatybilnym z biblioteką MLlib.\n\nW przypadku Pythona mogą one być zarówno:\n- zwykłymi listami/krotkami,\n- listami z biblioteki Numpy. \n\nWięcej informacji [tutaj](http://spark.apache.org/docs/latest/mllib-data-types.html#data-types-rdd-based-api)."],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"66bf1740-5fdd-4ae8-b470-b1d45fab0991"}}},{"cell_type":"code","source":["data = []\n\n# Konsersja danych do krotki typu \nfor feature, label in zip(x, y):\n data.append((feature, label))\n\n# Tworzenie RDD\nrdd = spark.sparkContext.parallelize(data)\n\n# Rzut okiem na rezultat\nrdd.collect()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"f6ac2293-37c9-4d37-83bd-228caa060d3a"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["Taką kolekcję należy reprezentować jako `DataFrame`, gdzie w pierwszej kolumnie umieszczona jest informacja o informacji jaką chcemy przewidzieć (*target label*), a następnie umieszamy wektor cech (klasa `Vectors` umożliwia stworzenie normalnego wektora jak i takiego działającego dla rzadkich wartości - *sparse*).\n\n> W tym obrazowym scenariuszu dostępna jest jedna cecha. W rzeczywistych scenariuszach obiekt `Vectors` będzie reprezentował wektor wielu elementów (np. wiek, waga, lokalizacja, itp)."],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"5bd04490-9097-41fe-b341-755a0314ede5"}}},{"cell_type":"code","source":["from pyspark.ml.linalg import Vectors\n\n# Definicja funkcji konwertującą krotkę do formatu akceptowalnego przez MLlib\ndef convert(row):\n return (float(row[1]), Vectors.dense(row[0]))\n\n# Konwersja na RDD\nfeatures_rdd = rdd.map(convert)\n\n# Stworzenie DataFrame, jawne dodanie nazw kolumn\ndataset_df = spark.createDataFrame(features_rdd, [\"label\", \"features\"])\n\n# Wyświetlenie pierwszych dwóch elementów\ndataset_df.show(2)"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"099ef486-c358-457b-9c33-6feec2fc49eb"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["#### 2. Uruchomienie algorytmu\nMają dane zapisane w dogodnej postaci możemy zadeklarować obiekt algorytmu, ustawić przykładowe parametry a następnie dopasować jego współczynniki od danych."],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"ef046be0-6d7f-4a07-84f4-e13f7e58026a"}}},{"cell_type":"code","source":["from pyspark.ml.regression import LinearRegression\n\n# Deklaracja obiektu reprezentującego algorytm liniowej regresji (ze stałymi parametrami)\nlr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)\n\n# Trenowanie modelu\nlr_model = lr.fit(dataset_df)"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"c279214a-817a-41a5-8167-bf5ce2a22513"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["Po trenowaniu mamy dostęp do kilku zmiennych określających wynik procesu."],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"5156eb86-11fb-4a4e-8d95-0f44c04545ae"}}},{"cell_type":"code","source":["# Obtained model\nprint(\"MODEL\")\nprint(\"Coefficients: [{}]\".format(lr_model.coefficients))\nprint(\"Intercept: [{}]\".format(lr_model.intercept))\n\n# Training stats\nprint(\"\\nTRAINING STATS\")\nprint(\"numIterations: {}\".format(lr_model.summary.totalIterations))\nprint(\"objectiveHistory: {}\".format(lr_model.summary.objectiveHistory))\nprint(\"RMSE: {}\".format(lr_model.summary.rootMeanSquaredError))\nprint(\"r2: {}\".format(lr_model.summary.r2))\nlr_model.summary.residuals.show()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"dbd9cb5f-4489-4f24-b8e2-7d1fbb985ebb"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["#### 3. Podział na dwa zbiory - treningowy i testowy\nNiestety otrzymane wyżej wyniki nie mogą uznać za wiarygodne - algorytm został przetestowany na wcześniej widzianych danych (występuje tu zjawisko nadmiernego dopasowania - tzw. \"_overfitting_\").\n\nDobrą praktyką jest podział danych na dwa zbiory:\n- treningowy (używany do obliczenia najlepszych współczynników modelu),\n- testowy (dane nie wykorzystane w procesie uczenia które przysłużą się do ocenienia wydajności modelu)\n\nW tym przypadku zbiór testowy zawierał będzie 30% wszystkich rekordów."],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"dad37581-8182-47eb-b601-ba69f7f7ded9"}}},{"cell_type":"code","source":["# Losowy podział danych na zbiór treningowy i testowy\ntrain_df, test_df = dataset_df.randomSplit([0.7, 0.3], seed=12345)"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"e9a6e30d-de45-45af-80b2-71f22e3a363b"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"code","source":["print(\"Ilość elementów w zbiorze treningowym: {}\".format(train_df.count()))\nprint(\"Ilość elementów w zbiorze testowym: {}\".format(test_df.count()))"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"b15e0649-513a-4a36-add4-df380dd483f8"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["#### 4. Wyszukiwanie najlepszej kombinacji hiper-parametrów z użyciem walidacji krzyżowej (ang. *cross-validation*)\nBardzo rzadko zdarza się, że znamy dokładne współczynniki algorytmu przetwarzającego dane.\n\nAby poradzić sobie z tym problemem zdefiniujemy zakres możliwych wartości dla parametrów które chcemy dostroić. Następnie każdna możliwa kombinacja zostanie wykonana i przetestowana. W MLlib w celu określenie przestrzeni parametrów stworzono klasę `ParamGridBuilder`. \n\nW tym przypadku będziemy modyfikować dwa parametry modelu - `regParam` i `elasticNetParam`."],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"c65e32c8-d656-489a-a70d-53eba33b60f6"}}},{"cell_type":"code","source":["from pyspark.ml.tuning import ParamGridBuilder\n\n# Deklaracja parametrów i możliwych wartości które zostaną sprawdzone\nparamGrid = ParamGridBuilder() \\\n .addGrid(lr.regParam, [0.1, 0.01]) \\\n .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \\\n .build()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"e889d6c4-e140-44c4-a558-1cf966d7fb72"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["Kolejnym krokiem jest uruchomienie [walidacji krzyżowej](https://www.wikiwand.com/en/Cross-validation_(statistics).\n\nW MLlib możemy ustalić następujące parametry:\n- `estimator` - testowany model,\n- `estimatorParamsMap` - obiekt opisujący dozwolone wartości parametrów do przetestowania,\n- `evaluator` - obiekt określający metrykę wyboru najlepszego modelu\n- `numFolds` - na ile podzbiorów zostaną podzielone dane treningowe"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"b7f2feab-2c23-4a71-8711-7e11e14fa236"}}},{"cell_type":"code","source":["from pyspark.ml.tuning import CrossValidator\nfrom pyspark.ml.evaluation import RegressionEvaluator\n\n# Deklaracja strategii walidacji krzyżowej\ncrossval = CrossValidator(\n estimator=lr,\n estimatorParamMaps=paramGrid,\n evaluator=RegressionEvaluator(),\n numFolds=2\n)"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"89fa9762-42e7-40e5-abdc-8b3298bc877b"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["Wytrenujmy model korzystając z przygotowanego wcześniej zbioru danych treningowych:"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"0e6fb67b-723e-4d8f-80f2-a13cddd8c8bf"}}},{"cell_type":"code","source":["cv_model = crossval.fit(train_df)"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"dffdedb4-7005-4c48-ab41-3fdfded03508"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["Warto zwrócić uwagę na ilość zadań uruchomionych w tle. Framework sprawdził i ocenił każdą kombinację parametrów.\n\nTak uzyskany najlepszy model może być użyty dla przewidzenia danych ze zbioru testowego"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"ac16b2c5-7037-4521-8a31-402729b4ee13"}}},{"cell_type":"code","source":["predicted_df = cv_model.transform(test_df)"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"ad97a5a7-cd14-44c2-bc56-799824764a6c"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["Wyświetlmy otrzymane predykcje w celu porównania ich z prawdziwymi wartościami"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"d1f1cf6f-f413-4fd7-a655-2d1be52883d7"}}},{"cell_type":"code","source":["predicted_df \\\n .withColumnRenamed('label', 'true_value') \\\n .select(\"true_value\", \"prediction\") \\\n .show()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"5db266f7-dce8-4c9a-9341-60e4fdc050d0"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0},{"cell_type":"markdown","source":["## Ćwiczenia\n1. Utworzyć dwa obiekty `DataFrame`. Jeden powinien zawierać dane o produktach, drugi o sprzedażach.\n2. Zarejestrować w pamięci oba widoki\n3. Należy napisać zapytanie SQL wyświetlające nazwę i średnią sprzedanych produktów (zaprezentowane powinny być tylko produkty znajdujace się w tablicy `products`).\n4. Należy napisać zapytanie SQL wyświetlające nazwę produktu i powiązany z nim sumaryczny dochód (zaprezentowane powinny być tylko produkty znajdujace się w tablicy `products`)\n5. Należy napisać własną funkcję UDF która zaokrągli cenę produktu do pełnej liczby całkowitej."],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"0a422ebe-faff-4f70-b640-6b4d31c592ec"}}},{"cell_type":"code","source":["# Lista produktów\nproducts = [\n {\"id\": 1, \"name\": \"ProductA\", \"price\": 9.99},\n {\"id\": 2, \"name\": \"ProductB\", \"price\": 12.99},\n {\"id\": 3, \"name\": \"ProductC\", \"price\": 69.99}\n]\n\n# Lista danych sprzedażowych\nsales = [\n {\"product_id\": 1, \"amount\": 1},\n {\"product_id\": 2, \"amount\": 5},\n {\"product_id\": 3, \"amount\": 5},\n {\"product_id\": 1, \"amount\": 1},\n {\"product_id\": 2, \"amount\": 2},\n {\"product_id\": 2, \"amount\": 1},\n {\"product_id\": 3, \"amount\": 1},\n {\"product_id\": 1, \"amount\": 10},\n {\"product_id\": 4, \"amount\": 5},\n {\"product_id\": 2, \"amount\": 2},\n {\"product_id\": 4, \"amount\": 7},\n {\"product_id\": 2, \"amount\": 3},\n {\"product_id\": 1, \"amount\": 1},\n {\"product_id\": 3, \"amount\": 7},\n {\"product_id\": 1, \"amount\": 3},\n {\"product_id\": 3, \"amount\": 1},\n {\"product_id\": 3, \"amount\": 8},\n {\"product_id\": 1, \"amount\": 10},\n]"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"195613c9-2e0d-4c59-96de-629ca35f21f8"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"type":"ipynbError","data":"","errorSummary":"","arguments":{}}},"output_type":"display_data","data":{"text/html":[""]}}],"execution_count":0}],"metadata":{"name":"lab10","notebookId":3948931357712241,"application/vnd.databricks.v1+notebook":{"notebookName":"lab14","dashboards":[],"notebookMetadata":{"pythonIndentUnit":2,"experimentId":"2852568710761466"},"language":"python","widgets":{},"notebookOrigID":2852568710761466}},"nbformat":4,"nbformat_minor":0}