{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "title: \"Elasticsearch Scala Example\"\n", "date: 2021-02-24\n", "type: technical_note\n", "draft: false\n", "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# ElasticSearch" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this example notebook we show how to write/read data to/from Elasticsearch using spark. We use the dataset from [American Kennel Club dog breed data](https://data.world/len/dog-canine-breed-size-akc/workspace/file?filename=AKC+Breed+Info.csv)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Setup" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "1. Dowload the dataset\n", "```\n", "wget -O akc_breed_info.csv https://query.data.world/s/msmjhcmdjslsvjzcaqmtreu52gkuno\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "2. Upload akc_breed_info.csv to your resources dataset " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Writing to Elasticsearch" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "import io.hops.util.Hops\n", "df: org.apache.spark.sql.DataFrame = [Breed: string, height_low_inches: string ... 3 more fields]\n" ] } ], "source": [ "import io.hops.util.Hops\n", "\n", "val df = spark.read.option(\"header\",\"true\").csv(\"hdfs:///Projects/\"+ Hops.getProjectName() +\"/Resources/akc_breed_info.csv\")\n", "\n", "(df.write\n", " .format(\"org.elasticsearch.spark.sql\")\n", " .options(Hops.getElasticConfiguration(\"newindex\"))\n", " .mode(\"Overwrite\")\n", " .save())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Reading from Elasticsearch" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "reader: org.apache.spark.sql.DataFrameReader = org.apache.spark.sql.DataFrameReader@416ac438\n", "df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Breed: string, height_high_inches: string ... 3 more fields]\n", "+--------------------+------------------+-----------------+---------------+--------------+\n", "| Breed|height_high_inches|height_low_inches|weight_high_lbs|weight_low_lbs|\n", "+--------------------+------------------+-----------------+---------------+--------------+\n", "| Affenpinscher| 12| 9| 12| 8|\n", "| Afghan Hound| 27| 25| 60| 50|\n", "| Airdale Terrier| 24| 22| 45| 45|\n", "| Akita| 28| 26| 120| 80|\n", "| Alaskan Malamute| na| na| na| na|\n", "| American Eskimo| 19| 9| 30| 25|\n", "| American Foxhound| 25| 22| 70| 65|\n", "|American Stafford...| 19| 17| 50| 40|\n", "|American Water Sp...| 18| 15| 45| 25|\n", "| Anatolian Sheepdog| 29| 27| 150| 100|\n", "|Australian Cattle...| 20| 17| 45| 35|\n", "| Australian Shepherd| 23| 18| 60| 40|\n", "| Australian Terrier| 10| 10| 14| 10|\n", "| Basenji| 17| 17| 22| 20|\n", "| Basset Hound| 14| 14| 50| 40|\n", "| Beagle| 16| 13| 30| 18|\n", "| Bearded Collie| 22| 20| 60| 40|\n", "| Beauceron| 27| 24| 120| 100|\n", "| Bedlington Terrier| 16| 15| 23| 18|\n", "| Belgian Malinois| 26| 22| 65| 60|\n", "+--------------------+------------------+-----------------+---------------+--------------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "val reader = spark.read.format(\"org.elasticsearch.spark.sql\").options(Hops.getElasticConfiguration(\"newindex\"))\n", "val df = reader.load().na.drop.orderBy($\"breed\")\n", "df.show()" ] } ], "metadata": { "kernelspec": { "display_name": "Spark", "language": "scala", "name": "sparkkernel" }, "language_info": { "codemirror_mode": "text/x-scala", "mimetype": "text/x-scala", "name": "scala", "pygments_lexer": "scala" } }, "nbformat": 4, "nbformat_minor": 4 }