---
title: "Hive PySpark Example"
date: 2021-02-24
type: technical_note
draft: false
---

# PySpark With Hive

In this notebook we'll cover how you can read/write to Hive using SparkSQL, this notebook assumes that you have enabled the service "Hive" in your project

## Create a SparkSession with Hive Enabled

sparkmagic automatically creates a spark session in the cluster for us with Hive enabled

In [1]:
spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1540813611542_0002,pyspark,idle,Link,Link,âœ”


SparkSession available as 'spark'.
<pyspark.sql.session.SparkSession object at 0x7f183f464860>

## Select Hive Database

Using the spark session you can interact with Hive through the `sql` method on the sparkSession, or through auxillary methods likes `.select()` and `.where()`. 

Each project that have enabled Hive will automatically have a Hive database created for them, this is the only Hive database that you can access unless someone have shared their database with you.

In [2]:
from hops import hdfs as hopsfs
PROJECT_NAME = hopsfs.project_name()

In [3]:
PROJECT_NAME

'test'

In [4]:
spark.sql("use " + PROJECT_NAME)

DataFrame[]

## Create Tables

Tables can be created either by issuing a `CREATE TABLE` statement or by using the `saveAsTable()` method on an existing dataframe. When using `saveAsTable` spark will infer the schema from the dataframe and do the `CREATE TABLE` for you. 

In [5]:
spark.sql("show tables").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+

In [6]:
spark.sql("CREATE TABLE MAGIC_MATRIX (position int, value float) STORED AS ORC")

DataFrame[]

In [7]:
spark.sql("show tables").show()

+--------+------------+-----------+
|database|   tableName|isTemporary|
+--------+------------+-----------+
|    test|magic_matrix|      false|
+--------+------------+-----------+

In [8]:
from pyspark.sql.types import *
schema = StructType([StructField('SquaredValue', IntegerType(), True)])

In [9]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(spark.sparkContext)
rddValues = spark.sparkContext.parallelize(list(range(0,100))).map(lambda x: [x*x])
dfValues = sqlContext.createDataFrame(rddValues,schema)

In [10]:
dfValues.show(5)

+------------+
|SquaredValue|
+------------+
|           0|
|           1|
|           4|
|           9|
|          16|
+------------+
only showing top 5 rows

In [11]:
dfValues.write.format("ORC").mode("overwrite").saveAsTable("SquaredValues")

In [12]:
spark.sql("show tables").show()

+--------+-------------+-----------+
|database|    tableName|isTemporary|
+--------+-------------+-----------+
|    test| magic_matrix|      false|
|    test|squaredvalues|      false|
+--------+-------------+-----------+

## Insert Values

Values can be inserted with plain SQL or by using `saveAsTable` / `insertInto`

### Simple Insert

In [13]:
spark.sql("INSERT INTO TABLE magic_matrix VALUES (1, 99), (2, 100)")

DataFrame[]

In [14]:
spark.sql("SELECT * FROM magic_matrix").show()

+--------+-----+
|position|value|
+--------+-----+
|       1| 99.0|
|       2|100.0|
+--------+-----+

### Insert  with saveAsTable

In [15]:
rddValues2 = spark.sparkContext.parallelize(list(range(100,200))).map(lambda x: [x*x])
dfValues2 = sqlContext.createDataFrame(rddValues2,schema)

In [16]:
spark.sql("SELECT COUNT(*) FROM squaredvalues").show()

+--------+
|count(1)|
+--------+
|     100|
+--------+

In [17]:
dfValues2.write.format("ORC").mode("append").saveAsTable("squaredvalues")

In [18]:
spark.sql("SELECT COUNT(*) FROM squaredvalues").show()

+--------+
|count(1)|
+--------+
|     200|
+--------+

## Insert with insertInto

In [19]:
dfValues2.write.mode("append").insertInto("squaredvalues")

In [20]:
spark.sql("SELECT COUNT(*) FROM squaredvalues").show()

+--------+
|count(1)|
+--------+
|     300|
+--------+

You can also use overwrite mode:

In [24]:
dfValues2.write.format("ORC").mode("overwrite").saveAsTable("squaredvalues")

In [27]:
spark.sql("REFRESH TABLE squaredvalues")
spark.sql("SELECT COUNT(*) FROM squaredvalues").show()

+--------+
|count(1)|
+--------+
|     100|
+--------+

### Insert by using TempTable

In [19]:
rddValues3 = spark.sparkContext.parallelize(list(range(200,300))).map(lambda x: [x*x])
dfValues3 = sqlContext.createDataFrame(rddValues3,schema)

In [20]:
spark.sql("SELECT COUNT(*) FROM squaredvalues").show()

+--------+
|count(1)|
+--------+
|     200|
+--------+

In [21]:
dfValues3.registerTempTable("temptable")

In [22]:
sqlContext.sql("insert into table squaredvalues select * from temptable")

DataFrame[]

In [23]:
spark.sql("SELECT COUNT(*) FROM squaredvalues").show()

+--------+
|count(1)|
+--------+
|     300|
+--------+

## Queries

In [24]:
spark.sql("SELECT * FROM squaredvalues WHERE squaredvalue > 380 ").show()

+------------+
|SquaredValue|
+------------+
|       40000|
|       40401|
|       40804|
|       41209|
|       41616|
|       42025|
|       42436|
|       42849|
|       43264|
|       43681|
|       44100|
|       44521|
|       44944|
|       45369|
|       45796|
|       46225|
|       46656|
|       47089|
|       47524|
|       47961|
+------------+
only showing top 20 rows

In [25]:
spark.sql("SELECT * FROM magic_matrix WHERE position = 2 ").show()

+--------+-----+
|position|value|
+--------+-----+
|       2|100.0|
+--------+-----+

## Drop Tables

In [26]:
spark.sql("SHOW TABLES").show()

+--------------+-------------+-----------+
|      database|    tableName|isTemporary|
+--------------+-------------+-----------+
|sparksqlonhive| magic_matrix|      false|
|sparksqlonhive|squaredvalues|      false|
|              |    temptable|       true|
+--------------+-------------+-----------+

In [27]:
spark.sql("DROP TABLE magic_matrix")

DataFrame[]

In [28]:
spark.sql("SHOW TABLES").show()

+--------------+-------------+-----------+
|      database|    tableName|isTemporary|
+--------------+-------------+-----------+
|sparksqlonhive|squaredvalues|      false|
|              |    temptable|       true|
+--------------+-------------+-----------+

In [29]:
spark.sql("DROP TABLE squaredvalues")

DataFrame[]

In [30]:
spark.sql("SHOW TABLES").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|        |temptable|       true|
+--------+---------+-----------+

In [31]:
spark.catalog.dropTempView("temptable")

In [32]:
spark.sql("SHOW TABLES").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+