{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Ngrams with pyspark\n", "\n", "### Create a Spark context\n", "\n", "A Spark context (or a session, that encapsulates a context) is the entry gate for Spark. \n", "It represents the Spark engine (whether on the local machine or on a cluster) and provides an API for creating and running data pipelines.\n", "\n", "In this example, we're going to load a text file into a RDD, split the text into ngrams, and count the frequency of ngrams." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "from pyspark import SparkContext\n", "from operator import add" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "filename = \"wiki429MB\"\n", "\n", "sc = SparkContext(\n", " appName = \"Ngrams with pyspark \" + filename\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### View Spark context" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "
\n", "

SparkContext

\n", "\n", "

Spark UI

\n", "\n", "
\n", "
Version
\n", "
v2.4.0-cdh6.3.2
\n", "
Master
\n", "
yarn
\n", "
AppName
\n", "
Ngrams with pyspark wiki429MB
\n", "
\n", "
\n", " " ], "text/plain": [ "" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "sc" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Check that data is there\n", "\n", "We are going to use the file `/data/wiki429MB` that has been been previously uploaded to HDFS. The file has size $429$MB.\n", "\n", "**Note:** the file contains one whole document per line." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "put: `/data/wiki429MB': File exists\n" ] } ], "source": [ "!hdfs dfs -put wiki429MB /data/wiki429MB" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "-rw-r--r-- 3 datalab hdfs 428.8 M 2020-02-14 08:54 /data/wiki429MB\n" ] } ], "source": [ "!hdfs dfs -ls -h /data/wiki429MB" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create RDD from file\n", "\n", "The second parameter ($80$) indicates the desired number of partitions." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "textFile is of type: \n", "Number of partitions: 80\n" ] } ], "source": [ "textFile = sc.textFile(\"/data/wiki429MB\", 80)\n", "print(\"textFile is of type: {}\\nNumber of partitions: {}\". \\\n", " format(type(textFile), textFile.getNumPartitions()))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Generate trigrams" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "n = 3\n", "ngrams = textFile \\\n", " .flatMap(lambda x: [x.split()]) \\\n", " .flatMap(lambda x: [tuple(y) for y in zip(*[x[i:] for i in range(n)])]) \\\n", " .map(lambda x: (x, 1)) \\\n", " .reduceByKey(add) \\\n", " .sortBy(lambda x: x[1], ascending=False)" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "pyspark.rdd.PipelinedRDD" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "type(ngrams)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Note:** up to now we've just carried out a series of _transformations_. Spark hasn't jet done any computation. It's by applying the _action_ `take` that we first act on the data to get a result." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "one of the 27795\n", "as well as 25145\n", "part of the 17984\n", "the United States 17224\n", "such as the 13886\n", "the end of 13878\n", "a number of 12986\n", "in the United 11760\n", "known as the 10172\n", "end of the 9842\n" ] } ], "source": [ "for (ngram, count) in ngrams.take(10):\n", " print(\"{:<20}{:>d}\".format(' '.join(ngram), count))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Stop context\n", "We're going to create a new context. In order to do that, we first need to stop the current Spark context to free resources." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "sc.stop()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Transformations and actions seen so far\n", "\n", "**Transformations**\n", "- `map`\n", "- `flatMap`\n", "- `filter`\n", "- `reduceByKey`\n", "- `sortBy`\n", "\n", "**Actions**\n", "- `take`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create new context" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "sc = SparkContext(\n", " appName = \"Remove newlines\"\n", ")" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "sc._jsc.hadoopConfiguration().set('textinputformat.record.delimiter', '\\n')" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "-rw-r--r-- 3 groda supergroup 1.3 G 2020-02-18 15:19 GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fna\n" ] } ], "source": [ "!hdfs dfs -ls -h GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fna" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "rmr: DEPRECATED: Please use '-rm -r' instead.\n", "20/02/19 23:37:53 INFO fs.TrashPolicyDefault: Moved: 'hdfs://nameservice1/user/groda/GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fnaNN' to trash at: hdfs://nameservice1/user/groda/.Trash/Current/user/groda/GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fnaNN\n" ] } ], "source": [ "!hadoop fs -rmr GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fnaNN" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['>QVRK01000602.1 Homo sapiens isolate HG02106 chromosome 1 1-100500000:0, whole genome shotgun sequence',\n", " 'CCCCAGCCACCCTTgcttccctgccccagccttccatcTCATCTCTCTTGCTTCCATCTCTGGCTTTTCCACTCCAGCCA']" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "genomeFile = sc.textFile(\"GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fna\", minPartitions=12) \n", "genomeFile.take(2)[:140]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This passage is to remove newlines and use comment lines (beginning with \">\") as block delimiters. The new file is saved in `GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fnaNN`." ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "import re\n", "genomeFile \\\n", " .map(lambda x: re.sub('^>.*', '---', x)) \\\n", " .map(lambda x: x.upper()) \\\n", " .map(lambda x: re.sub('^$', '\\n', x)) \\\n", " .saveAsTextFile(\"GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fnaNN\")" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "sc.stop()\n", "sc = SparkContext(\n", " appName = \"Genome\"\n", ")\n", "sc._jsc.hadoopConfiguration().set('textinputformat.record.delimiter', '---')" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [], "source": [ "genomeFile = sc.textFile(\"GCA_003711455.1_HG02106_EEE_SV-Pop.1_genomic.fnaNN\", minPartitions=12)" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [], "source": [ "n = 3\n", "ngrams = genomeFile \\\n", " .map(lambda x: re.sub('\\n', '', x)) \\\n", " .flatMap(lambda x: x.split()) \\\n", " .flatMap(lambda x: [tuple(y) for y in zip(*[x[i:] for i in range(n)])]) \\\n", " .map(lambda x: (x, 1)) \\\n", " .reduceByKey(add) \\\n", " .sortBy(lambda x: x[1], ascending=False)" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "T T T 48189867\n", "A A A 48113618\n", "A A T 30902280\n", "A T T 30887259\n", "C A G 30608314\n", "C T G 30478385\n", "A G A 29926360\n", "T C T 29887506\n", "A C A 27652593\n", "T G T 27622134\n" ] } ], "source": [ "for (ngram, count) in ngrams.take(10):\n", " print(\"{:<20}{:>d}\".format(' '.join(ngram), count))" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [], "source": [ "sc.stop()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.6" } }, "nbformat": 4, "nbformat_minor": 2 }