--- title: "Chapter 4 Big Data Cloud Platform" date: "`r Sys.Date()`" output: html_document: toc: true toc_depth: 3 toc_float: collapsed: true smooth_scroll: true --- This notebook illustrates how to use Databrick's Community Edition account for the Spark big data cloud platform. We use the R package `sparklyr` to drive Spark computation on Spark data frames. The methods mentioned in the notebook are only brief introductions. For more details of additional functions, refer to: - https://docs.databricks.com/user-guide/faq/sparklyr.html - http://spark.rstudio.com/index.html Since both `sparklyr` and the Databrick Community environment are fast developing, we only test the notebook to run under a specific version. **This notebook is updated and tested: 2021-01-31 running on 7.2 ML cluster instance of community edition account.** # Install and Load Library First, we need to install the `sparklyr` package, which enables the connection between the local node (i.e., running R instance) to the Spark cluster environments. As it will install a few dependencies, it may take a few minutes to finish. ```r install.packages("sparklyr") ``` Load package: ```{r} library(sparklyr) ``` # Create Spark Connection Once the R package is loaded, we need to create a Spark Connection to link the R local node to the Spark environment. Here we set `method = "databricks"` in `spark_connect( )` function. In other environments, please consult your IT administrator for more details. The created Spark Connection (i.e., sc) will be the bridge that connects the R local node to the Spark Cluster. ```{r, echo = FALSE, message=FALSE, error=FALSE} # spark_install(version = "3.1.2") # Sys.setenv(JAVA_HOME = "/opt/homebrew/opt/openjdk") sc <- spark_connect(master = "local", spark_home = "/Users/hui/spark/spark-3.1.2-bin-hadoop3.2") ``` ```r sc <- spark_connect(method = "databricks") ``` # Sample Dataset To simplify the learning process, let us use the well-known `iris` data. It is part of the `dplyr` package, and let's load the package and the `iris` data frame. Here the `iris` data frame is still in the local node that the R notebook is running on. And we can see that the first a few lines of the `iris` below the code after running: ```{r, message = FALSE} library(dplyr) head(iris) ``` # IMPORTANT - Copy Data to Spark Environment In real big data applications, the data sets are usually too big to fit into one hard disk. It is very likely the data sets are already in the form of Spark data frames. However, for a brand Spark system we just created, there is no Spark data frame yet. For illustration purposes, we will first copy the `iris` R data frame to Spark using the `sdf_copy_to()` function as described below: ```{r} iris_tbl <- sdf_copy_to(sc = sc, x = iris, overwrite = T) ``` The `sdf_copy_to()` function will return an R object `iris_tbl` referring to the newly created Spark data frame `iris.` Later in the notebook, we can use `iris_tbl` to refer to the `iris` Spark data frame. To list the existing Spark data frames, we can use `src_tbls( )` function to the Spark Connection (`sc`): ```{r} src_tbls(sc) ``` # Analyzing the Data With `sparklyr` packages, we can use many functions in `dplyr` to Spark data frame `iris` directly through R object `iris_tbl`, same as we are applying the same `dplyr` functions to a local R data frame. For example, we can use `%>%` operator to pass `iris_tbl` to `count( )` function: ```{r} iris_tbl %>% count ``` ```{r} head(iris_tbl) ``` Or more advanced data operation directly to `iris_tbl`: ```{r} iris_tbl %>% mutate(Sepal_Width = ceiling(Sepal_Width*2)/2) %>% group_by(Species, Sepal_Width) %>% summarize(count = n(), avg_Sepal_Length = mean(Sepal_Length), std_Sepal_Length = sd(Sepal_Length)) ``` # Collect Results Back to R Local Node We can run many `dplyr` functions to Spark data frames. However, we can NOT apply functions from other packages to Spark data frames using `spark_connection()`. For functions that can only work on local R data frames, we have to copy the processed Spark data frame back to the local node, which can be done using the `collect()` function. The following code collects the data manipulation results of a Spark data frame and assigns it to the `iris_summary` variable. ```{r} iris_summary <- iris_tbl %>% mutate(Sepal_Width = ceiling(Sepal_Width*2)/2) %>% group_by(Species, Sepal_Width) %>% summarize(count = n(), avg_Sepal_Length = mean(Sepal_Length), std_Sepal_Length = sd(Sepal_Length)) %>% collect() iris_summary ``` Here `iris_summary` is a local variable to the R notebook, and we can apply any R packages and functions to it. For example, we can use `ggplot()` function: ```{r} library(ggplot2) ggplot(iris_summary, aes(Sepal_Width, avg_Sepal_Length, color = Species)) + geom_line(size = 1.2) + geom_errorbar(aes(ymin = avg_Sepal_Length - std_Sepal_Length, ymax = avg_Sepal_Length + std_Sepal_Length), width = 0.05) + geom_text(aes(label = count), vjust = -0.2, hjust = 1.2, color = "black") + theme(legend.position="top") ``` # Fit A Regression Model to Spark Data Frame There are many existing Spark version statistical and machine learning algorithms to leverage parallel computing on distributed data. We can easily fit a linear regression on a big dataset far beyond one single computer's memory limit. ```{r} fit1 <- ml_linear_regression(x = iris_tbl, response = "Sepal_Length", features = c("Sepal_Width", "Petal_Length", "Petal_Width")) summary(fit1) ``` # Fit Other Models Through `sparkly` package, we can call Spark machine learning library (MLlib) to fit many models, such as linear regression, logistic regression, Survival Regression, Generalized Linear Regression, Decision Trees, Random Forests, Gradient-Boosted Trees, Principal Components Analysis, Naive-Bayes, K-Means Clustering, and a few other methods. The following codes fit a k-means cluster algorithm: ```{r} fit2 <- ml_kmeans(x = iris_tbl, k = 3, features = c("Petal_Length", "Petal_Width")) print(fit2) ``` After fitting the model, we can then apply the model to predict other datasets through the `ml_predict()` function. The code below uses the fitted model to `iris_tbl` again to predict each row's cluster. The predictions are collected back to a local variable `prediction` through the `collect()` function: ```{r} prediction = collect(ml_predict(fit2, iris_tbl)) head(prediction) ``` As variable `prediction` is a local R variable, we can apply any R functions from any packages. For example: ```{r} prediction %>% ggplot(aes(Petal_Length, Petal_Width)) + geom_point(aes(Petal_Width, Petal_Length, col = factor(prediction + 1)), size = 2, alpha = 0.5) + geom_point(data = fit2$centers, aes(Petal_Width, Petal_Length), col = scales::muted(c("red", "green", "blue")), pch = 'x', size = 12) + scale_color_discrete(name = "Predicted Cluster", labels = paste("Cluster", 1:3)) + labs( x = "Petal Width", y = "Petal Length", title = "K-Means Clustering", subtitle = "Use Spark.ML to predict cluster membership with the iris dataset." ) ``` # Summary In this notebook, we illustrated: - The relationship between a local R node and the Spark environment. - How to copy a local data frame to a Spark data frame (please note if the data is already in Spark environment, there - is no need to copy. This is likely to be the case for real applications.). - How to perform data operation to Spark data frame through `dplyr` functions using `sparklyr` package. - How to fit statistical and machine learning models to Spark data frame. - How to collect information from processed Spark data frame back to a local R data frame for future analysis.