{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# DataFrame cơ bản\n", "Tập dữ liệu phân tán biểu diễn dưới dạng dòng và cột như CSDL." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# SQLContext: Tạo DataFrame" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Từ danh sách tuples" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[Row(_1=u'Alice', _2=1)]\n", "[Row(name=u'Alice', age=1)]\n" ] } ], "source": [ "l = [(\"Alice\", 1)]\n", "print sqlContext.createDataFrame(l).collect()\n", "print sqlContext.createDataFrame(l, [\"name\", \"age\"]).collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Từ RDDs" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(_1=u'Alice', _2=1)]" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rdd = sc.parallelize(l)\n", "sqlContext.createDataFrame(rdd).collect()" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(name=u'Alice', age=1)]" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df = sqlContext.createDataFrame(rdd, [\"name\", \"age\"])\n", "df.collect()" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[Row(a=u'Alice', b=1)]\n", "[Row(value=1)]\n" ] } ], "source": [ "print sqlContext.createDataFrame(rdd, \"a: string, b: int\").collect()\n", "rdd = sc.parallelize(l)\n", "rdd = rdd.map(lambda row: row[1])\n", "print sqlContext.createDataFrame(rdd, \"int\").collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Từ Row" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(name=u'Alice', age=12)]" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from pyspark.sql import Row\n", "rdd = sc.parallelize([(\"Alice\", 12)])\n", "Person = Row(\"name\", \"age\")\n", "person = rdd.map(lambda r: Person(*r))\n", "df2 = sqlContext.createDataFrame(person)\n", "df2.collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Từ Schema" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(name=u'Alice', age=12)]" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from pyspark.sql.types import *\n", "schema = StructType([\n", " StructField(\"name\", StringType(), True),\n", " StructField(\"age\", IntegerType(), True)\n", " ])\n", "df3 = sqlContext.createDataFrame(rdd, schema)\n", "df3.collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Từ pandas" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[Row(name=u'Alice', age=1)]\n", "[Row(0=1, 1=2)]\n" ] } ], "source": [ "import pandas\n", "print sqlContext.createDataFrame(df.toPandas()).collect()\n", "print sqlContext.createDataFrame(pandas.DataFrame([[1, 2]])).collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Chuyển đổi định dạng" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(f1=u'Alice', f2=1)]" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.toDF(\"f1\", \"f2\").collect()" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "u'{\"name\":\"Alice\",\"age\":1}'" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.toJSON().first()" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
nameage
0Alice1
\n", "
" ], "text/plain": [ " name age\n", "0 Alice 1" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.toPandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Tạo temp table" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[u'table1', u'table2']\n" ] } ], "source": [ "sqlContext.registerDataFrameAsTable(df, \"table1\")\n", "sqlContext.registerDataFrameAsTable(df2, \"table2\")\n", "print sqlContext.tableNames()" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "DataFrame[database: string, tableName: string, isTemporary: boolean]\n", "Row(database=u'', tableName=u'table1', isTemporary=True)\n" ] } ], "source": [ "df3 = sqlContext.tables()\n", "print df3\n", "print df3.filter(\"tableName = 'table1'\").first()" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "sqlContext.dropTempTable(\"table1\")\n", "sqlContext.dropTempTable(\"table2\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Tạo hàm UDF: User Defined Function" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(stringLengthString(test)=u'4')]" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "sqlContext.registerFunction(\"stringLengthString\", lambda x: len(x))\n", "sqlContext.sql(\"SELECT stringLengthString('test')\").collect()" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(stringLengthInt(test)=4)]" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from pyspark.sql.types import IntegerType\n", "sqlContext.registerFunction(\"stringLengthInt\", lambda x: len(x), IntegerType())\n", "sqlContext.sql(\"SELECT stringLengthInt('test')\").collect()" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(stringLengthInt(test)=4)]" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "sqlContext.udf.register(\"stringLengthInt\", lambda x: len(x), IntegerType())\n", "sqlContext.sql(\"SELECT stringLengthInt('test')\").collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Thao tác với DataFrame" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "l = [(\"Alice\", 2, 12), (\"Bob\", 5, 25)]\n", "rdd = sc.parallelize(l)\n", "df = sqlContext.createDataFrame(rdd, \"name: string, age: int, height: int\")\n", "df.collect()\n", "\n", "df.createTempView(\"people\")\n", "df2 = sqlContext.sql(\"select * from people\")" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "10" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.repartition(10).rdd.getNumPartitions()" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+---+------+\n", "| name|age|height|\n", "+-----+---+------+\n", "| Bob| 5| 25|\n", "| Bob| 5| 25|\n", "|Alice| 2| 12|\n", "|Alice| 2| 12|\n", "+-----+---+------+\n", "\n" ] } ], "source": [ "data = df.union(df).repartition(\"age\")\n", "data.show()" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+---+------+\n", "| name|age|height|\n", "+-----+---+------+\n", "|Alice| 2| 12|\n", "| Bob| 5| 25|\n", "|Alice| 2| 12|\n", "| Bob| 5| 25|\n", "+-----+---+------+\n", "\n" ] } ], "source": [ "data = data.repartition(7, \"age\")\n", "data.show()" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "7" ] }, "execution_count": 21, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data.rdd.getNumPartitions()" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+---+------+\n", "| name|age|height|\n", "+-----+---+------+\n", "| Bob| 5| 25|\n", "| Bob| 5| 25|\n", "|Alice| 2| 12|\n", "|Alice| 2| 12|\n", "+-----+---+------+\n", "\n" ] } ], "source": [ "data = data.repartition(\"name\", \"age\")\n", "data.show()" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(name=u'Alice', age=2, height=12, age2=4),\n", " Row(name=u'Bob', age=5, height=25, age2=7)]" ] }, "execution_count": 23, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# withColumn(colName, col)\n", "# Returns a new DataFrame by adding a column or replacing the existing column that has the same name.\n", "df.withColumn(\"age2\", df.age + 2).collect()" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(name=u'Alice', age2=2, height=12), Row(name=u'Bob', age2=5, height=25)]" ] }, "execution_count": 24, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.withColumnRenamed(\"age\", \"age2\").collect()" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[Row(ages=u'2'), Row(ages=u'5')]\n", "[Row(ages=u'2'), Row(ages=u'5')]\n" ] } ], "source": [ "print df.select(df.age.cast(\"string\").alias(\"ages\")).collect()\n", "print df.select(df.age.cast(StringType()).alias(\"ages\")).collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Tổng hợp dữ liệu" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(max(age)=5)]" ] }, "execution_count": 26, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.agg({\"age\": \"max\"}).collect()" ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(min(age)=2)]" ] }, "execution_count": 27, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from pyspark.sql import functions as F\n", "df.agg(F.min(df.age)).collect()" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(name=u'Alice', count(1)=1), Row(name=u'Bob', count(1)=1)]" ] }, "execution_count": 28, "metadata": {}, "output_type": "execute_result" } ], "source": [ "gdf = df.groupBy(df.name)\n", "sorted(gdf.agg({\"*\": \"count\"}).collect())" ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(name=u'Alice', min(age)=2), Row(name=u'Bob', min(age)=5)]" ] }, "execution_count": 29, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from pyspark.sql import functions as F\n", "sorted(gdf.agg(F.min(df.age)).collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Alias" ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(name=u'Bob', name=u'Bob', age=5),\n", " Row(name=u'Alice', name=u'Alice', age=2)]" ] }, "execution_count": 30, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from pyspark.sql.functions import *\n", "df_as1 = df.alias(\"df_as1\")\n", "df_as2 = df.alias(\"df_as2\")\n", "joined_df = df_as1.join(df_as2, col(\"df_as1.name\") == col(\"df_as2.name\"), \"inner\")\n", "joined_df.select(\"df_as1.name\", \"df_as2.name\", \"df_as2.age\").collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Xem thống kê" ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- name: string (nullable = true)\n", " |-- age: integer (nullable = true)\n", " |-- height: integer (nullable = true)\n", "\n" ] } ], "source": [ "df.printSchema()" ] }, { "cell_type": "code", "execution_count": 32, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "StructType(List(StructField(name,StringType,true),StructField(age,IntegerType,true),StructField(height,IntegerType,true)))" ] }, "execution_count": 32, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.schema" ] }, { "cell_type": "code", "execution_count": 33, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "StorageLevel(False, False, False, False, 1)" ] }, "execution_count": 33, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.storageLevel" ] }, { "cell_type": "code", "execution_count": 34, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "2" ] }, "execution_count": 34, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.count()" ] }, { "cell_type": "code", "execution_count": 35, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[Row(sum(age)=7)]\n", "[Row(sum(age)=7, sum(height)=37)]\n" ] } ], "source": [ "print df.groupBy().sum(\"age\").collect()\n", "print df.groupBy().sum(\"age\", \"height\").collect()" ] }, { "cell_type": "code", "execution_count": 36, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(avg(age)=3.5)]" ] }, "execution_count": 36, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.groupBy().avg(\"age\").collect()" ] }, { "cell_type": "code", "execution_count": 37, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(avg(age)=3.5, avg(height)=18.5)]" ] }, "execution_count": 37, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.groupBy().avg(\"age\", \"height\").collect()" ] }, { "cell_type": "code", "execution_count": 38, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['name', 'age', 'height']" ] }, "execution_count": 38, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.columns" ] }, { "cell_type": "code", "execution_count": 39, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Column\n", "Column\n", "Column<(age + 1)>\n" ] } ], "source": [ "print df.name\n", "print df[\"name\"]\n", "print df.age + 1" ] }, { "cell_type": "code", "execution_count": 40, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+----+-----+\n", "| name| age|count|\n", "+-----+----+-----+\n", "| null|null| 2|\n", "| null| 2| 1|\n", "| null| 5| 1|\n", "|Alice|null| 1|\n", "|Alice| 2| 1|\n", "| Bob|null| 1|\n", "| Bob| 5| 1|\n", "+-----+----+-----+\n", "\n" ] } ], "source": [ "# cube(*col): Create a multi-dimensional cube for the current DataFrame using the specified columns, so we can run aggregation on them.\n", "df.cube(\"name\", df.age).count().orderBy(\"name\", \"age\").show()" ] }, { "cell_type": "code", "execution_count": 41, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+------------------+\n", "|summary| age|\n", "+-------+------------------+\n", "| count| 2|\n", "| mean| 3.5|\n", "| stddev|2.1213203435596424|\n", "| min| 2|\n", "| max| 5|\n", "+-------+------------------+\n", "\n" ] } ], "source": [ "df.describe([\"age\"]).show()" ] }, { "cell_type": "code", "execution_count": 42, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+-----+------------------+-----------------+\n", "|summary| name| age| height|\n", "+-------+-----+------------------+-----------------+\n", "| count| 2| 2| 2|\n", "| mean| null| 3.5| 18.5|\n", "| stddev| null|2.1213203435596424|9.192388155425117|\n", "| min|Alice| 2| 12|\n", "| max| Bob| 5| 25|\n", "+-------+-----+------------------+-----------------+\n", "\n" ] } ], "source": [ "df.describe().show()" ] }, { "cell_type": "code", "execution_count": 43, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "2" ] }, "execution_count": 43, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.distinct().count()" ] }, { "cell_type": "code", "execution_count": 44, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('name', 'string'), ('age', 'int'), ('height', 'int')]" ] }, "execution_count": 44, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.dtypes" ] }, { "cell_type": "code", "execution_count": 45, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "== Physical Plan ==\n", "Scan ExistingRDD[name#81,age#82,height#83]\n" ] } ], "source": [ "df.explain()" ] }, { "cell_type": "code", "execution_count": 46, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "== Parsed Logical Plan ==\n", "LogicalRDD [name#81, age#82, height#83]\n", "\n", "== Analyzed Logical Plan ==\n", "name: string, age: int, height: int\n", "LogicalRDD [name#81, age#82, height#83]\n", "\n", "== Optimized Logical Plan ==\n", "LogicalRDD [name#81, age#82, height#83]\n", "\n", "== Physical Plan ==\n", "Scan ExistingRDD[name#81,age#82,height#83]\n" ] } ], "source": [ "df.explain(True)" ] }, { "cell_type": "code", "execution_count": 47, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(avg(age)=3.5, avg(height)=18.5)]" ] }, "execution_count": 47, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.groupBy().avg().collect()" ] }, { "cell_type": "code", "execution_count": 48, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(name=u'Bob', avg(age)=5.0), Row(name=u'Alice', avg(age)=2.0)]" ] }, "execution_count": 48, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.groupBy(\"name\").agg({\"age\": \"mean\"}).collect()" ] }, { "cell_type": "code", "execution_count": 49, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(name=u'Bob', avg(age)=5.0, avg(height)=25.0),\n", " Row(name=u'Alice', avg(age)=2.0, avg(height)=12.0)]" ] }, "execution_count": 49, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.groupBy(df.name).avg().collect()" ] }, { "cell_type": "code", "execution_count": 50, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(name=u'Bob', age=5, count=1), Row(name=u'Alice', age=2, count=1)]" ] }, "execution_count": 50, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.groupBy([\"name\", df.age]).count().collect()" ] }, { "cell_type": "code", "execution_count": 51, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[Row(max(age)=5)]\n", "[Row(max(age)=5, max(height)=25)]\n" ] } ], "source": [ "print df.groupBy().max(\"age\").collect()\n", "print df.groupBy().max(\"age\", \"height\").collect()" ] }, { "cell_type": "code", "execution_count": 52, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[Row(avg(age)=3.5)]\n", "[Row(avg(age)=3.5, avg(height)=18.5)]\n" ] } ], "source": [ "print df.groupBy().mean(\"age\").collect()\n", "print df.groupBy().mean(\"age\", \"height\").collect()" ] }, { "cell_type": "code", "execution_count": 53, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+------+-----+\n", "|age|height| name|\n", "+---+------+-----+\n", "| 5| 80|Alice|\n", "| 10| 80|Alice|\n", "+---+------+-----+\n", "\n" ] } ], "source": [ "from pyspark.sql import Row\n", "df = sc.parallelize([\n", " Row(name=\"Alice\", age=5, height=80),\n", " Row(name=\"Alice\", age=5, height=80),\n", " Row(name=\"Alice\", age=10, height=80)\n", " ]).toDF()\n", "df.dropDuplicates().show()" ] }, { "cell_type": "code", "execution_count": 54, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+------+-----+\n", "|age|height| name|\n", "+---+------+-----+\n", "| 5| 80|Alice|\n", "+---+------+-----+\n", "\n" ] } ], "source": [ "df.dropDuplicates([\"name\", \"height\"]).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Join" ] }, { "cell_type": "code", "execution_count": 55, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[Row(age=5, name=u'Alice'), Row(age=5, name=u'Alice'), Row(age=10, name=u'Alice')]\n", "[Row(name=u'Alice', height=12), Row(name=u'Bob', height=25)]\n" ] }, { "data": { "text/plain": [ "[Row(age=5, name=u'Alice', height=12),\n", " Row(age=5, name=u'Alice', height=25),\n", " Row(age=5, name=u'Alice', height=12),\n", " Row(age=5, name=u'Alice', height=25),\n", " Row(age=10, name=u'Alice', height=12),\n", " Row(age=10, name=u'Alice', height=25)]" ] }, "execution_count": 55, "metadata": {}, "output_type": "execute_result" } ], "source": [ "print df.select(\"age\", \"name\").collect()\n", "print df2.select(\"name\", \"height\").collect()\n", "df.crossJoin(df2.select(\"height\")).select(\"age\", \"name\", df2.height).collect()" ] }, { "cell_type": "code", "execution_count": 56, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(height=80, name=u'Alice'),\n", " Row(height=80, name=u'Alice'),\n", " Row(height=80, name=u'Alice')]" ] }, "execution_count": 56, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.drop(\"age\").collect()" ] }, { "cell_type": "code", "execution_count": 57, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(height=80, name=u'Alice'),\n", " Row(height=80, name=u'Alice'),\n", " Row(height=80, name=u'Alice')]" ] }, "execution_count": 57, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.drop(df.age).collect()" ] }, { "cell_type": "code", "execution_count": 58, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(height=80, name=u'Alice', age=2, height=12),\n", " Row(height=80, name=u'Alice', age=2, height=12),\n", " Row(height=80, name=u'Alice', age=2, height=12)]" ] }, "execution_count": 58, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.join(df2, df.name == df2.name, \"inner\").drop(df.name).drop(df.age).collect()" ] }, { "cell_type": "code", "execution_count": 59, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(name=u'Alice'), Row(name=u'Alice'), Row(name=u'Alice')]" ] }, "execution_count": 59, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.join(df2, \"name\", \"inner\").drop(\"age\", \"height\").collect()" ] }, { "cell_type": "code", "execution_count": 60, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(name=None, height=25),\n", " Row(name=u'Alice', height=12),\n", " Row(name=u'Alice', height=12),\n", " Row(name=u'Alice', height=12)]" ] }, "execution_count": 60, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect()" ] }, { "cell_type": "code", "execution_count": 61, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(name=u'Bob', height=None),\n", " Row(name=u'Alice', height=80),\n", " Row(name=u'Alice', height=80),\n", " Row(name=u'Alice', height=80)]" ] }, "execution_count": 61, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.join(df2, 'name', 'outer').select('name', df.height).collect()" ] }, { "cell_type": "code", "execution_count": 62, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(name=None, age=2),\n", " Row(name=None, age=5),\n", " Row(name=u'Alice', age=None),\n", " Row(name=u'Alice', age=None),\n", " Row(name=u'Alice', age=None)]" ] }, "execution_count": 62, "metadata": {}, "output_type": "execute_result" } ], "source": [ "cond = [df.name == df2.name, df.age == df2.age]\n", "df.join(df2, cond, 'outer').select(df.name, df2.age).collect()" ] }, { "cell_type": "code", "execution_count": 63, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(name=u'Alice', height=12),\n", " Row(name=u'Alice', height=12),\n", " Row(name=u'Alice', height=12)]" ] }, "execution_count": 63, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.join(df2, 'name').select(df.name, df2.height).collect()" ] }, { "cell_type": "code", "execution_count": 64, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[]" ] }, "execution_count": 64, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.join(df2, ['name', 'age']).select(df.name, df.age).collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Filter" ] }, { "cell_type": "code", "execution_count": 65, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[Row(name=u'Bob', age=5, height=25)]\n", "[Row(name=u'Bob', age=5, height=25)]\n", "[Row(name=u'Alice', age=2, height=12)]\n" ] } ], "source": [ "l = [(\"Alice\", 2, 12), (\"Bob\", 5, 25)]\n", "rdd = sc.parallelize(l)\n", "df = sqlContext.createDataFrame(rdd, \"name: string, age: int, height: int\")\n", "\n", "print df.filter(df.age > 3).collect()\n", "print df.filter(\"age > 3\").collect()\n", "print df.where(\"age=2\").collect()" ] }, { "cell_type": "code", "execution_count": 66, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Row(name=u'Alice', age=2, height=12)" ] }, "execution_count": 66, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.first()" ] }, { "cell_type": "code", "execution_count": 67, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Row(name=u'Alice', age=2, height=12)" ] }, "execution_count": 67, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.head()" ] }, { "cell_type": "code", "execution_count": 68, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[Row(name=u'Alice', age=2, height=12)]\n", "[]\n" ] } ], "source": [ "print df.limit(1).collect()\n", "print df.limit(0).collect()" ] }, { "cell_type": "code", "execution_count": 69, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[Row(name=u'Bob', age=5, height=25), Row(name=u'Alice', age=2, height=12)]\n", "[Row(name=u'Bob', age=5, height=25), Row(name=u'Alice', age=2, height=12)]\n", "[Row(name=u'Bob', age=5, height=25), Row(name=u'Alice', age=2, height=12)]\n", "[Row(name=u'Alice', age=2, height=12), Row(name=u'Bob', age=5, height=25)]\n", "[Row(name=u'Bob', age=5, height=25), Row(name=u'Alice', age=2, height=12)]\n", "[Row(name=u'Bob', age=5, height=25), Row(name=u'Alice', age=2, height=12)]\n" ] } ], "source": [ "# orderBy\n", "print df.sort(df.age.desc()).collect()\n", "print df.sort(\"age\", ascending=False).collect()\n", "print df.orderBy(df.age.desc()).collect()\n", "\n", "from pyspark.sql.functions import *\n", "print df.sort(asc(\"age\")).collect()\n", "print df.sort(desc(\"age\"), \"name\").collect()\n", "print df.orderBy([\"age\", \"name\"], ascending=[0, 1]).collect()" ] }, { "cell_type": "code", "execution_count": 70, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[Row(name=u'Alice', age=2, height=12)]\n" ] }, { "data": { "text/plain": [ "[]" ] }, "execution_count": 70, "metadata": {}, "output_type": "execute_result" } ], "source": [ "print df.filter(df.name.endswith(\"ice\")).collect()\n", "df.filter(df.name.endswith(\"ice$\")).collect()" ] }, { "cell_type": "code", "execution_count": 71, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+\n", "|r.b|\n", "+---+\n", "| b|\n", "+---+\n", "\n", "+---+\n", "|r.a|\n", "+---+\n", "| 1|\n", "+---+\n", "\n" ] } ], "source": [ "# get subfield RDD > RDD, gets a field by name in a StructField.\n", "from pyspark.sql import Row\n", "df1 = sc.parallelize([Row(r=Row(a=1, b=\"b\"))]).toDF()\n", "df1.select(df1.r.getField(\"b\")).show()\n", "df1.select(df1.r.getField(\"a\")).show()" ] }, { "cell_type": "code", "execution_count": 72, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----+------+\n", "|l[0]|d[key]|\n", "+----+------+\n", "| 1| value|\n", "+----+------+\n", "\n", "+----+------+\n", "|l[0]|d[key]|\n", "+----+------+\n", "| 1| value|\n", "+----+------+\n", "\n" ] } ], "source": [ "# RDD contains list and dictionary\n", "df1 = sc.parallelize([([1, 2], {\"key\": \"value\"})]).toDF([\"l\", \"d\"])\n", "df1.select(df1.l.getItem(0), df1.d.getItem(\"key\")).show()\n", "df1.select(df1.l[0], df1.d[\"key\"]).show()" ] }, { "cell_type": "code", "execution_count": 73, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[Row(height=80, name=u'Tom')]\n", "[Row(height=None, name=u'Alice')]\n" ] } ], "source": [ "from pyspark.sql import Row\n", "df1 = sc.parallelize([Row(name=u\"Tom\", height=80), Row(name=u\"Alice\", height=None)]).toDF()\n", "print df1.filter(df1.height.isNotNull()).collect()\n", "print df1.filter(df1.height.isNull()).collect()" ] }, { "cell_type": "code", "execution_count": 74, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[Row(name=u'Bob', age=5, height=25)]\n", "[Row(name=u'Alice', age=2, height=12)]\n" ] } ], "source": [ "print df[df.name.isin(\"Bob\", \"Mike\")].collect()\n", "print df[df.age.isin(1, 2, 3)].collect()" ] }, { "cell_type": "code", "execution_count": 75, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(name=u'Alice', age=2, height=12)]" ] }, "execution_count": 75, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.filter(df.name.like(\"Al%\")).collect()" ] }, { "cell_type": "code", "execution_count": 76, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+-------------------------------------+\n", "| name|CASE WHEN (age > 3) THEN 1 ELSE 0 END|\n", "+-----+-------------------------------------+\n", "|Alice| 0|\n", "| Bob| 1|\n", "+-----+-------------------------------------+\n", "\n" ] } ], "source": [ "from pyspark.sql import functions as F\n", "df.select(df.name, F.when(df.age > 3, 1).otherwise(0)).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Làm việc với Sample" ] }, { "cell_type": "code", "execution_count": 77, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----+---+------+\n", "|name|age|height|\n", "+----+---+------+\n", "| A| 2| 12|\n", "| B| 5| 25|\n", "+----+---+------+\n", "\n" ] } ], "source": [ "df.na.replace([\"Alice\", \"Bob\"], [\"A\", \"B\"], \"name\").show()" ] }, { "cell_type": "code", "execution_count": 78, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+----+-----+\n", "| name| age|count|\n", "+-----+----+-----+\n", "| null|null| 2|\n", "|Alice|null| 1|\n", "|Alice| 2| 1|\n", "| Bob|null| 1|\n", "| Bob| 5| 1|\n", "+-----+----+-----+\n", "\n" ] } ], "source": [ "df.rollup(\"name\", df.age).count().orderBy(\"name\", \"age\").show()" ] }, { "cell_type": "code", "execution_count": 79, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "2" ] }, "execution_count": 79, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# sample(withReplacement, fraction, seed=None)\n", "df.sample(False, 0.5, 42).count()" ] }, { "cell_type": "code", "execution_count": 80, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+-----+\n", "|key|count|\n", "+---+-----+\n", "| 0| 5|\n", "| 1| 9|\n", "+---+-----+\n", "\n" ] } ], "source": [ "# sampleBy(col, fractions, seed=None)\n", "dataset = sqlContext.range(0, 100).select((col(\"id\") % 3).alias(\"key\"))\n", "sampled = dataset.sampleBy(\"key\", fractions={0: 0.1, 1: 0.2}, seed=0)\n", "sampled.groupBy(\"key\").count().orderBy(\"key\").show()" ] }, { "cell_type": "code", "execution_count": 81, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row((age * 2)=4, abs(age)=2), Row((age * 2)=10, abs(age)=5)]" ] }, "execution_count": 81, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.selectExpr(\"age * 2\", \"abs(age)\").collect()" ] }, { "cell_type": "code", "execution_count": 82, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----+---+------+\n", "|name|age|height|\n", "+----+---+------+\n", "| Ali| 2| 12|\n", "| Bob| 5| 25|\n", "+----+---+------+\n", "\n" ] } ], "source": [ "# show(n=20, truncate=True)\n", "# truncate – If set to True, truncate strings longer than 20 chars by default. If set to a number greater than one, truncates long strings to length truncate and align cells right.\n", "df.show(truncate=3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Làm việc với Row" ] }, { "cell_type": "code", "execution_count": 83, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Row(age=11, name='Alice')\n", "Alice 11\n", "Alice 11\n", "True\n", "False\n" ] } ], "source": [ "row = Row(name=\"Alice\", age=11)\n", "print row\n", "print row[\"name\"], row[\"age\"]\n", "print row.name, row.age\n", "print \"name\" in row\n", "print \"wrong_key\" in row" ] }, { "cell_type": "code", "execution_count": 84, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n", "Row(name='Alice', age=11)\n" ] } ], "source": [ "# Row also can be used to create another Row like class, then it could be used to create Row objects\n", "Person = Row(\"name\", \"age\")\n", "print Person\n", "print Person(\"Alice\", 11)" ] }, { "cell_type": "code", "execution_count": 85, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'age': 11, 'name': 'Alice'}\n", "{'value': Row(age=2, name='a'), 'key': 1}\n", "{'value': {'age': 2, 'name': 'a'}, 'key': 1}\n" ] } ], "source": [ "# asDict(recursive=False)\n", "print Row(name=\"Alice\", age=11).asDict()\n", "row = Row(key=1, value=Row(name=\"a\", age=2))\n", "print row.asDict()\n", "print row.asDict(True)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.10" }, "name": "PySparkSQL", "notebookId": 2217893066036922 }, "nbformat": 4, "nbformat_minor": 1 }