phoenix-spark extends Phoenix's MapReduce support to allow Spark to load Phoenix tables as DataFrames, and enables persisting DataFrames back to Phoenix. ## Reading Phoenix Tables Given a Phoenix table with the following DDL and DML: ```sql CREATE TABLE TABLE1 (ID BIGINT NOT NULL PRIMARY KEY, COL1 VARCHAR); UPSERT INTO TABLE1 (ID, COL1) VALUES (1, 'test_row_1'); UPSERT INTO TABLE1 (ID, COL1) VALUES (2, 'test_row_2'); ``` ### Load as a DataFrame using the DataSourceV2 API ```scala import org.apache.spark.SparkContext import org.apache.spark.sql.{SQLContext, SparkSession} import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource val spark = SparkSession .builder() .appName("phoenix-test") .master("local") .getOrCreate() // Load data from TABLE1 val df = spark.sqlContext .read .format("phoenix") .options(Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181")) .load df.filter(df("COL1") === "test_row_1" && df("ID") === 1L) .select(df("ID")) .show ``` ## Saving to Phoenix ### Save DataFrames to Phoenix using DataSourceV2 The `save` is method on DataFrame allows passing in a data source type. You can use `phoenix` for DataSourceV2 and must also pass in a `table` and `zkUrl` parameter to specify which table and server to persist the DataFrame to. The column names are derived from the DataFrame's schema field names, and must match the Phoenix column names. The `save` method also takes a `SaveMode` option, for which only `SaveMode.Overwrite` is supported. Given two Phoenix tables with the following DDL: ```sql CREATE TABLE INPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); ``` you can load from an input table and save to an output table as a DataFrame as follows: ```scala import org.apache.spark.SparkContext import org.apache.spark.sql.{SQLContext, SparkSession, SaveMode} import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource val spark = SparkSession .builder() .appName("phoenix-test") .master("local") .getOrCreate() // Load INPUT_TABLE val df = spark.sqlContext .read .format("phoenix") .options(Map("table" -> "INPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "hbaseConnectionString")) .load // Save to OUTPUT_TABLE df .write .format("phoenix") .mode(SaveMode.Overwrite) .options(Map("table" -> "INPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "hbaseConnectionString")) .save() ``` ### Save from an external RDD with a schema to a Phoenix table Just like the previous example, you can pass in the data source type as `phoenix` and specify the `table` and `zkUrl` parameters indicating which table and server to persist the DataFrame to. Note that the schema of the RDD must match its column data and this must match the schema of the Phoenix table that you save to. Given an output Phoenix table with the following DDL: ```sql CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); ``` you can save a dataframe from an RDD as follows: ```scala import org.apache.spark.SparkContext import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, StructField} import org.apache.spark.sql.{Row, SQLContext, SparkSession, SaveMode} import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource val spark = SparkSession .builder() .appName("phoenix-test") .master("local") .getOrCreate() val dataSet = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3)) val schema = StructType( Seq(StructField("ID", LongType, nullable = false), StructField("COL1", StringType), StructField("COL2", IntegerType))) val rowRDD = spark.sparkContext.parallelize(dataSet) // Apply the schema to the RDD. val df = spark.sqlContext.createDataFrame(rowRDD, schema) df.write .format("phoenix") .options(Map("table" -> "OUTPUT_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "quorumAddress")) .mode(SaveMode.Overwrite) .save() ``` ## Notes - If you want to use DataSourceV1, you can use source type `"org.apache.phoenix.spark"` instead of `"phoenix"`, however this is deprecated as of `connectors-1.0.0`. - The (deprecated) functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` and `saveToPhoenix` all support optionally specifying a `conf` Hadoop configuration parameter with custom Phoenix client settings, as well as an optional `zkUrl` parameter for the Phoenix connection URL. - If `zkUrl` isn't specified, it's assumed that the "hbase.zookeeper.quorum" property has been set in the `conf` parameter. Similarly, if no configuration is passed in, `zkUrl` must be specified. ## Limitations - Basic support for column and predicate pushdown using the Data Source API - The Data Source API does not support passing custom Phoenix settings in configuration, you must create the DataFrame or RDD directly if you need fine-grained configuration. - No support for aggregate or distinct functions (http://phoenix.apache.org/phoenix_mr.html) ## Deprecated Usages ### Load as a DataFrame directly using a Configuration object ```scala import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.phoenix.spark._ val configuration = new Configuration() // Can set Phoenix-specific settings, requires 'hbase.zookeeper.quorum' val sc = new SparkContext("local", "phoenix-test") val sqlContext = new SQLContext(sc) // Load the columns 'ID' and 'COL1' from TABLE1 as a DataFrame val df = sqlContext.phoenixTableAsDataFrame( "TABLE1", Array("ID", "COL1"), conf = configuration ) df.show ``` ### Load as an RDD, using a Zookeeper URL ```scala import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.phoenix.spark._ import org.apache.spark.rdd.RDD val sc = new SparkContext("local", "phoenix-test") // Load the columns 'ID' and 'COL1' from TABLE1 as an RDD val rdd: RDD[Map[String, AnyRef]] = sc.phoenixTableAsRDD( "TABLE1", Seq("ID", "COL1"), zkUrl = Some("phoenix-server:2181") ) rdd.count() val firstId = rdd.first()("ID").asInstanceOf[Long] val firstCol = rdd.first()("COL1").asInstanceOf[String] ``` ### Saving RDDs to Phoenix `saveToPhoenix` is an implicit method on RDD[Product], or an RDD of Tuples. The data types must correspond to the Java types Phoenix supports (http://phoenix.apache.org/language/datatypes.html) Given a Phoenix table with the following DDL: ```sql CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); ``` ```scala import org.apache.spark.SparkContext import org.apache.phoenix.spark._ val sc = new SparkContext("local", "phoenix-test") val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3)) sc .parallelize(dataSet) .saveToPhoenix( "OUTPUT_TEST_TABLE", Seq("ID","COL1","COL2"), zkUrl = Some("phoenix-server:2181") ) ```