{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Producing and Consuming Messages to/from Kafka and plotting, using python producer and spark consumer\n", "\n", "To run this notebook you must already have created a Kafka topic " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Imports\n", "\n", "We use utility functions from the hops library to make Kafka configuration simple\n", "\n", "Dependencies: \n", "\n", "- hops-py-util\n", "- confluent-kafka" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting Spark application\n" ] }, { "data": { "text/html": [ "
ID | YARN Application ID | Kind | State | Spark UI | Driver log | Current session? |
---|---|---|---|---|---|---|
34 | application_1538483294796_0037 | pyspark | idle | Link | Link | ✔ |
\n", "Sometimes there is a delay before the spark job starts writing to the sink,
\n", "\n", "before going on to the next step in this notebook, go to your HDFS `OUTPUT_PATH` \n", "and verify that the csv output is not empty.
\n", "\n", "If it is empty, re-run the query above
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Read the Data from the Sink" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [], "source": [ "schema = StructType([\n", " StructField(\"value\", FloatType(), True),\n", " StructField(\"timestamp\", TimestampType(), True)])\n", "\n", "df1 = spark.read \\\n", " .format(\"csv\") \\\n", " .option(\"header\", \"false\") \\\n", " .schema(schema) \\\n", " .load(OUTPUT_PATH)" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- value: float (nullable = true)\n", " |-- timestamp: timestamp (nullable = true)" ] } ], "source": [ "df1.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Visualize the DataFrame using SparkMagic\n", "\n", "This visualization currenly only works in Python 2.*
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This command copies the spark dataframe from the cluster \n", "to the local machine and converts it to a pandas dataframe named \"df1\". \n", "This pandas dataframe is available in all cells started with the sparkmagic: %%local and can be used for \n", "visualizations and plotting." ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [], "source": [ "%%spark -o df1" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Below is sparkmagics default plotting" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "bf3b24a36bee4e8ca5d9ea46c1389c10", "version_major": 2, "version_minor": 0 }, "text/plain": [ "VkJveChjaGlsZHJlbj0oSEJveChjaGlsZHJlbj0oSFRNTCh2YWx1ZT11J1R5cGU6JyksIEJ1dHRvbihkZXNjcmlwdGlvbj11J1RhYmxlJywgbGF5b3V0PUxheW91dCh3aWR0aD11JzcwcHgnKSzigKY=\n" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "14d51278b25c4bc1b9e631c942bc63dd", "version_major": 2, "version_minor": 0 }, "text/plain": [ "Output()" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "%%local\n", "df1" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Install matplotlib on the local machine in case it is not already installed" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%bash\n", "pip install --user matplotlib" ] }, { "cell_type": "code", "execution_count": 39, "metadata": {}, "outputs": [ { "data": { "image/png": "\n", "text/plain": [ "