{ "cells": [ { "cell_type": "code", "execution_count": 1, "id": "25e5ea3d-6dde-472b-b24f-0129c7ab3109", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Hello, Hadoop!\n", "Testing Scala with Spark\n" ] }, { "data": { "text/plain": [ "filePath = hdfs:///share/test_scala\n", "data = hdfs:///share/test_scala MapPartitionsRDD[3] at textFile at :31\n" ] }, "metadata": {}, "output_type": "display_data", "source": "user" }, { "data": { "text/plain": [ "hdfs:///share/test_scala MapPartitionsRDD[3] at textFile at :31" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "// Define the HDFS file path\n", "val filePath = \"hdfs:///share/test_scala\"\n", "\n", "// Write data to HDFS\n", "sc.parallelize(Seq(\"Hello, Hadoop!\", \"Testing Scala with Spark\")).saveAsTextFile(filePath)\n", "\n", "// Read back the data\n", "val data = sc.textFile(filePath)\n", "data.collect().foreach(println)" ] }, { "cell_type": "code", "execution_count": 2, "id": "5bd3b56b-1c68-4616-b607-ecb3da0114cf", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "(Spark,1)\n", "(with,1)\n", "(Hello,,1)\n", "(Scala,1)\n", "(Hadoop!,1)\n", "(Testing,1)\n" ] }, { "data": { "text/plain": [ "wordCounts = ShuffledRDD[8] at reduceByKey at :29\n" ] }, "metadata": {}, "output_type": "display_data", "source": "user" }, { "data": { "text/plain": [ "ShuffledRDD[8] at reduceByKey at :29" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "// Load the file and perform word count\n", "val wordCounts = sc.textFile(filePath)\n", " .flatMap(line => line.split(\" \"))\n", " .map(word => (word, 1))\n", " .reduceByKey(_ + _)\n", "\n", "// Display the word counts\n", "wordCounts.collect().foreach(println)" ] }, { "cell_type": "code", "execution_count": null, "id": "d2d63bdb-dad8-448b-9c06-5d0ca6c7216b", "metadata": {}, "outputs": [], "source": [ "import org.apache.hadoop.conf.Configuration\n", "import org.apache.hadoop.fs.{FileSystem, Path}\n", "import java.io.OutputStreamWriter\n", "import java.nio.charset.StandardCharsets\n", "\n", "// Set up HDFS configuration\n", "val hdfsConfig = new Configuration()\n", "\n", "// Define HDFS path for the new file\n", "val hdfsPath = new Path(\"/share/test_data.csv\")\n", "\n", "// Generate sample data (in-memory)\n", "val testData = List(\n", " \"id,name,age\",\n", " \"1,John,23\",\n", " \"2,Jane,34\",\n", " \"3,Sam,29\",\n", " \"4,Lucy,45\"\n", ")\n", "\n", "// Initialize HDFS FileSystem\n", "val fs = FileSystem.get(hdfsConfig)\n", "\n", "// Write data to HDFS\n", "val outputStream = new OutputStreamWriter(fs.create(hdfsPath, true), StandardCharsets.UTF_8)\n", "try {\n", " testData.foreach(line => {\n", " outputStream.write(line + \"\\n\") // Write each line to the file\n", " })\n", " println(\"Data successfully written to HDFS at /share/test_data.csv\")\n", "} finally {\n", " outputStream.close() // Ensure the stream is closed after writing\n", "}\n", "\n", "// Confirm the file creation\n", "if (fs.exists(hdfsPath)) {\n", " println(\"File successfully created in HDFS.\")\n", "} else {\n", " println(\"File creation failed.\")\n", "}\n", "\n", "// Close the HDFS FileSystem connection\n", "fs.close()" ] } ], "metadata": { "kernelspec": { "display_name": "Apache Toree - Scala", "language": "scala", "name": "apache_toree_scala" }, "language_info": { "codemirror_mode": "text/x-scala", "file_extension": ".scala", "mimetype": "text/x-scala", "name": "scala", "pygments_lexer": "scala", "version": "2.12.15" } }, "nbformat": 4, "nbformat_minor": 5 }