{ "cells": [ { "cell_type": "code", "execution_count": 1, "id": "6b1989cd-f596-4ecc-ab67-d501fac6cc1a", "metadata": { "editable": true, "slideshow": { "slide_type": "" }, "tags": [] }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "\n", "Attaching package: ‘sparklyr’\n", "\n", "\n", "The following object is masked from ‘package:stats’:\n", "\n", " filter\n", "\n", "\n", "\n", "Attaching package: ‘dplyr’\n", "\n", "\n", "The following objects are masked from ‘package:stats’:\n", "\n", " filter, lag\n", "\n", "\n", "The following objects are masked from ‘package:base’:\n", "\n", " intersect, setdiff, setequal, union\n", "\n", "\n" ] } ], "source": [ "# Load necessary libraries\n", "library(sparklyr)\n", "library(dplyr)\n", "library(ggplot2)" ] }, { "cell_type": "code", "execution_count": 2, "id": "633650d2-b9b9-4000-b02c-9521e7f574d8", "metadata": {}, "outputs": [], "source": [ "# Generate a sample data frame with an 'id' and 'value' column\n", "test_data <- data.frame(\n", " id = 1:100,\n", " value = round(runif(100, min = 0, max = 10), 2) # 100 random float values between 0 and 10\n", ")\n", "\n", "# Copy data to Spark (using the existing Spark connection)\n", "sc <- spark_connect(master = \"yarn\") # Adjust with your Spark setup\n", "spark_df <- sdf_copy_to(sc, test_data, overwrite = TRUE)" ] }, { "cell_type": "code", "execution_count": 3, "id": "704f41a1-fd60-4109-8d97-9e81d0e528cf", "metadata": { "editable": true, "slideshow": { "slide_type": "" }, "tags": [] }, "outputs": [ { "ename": "ERROR", "evalue": "Error in UseMethod(\"spark_write_parquet\"): no applicable method for 'spark_write_parquet' applied to an object of class \"data.frame\"\n", "output_type": "error", "traceback": [ "Error in UseMethod(\"spark_write_parquet\"): no applicable method for 'spark_write_parquet' applied to an object of class \"data.frame\"\nTraceback:\n", "1. .handleSimpleError(function (cnd) \n . {\n . watcher$capture_plot_and_output()\n . cnd <- sanitize_call(cnd)\n . watcher$push(cnd)\n . switch(on_error, continue = invokeRestart(\"eval_continue\"), \n . stop = invokeRestart(\"eval_stop\"), error = invokeRestart(\"eval_error\", \n . cnd))\n . }, \"no applicable method for 'spark_write_parquet' applied to an object of class \\\"data.frame\\\"\", \n . base::quote(UseMethod(\"spark_write_parquet\")))" ] } ], "source": [ "# Write data to HDFS (change the path as needed)\n", "hdfs_path <- \"hdfs:///share/test_data\"\n", "spark_write_parquet(test_data, path = hdfs_path, mode = \"overwrite\")\n", "\n", "# Read data back from HDFS\n", "data_from_hdfs <- spark_read_parquet(sc, name = \"data_from_hdfs\", path = hdfs_path)\n", "\n", "# Show a sample of the data\n", "print(head(data_from_hdfs))\n", "\n", "# Collect data for plotting\n", "data_for_plot <- data_from_hdfs %>% collect()\n", "\n", "# Plotting the data\n", "ggplot(data_for_plot, aes(x = id, y = value)) +\n", " geom_point(color = \"blue\", alpha = 0.5) +\n", " labs(title = \"Random Test Data from HDFS\", x = \"ID\", y = \"Value\")" ] }, { "cell_type": "code", "execution_count": null, "id": "8656534b-ad8a-404b-8fc9-d35b42347dfb", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "R", "language": "R", "name": "ir" }, "language_info": { "codemirror_mode": "r", "file_extension": ".r", "mimetype": "text/x-r-source", "name": "R", "pygments_lexer": "r", "version": "4.4.2" } }, "nbformat": 4, "nbformat_minor": 5 }