{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### New to Plotly?\n",
"Plotly's Python library is free and open source! [Get started](https://plotly.com/python/getting-started/) by downloading the client and [reading the primer](https://plotly.com/python/getting-started/).\n",
" You can set up Plotly to work in [online](https://plotly.com/python/getting-started/#initialization-for-online-plotting) or [offline](https://plotly.com/python/getting-started/#initialization-for-offline-plotting) mode, or in [jupyter notebooks](https://plotly.com/python/getting-started/#start-plotting-online).\n",
" We also have a quick-reference [cheatsheet](https://images.plot.ly/plotly-documentation/images/python_cheat_sheet.pdf) (new!) to help you get started!\n",
"#### Version Check\n",
"Plotly's python package is updated frequently. Run `pip install plotly --upgrade` to use the latest version."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'2.0.1'"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import plotly\n",
"plotly.__version__"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### About Apache Spark\n",
"[Apache Spark](https://spark.apache.org/)'s meteoric rise has been incredible. It is one of the fastest growing open source projects and is a perfect fit for the graphing tools that [Plotly](https://plotly.com/) provides. Plotly's ability to graph and share images from [Spark DataFrames](https://spark.apache.org/docs/latest/sql-programming-guide.html) quickly and easily make it a great tool for any data scientist and [Chart Studio Enterprise](https://plotly.com/product/enterprise/) make it easy to securely host and share those Plotly graphs.\n",
"\n",
"This notebook will go over the details of getting set up with IPython Notebooks for graphing Spark data with Plotly.\n",
"\n",
"#### Create a Profile\n",
"First you'll have to create an ipython profile for pyspark, you can do this locally or you can do it on the cluster that you're running Spark.\n",
"\n",
"Start off by creating a new ipython profile. (Spark should have ipython install but you may need to install ipython notebook yourself).\n",
"\n",
"```sh\n",
"ipython profile create pyspark\n",
"```\n",
"\n",
"Next you'll have to edit some configurations. Spark/Hadoop have plenty of ports that they open up so you'll have to change the below file to avoid any conflicts that might come up. \n",
"\n",
"```sh\n",
"~/.ipython/profile_pyspark/ipython_notebook_config.py\n",
"```\n",
"\n",
"If you're not running Spark locally, you'll have to add some other configurations. [Cloudera's blog](http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/) has a great post about some of the other things you can add, like passwords.\n",
"\n",
"IPython's documentation also has some excellent recommendations for settings that you can find on [the \"Securing a Notebook Server\" post on ipython.org.](http://ipython.org/ipython-doc/3/notebook/public_server.html#running-a-notebook-server)\n",
"\n",
"You'll likely want to set a port, and an IP address to be able to access the notebook.\n",
"\n",
"Next you'll need to set a couple of environmental variables. You can do this at the command line or you can set it up in your computer's/master node's bash_rc/bash_profile files.\n",
"\n",
"\n",
"```sh\n",
"export SPARK_HOME=\"$HOME/Downloads/spark-1.3.1\"\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Setup\n",
"Now we'll need to add a file to make sure that we boot up with the Spark Context. Basically when we start the IPython Notebook, we need to be bring in the Spark Context. We need to set up a startup script that runs everytime we start a notebook from this profile. \n",
"\n",
"Setting startup scripts are actually extremely easy - you just put them in the IPython Notebook directory under the \"startup\" folder. You can learn more about IPython configurations on the [IPython site](http://ipython.org/ipython-doc/1/config/overview.html).\n",
"\n",
"We'll create a file called `pyspark_setup.py`\n",
"\n",
"in it we'll put\n",
"\n",
"```py\n",
"import os\n",
"import sys\n",
" \n",
"spark_home = os.environ.get('SPARK_HOME', None)\n",
" \n",
"# check if it exists\n",
"if not spark_home:\n",
" raise ValueError('SPARK_HOME environment variable is not set')\n",
" \n",
"# check if it is a directory\n",
"if not os.path.isdir(spark_home):\n",
" raise ValueError('SPARK_HOME environment variable is not a directory')\n",
" \n",
"#check if we can find the python sub-directory\n",
"if not os.path.isdir(os.path.join(spark_home, 'python')):\n",
" raise ValueError('SPARK_HOME directory does not contain python')\n",
" \n",
"sys.path.insert(0, os.path.join(spark_home, 'python'))\n",
" \n",
"#check if we can find the py4j zip file\n",
"if not os.path.exists(os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip')):\n",
" raise ValueError('Could not find the py4j library - \\\n",
" maybe your version number is different?(Looking for 0.8.2.1)')\n",
" \n",
"sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip'))\n",
" \n",
"with open(os.path.join(spark_home, 'python/pyspark/shell.py')) as f:\n",
" code = compile(f.read(), os.path.join(spark_home, 'python/pyspark/shell.py'), 'exec')\n",
" exec(code)\n",
"```\n",
"\n",
"And now we're all set! When we start up an ipython notebook, we'll have the Spark Context available in our IPython notebooks. This is one time set up! So now we're ready to run things normally! We just have to start a specific pyspark profile.\n",
"\n",
"`ipython notebook --profile=pyspark`\n",
"\n",
"We can test for the Spark Context's existence with `print sc`."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
}
],
"source": [
"from __future__ import print_function #python 3 support\n",
"print(sc)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Spark Tools\n",
"Now that we've got the SparkContext, let's pull in some other useful Spark tools that we'll need. We'll be using pandas for some downstream analysis as well as Plotly for our graphing.\n",
"\n",
"We'll also need the SQLContext to be able to do some nice Spark SQL transformations."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from pyspark.sql import SQLContext\n",
"sqlContext = SQLContext(sc)\n",
"\n",
"import plotly.plotly as py\n",
"import plotly.graph_objs as go\n",
"import pandas as pd\n",
"import requests\n",
"requests.packages.urllib3.disable_warnings()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The data we'll be working with is a sample of the [open bike rental data.](http://www.bayareabikeshare.com/datachallenge) Essentially people can rent bikes and ride them from one station to another. This data provides that information. [You can snag the sample I am using in JSON format here.](https://github.com/anabranch/Interactive-Graphs-with-Plotly/raw/master/btd2.json).\n",
"\n",
"Now we can import it."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"btd = sqlContext.jsonFile(\"btd2.json\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now we can see that it's a DataFrame by printing its type."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
}
],
"source": [
"print(type(btd))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now **RDD** is the base abstraction of Apache Spark, it's the Resilient Distributed Dataset. It is an immutable, partitioned collection of elements that can be operated on in a distributed manner. The DataFrame builds on that but is also immutable - meaning you've got to think in terms of transformations - not just manipulations.\n",
"\n",
"Because we've got a json file, we've loaded it up as a DataFrame - a new introduction in Spark 1.3. The DataFrame interface which is similar to pandas style DataFrames except for that immutability described above."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can print the schema easily, which gives us the layout of the data. Everything that I'm describing can be [found in the Pyspark SQL documentation.](https://spark.apache.org/docs/latest/api/python/pyspark.sql.htm)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- Bike #: string (nullable = true)\n",
" |-- Duration: string (nullable = true)\n",
" |-- End Date: string (nullable = true)\n",
" |-- End Station: string (nullable = true)\n",
" |-- End Terminal: string (nullable = true)\n",
" |-- Start Date: string (nullable = true)\n",
" |-- Start Station: string (nullable = true)\n",
" |-- Start Terminal: string (nullable = true)\n",
" |-- Subscription Type: string (nullable = true)\n",
" |-- Trip ID: string (nullable = true)\n",
" |-- Zip Code: string (nullable = true)\n",
"\n"
]
}
],
"source": [
"btd.printSchema()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can grab a couple, to see what the layout looks like."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[Row(Bike #=u'520', Duration=u'63', End Date=u'8/29/13 14:14', End Station=u'South Van Ness at Market', End Terminal=u'66', Start Date=u'8/29/13 14:13', Start Station=u'South Van Ness at Market', Start Terminal=u'66', Subscription Type=u'Subscriber', Trip ID=u'4576', Zip Code=u'94127'),\n",
" Row(Bike #=u'661', Duration=u'70', End Date=u'8/29/13 14:43', End Station=u'San Jose City Hall', End Terminal=u'10', Start Date=u'8/29/13 14:42', Start Station=u'San Jose City Hall', Start Terminal=u'10', Subscription Type=u'Subscriber', Trip ID=u'4607', Zip Code=u'95138'),\n",
" Row(Bike #=u'48', Duration=u'71', End Date=u'8/29/13 10:17', End Station=u'Mountain View City Hall', End Terminal=u'27', Start Date=u'8/29/13 10:16', Start Station=u'Mountain View City Hall', Start Terminal=u'27', Subscription Type=u'Subscriber', Trip ID=u'4130', Zip Code=u'97214')]"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"btd.take(3)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now one thing I'd like to look at is the duration distribution - can we see how common certain ride times are?\n",
"\n",
"To answer that we'll get the durations and the way we'll be doing it is through the Spark SQL Interface. To do so we'll register it as a table."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"sqlCtx.registerDataFrameAsTable(btd, \"bay_area_bike\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now as you may have noted above, the durations are in seconds. Let's start off by looking at all rides under 2 hours."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"7200"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"60 * 60 * 2 # 2 hours in seconds"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df2 = sqlCtx.sql(\"SELECT Duration as d1 from bay_area_bike where Duration < 7200\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We've created a new DataFrame from the transformation and query - now we're ready to plot it. One of the great things about plotly is that you can throw very large datasets at it and it will do just fine. It's certainly a much more scalable solution than matplotlib.\n",
"\n",
"Below I create a histogram of the data."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"data = [go.Histogram(x=df2.toPandas()['d1'])]"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/Users/bill_chambers/.virtualenvs/plotly-notebook/lib/python2.7/site-packages/plotly/plotly/plotly.py:187: UserWarning:\n",
"\n",
"Woah there! Look at all those points! Due to browser limitations, Plotly has a hard time graphing more than 500k data points for line charts, or 40k points for other types of charts. Here are some suggestions:\n",
"(1) Trying using the image API to return an image instead of a graph URL\n",
"(2) Use matplotlib\n",
"(3) See if you can create your visualization with fewer data points\n",
"\n",
"If the visualization you're using aggregates points (e.g., box plot, histogram, etc.) you can disregard this warning.\n",
"\n"
]
},
{
"data": {
"text/html": [
""
],
"text/plain": [
""
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"py.iplot(data, filename=\"spark/less_2_hour_rides\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"That was simple and we can see that plotly was able to handle the data without issue. We can see that big uptick in rides that last less than ~30 minutes (2000 seconds) - so let's look at that distribution."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df3 = sqlCtx.sql(\"SELECT Duration as d1 from bay_area_bike where Duration < 2000\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A great thing about Apache Spark is that you can sample easily from large datasets, you just set the amount you would like to sample and you're all set. Plotly converts those samples into beautifully overlayed histograms. This is a great way to eyeball different distributions."
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
""
],
"text/plain": [
""
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"s1 = df2.sample(False, 0.05, 20)\n",
"s2 = df3.sample(False, 0.05, 2500)\n",
"\n",
"data = [\n",
" go.Histogram(x=s1.toPandas()['d1'], name=\"Large Sample\"),\n",
" go.Histogram(x=s2.toPandas()['d1'], name=\"Small Sample\")\n",
" ]\n",
"\n",
"py.iplot(data, filename=\"spark/sample_rides\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"What's really powerful about Plotly is sharing this data is simple. I can take the above graph and change the styling or bins visually. A common workflow is to make a rough sketch of the graph in code, then make a more refined version with notes to share with management like the one below. Plotly's online interface allows you to edit graphs in other languages as well."
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
""
],
"text/plain": [
""
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import plotly.tools as tls\n",
"tls.embed(\"https://plotly.com/~bill_chambers/101\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now let's check out bike rentals from individual stations. We can do a groupby with Spark DataFrames just as we might in Pandas. We've also seen at this point how easy it is to convert a Spark DataFrame to a pandas DataFrame."
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"34 San Francisco Caltrain (Townsend at 4th)\n",
"47 Harry Bridges Plaza (Ferry Building)\n",
"0 Embarcadero at Sansome\n",
"Name: Start Station, dtype: object"
]
},
"execution_count": 18,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"dep_stations = btd.groupBy(btd['Start Station']).count().toPandas().sort('count', ascending=False)\n",
"dep_stations['Start Station'][:3] # top 3 stations"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"we'll add a handy function to help us convert all of these into appropriate count data. We're just using pandas resampling function to turn this into day count data."
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {
"scrolled": false
},
"outputs": [
{
"data": {
"text/html": [
""
],
"text/plain": [
""
]
},
"execution_count": 21,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"def transform_df(df):\n",
" df['counts'] = 1\n",
" df['Start Date'] = df['Start Date'].apply(pd.to_datetime)\n",
" return df.set_index('Start Date').resample('D', how='sum')\n",
"\n",
"pop_stations = [] # being popular stations - we could easily extend this to more stations\n",
"for station in dep_stations['Start Station'][:3]:\n",
" temp = transform_df(btd.where(btd['Start Station'] == station).select(\"Start Date\").toPandas())\n",
" pop_stations.append(\n",
" go.Scatter(\n",
" x=temp.index,\n",
" y=temp.counts,\n",
" name=station\n",
" )\n",
" )\n",
" \n",
"data = pop_stations\n",
"py.iplot(data, filename=\"spark/over_time\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": true
},
"source": [
"Interestingly we can see similar patterns for the Embarcadero and Ferry Buildings. We also get a consistent break between work weeks and work days. There also seems to be an interesting pattern between fall and winter usage for the downtown stations that doesn't seem to affect the Caltrain station."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### References\n",
"You can learn more about Chart Studio Enterprise and collaboration tools with the links below:\n",
"- [Collaborations and Language Support](https://plotly.com/ipython-notebooks/collaboration/)\n",
"- [Network Graphing](https://plotly.com/ipython-notebooks/network-graphs/)\n",
"- [Maps with Plotly](https://plotly.com/ipython-notebooks/basemap-maps/)\n",
"- [Chart Studio Enterprise](https://plotly.com/product/enterprise/)"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
""
],
"text/plain": [
""
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
""
],
"text/plain": [
""
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Collecting git+https://github.com/plotly/publisher.git\n",
" Cloning https://github.com/plotly/publisher.git to c:\\users\\brand\\appdata\\local\\temp\\pip-req-build-7eg7qgbt\n",
"Installing collected packages: publisher\n",
" Found existing installation: publisher 0.11\n",
" Uninstalling publisher-0.11:\n",
" Successfully uninstalled publisher-0.11\n",
" Running setup.py install for publisher: started\n",
" Running setup.py install for publisher: finished with status 'done'\n",
"Successfully installed publisher-0.11\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"C:\\Python27\\lib\\site-packages\\IPython\\nbconvert.py:13: ShimWarning: The `IPython.nbconvert` package has been deprecated since IPython 4.0. You should import from nbconvert instead.\n",
" \"You should import from nbconvert instead.\", ShimWarning)\n",
"C:\\Python27\\lib\\site-packages\\publisher\\publisher.py:53: UserWarning: Did you \"Save\" this notebook before running this command? Remember to save, always save.\n",
" warnings.warn('Did you \"Save\" this notebook before running this command? '\n"
]
}
],
"source": [
"from IPython.display import display, HTML\n",
"\n",
"display(HTML(''))\n",
"display(HTML(''))\n",
"\n",
"! pip install git+https://github.com/plotly/publisher.git --upgrade\n",
"import publisher\n",
"publisher.publish(\n",
" 'apachespark.ipynb', 'python/apache-spark/', 'Plot Data from Apache Spark', \n",
" 'A tutorial showing how to plot Apache Spark DataFrames with Plotly', \n",
" title='Plotting Spark DataFrames | Plotly', has_thumbnail='false',\n",
" language='python', page_type='example_index', display_as='databases', order=2,\n",
" redirect_from= 'ipython-notebooks/apache-spark/')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.14"
}
},
"nbformat": 4,
"nbformat_minor": 1
}