{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"* [RDD API](http://nbviewer.ipython.org/github/jkthompson/pyspark-pictures/blob/master/pyspark-pictures.ipynb)\n",
"* [GitHub](https://github.com/jkthompson/pyspark-pictures)\n",
"* [related blog post](http://data-frack.blogspot.com/2015/01/visual-mnemonics-for-pyspark-api.html)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Click on a picture to view pyspark docs"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"pyspark version:1.6.1\n",
"Ipython version:4.2.0\n"
]
}
],
"source": [
"# versions\n",
"import IPython\n",
"print(\"pyspark version:\" + str(sc.version))\n",
"print(\"Ipython version:\" + str(IPython.__version__))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"+-------------------+\n",
"| avg(amt)|\n",
"+-------------------+\n",
"|0.20000000000000004|\n",
"+-------------------+\n",
"\n"
]
}
],
"source": [
"# agg\n",
"x = sqlContext.createDataFrame([(\"Alice\",\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.agg({\"amt\":\"avg\"})\n",
"x.show()\n",
"y.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"+-----+\n",
"| to|\n",
"+-----+\n",
"| Bob|\n",
"|Carol|\n",
"| Dave|\n",
"+-----+\n",
"\n"
]
}
],
"source": [
"# alias\n",
"from pyspark.sql.functions import col\n",
"x = sqlContext.createDataFrame([(\"Alice\",\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.alias('transactions')\n",
"x.show()\n",
"y.show()\n",
"y.select(col(\"transactions.to\")).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"3\n",
"3\n"
]
}
],
"source": [
"# cache\n",
"x = sqlContext.createDataFrame([(\"Alice\",\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"x.cache()\n",
"print(x.count()) # first action materializes x in memory\n",
"print(x.count()) # later actions avoid IO overhead"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2\n",
"1\n"
]
}
],
"source": [
"# coalesce\n",
"x_rdd = sc.parallelize([(\"Alice\",\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)],2)\n",
"x = sqlContext.createDataFrame(x_rdd, ['from','to','amt'])\n",
"y = x.coalesce(numPartitions=1)\n",
"print(x.rdd.getNumPartitions())\n",
"print(y.rdd.getNumPartitions())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"[Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2), Row(from=u'Carol', to=u'Dave', amt=0.3)]\n"
]
}
],
"source": [
"# collect\n",
"x = sqlContext.createDataFrame([(\"Alice\",\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.collect() # creates list of rows on driver\n",
"x.show()\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"['from', 'to', 'amt']\n"
]
}
],
"source": [
"# columns\n",
"x = sqlContext.createDataFrame([(\"Alice\",\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.columns #creates list of column names on driver\n",
"x.show()\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+-----+\n",
"| from| to|amt| fee|\n",
"+-----+-----+---+-----+\n",
"|Alice| Bob|0.1|0.001|\n",
"| Bob|Carol|0.2| 0.02|\n",
"|Carol| Dave|0.3| 0.02|\n",
"+-----+-----+---+-----+\n",
"\n",
"0.866025403784\n"
]
}
],
"source": [
"# corr\n",
"x = sqlContext.createDataFrame([(\"Alice\",\"Bob\",0.1,0.001),(\"Bob\",\"Carol\",0.2,0.02),(\"Carol\",\"Dave\",0.3,0.02)], ['from','to','amt','fee'])\n",
"y = x.corr(col1=\"amt\",col2=\"fee\")\n",
"x.show()\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"3\n"
]
}
],
"source": [
"# count\n",
"x = sqlContext.createDataFrame([(\"Alice\",\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"x.show()\n",
"print(x.count())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+-----+\n",
"| from| to|amt| fee|\n",
"+-----+-----+---+-----+\n",
"|Alice| Bob|0.1|0.001|\n",
"| Bob|Carol|0.2| 0.02|\n",
"|Carol| Dave|0.3| 0.02|\n",
"+-----+-----+---+-----+\n",
"\n",
"0.00095\n"
]
}
],
"source": [
"# cov\n",
"x = sqlContext.createDataFrame([(\"Alice\",\"Bob\",0.1,0.001),(\"Bob\",\"Carol\",0.2,0.02),(\"Carol\",\"Dave\",0.3,0.02)], ['from','to','amt','fee'])\n",
"y = x.cov(col1=\"amt\",col2=\"fee\")\n",
"x.show()\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"+-------+----+-----+---+\n",
"|from_to|Dave|Carol|Bob|\n",
"+-------+----+-----+---+\n",
"| Bob| 0| 1| 0|\n",
"| Alice| 0| 0| 1|\n",
"| Carol| 1| 0| 0|\n",
"+-------+----+-----+---+\n",
"\n"
]
}
],
"source": [
"# crosstab\n",
"x = sqlContext.createDataFrame([(\"Alice\",\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.crosstab(col1='from',col2='to')\n",
"x.show()\n",
"y.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"|Alice|Carol|0.2|\n",
"+-----+-----+---+\n",
"\n",
"\n",
"+-----+-----+-------------------+\n",
"| from| to| sum(amt)|\n",
"+-----+-----+-------------------+\n",
"|Alice|Carol| 0.2|\n",
"|Alice| Bob| 0.1|\n",
"|Alice| null|0.30000000000000004|\n",
"| null|Carol| 0.2|\n",
"| null| Bob| 0.1|\n",
"| null| null|0.30000000000000004|\n",
"+-----+-----+-------------------+\n",
"\n",
"+-----+-----+--------+\n",
"| from| to|max(amt)|\n",
"+-----+-----+--------+\n",
"|Alice|Carol| 0.2|\n",
"|Alice| Bob| 0.1|\n",
"|Alice| null| 0.2|\n",
"| null|Carol| 0.2|\n",
"| null| Bob| 0.1|\n",
"| null| null| 0.2|\n",
"+-----+-----+--------+\n",
"\n"
]
}
],
"source": [
"# cube\n",
"x = sqlContext.createDataFrame([(\"Alice\",\"Bob\",0.1),(\"Alice\",\"Carol\",0.2)], ['from','to','amt'])\n",
"y = x.cube('from','to')\n",
"x.show()\n",
"print(y) # y is a grouped data object, aggregations will be applied to all numerical columns\n",
"y.sum().show() \n",
"y.max().show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"+-------+-------------------+\n",
"|summary| amt|\n",
"+-------+-------------------+\n",
"| count| 3|\n",
"| mean|0.20000000000000004|\n",
"| stddev|0.09999999999999998|\n",
"| min| 0.1|\n",
"| max| 0.3|\n",
"+-------+-------------------+\n",
"\n"
]
}
],
"source": [
"# describe\n",
"x = sqlContext.createDataFrame([(\"Alice\",\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"x.show()\n",
"x.describe().show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"| Bob|Carol|0.2|\n",
"+-----+-----+---+\n",
"\n",
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"|Alice| Bob|0.1|\n",
"+-----+-----+---+\n",
"\n"
]
}
],
"source": [
"# distinct\n",
"x = sqlContext.createDataFrame([(\"Alice\",\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3),(\"Bob\",\"Carol\",0.2)], ['from','to','amt'])\n",
"y = x.distinct()\n",
"x.show()\n",
"y.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"+-----+-----+\n",
"| from| to|\n",
"+-----+-----+\n",
"|Alice| Bob|\n",
"| Bob|Carol|\n",
"|Carol| Dave|\n",
"+-----+-----+\n",
"\n"
]
}
],
"source": [
"# drop\n",
"x = sqlContext.createDataFrame([(\"Alice\",\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.drop('amt')\n",
"x.show()\n",
"y.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"| Bob|Carol|0.3|\n",
"| Bob|Carol|0.2|\n",
"+-----+-----+---+\n",
"\n",
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"| Bob|Carol|0.2|\n",
"|Alice| Bob|0.1|\n",
"+-----+-----+---+\n",
"\n"
]
}
],
"source": [
"# dropDuplicates / drop_duplicates\n",
"x = sqlContext.createDataFrame([(\"Alice\",\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Bob\",\"Carol\",0.3),(\"Bob\",\"Carol\",0.2)], ['from','to','amt'])\n",
"y = x.dropDuplicates(subset=['from','to'])\n",
"x.show()\n",
"y.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
"<"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+----+\n",
"| from| to| amt|\n",
"+-----+-----+----+\n",
"| null| Bob| 0.1|\n",
"| Bob|Carol|null|\n",
"|Carol| null| 0.3|\n",
"| Bob|Carol| 0.2|\n",
"+-----+-----+----+\n",
"\n",
"+----+-----+----+\n",
"|from| to| amt|\n",
"+----+-----+----+\n",
"| Bob|Carol|null|\n",
"| Bob|Carol| 0.2|\n",
"+----+-----+----+\n",
"\n"
]
}
],
"source": [
"# dropna\n",
"x = sqlContext.createDataFrame([(None,\"Bob\",0.1),(\"Bob\",\"Carol\",None),(\"Carol\",None,0.3),(\"Bob\",\"Carol\",0.2)], ['from','to','amt'])\n",
"y = x.dropna(how='any',subset=['from','to'])\n",
"x.show()\n",
"y.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"[('from', 'string'), ('to', 'string'), ('amt', 'double')]\n"
]
}
],
"source": [
"# dtypes\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.dtypes\n",
"x.show()\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"== Parsed Logical Plan ==\n",
"'Aggregate ['avg(amt#296) AS avg(amt)#297]\n",
"+- LogicalRDD [from#294,to#295,amt#296], MapPartitionsRDD[194] at applySchemaToPythonRDD at null:-1\n",
"\n",
"== Analyzed Logical Plan ==\n",
"avg(amt): double\n",
"Aggregate [(avg(amt#296),mode=Complete,isDistinct=false) AS avg(amt)#297]\n",
"+- LogicalRDD [from#294,to#295,amt#296], MapPartitionsRDD[194] at applySchemaToPythonRDD at null:-1\n",
"\n",
"== Optimized Logical Plan ==\n",
"Aggregate [(avg(amt#296),mode=Complete,isDistinct=false) AS avg(amt)#297]\n",
"+- Project [amt#296]\n",
" +- LogicalRDD [from#294,to#295,amt#296], MapPartitionsRDD[194] at applySchemaToPythonRDD at null:-1\n",
"\n",
"== Physical Plan ==\n",
"TungstenAggregate(key=[], functions=[(avg(amt#296),mode=Final,isDistinct=false)], output=[avg(amt)#297])\n",
"+- TungstenExchange SinglePartition, None\n",
" +- TungstenAggregate(key=[], functions=[(avg(amt#296),mode=Partial,isDistinct=false)], output=[sum#301,count#302L])\n",
" +- Project [amt#296]\n",
" +- Scan ExistingRDD[from#294,to#295,amt#296]\n"
]
}
],
"source": [
"# explain\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"x.show()\n",
"x.agg({\"amt\":\"avg\"}).explain(extended = True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+----+\n",
"| from| to| amt|\n",
"+-----+-----+----+\n",
"| null| Bob| 0.1|\n",
"| Bob|Carol|null|\n",
"|Carol| null| 0.3|\n",
"+-----+-----+----+\n",
"\n",
"+-------+-------+----+\n",
"| from| to| amt|\n",
"+-------+-------+----+\n",
"|unknown| Bob| 0.1|\n",
"| Bob| Carol|null|\n",
"| Carol|unknown| 0.3|\n",
"+-------+-------+----+\n",
"\n"
]
}
],
"source": [
"# fillna\n",
"x = sqlContext.createDataFrame([(None,\"Bob\",0.1),(\"Bob\",\"Carol\",None),(\"Carol\",None,0.3)], ['from','to','amt'])\n",
"y = x.fillna(value='unknown',subset=['from','to'])\n",
"x.show()\n",
"y.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n"
]
}
],
"source": [
"# filter\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.filter(\"amt > 0.1\")\n",
"x.show()\n",
"y.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"Row(from=u'Alice', to=u'Bob', amt=0.1)\n"
]
}
],
"source": [
"# first\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.first()\n",
"x.show()\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"PythonRDD[227] at RDD at PythonRDD.scala:43\n"
]
},
{
"data": {
"text/plain": [
"[u'Alice', 0.1, u'Bob', 0.2, u'Carol', 0.3]"
]
},
"execution_count": 23,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# flatMap\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.flatMap(lambda x: (x[0],x[2])) \n",
"print(y) # implicit coversion to RDD\n",
"y.collect()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"None\n",
"Row(from=u'Carol', to=u'Dave', amt=0.3)\n",
"Row(from=u'Bob', to=u'Carol', amt=0.2)\n",
"Row(from=u'Alice', to=u'Bob', amt=0.1)\n",
"\n"
]
}
],
"source": [
"# foreach\n",
"from __future__ import print_function\n",
"\n",
"# setup\n",
"fn = './foreachExampleDataFrames.txt' \n",
"open(fn, 'w').close() # clear the file\n",
"def fappend(el,f):\n",
" '''appends el to file f'''\n",
" print(el,file=open(f, 'a+') )\n",
"\n",
"# example\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.foreach(lambda x: fappend(x,fn)) # writes into foreachExampleDataFrames.txt\n",
"x.show() # original dataframe\n",
"print(y) # foreach returns 'None'\n",
"# print the contents of the file\n",
"with open(fn, \"r\") as foreachExample:\n",
" print (foreachExample.read())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"None\n",
"[]\n",
"[Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2), Row(from=u'Carol', to=u'Dave', amt=0.3)]\n",
"\n"
]
}
],
"source": [
"# foreachPartition\n",
"from __future__ import print_function\n",
"\n",
"#setup\n",
"fn = './foreachPartitionExampleDataFrames.txt'\n",
"open(fn, 'w').close() # clear the file\n",
"def fappend(partition,f):\n",
" '''append all elements in partition to file f'''\n",
" print([el for el in partition],file=open(f, 'a+'))\n",
"\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"x = x.repartition(2) # force 2 partitions\n",
"y = x.foreachPartition(lambda x: fappend(x,fn)) # writes into foreachPartitionExampleDataFrames.txt\n",
"\n",
"x.show() # original dataframe\n",
"print(y) # foreach returns 'None'\n",
"# print the contents of the file\n",
"with open(fn, \"r\") as foreachExample:\n",
" print (foreachExample.read())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"| Bob|Carol|0.1|\n",
"|Alice| Dave|0.1|\n",
"|Alice| Bob|0.1|\n",
"|Alice| Bob|0.5|\n",
"|Carol| Bob|0.1|\n",
"+-----+-----+---+\n",
"\n",
"+--------------+-------------+\n",
"|from_freqItems|amt_freqItems|\n",
"+--------------+-------------+\n",
"| [Alice]| [0.1]|\n",
"+--------------+-------------+\n",
"\n"
]
}
],
"source": [
"# freqItems\n",
"x = sqlContext.createDataFrame([(\"Bob\",\"Carol\",0.1), \\\n",
" (\"Alice\",\"Dave\",0.1), \\\n",
" (\"Alice\",\"Bob\",0.1), \\\n",
" (\"Alice\",\"Bob\",0.5), \\\n",
" (\"Carol\",\"Bob\",0.1)], \\\n",
" ['from','to','amt'])\n",
"y = x.freqItems(cols=['from','amt'],support=0.8)\n",
"x.show()\n",
"y.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"|Alice|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"\n"
]
}
],
"source": [
"# groupBy\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Alice\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.groupBy('from')\n",
"x.show()\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"|Alice|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"+-----+-------------------+\n",
"| from| avg(amt)|\n",
"+-----+-------------------+\n",
"|Carol| 0.3|\n",
"|Alice|0.15000000000000002|\n",
"+-----+-------------------+\n",
"\n"
]
}
],
"source": [
"# groupBy(col1).avg(col2)\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Alice\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.groupBy('from').avg('amt')\n",
"x.show()\n",
"y.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"[Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2)]\n"
]
}
],
"source": [
"# head\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.head(2)\n",
"x.show()\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Alice|0.2|\n",
"|Carol| Dave|0.1|\n",
"+-----+-----+---+\n",
"\n",
"+-----+---+---+\n",
"| from| to|amt|\n",
"+-----+---+---+\n",
"|Alice|Bob|0.1|\n",
"+-----+---+---+\n",
"\n"
]
}
],
"source": [
"# intersect\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Alice\",0.2),(\"Carol\",\"Dave\",0.1)], ['from','to','amt'])\n",
"z = x.intersect(y)\n",
"x.show()\n",
"y.show()\n",
"z.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"False\n"
]
}
],
"source": [
"# isLocal\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.isLocal()\n",
"x.show()\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"+-----+---+\n",
"| name|age|\n",
"+-----+---+\n",
"|Alice| 20|\n",
"| Bob| 40|\n",
"| Dave| 80|\n",
"+-----+---+\n",
"\n",
"+-----+----+---+---+\n",
"| from| to|amt|age|\n",
"+-----+----+---+---+\n",
"|Carol|Dave|0.3| 80|\n",
"|Alice| Bob|0.1| 40|\n",
"+-----+----+---+---+\n",
"\n"
]
}
],
"source": [
"# join\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = sqlContext.createDataFrame([('Alice',20),(\"Bob\",40),(\"Dave\",80)], ['name','age'])\n",
"z = x.join(y,x.to == y.name,'inner').select('from','to','amt','age')\n",
"x.show()\n",
"y.show()\n",
"z.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 33,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"+-----+-----+---+\n",
"\n"
]
}
],
"source": [
"# limit\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.limit(2)\n",
"x.show()\n",
"y.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"[1.1, 1.2, 1.3]\n"
]
}
],
"source": [
"# map\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.map(lambda x: x.amt+1)\n",
"x.show()\n",
"print(y.collect()) # output is RDD"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 35,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"[[Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2), Row(from=u'Carol', to=u'Dave', amt=0.3)], []]\n",
"[0.6000000000000001, 0]\n",
"[[0.6000000000000001], [0]]\n"
]
}
],
"source": [
"# mapPartitions\n",
"def amt_sum(partition):\n",
" '''sum the value in field amt'''\n",
" yield sum([el.amt for el in partition])\n",
" \n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"x = x.repartition(2) # force 2 partitions\n",
"y = x.mapPartitions(lambda p: amt_sum(p))\n",
"x.show()\n",
"print(x.rdd.glom().collect()) # flatten elements on the same partition\n",
"print(y.collect())\n",
"print(y.glom().collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 36,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+----+\n",
"| from| to| amt|\n",
"+-----+-----+----+\n",
"| null| Bob| 0.1|\n",
"| Bob|Carol|null|\n",
"|Carol| null| 0.3|\n",
"| Bob|Carol| 0.2|\n",
"+-----+-----+----+\n",
"\n",
"\n",
"+----+-----+---+\n",
"|from| to|amt|\n",
"+----+-----+---+\n",
"| Bob|Carol|0.2|\n",
"+----+-----+---+\n",
"\n",
"+-------+-------+---+\n",
"| from| to|amt|\n",
"+-------+-------+---+\n",
"|unknown| Bob|0.1|\n",
"| Bob| Carol|0.0|\n",
"| Carol|unknown|0.3|\n",
"| Bob| Carol|0.2|\n",
"+-------+-------+---+\n",
"\n",
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"| null| Bob|0.1|\n",
"| Bob|Carol|0.0|\n",
"|Carol| null|0.3|\n",
"| Bob|Carol|0.2|\n",
"+-----+-----+---+\n",
"\n"
]
}
],
"source": [
"# na\n",
"x = sqlContext.createDataFrame([(None,\"Bob\",0.1),(\"Bob\",\"Carol\",None),(\"Carol\",None,0.3),(\"Bob\",\"Carol\",0.2)], ['from','to','amt'])\n",
"y = x.na # returns an object for handling missing values, supports drop, fill, and replace methods\n",
"x.show()\n",
"print(y)\n",
"y.drop().show()\n",
"y.fill({'from':'unknown','to':'unknown','amt':0}).show()\n",
"y.fill(0).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 37,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Carol| Dave|0.3|\n",
"| Bob|Carol|0.2|\n",
"|Alice| Bob|0.1|\n",
"+-----+-----+---+\n",
"\n"
]
}
],
"source": [
"# orderBy\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.orderBy(['from'],ascending=[False])\n",
"x.show()\n",
"y.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 38,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n"
]
},
{
"data": {
"text/plain": [
"True"
]
},
"execution_count": 38,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# persist\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"x.persist(storageLevel=StorageLevel(True,True,False,True,1)) # StorageLevel(useDisk,useMemory,useOffHeap,deserialized,replication=1)\n",
"x.show()\n",
"x.is_cached"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 39,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"root\n",
" |-- from: string (nullable = true)\n",
" |-- to: string (nullable = true)\n",
" |-- amt: double (nullable = true)\n",
"\n"
]
}
],
"source": [
"# printSchema\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"x.show()\n",
"x.printSchema()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 40,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"+----+---+---+\n",
"|from| to|amt|\n",
"+----+---+---+\n",
"+----+---+---+\n",
"\n",
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n"
]
}
],
"source": [
"# randomSplit\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.randomSplit([0.5,0.5])\n",
"x.show()\n",
"y[0].show()\n",
"y[1].show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 41,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"[Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2), Row(from=u'Carol', to=u'Dave', amt=0.3)]\n"
]
}
],
"source": [
"# rdd\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.rdd\n",
"x.show()\n",
"print(y.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 42,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n"
]
}
],
"source": [
"# registerTempTable\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"x.registerTempTable(name=\"TRANSACTIONS\")\n",
"y = sqlContext.sql('SELECT * FROM TRANSACTIONS WHERE amt > 0.1')\n",
"x.show()\n",
"y.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 43,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"4\n",
"3\n"
]
}
],
"source": [
"# repartition\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.repartition(3)\n",
"print(x.rdd.getNumPartitions())\n",
"print(y.rdd.getNumPartitions())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 44,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol|David|0.3|\n",
"+-----+-----+---+\n",
"\n"
]
}
],
"source": [
"# replace\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.replace('Dave','David',['from','to'])\n",
"x.show()\n",
"y.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 45,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"\n",
"+-----+-----+------------------+\n",
"| from| to| sum(amt)|\n",
"+-----+-----+------------------+\n",
"|Alice| Bob| 0.1|\n",
"| Bob|Carol| 0.2|\n",
"|Alice| null| 0.1|\n",
"|Carol| Dave| 0.3|\n",
"| Bob| null| 0.2|\n",
"|Carol| null| 0.3|\n",
"| null| null|0.6000000000000001|\n",
"+-----+-----+------------------+\n",
"\n",
"+-----+-----+--------+\n",
"| from| to|max(amt)|\n",
"+-----+-----+--------+\n",
"|Alice| Bob| 0.1|\n",
"| Bob|Carol| 0.2|\n",
"|Alice| null| 0.1|\n",
"|Carol| Dave| 0.3|\n",
"| Bob| null| 0.2|\n",
"|Carol| null| 0.3|\n",
"| null| null| 0.3|\n",
"+-----+-----+--------+\n",
"\n"
]
}
],
"source": [
"# rollup\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.rollup(['from','to'])\n",
"x.show()\n",
"print(y) # y is a grouped data object, aggregations will be applied to all numerical columns\n",
"y.sum().show()\n",
"y.max().show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 46,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"+----+-----+---+\n",
"|from| to|amt|\n",
"+----+-----+---+\n",
"| Bob|Carol|0.2|\n",
"+----+-----+---+\n",
"\n"
]
}
],
"source": [
"# sample\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.sample(False,0.5)\n",
"x.show()\n",
"y.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 47,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"|Alice|Carol|0.2|\n",
"|Alice|Alice|0.3|\n",
"|Alice| Dave|0.4|\n",
"| Bob| Bob|0.5|\n",
"| Bob|Carol|0.6|\n",
"+-----+-----+---+\n",
"\n",
"+----+-----+---+\n",
"|from| to|amt|\n",
"+----+-----+---+\n",
"| Bob| Bob|0.5|\n",
"| Bob|Carol|0.6|\n",
"+----+-----+---+\n",
"\n"
]
}
],
"source": [
"# sampleBy\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Alice\",\"Carol\",0.2),(\"Alice\",\"Alice\",0.3), \\\n",
" ('Alice',\"Dave\",0.4),(\"Bob\",\"Bob\",0.5),(\"Bob\",\"Carol\",0.6)], \\\n",
" ['from','to','amt'])\n",
"y = x.sampleBy(col='from',fractions={'Alice':0.1,'Bob':0.9})\n",
"x.show()\n",
"y.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 48,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"StructType(List(StructField(from,StringType,true),StructField(to,StringType,true),StructField(amt,DoubleType,true)))\n"
]
}
],
"source": [
"# schema\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.schema\n",
"x.show()\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 49,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"+-----+---+\n",
"| from|amt|\n",
"+-----+---+\n",
"|Alice|0.1|\n",
"| Bob|0.2|\n",
"|Carol|0.3|\n",
"+-----+---+\n",
"\n"
]
}
],
"source": [
"# select\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.select(['from','amt'])\n",
"x.show()\n",
"y.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 50,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"+----------------+----------+\n",
"|substr(from,1,1)|(amt + 10)|\n",
"+----------------+----------+\n",
"| A| 10.1|\n",
"| B| 10.2|\n",
"| C| 10.3|\n",
"+----------------+----------+\n",
"\n"
]
}
],
"source": [
"# selectExpr\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.selectExpr(['substr(from,1,1)','amt+10'])\n",
"x.show()\n",
"y.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 51,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n"
]
}
],
"source": [
"# show\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"x.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 52,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol|Alice|0.3|\n",
"+-----+-----+---+\n",
"\n",
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Carol|Alice|0.3|\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"+-----+-----+---+\n",
"\n"
]
}
],
"source": [
"# sort\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Alice\",0.3)], ['from','to','amt'])\n",
"y = x.sort(['to'])\n",
"x.show()\n",
"y.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 53,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+----+\n",
"| from| to|amt|p_id|\n",
"+-----+-----+---+----+\n",
"|Alice| Bob|0.1| 1|\n",
"| Bob|Carol|0.2| 2|\n",
"|Carol|Alice|0.3| 2|\n",
"+-----+-----+---+----+\n",
"\n",
"+-----+-----+---+----+\n",
"| from| to|amt|p_id|\n",
"+-----+-----+---+----+\n",
"|Alice| Bob|0.1| 1|\n",
"|Carol|Alice|0.3| 2|\n",
"| Bob|Carol|0.2| 2|\n",
"+-----+-----+---+----+\n",
"\n",
"[[Row(from=u'Alice', to=u'Bob', amt=0.1, p_id=1)], [Row(from=u'Bob', to=u'Carol', amt=0.2, p_id=2), Row(from=u'Carol', to=u'Alice', amt=0.3, p_id=2)]]\n",
"[[Row(from=u'Alice', to=u'Bob', amt=0.1, p_id=1)], [Row(from=u'Carol', to=u'Alice', amt=0.3, p_id=2), Row(from=u'Bob', to=u'Carol', amt=0.2, p_id=2)]]\n"
]
}
],
"source": [
"# sortWithinPartitions\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1,1),(\"Bob\",\"Carol\",0.2,2),(\"Carol\",\"Alice\",0.3,2)], \\\n",
" ['from','to','amt','p_id']).repartition(2,'p_id')\n",
"y = x.sortWithinPartitions(['to'])\n",
"x.show()\n",
"y.show()\n",
"print(x.rdd.glom().collect()) # glom() flattens elements on the same partition\n",
"print(y.rdd.glom().collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 54,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+-----+\n",
"| from| to|amt| fee|\n",
"+-----+-----+---+-----+\n",
"|Alice| Bob|0.1|0.001|\n",
"| Bob|Carol|0.2| 0.02|\n",
"|Carol| Dave|0.3| 0.02|\n",
"+-----+-----+---+-----+\n",
"\n",
"\n",
"0.866025403784\n"
]
}
],
"source": [
"# stat\n",
"x = sqlContext.createDataFrame([(\"Alice\",\"Bob\",0.1,0.001),(\"Bob\",\"Carol\",0.2,0.02),(\"Carol\",\"Dave\",0.3,0.02)], ['from','to','amt','fee'])\n",
"y = x.stat\n",
"x.show()\n",
"print(y)\n",
"print(y.corr(col1=\"amt\",col2=\"fee\"))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 55,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.1|\n",
"+-----+-----+---+\n",
"\n",
"+-----+----+---+\n",
"| from| to|amt|\n",
"+-----+----+---+\n",
"|Carol|Dave|0.3|\n",
"+-----+----+---+\n",
"\n"
]
}
],
"source": [
"# subtract\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.1)], ['from','to','amt'])\n",
"z = x.subtract(y)\n",
"x.show()\n",
"y.show()\n",
"z.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 56,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"[Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2)]\n"
]
}
],
"source": [
"# take\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.take(num=2)\n",
"x.show()\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 57,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"+------+-----+---+\n",
"|seller|buyer|amt|\n",
"+------+-----+---+\n",
"| Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"| Carol| Dave|0.3|\n",
"+------+-----+---+\n",
"\n"
]
}
],
"source": [
"# toDF\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.toDF(\"seller\",\"buyer\",\"amt\")\n",
"x.show()\n",
"y.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 58,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol|Alice|0.3|\n",
"+-----+-----+---+\n",
"\n",
"[u'{\"from\":\"Alice\",\"to\":\"Bob\",\"amt\":0.1}', u'{\"from\":\"Bob\",\"to\":\"Carol\",\"amt\":0.2}', u'{\"from\":\"Carol\",\"to\":\"Alice\",\"amt\":0.3}']\n"
]
}
],
"source": [
"# toJSON\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Alice\",0.3)], ['from','to','amt'])\n",
"y = x.toJSON()\n",
"x.show()\n",
"print(y.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 59,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"\n"
]
},
{
"data": {
"text/html": [
"\n",
"
\n",
" \n",
" \n",
" | \n",
" from | \n",
" to | \n",
" amt | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" Alice | \n",
" Bob | \n",
" 0.1 | \n",
"
\n",
" \n",
" 1 | \n",
" Bob | \n",
" Carol | \n",
" 0.2 | \n",
"
\n",
" \n",
" 2 | \n",
" Carol | \n",
" Dave | \n",
" 0.3 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" from to amt\n",
"0 Alice Bob 0.1\n",
"1 Bob Carol 0.2\n",
"2 Carol Dave 0.3"
]
},
"execution_count": 59,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# toPandas\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.toPandas()\n",
"x.show()\n",
"print(type(y))\n",
"y"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 60,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"+-----+-----+---+\n",
"\n",
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.1|\n",
"+-----+-----+---+\n",
"\n",
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.1|\n",
"+-----+-----+---+\n",
"\n"
]
}
],
"source": [
"# unionAll\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2)], ['from','to','amt'])\n",
"y = sqlContext.createDataFrame([(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.1)], ['from','to','amt'])\n",
"z = x.unionAll(y)\n",
"x.show()\n",
"y.show()\n",
"z.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 61,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"True\n",
"False\n"
]
}
],
"source": [
"# unpersist\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"x.cache()\n",
"x.count()\n",
"x.show()\n",
"print(x.is_cached)\n",
"x.unpersist()\n",
"print(x.is_cached)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 62,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n"
]
}
],
"source": [
"# where (filter)\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.where(\"amt > 0.1\")\n",
"x.show()\n",
"y.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 63,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+----+\n",
"| from| to| amt|\n",
"+-----+-----+----+\n",
"|Alice| Bob| 0.1|\n",
"| Bob|Carol|null|\n",
"|Carol| Dave| 0.3|\n",
"+-----+-----+----+\n",
"\n",
"+-----+-----+----+-----+\n",
"| from| to| amt| conf|\n",
"+-----+-----+----+-----+\n",
"|Alice| Bob| 0.1| true|\n",
"| Bob|Carol|null|false|\n",
"|Carol| Dave| 0.3| true|\n",
"+-----+-----+----+-----+\n",
"\n"
]
}
],
"source": [
"# withColumn\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",None),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.withColumn('conf',x.amt.isNotNull())\n",
"x.show()\n",
"y.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 64,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"+-----+-----+------+\n",
"| from| to|amount|\n",
"+-----+-----+------+\n",
"|Alice| Bob| 0.1|\n",
"| Bob|Carol| 0.2|\n",
"|Carol| Dave| 0.3|\n",
"+-----+-----+------+\n",
"\n"
]
}
],
"source": [
"# withColumnRenamed\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.withColumnRenamed('amt','amount')\n",
"x.show()\n",
"y.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 65,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---+\n",
"| from| to|amt|\n",
"+-----+-----+---+\n",
"|Alice| Bob|0.1|\n",
"| Bob|Carol|0.2|\n",
"|Carol| Dave|0.3|\n",
"+-----+-----+---+\n",
"\n",
"+---+-----+-----+\n",
"|amt| from| to|\n",
"+---+-----+-----+\n",
"|0.1|Alice| Bob|\n",
"|0.2| Bob|Carol|\n",
"|0.3|Carol| Dave|\n",
"+---+-----+-----+\n",
"\n"
]
}
],
"source": [
"# write\n",
"import json\n",
"x = sqlContext.createDataFrame([('Alice',\"Bob\",0.1),(\"Bob\",\"Carol\",0.2),(\"Carol\",\"Dave\",0.3)], ['from','to','amt'])\n",
"y = x.write.mode('overwrite').json('./dataframeWriteExample.json')\n",
"x.show()\n",
"# read the dataframe back in from file\n",
"sqlContext.read.json('./dataframeWriteExample.json').show()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 1
}