{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Clickstream Analysis using Apache Spark and Apache Kafka(or Message hub).\n", "\n", "[Message Hub: Apache Kafka as a Service](https://developer.ibm.com/messaging/2016/03/14/message-hub-apache-kafka-as-a-service/), is well integrated into the IBM Data Science Experience.\n", "\n", "\n", "Before running the notebook, you will need to setup a [Message hub](https://developer.ibm.com/messaging/message-hub/) service. \n", "\n", "**Note:** Message hub is a paid service.\n", "\n", "* For creating a Message hub service, go to `Data services-> Services` tab on the dashboard. Select the option to create a message hub service following the on screen instructions. Post creating service instance, select it and create a topic `clicks` with all defaults or as per your preference.\n", "\n", "* Once the service is running, it has to be added to the current notebook. For this, first we need to create a connection for this message hub service instance. Go to `Data services-> connections` tab on dashboard and create new connection filling in the details and referring to the above created service instance in the Service instance section and then select topic `clicks`. Once done, go to `Assets` tab on the project dashboard and then click `+New data asset`. Then locate the created message hub service connection under the connections tab and click `Apply`.\n", "\n", "* Once the service is added to the notebook, credentials to access it can be auto inserted. Please follow comments in the next step, for instructions on how to insert credentials. Once the credentials are inserted it is ready for execution.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Credentials Section. " ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Waiting for a Spark session to start..." ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "2.1.0" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ " // In a Project create a Connection to a Message Hub topic and then in a Notebook in the Project use 'Insert to Code'\n", "// to get the connection info pasted into the code cell. And rename it to credentials_1.\n", "\n", " // Just like this: \n", "// @hidden_cell\n", "/*var credentials_1 = scala.collection.mutable.HashMap[String, String](\n", " \"instance_id\"->\"***\",\n", " \"mqlight_lookup_url\"->\"***\",\n", " \"api_key\"->\"***\",\n", " \"kafka_admin_url\"->\"***\",\n", " \"kafka_rest_url\"->\"***\",\n", " \"kafka_brokers_sasl\"->\"ip:port\",\n", " \"user\"->\"***\",\n", " \"password\"->\"\"\"****\"\"\",\n", " \"topic\"->\"test\"\n", ")*/\n", "// @hidden_cell\n", "\n", "\n", "spark.version" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Specify the schema of the incoming wikipedia clickstream and the parse method:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": true }, "outputs": [], "source": [ "import scala.util.Try\n" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": true }, "outputs": [], "source": [ "case class Click(prev: String, curr: String, link: String, n: Long)\n" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def parseVal(x: Array[Byte]): Option[Click] = {\n", " val split: Array[String] = new Predef.String(x).split(\"\\\\t\")\n", " if (split.length == 4) {\n", " Try(Click(split(0), split(1), split(2), split(3).toLong)).toOption\n", " } else\n", " None\n", " }" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "collapsed": true }, "outputs": [], "source": [ "val user = credentials_1(\"user\")\n", "val pass = credentials_1(\"password\")\n", "val topic = credentials_1(\"topic\")\n", "val saslConfig =\n", " s\"\"\"org.apache.kafka.common.security.plain.PlainLoginModule required\n", " |debug=true\n", " |username=\"$user\"\n", " |password=\"$pass\";\"\"\".stripMargin\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Setup structured streaming to read from Kafka:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "scrolled": true }, "outputs": [], "source": [ "val records = spark.readStream.format(\"kafka\").option(\"kafka.sasl.jaas.config\", saslConfig).option(\"subscribe\", topic).option(\"kafka.sasl.mechanism\", \"PLAIN\").option(\"kafka.security.protocol\", \"SASL_SSL\").option(\"failOnDataLoss\", \"false\").option(\"kafka.bootstrap.servers\", credentials_1(\"kafka_brokers_sasl\")).load()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Process the records:" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "val messages = records.select(\"value\").as[Array[Byte]].flatMap(x => parseVal(x)).groupBy(\"curr\").agg(Map(\"n\" -> \"sum\")).sort($\"sum(n)\".desc)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Output to the console and start streaming:" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "-------------------------------------------\n", "Batch: 0\n", "-------------------------------------------\n", "+----+------+\n", "|curr|sum(n)|\n", "+----+------+\n", "+----+------+\n", "\n", "-------------------------------------------\n", "Batch: 7\n", "-------------------------------------------\n", "+---------+------+\n", "|curr |sum(n)|\n", "+---------+------+\n", "|Aristotle|108441|\n", "+---------+------+\n", "\n", "-------------------------------------------\n", "Batch: 8\n", "-------------------------------------------\n", "+--------------------------------------------+------+\n", "|curr |sum(n)|\n", "+--------------------------------------------+------+\n", "|Aristotle |108441|\n", "|Monday_Night_Massacre |49366 |\n", "|Fin_(Syd_album) |762 |\n", "|################# |313 |\n", "|Binghamton_Devils |280 |\n", "|Fueler |254 |\n", "|Alpine_skiing_at_the_2017_Winter_Universiade|193 |\n", "|Mudyug_concentration_camp |166 |\n", "|Hannah_Montana_(song) |165 |\n", "|Flight_of_Fancy |152 |\n", "|Ouedraogo |141 |\n", "|##################################### |120 |\n", "|Pierre_Thomas |116 |\n", "|Mohammad_Aghazadeh_Khorasani |112 |\n", "|Nic_Radford |94 |\n", "|Lion_(Taiwanese_band) |92 |\n", "|Sid_Roberson |81 |\n", "|Institute_of_Political_Studies_in_Belgrade |65 |\n", "|David_Branson_Smith |61 |\n", "|Legacy_(2017_film) |50 |\n", "+--------------------------------------------+------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "val query = messages.writeStream.outputMode(\"complete\").option(\"truncate\", \"false\").format(\"console\").start()\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Scala 2.11 with Spark 2.1", "language": "scala", "name": "scala-spark21" }, "language_info": { "codemirror_mode": "text/x-scala", "file_extension": ".scala", "mimetype": "text/x-scala", "name": "scala", "pygments_lexer": "scala", "version": "2.11.8" } }, "nbformat": 4, "nbformat_minor": 1 }