# Structured Streaming

Many real-world applications benefit from the ability to process data in a streaming manner. Spark provides *Structured Streaming* which a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. With *Structured Streaming*, you can process streaming data in real time as if in batch mode. Moveover, *Structured Streaming* enables fast, scalable, fault-tolerant, end-to-end exactly-once stream processing which make it easier for users to write streaming applications.

## Word Count using Structured Streaming

In this simple example, we will show that how to use *Structured Streaming* to maintain a running word count program of text data. This example is initially posted on [Structured Streaming Programming Guide](http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html). You can download the full code [here](https://github.com/apache/spark/blob/v2.1.0/examples/src/main/python/streaming/network_wordcount.py).

We first need to import the *StreamingContext* which is the entry point of *Structured Streaming*.

In [1]:
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
from bigdl.util.common import *

sc = SparkContext('local[*]')


Then we create a local *StreamingContext* with batch interval of 5 seconds.

In [2]:
# Create a local StreamingContext with two working thread and batch interval of 5 seconds
ssc = StreamingContext(sc, 5)

The following lines first create a *DStream* connecting to a localhost that represents the stream of data that will be received from the data server, and then split each line into words and count word in each batch. The last line prints the first ten elements of each RDD generated in this DStream to the console.

In [3]:
# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)
# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()

To start the computation, we need to call function *start()*. And the function *awaitTermination()* specifies the time to wait in seconds to wait for the execution to stop.

In [None]:
# Start the computation
ssc.start()

# Wait for the computation to terminate 
ssc.awaitTermination(20) 

When the program starts to run, the streaming computation is processing in the background. To actually run the code in your machine, you can run *Netcat* as a data server by typing in an existing terminal.

In [None]:
# TERMINAL: # Running Netcat
$ nc -lk 9999

Then, you can type words sequentially in the terminal running the netcat server and the counting results will be printed on the termial that runs the word count program.

In [None]:
# TERMINAL: # Running Netcat
$ nc -lk 9999
apache spark
apache hadoop

If succeed, you can see following outputs on the screen.

In [4]:
# Start the computation
ssc.start()

# Wait for the computation to terminate 
ssc.awaitTermination(20) 

-------------------------------------------
Time: 2017-04-26 15:02:35
-------------------------------------------

-------------------------------------------
Time: 2017-04-26 15:02:40
-------------------------------------------
(u'apache', 1)
(u'spark', 1)

-------------------------------------------
Time: 2017-04-26 15:02:45
-------------------------------------------

-------------------------------------------
Time: 2017-04-26 15:02:50
-------------------------------------------
(u'apache', 1)
(u'hadoop', 1)

-------------------------------------------
Time: 2017-04-26 15:02:55
-------------------------------------------

-------------------------------------------
Time: 2017-04-26 15:03:00
-------------------------------------------



For more information about *Structured Streaming*, you can refer to this [site](http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html).