\n",
"Licensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file\n",
"except in compliance with the License. You may obtain a copy of the License at\n",
"http://www.apache.org/licenses/LICENSE-2.0\n",
"Unless required by applicable law or agreed to in writing, software distributed under the\n",
"License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either\n",
"express or implied. See the License for the specific language governing permissions and\n",
"limitations under the License.\n",
"
"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Analyze Clickstream Events\n",
"\n",
"This notebook uses the [Scala](https://www.scala-lang.org/) programming language\n",
"to interact with IBM Db2 Event Stream. It demonstrates how to:\n",
"\n",
"* Connect to Event Store\n",
"* Analyze clickstream data to gain insight into customer interests\n",
"* Visualize the information with interactive Brunel charts"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Connect to IBM Db2 Event Store\n",
"\n",
"### Determine the IP address of your host\n",
"\n",
"Obtain the IP address of the host that you want to connect to by running the appropriate command for your operating system:\n",
"\n",
"* On Mac, run: `ifconfig`\n",
"* On Windows, run: `ipconfig`\n",
"* On Linux, run: `hostname -i`\n",
"\n",
"Edit the `HOST = \"XXX.XXX.XXX.XXX\"` value in the next cell to provide the IP address."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"// Set your host IP address\n",
"val Host = \"XXX.XXX.XXX.XXX\"\n",
"\n",
"// Port will be 1100 for version 1.1.2 or later (5555 for version 1.1.1)\n",
"val Port = \"1100\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Add Brunel integration\n",
"Use cell magic to install the Brunel integration for Apache Toree (Scala)."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Starting download from https://brunelvis.org/jar/spark-kernel-brunel-all-2.3.jar\n",
"Finished download of spark-kernel-brunel-all-2.3.jar\n"
]
}
],
"source": [
"%AddJar -magic https://brunelvis.org/jar/spark-kernel-brunel-all-2.3.jar -f"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Import Scala packages\n",
"\n",
"Import packages for Scala, Spark, and IBM Db2 Event Store."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import sys.process._\n",
"import java.io.File\n",
"import scala.concurrent.{Await, Future}\n",
"import scala.concurrent.duration.Duration\n",
"import org.apache.log4j.{Level, LogManager, Logger}\n",
"import org.apache.spark._\n",
"import org.apache.spark.sql.expressions.Window\n",
"import org.apache.spark.sql.functions._\n",
"import org.apache.spark.sql.ibm.event.EventSession\n",
"import org.apache.spark.sql.Row\n",
"import org.apache.spark.sql.types._\n",
"import com.ibm.event.catalog.TableSchema\n",
"import com.ibm.event.common.ConfigurationReader\n",
"import com.ibm.event.example.DataGenerator\n",
"import com.ibm.event.oltp.EventContext\n",
"import com.ibm.event.oltp.InsertResult"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Connect to Event Store"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"ConfigurationReader.setConnectionEndpoints(Host + \":\" + Port)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Load data from the Event Store table into a DataFrame"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----------+---------+----------+------------+-----------------+------+--------------------+-------+\n",
"| eventId|eventType| timestamp| ipaddress| sessionId|userId| pageUrl|browser|\n",
"+-----------+---------+----------+------------+-----------------+------+--------------------+-------+\n",
"|20170522901| pageView|1496311260|169.34.56.78|y20170522a4499u21|ceaton| /www.cybershop.com| Chrome|\n",
"|20170522902| pageView|1496311320|169.34.56.78|y20170522a4499u21|ceaton|/estore?product_l...| Chrome|\n",
"|20170522903| pageView|1496311440|169.34.56.78|y20170522a4499u21|ceaton|/estore?product_l...| Chrome|\n",
"|20170522904| pageView|1496311500|169.34.56.78|y20170522a4499u21|ceaton|/estore?product_l...| Chrome|\n",
"|20170522905| pageView|1496311560|169.34.56.78|y20170522a4499u21|ceaton|/estore?product_l...| Chrome|\n",
"+-----------+---------+----------+------------+-----------------+------+--------------------+-------+\n",
"only showing top 5 rows\n",
"\n"
]
}
],
"source": [
"val sqlContext = new EventSession(spark.sparkContext, \"TESTDB\")\n",
"import sqlContext.implicits._\n",
"\n",
"val table = sqlContext.loadEventTable(\"ClickStreamTable\")\n",
"table.registerTempTable(\"ClickStreamTable\")\n",
"\n",
"val clickStreamDF = sqlContext.sql(\"select * from ClickStreamTable\")\n",
"clickStreamDF.show(5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Prepare the clickstream data\n",
"\n",
"Use Spark SQL and Spark functions to build DataFrames with aggregated web metrics.\n",
"\n",
"### Calculate time spent on web pages"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----------+---------+----------+------------+-----------------+------+--------------------+-------+----+\n",
"| eventId|eventType| timestamp| ipaddress| sessionId|userId| pageUrl|browser|time|\n",
"+-----------+---------+----------+------------+-----------------+------+--------------------+-------+----+\n",
"|20170522901| pageView|1496311260|169.34.56.78|y20170522a4499u21|ceaton| /www.cybershop.com| Chrome| 60|\n",
"|20170522902| pageView|1496311320|169.34.56.78|y20170522a4499u21|ceaton|/estore?product_l...| Chrome| 120|\n",
"|20170522903| pageView|1496311440|169.34.56.78|y20170522a4499u21|ceaton|/estore?product_l...| Chrome| 60|\n",
"|20170522904| pageView|1496311500|169.34.56.78|y20170522a4499u21|ceaton|/estore?product_l...| Chrome| 60|\n",
"|20170522905| pageView|1496311560|169.34.56.78|y20170522a4499u21|ceaton|/estore?product_l...| Chrome| 60|\n",
"+-----------+---------+----------+------------+-----------------+------+--------------------+-------+----+\n",
"only showing top 5 rows\n",
"\n"
]
}
],
"source": [
"val timestamp = clickStreamDF(\"timestamp\")\n",
"val next_timestamp = lead(timestamp, 1).over(Window.orderBy(timestamp))\n",
"\n",
"// Calculate time on spent on web pages\n",
"val clickStreamWithTimeDF = clickStreamDF.withColumn(\n",
" \"time\", next_timestamp.cast(LongType) - timestamp.cast(LongType))\n",
"clickStreamWithTimeDF.show(5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Calculate aggregated page hits and time spent"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----------------------------------------------------------------------------+---------+----------+\n",
"|pageURL |page_hits|total_time|\n",
"+-----------------------------------------------------------------------------+---------+----------+\n",
"|/www.cybershop.com |9 |600 |\n",
"|/estore?product_line=smartphones&action=catalog |13 |1260 |\n",
"|/estore?product_line=smartphones&product=A-phone&action=details |7 |540 |\n",
"|/estore?product_line=smartphones&product=A-phone&feature=color&action=details|5 |660 |\n",
"|/estore?product_line=smartphones&product=S-phone&action=details |5 |600 |\n",
"+-----------------------------------------------------------------------------+---------+----------+\n",
"only showing top 5 rows\n",
"\n"
]
}
],
"source": [
"val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n",
"\n",
"clickStreamWithTimeDF.registerTempTable(\"tempData\")\n",
"val clickStreamWithDateTimeDF = sqlContext.sql(\n",
" \"select eventId, eventType, cast(from_unixtime(timestamp) as date), \" +\n",
" \"ipaddress, sessionId, userId, pageUrl, browser, time \" +\n",
" \"from tempData\").withColumnRenamed(\n",
" \"CAST(from_unixtime(CAST(timestamp AS BIGINT), yyyy-MM-dd HH:mm:ss) AS DATE)\",\n",
" \"date\")\n",
"// clickStreamWithDateTimeDF.show(5)\n",
"\n",
"clickStreamWithDateTimeDF.registerTempTable(\"ClickData\")\n",
"val clicksDF = sqlContext.sql(\n",
" \"select pageURL, count(*) as page_hits, sum(time) as total_time \" +\n",
" \"from ClickData where eventType='pageView' group by pageURL\")\n",
"clicksDF.show(5,false)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+---------+----------+\n",
"| pageURL|page_hits|total_time|\n",
"+--------------------+---------+----------+\n",
"| /www.cybershop.com| 9| 600|\n",
"|/estore?product_l...| 13| 1260|\n",
"|/estore?product_l...| 7| 540|\n",
"|/estore?product_l...| 5| 660|\n",
"|/estore?product_l...| 5| 600|\n",
"+--------------------+---------+----------+\n",
"only showing top 5 rows\n",
"\n"
]
}
],
"source": [
"clicksDF.registerTempTable(\"WebMetricsData\")\n",
"val webMetricsDF = sqlContext.sql(\"select * from WebMetricsData\")\n",
"webMetricsDF.show(5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Calculate aggregated web metrics by product line, product, and feature"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+------------+\n",
"|product_line|\n",
"+------------+\n",
"| videogames|\n",
"| videogames|\n",
"| smartphones|\n",
"| smartphones|\n",
"| smartphones|\n",
"+------------+\n",
"only showing top 5 rows\n",
"\n",
"+------------+-------+-------+---------+---------+----------+\n",
"|product_line| action|product| feature|page_hits|total_time|\n",
"+------------+-------+-------+---------+---------+----------+\n",
"| videogames|details| W-game| | 1| 120|\n",
"| videogames|catalog| | | 6| 4680|\n",
"| smartphones|details|A-phone|processor| 2| 120|\n",
"| smartphones|details|A-phone| | 7| 540|\n",
"| smartphones|details|A-phone| color| 5| 660|\n",
"+------------+-------+-------+---------+---------+----------+\n",
"only showing top 5 rows\n",
"\n"
]
}
],
"source": [
"clicksDF.registerTempTable(\"WebMetricsDataTest\")\n",
"val metricsQuery = \"\"\"\n",
" select\n",
" parse_URL(pageURL,'QUERY','product_line') as product_line, \n",
" Coalesce(parse_URL(pageURL,'QUERY','action'),'') as action,\n",
" Coalesce(parse_URL(pageURL,'QUERY','product'),'') as product, \n",
" Coalesce(parse_URL(pageURL,'QUERY','feature'),'') as feature, page_hits, total_time\n",
" from WebMetricsData\"\"\"\n",
"val metricsQuery2 = \"\"\"\n",
" select\n",
" parse_URL(pageURL,'QUERY','product_line') as product_line, \n",
" parse_URL(pageURL,'QUERY','action') as action,\n",
" parse_URL(pageURL,'QUERY','product') as product, \n",
" from WebMetricsData\"\"\"\n",
"val metricsQuery3 = \"\"\"\n",
" select parse_URL(pageURL,'QUERY','product_line') as product_line\n",
" from WebMetricsDataTest\"\"\"\n",
"val webMetricsDF3 = sqlContext.sql(metricsQuery3).filter($\"product_line\".isNotNull).sort($\"product_line\".desc)\n",
"webMetricsDF3.show(5)\n",
"val webMetricsDF = sqlContext.sql(metricsQuery).filter($\"product_line\".isNotNull).sort($\"product_line\".desc)\n",
"webMetricsDF.show(5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Aggregated web metrics for all product lines"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------+---------+----------+\n",
"| product_line|page_hits|total_time|\n",
"+--------------+---------+----------+\n",
"| smartphones| 58| 6360|\n",
"| computers| 16| 3720|\n",
"| videogames| 7| 4800|\n",
"| appliances| 5| 2220|\n",
"| hometheater| 4| 840|\n",
"| headphones| 4| 960|\n",
"|carelectronics| 2| 420|\n",
"| cameras| 2| 360|\n",
"+--------------+---------+----------+\n",
"\n"
]
}
],
"source": [
"val productlineMetrics = webMetricsDF.\n",
" select(\"product_line\",\"page_hits\",\"total_time\").\n",
" groupBy(\"product_line\").agg(sum(\"page_hits\"), sum(\"total_time\")).\n",
" withColumnRenamed(\"sum(page_hits)\",\"page_hits\").\n",
" withColumnRenamed(\"sum(total_time)\",\"total_time\").\n",
" sort($\"page_hits\".desc)\n",
" \n",
"productlineMetrics.show()"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": false,
"scrolled": false
},
"outputs": [
{
"data": {
"text/html": [
"\n",
" \n",
" \n",
" \n",
"