{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "title: \"Hive PySpark Example\"\n", "date: 2021-02-24\n", "type: technical_note\n", "draft: false\n", "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# PySpark With Hive\n", "\n", "In this notebook we'll cover how you can read/write to Hive using SparkSQL, this notebook assumes that you have enabled the service \"Hive\" in your project" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create a SparkSession with Hive Enabled\n", "\n", "sparkmagic automatically creates a spark session in the cluster for us with Hive enabled" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting Spark application\n" ] }, { "data": { "text/html": [ "\n", "
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
0application_1540813611542_0002pysparkidleLinkLink
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "SparkSession available as 'spark'.\n", "" ] } ], "source": [ "spark" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Select Hive Database\n", "\n", "Using the spark session you can interact with Hive through the `sql` method on the sparkSession, or through auxillary methods likes `.select()` and `.where()`. \n", "\n", "Each project that have enabled Hive will automatically have a Hive database created for them, this is the only Hive database that you can access unless someone have shared their database with you." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "from hops import hdfs as hopsfs\n", "PROJECT_NAME = hopsfs.project_name()" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "'test'" ] } ], "source": [ "PROJECT_NAME" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "DataFrame[]" ] } ], "source": [ "spark.sql(\"use \" + PROJECT_NAME)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create Tables\n", "\n", "Tables can be created either by issuing a `CREATE TABLE` statement or by using the `saveAsTable()` method on an existing dataframe. When using `saveAsTable` spark will infer the schema from the dataframe and do the `CREATE TABLE` for you. " ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+---------+-----------+\n", "|database|tableName|isTemporary|\n", "+--------+---------+-----------+\n", "+--------+---------+-----------+" ] } ], "source": [ "spark.sql(\"show tables\").show()" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "DataFrame[]" ] } ], "source": [ "spark.sql(\"CREATE TABLE MAGIC_MATRIX (position int, value float) STORED AS ORC\")" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+------------+-----------+\n", "|database| tableName|isTemporary|\n", "+--------+------------+-----------+\n", "| test|magic_matrix| false|\n", "+--------+------------+-----------+" ] } ], "source": [ "spark.sql(\"show tables\").show()" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql.types import *\n", "schema = StructType([StructField('SquaredValue', IntegerType(), True)])" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SQLContext\n", "sqlContext = SQLContext(spark.sparkContext)\n", "rddValues = spark.sparkContext.parallelize(list(range(0,100))).map(lambda x: [x*x])\n", "dfValues = sqlContext.createDataFrame(rddValues,schema)" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+------------+\n", "|SquaredValue|\n", "+------------+\n", "| 0|\n", "| 1|\n", "| 4|\n", "| 9|\n", "| 16|\n", "+------------+\n", "only showing top 5 rows" ] } ], "source": [ "dfValues.show(5)" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "dfValues.write.format(\"ORC\").mode(\"overwrite\").saveAsTable(\"SquaredValues\")" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+-------------+-----------+\n", "|database| tableName|isTemporary|\n", "+--------+-------------+-----------+\n", "| test| magic_matrix| false|\n", "| test|squaredvalues| false|\n", "+--------+-------------+-----------+" ] } ], "source": [ "spark.sql(\"show tables\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Insert Values\n", "\n", "Values can be inserted with plain SQL or by using `saveAsTable` / `insertInto`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Simple Insert" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "DataFrame[]" ] } ], "source": [ "spark.sql(\"INSERT INTO TABLE magic_matrix VALUES (1, 99), (2, 100)\")" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+-----+\n", "|position|value|\n", "+--------+-----+\n", "| 1| 99.0|\n", "| 2|100.0|\n", "+--------+-----+" ] } ], "source": [ "spark.sql(\"SELECT * FROM magic_matrix\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Insert with saveAsTable" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "rddValues2 = spark.sparkContext.parallelize(list(range(100,200))).map(lambda x: [x*x])\n", "dfValues2 = sqlContext.createDataFrame(rddValues2,schema)" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+\n", "|count(1)|\n", "+--------+\n", "| 100|\n", "+--------+" ] } ], "source": [ "spark.sql(\"SELECT COUNT(*) FROM squaredvalues\").show()" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "dfValues2.write.format(\"ORC\").mode(\"append\").saveAsTable(\"squaredvalues\")" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+\n", "|count(1)|\n", "+--------+\n", "| 200|\n", "+--------+" ] } ], "source": [ "spark.sql(\"SELECT COUNT(*) FROM squaredvalues\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Insert with insertInto" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [], "source": [ "dfValues2.write.mode(\"append\").insertInto(\"squaredvalues\")" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+\n", "|count(1)|\n", "+--------+\n", "| 300|\n", "+--------+" ] } ], "source": [ "spark.sql(\"SELECT COUNT(*) FROM squaredvalues\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can also use overwrite mode:" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [], "source": [ "dfValues2.write.format(\"ORC\").mode(\"overwrite\").saveAsTable(\"squaredvalues\")" ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+\n", "|count(1)|\n", "+--------+\n", "| 100|\n", "+--------+" ] } ], "source": [ "spark.sql(\"REFRESH TABLE squaredvalues\")\n", "spark.sql(\"SELECT COUNT(*) FROM squaredvalues\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Insert by using TempTable" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [], "source": [ "rddValues3 = spark.sparkContext.parallelize(list(range(200,300))).map(lambda x: [x*x])\n", "dfValues3 = sqlContext.createDataFrame(rddValues3,schema)" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+\n", "|count(1)|\n", "+--------+\n", "| 200|\n", "+--------+" ] } ], "source": [ "spark.sql(\"SELECT COUNT(*) FROM squaredvalues\").show()" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [], "source": [ "dfValues3.registerTempTable(\"temptable\")" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "DataFrame[]" ] } ], "source": [ "sqlContext.sql(\"insert into table squaredvalues select * from temptable\")" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+\n", "|count(1)|\n", "+--------+\n", "| 300|\n", "+--------+" ] } ], "source": [ "spark.sql(\"SELECT COUNT(*) FROM squaredvalues\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Queries" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+------------+\n", "|SquaredValue|\n", "+------------+\n", "| 40000|\n", "| 40401|\n", "| 40804|\n", "| 41209|\n", "| 41616|\n", "| 42025|\n", "| 42436|\n", "| 42849|\n", "| 43264|\n", "| 43681|\n", "| 44100|\n", "| 44521|\n", "| 44944|\n", "| 45369|\n", "| 45796|\n", "| 46225|\n", "| 46656|\n", "| 47089|\n", "| 47524|\n", "| 47961|\n", "+------------+\n", "only showing top 20 rows" ] } ], "source": [ "spark.sql(\"SELECT * FROM squaredvalues WHERE squaredvalue > 380 \").show()" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+-----+\n", "|position|value|\n", "+--------+-----+\n", "| 2|100.0|\n", "+--------+-----+" ] } ], "source": [ "spark.sql(\"SELECT * FROM magic_matrix WHERE position = 2 \").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Drop Tables" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------+-------------+-----------+\n", "| database| tableName|isTemporary|\n", "+--------------+-------------+-----------+\n", "|sparksqlonhive| magic_matrix| false|\n", "|sparksqlonhive|squaredvalues| false|\n", "| | temptable| true|\n", "+--------------+-------------+-----------+" ] } ], "source": [ "spark.sql(\"SHOW TABLES\").show()" ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "DataFrame[]" ] } ], "source": [ "spark.sql(\"DROP TABLE magic_matrix\")" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------+-------------+-----------+\n", "| database| tableName|isTemporary|\n", "+--------------+-------------+-----------+\n", "|sparksqlonhive|squaredvalues| false|\n", "| | temptable| true|\n", "+--------------+-------------+-----------+" ] } ], "source": [ "spark.sql(\"SHOW TABLES\").show()" ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "DataFrame[]" ] } ], "source": [ "spark.sql(\"DROP TABLE squaredvalues\")" ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+---------+-----------+\n", "|database|tableName|isTemporary|\n", "+--------+---------+-----------+\n", "| |temptable| true|\n", "+--------+---------+-----------+" ] } ], "source": [ "spark.sql(\"SHOW TABLES\").show()" ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [], "source": [ "spark.catalog.dropTempView(\"temptable\")" ] }, { "cell_type": "code", "execution_count": 32, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+---------+-----------+\n", "|database|tableName|isTemporary|\n", "+--------+---------+-----------+\n", "+--------+---------+-----------+" ] } ], "source": [ "spark.sql(\"SHOW TABLES\").show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }