Plotly demo with spark

https://plot.ly/python/apache-spark/

  **Table of Contents**

  <div id="toc"></div>
  <script type="text/javascript"
  src="https://raw.github.com/kmahelona/ipython_notebook_goodies/master/ipython_notebook_toc.js">
  </script>
  


# Check pyspark is loaded correctly

In [2]:
import os
import sys

spark_home = os.environ.get('SPARK_HOME', None)
print spark_home

/home/takanori/mybin/spark-2.0.0-bin-hadoop2.7


In [3]:
from __future__ import print_function #python 3 support
print(sc)

<pyspark.context.SparkContext object at 0x7f6ca3274d50>


Spark Context object loaded nicely :).

How about ``sqlcontext``?

In [8]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
print(sqlContext)

<pyspark.sql.context.SQLContext object at 0x7f6c91426090>


# Download bike data

In [13]:
%%bash
wget https://raw.githubusercontent.com/anabranch/Interactive-Graphs-with-Plotly/master/btd2.json

--2016-09-22 15:49:51--  https://raw.githubusercontent.com/anabranch/Interactive-Graphs-with-Plotly/master/btd2.json
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.20.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.20.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 41125386 (39M) [text/plain]
Saving to: ‘btd2.json’

     0K .......... .......... .......... .......... ..........  0%  219M 0s
    50K .......... .......... .......... .......... ..........  0%  402M 0s
   100K .......... .......... .......... .......... ..........  0%  408M 0s
   150K .......... .......... .......... .......... ..........  0%  382M 0s
   200K .......... .......... .......... .......... ..........  0% 8.15M 1s
   250K .......... .......... .......... .......... ..........  0% 7.07M 2s
   300K .......... .......... .......... .......... ..........  0% 16.5M 2s
   350K .......... .......... .......... .......... ........

In [14]:
btd = sqlContext.read.json('btd2.json')

In [15]:
print(type(btd))

<class 'pyspark.sql.dataframe.DataFrame'>


In [16]:
btd.printSchema()

root
 |-- Bike #: string (nullable = true)
 |-- Duration: string (nullable = true)
 |-- End Date: string (nullable = true)
 |-- End Station: string (nullable = true)
 |-- End Terminal: string (nullable = true)
 |-- Start Date: string (nullable = true)
 |-- Start Station: string (nullable = true)
 |-- Start Terminal: string (nullable = true)
 |-- Subscription Type: string (nullable = true)
 |-- Trip ID: string (nullable = true)
 |-- Zip Code: string (nullable = true)



In [17]:
btd.take(3)

[Row(Bike #=u'520', Duration=u'63', End Date=u'8/29/13 14:14', End Station=u'South Van Ness at Market', End Terminal=u'66', Start Date=u'8/29/13 14:13', Start Station=u'South Van Ness at Market', Start Terminal=u'66', Subscription Type=u'Subscriber', Trip ID=u'4576', Zip Code=u'94127'),
 Row(Bike #=u'661', Duration=u'70', End Date=u'8/29/13 14:43', End Station=u'San Jose City Hall', End Terminal=u'10', Start Date=u'8/29/13 14:42', Start Station=u'San Jose City Hall', Start Terminal=u'10', Subscription Type=u'Subscriber', Trip ID=u'4607', Zip Code=u'95138'),
 Row(Bike #=u'48', Duration=u'71', End Date=u'8/29/13 10:17', End Station=u'Mountain View City Hall', End Terminal=u'27', Start Date=u'8/29/13 10:16', Start Station=u'Mountain View City Hall', Start Terminal=u'27', Subscription Type=u'Subscriber', Trip ID=u'4130', Zip Code=u'97214')]

# Register dataframe as table to use SQL commands!

In [21]:
sqlContext.registerDataFrameAsTable(btd, "bay_area_bike")

In [22]:
# now i can use sql commands with table named *bay_area_bike*
df2 = sqlContext.sql("SELECT Duration as d1 from bay_area_bike where Duration < 7200")

In [23]:
df2.printSchema()

root
 |-- d1: string (nullable = true)



# Now let's visualize!

In [24]:
data = Data([Histogram(x=df2.toPandas()['d1'])])



In [25]:
py.iplot(data, filename="spark/less_2_hour_rides")


Woah there! Look at all those points! Due to browser limitations, the Plotly SVG drawing functions have a hard time graphing more than 500k data points for line charts, or 40k points for other types of charts. Here are some suggestions:
(1) Use the `plotly.graph_objs.Scattergl` trace object to generate a WebGl graph.
(2) Trying using the image API to return an image instead of a graph URL
(3) Use matplotlib
(4) See if you can create your visualization with fewer data points






That was simple and we can see that plotly was able to handle the data without issue. We can see that big uptick in rides that last less than ~30 minutes (2000 seconds) - so let's look at that distribution.


In [26]:
df3 = sqlContext.sql("SELECT Duration as d1 from bay_area_bike where Duration < 2000")





A great thing about Apache Spark is that you can sample easily from large datasets, you just set the amount you would like to sample and you're all set.


In [27]:
s1 = df2.sample(False, 0.05, 20)
s2 = df3.sample(False, 0.05, 2500)

In [28]:
data = Data([
        Histogram(x=s1.toPandas()['d1'], name="Large Sample"),
        Histogram(x=s2.toPandas()['d1'], name="Small Sample")
    ])


Plotly converts those samples into beautifully overlayed histograms. This is a great way to eyeball different distributions.

In [29]:
py.iplot(data, filename="spark/sample_rides")

What's really powerful about Plotly is sharing this data is simple. I can take the above graph and change the styling or bins visually. A common workflow is to make a rough sketch of the graph in code, then make a more refined version with notes to share with management like the one below. Plotly's online interface allows you to edit graphs in other languages as well.

In [30]:
import plotly.tools as tls
tls.embed("https://plot.ly/~bill_chambers/101")

# PySpark Dataframe to Pandas Dataframe

Now let's check out bike rentals from individual stations. We can do a groupby with Spark DataFrames just as we might in Pandas. We've also seen at this point how easy it is to convert a Spark DataFrame to a pandas DataFrame.

In [31]:
dep_stations = btd.groupBy(btd['Start Station']).count().toPandas().sort('count', ascending=False)
dep_stations


sort(columns=....) is deprecated, use sort_values(by=.....)



Unnamed: 0,Start Station,count
64,San Francisco Caltrain (Townsend at 4th),9838
50,Harry Bridges Plaza (Ferry Building),7343
54,Embarcadero at Sansome,6545
6,Market at Sansome,5922
27,Temporary Transbay Terminal (Howard at Beale),5113
26,Market at 4th,5030
66,2nd at Townsend,4987
58,San Francisco Caltrain 2 (330 Townsend),4976
28,Steuart at Market,4913
14,Townsend at 7th,4493




Now that we've got a better sense of which stations might be interesting to look at, let's graph out, the number of trips leaving from the top two stations over time.


In [32]:
dep_stations['Start Station'][:3] # top 3 stations

64    San Francisco Caltrain (Townsend at 4th)
50        Harry Bridges Plaza (Ferry Building)
54                      Embarcadero at Sansome
Name: Start Station, dtype: object

In [33]:
def transform_df(df):
    df['counts'] = 1
    df['Start Date'] = df['Start Date'].apply(pd.to_datetime)
    return df.set_index('Start Date').resample('D', how='sum')

In [34]:
pop_stations = [] # being popular stations - we could easily extend this to more stations
for station in dep_stations['Start Station'][:3]:
    temp = transform_df(btd.where(btd['Start Station'] == station).select("Start Date").toPandas())
    pop_stations.append(
        Scatter(
        x=temp.index,
        y=temp.counts,
        name=station
        )
    )


how in .resample() is deprecated
the new syntax is .resample(...).sum()



In [35]:


data = Data(pop_stations)
py.iplot(data, filename="spark/over_time")

