{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Structured Streaming" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Many real-world applications benefit from the ability to process data in a streaming manner. Spark provides *Structured Streaming* which a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. With *Structured Streaming*, you can process streaming data in real time as if in batch mode. Moveover, *Structured Streaming* enables fast, scalable, fault-tolerant, end-to-end exactly-once stream processing which make it easier for users to write streaming applications." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Word Count using Structured Streaming" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this simple example, we will show that how to use *Structured Streaming* to maintain a running word count program of text data. This example is initially posted on [Structured Streaming Programming Guide](http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html). You can download the full code [here](https://github.com/apache/spark/blob/v2.1.0/examples/src/main/python/streaming/network_wordcount.py)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We first need to import the *StreamingContext* which is the entry point of *Structured Streaming*." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": true }, "outputs": [], "source": [ "from pyspark.streaming import StreamingContext\n", "from pyspark import SparkContext\n", "from bigdl.util.common import *\n", "\n", "sc = SparkContext('local[*]')\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Then we create a local *StreamingContext* with batch interval of 5 seconds." ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Create a local StreamingContext with two working thread and batch interval of 5 seconds\n", "ssc = StreamingContext(sc, 5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The following lines first create a *DStream* connecting to a localhost that represents the stream of data that will be received from the data server, and then split each line into words and count word in each batch. The last line prints the first ten elements of each RDD generated in this DStream to the console." ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Create a DStream that will connect to hostname:port, like localhost:9999\n", "lines = ssc.socketTextStream(\"localhost\", 9999)\n", "# Split each line into words\n", "words = lines.flatMap(lambda line: line.split(\" \"))\n", "\n", "# Count each word in each batch\n", "pairs = words.map(lambda word: (word, 1))\n", "wordCounts = pairs.reduceByKey(lambda x, y: x + y)\n", "\n", "# Print the first ten elements of each RDD generated in this DStream to the console\n", "wordCounts.pprint()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To start the computation, we need to call function *start()*. And the function *awaitTermination()* specifies the time to wait in seconds to wait for the execution to stop." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Start the computation\n", "ssc.start()\n", "\n", "# Wait for the computation to terminate \n", "ssc.awaitTermination(20) " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "When the program starts to run, the streaming computation is processing in the background. To actually run the code in your machine, you can run *Netcat* as a data server by typing in an existing terminal." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# TERMINAL: # Running Netcat\n", "$ nc -lk 9999" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Then, you can type words sequentially in the terminal running the netcat server and the counting results will be printed on the termial that runs the word count program." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# TERMINAL: # Running Netcat\n", "$ nc -lk 9999\n", "apache spark\n", "apache hadoop" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If succeed, you can see following outputs on the screen." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "-------------------------------------------\n", "Time: 2017-04-26 15:02:35\n", "-------------------------------------------\n", "\n", "-------------------------------------------\n", "Time: 2017-04-26 15:02:40\n", "-------------------------------------------\n", "(u'apache', 1)\n", "(u'spark', 1)\n", "\n", "-------------------------------------------\n", "Time: 2017-04-26 15:02:45\n", "-------------------------------------------\n", "\n", "-------------------------------------------\n", "Time: 2017-04-26 15:02:50\n", "-------------------------------------------\n", "(u'apache', 1)\n", "(u'hadoop', 1)\n", "\n", "-------------------------------------------\n", "Time: 2017-04-26 15:02:55\n", "-------------------------------------------\n", "\n", "-------------------------------------------\n", "Time: 2017-04-26 15:03:00\n", "-------------------------------------------\n", "\n" ] } ], "source": [ "# Start the computation\n", "ssc.start()\n", "\n", "# Wait for the computation to terminate \n", "ssc.awaitTermination(20) " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For more information about *Structured Streaming*, you can refer to this [site](http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)." ] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.12" } }, "nbformat": 4, "nbformat_minor": 2 }