{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Big Data in Little Laptop with Toolz\n", "\n", "> GRACIE: A knife? The guy's twelve feet tall!
\n", "> JACK: Seven. Hey, don't worry, I think I can handle him.\n", ">\n", "> — Jack Burton, *Big Trouble in Little China*\n", "\n", "Streaming is not a SciPy feature per se, but rather an approach that\n", "allows us to efficiently process large datasets, like those often\n", "seen in science. The Python language contains some useful primitives\n", "for streaming data processing, and these can be combined with Matt Rocklin's\n", "Toolz library to generate elegant, concise code that is extremely\n", "memory-efficient. In this chapter, we will show you how to apply these\n", "streaming concepts to enable you to handle much larger datasets than can fit\n", "in your computer's RAM.\n", "\n", "You have probably already done some streaming, perhaps without thinking about it in these terms.\n", "The simplest form is probably iterating through lines in a files, processing each line without ever reading the entire file into memory.\n", "For example a loop like this to calculate the mean of each row and sum them:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "with open('data/expr.tsv') as f:\n", " sum_of_means = 0\n", " for line in f:\n", " sum_of_means += np.mean(np.fromstring(line, dtype=int, sep='\\t'))\n", "print(sum_of_means)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This strategy works really well for cases where your problem can be neatly solved with by-row processing.\n", "But things can quickly get out of hand when your code becomes more sophisticated.\n", "\n", "In streaming programs, a function processes *some* of the input data, returns the\n", "processed chunk, then, while downstream functions are dealing with that chunk,\n", "the function receives a bit more, and so on... All these things are going on\n", "at the same time! How can one keep them straight?\n", "\n", "We too found this difficult, until we discovered the `toolz` library.\n", "Its constructs make streaming programs so elegant to write that\n", "it was impossible to contemplate writing this book without including a chapter\n", "about it.\n", "\n", "Let us clarify what we mean by \"streaming\" and why you might want to do it.\n", "Suppose you have some data in a text file, and you want to compute the column-wise average of $\\log(x+1)$ of the values.\n", "The common way to do this would be to use NumPy to load the values, compute the log function for all values in the full matrix, and then take the mean over the 1st axis:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "expr = np.loadtxt('data/expr.tsv')\n", "logexpr = np.log(expr + 1)\n", "np.mean(logexpr, axis=0)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This works, and it follows a reassuringly familiar input-output model of computation.\n", "But it's a pretty inefficient way to go about it!\n", "We load the full matrix into memory (1), then make a copy with 1 added to each value (2), then make another copy to compute the log (3), before finally passing it on to `np.mean`.\n", "That's three instances of the data array, to perform an operation that doesn't require keeping even *one* instance in memory.\n", "For any kind of \"big data\" operation, this approach won't work.\n", "\n", "Python's creators knew this, and created the \"yield\" keyword, which enables a function to process just one \"sip\" of the data, pass the result on to the next process, and *let the chain of processing complete* for that one piece of data before moving on to the next one.\n", "\"Yield\" is a rather nice name for it: the function *yields* control to the next function, waiting to resume processing the data until all the downstream steps have processed that data point.\n", "\n", "## Streaming with `yield`\n", "\n", "The flow of control described above can be rather hard to follow.\n", "An awesome feature of Python is that it abstracts this complexity away, allowing you to focus on the analysis functionality.\n", "Here's one way to think about it: for every processing function that would normally take a list (a collection of data) and transform that list, you can rewrite that function as taking a *stream* and *yielding* the result of every element of that stream.\n", "\n", "Here's an example where we take the log of each element in a list, using either a standard data-copying method or a streaming method:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def log_all_standard(input):\n", " output = []\n", " for elem in input:\n", " output.append(np.log(elem))\n", " return output\n", "\n", "def log_all_streaming(input_stream):\n", " for elem in input_stream:\n", " yield np.log(elem)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's check that we get the same result with both methods:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# We set the random seed so we will get consistent results\n", "np.random.seed(seed=7)\n", "\n", "arr = np.random.rand(1000) + 0.5\n", "result_batch = sum(log_all_standard(arr))\n", "print('Batch result: ', result_batch)\n", "result_stream = sum(log_all_streaming(arr))\n", "print('Stream result: ', result_stream)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The advantage of the streaming approach is that elements of a stream aren't processed until they're needed, whether it's for computing a running sum, or for writing out to disk, or something else.\n", "This can conserve a lot of memory when you have many input items, or when each item is very big.\n", "(Or both!)\n", "This quote from one of Matt's blog posts very succinctly summarizes the utility of streaming data analysis:\n", "\n", "> In my brief experience people rarely take this [streaming] route.\n", "They use single-threaded in-memory Python until it breaks, and then seek out Big Data Infrastructure like Hadoop/Spark at relatively high productivity overhead.\n", "\n", "Indeed, this describes our computational careers perfectly.\n", "But the intermediate approach can get you a *lot* farther than you think.\n", "In some cases, it can get you there even faster than the supercomputing approach, by eliminating the overhead of multi-core communication and random-access to databases.\n", "(For example, see [this post](http://www.frankmcsherry.org/graph/scalability/cost/2015/02/04/COST2.html) by Frank McSherry, where he processes a 128 billion edge graph on his laptop *faster* than using a graph database on a supercomputer.)\n", "\n", "To clarify the flow of control when using streaming-style functions, it's useful to make *verbose* versions of the functions, which print out a message with each operation." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "\n", "def tsv_line_to_array(line):\n", " lst = [float(elem) for elem in line.rstrip().split('\\t')]\n", " return np.array(lst)\n", "\n", "def readtsv(filename):\n", " print('starting readtsv')\n", " with open(filename) as fin:\n", " for i, line in enumerate(fin):\n", " print(f'reading line {i}')\n", " yield tsv_line_to_array(line)\n", " print('finished readtsv')\n", "\n", "def add1(arrays_iter):\n", " print('starting adding 1')\n", " for i, arr in enumerate(arrays_iter):\n", " print(f'adding 1 to line {i}')\n", " yield arr + 1\n", " print('finished adding 1')\n", "\n", "def log(arrays_iter):\n", " print('starting log')\n", " for i, arr in enumerate(arrays_iter):\n", " print(f'taking log of array {i}')\n", " yield np.log(arr)\n", " print('finished log')\n", "\n", "def running_mean(arrays_iter):\n", " print('starting running mean')\n", " for i, arr in enumerate(arrays_iter):\n", " if i == 0:\n", " mean = arr\n", " mean += (arr - mean) / (i + 1)\n", " print(f'adding line {i} to the running mean')\n", " print('returning mean')\n", " return mean" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's see it in action for a small sample file:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fin = 'data/expr.tsv'\n", "print('Creating lines iterator')\n", "lines = readtsv(fin)\n", "print('Creating loglines iterator')\n", "loglines = log(add1(lines))\n", "print('Computing mean')\n", "mean = running_mean(loglines)\n", "print(f'the mean log-row is: {mean}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note:\n", "\n", "- None of the computation is run when creating the lines and loglines iterators. This is because iterators are *lazy*, meaning they are not evaluated (or *consumed*) until a result is needed.\n", "- When the computation is finally triggered, by the call to `running_mean`, it jumps back and forth between all the functions, as various computations are performed on each line, before moving on to the next line.\n", "\n", "## Introducing the Toolz streaming library\n", "\n", "In this chapter's code example, contributed by Matt Rocklin, we create a Markov model from an entire fly genome in under 5 minutes on a laptop, using just a few lines of code.\n", "(We have slightly edited it for easier downstream processing.)\n", "Matt's example uses a human genome, but apparently our laptops weren't quite so fast, so we're going to use a fly genome instead (it's about 1/20 the size).\n", "Over the course of the chapter we'll actually augment it a little bit to start from compressed data (who wants to keep an uncompressed dataset on their hard drive?).\n", "This modification is almost *trivial*, which speaks to the elegance of his example." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import toolz as tz\n", "from toolz import curried as c\n", "from glob import glob\n", "import itertools as it\n", "\n", "LDICT = dict(zip('ACGTacgt', range(8)))\n", "PDICT = {(a, b): (LDICT[a], LDICT[b])\n", " for a, b in it.product(LDICT, LDICT)}\n", "\n", "def is_sequence(line):\n", " return not line.startswith('>')\n", "\n", "def is_nucleotide(letter):\n", " return letter in LDICT # ignore 'N'\n", "\n", "@tz.curry\n", "def increment_model(model, index):\n", " model[index] += 1\n", "\n", "\n", "def genome(file_pattern):\n", " \"\"\"Stream a genome, letter by letter, from a list of FASTA filenames.\"\"\"\n", " return tz.pipe(file_pattern, glob, sorted, # Filenames\n", " c.map(open), # lines\n", " # concatenate lines from all files:\n", " tz.concat,\n", " # drop header from each sequence\n", " c.filter(is_sequence),\n", " # concatenate characters from all lines\n", " tz.concat,\n", " # discard newlines and 'N'\n", " c.filter(is_nucleotide))\n", "\n", "\n", "def markov(seq):\n", " \"\"\"Get a 1st-order Markov model from a sequence of nucleotides.\"\"\"\n", " model = np.zeros((8, 8))\n", " tz.last(tz.pipe(seq,\n", " c.sliding_window(2), # each successive tuple\n", " c.map(PDICT.__getitem__), # location in matrix of tuple\n", " c.map(increment_model(model)))) # increment matrix\n", " # convert counts to transition probability matrix\n", " model /= np.sum(model, axis=1)[:, np.newaxis]\n", " return model" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can then do the following to obtain a Markov model of repetitive sequences\n", "in the fruit-fly genome:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%timeit -r 1 -n 1\n", "dm = 'data/dm6.fa'\n", "model = tz.pipe(dm, genome, c.take(10**7), markov)\n", "# we use `take` to just run on the first 10 million bases, to speed things up.\n", "# the take step can just be removed if you have ~5-10 mins to wait." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "There's a *lot* going on in that example, so we are going to unpack it little by little.\n", "We'll actually run the example at the end of the chapter.\n", "\n", "The first thing to note is how many functions come from the [Toolz library](http://toolz.readthedocs.org/en/latest/).\n", "For example from Toolz we've used, `pipe`, `sliding_window`, `frequencies`, and a curried version of `map` (more on this later).\n", "That's because Toolz is written specifically to take advantage of Python's iterators, and easily manipulate streams.\n", "\n", "Let's start with `pipe`.\n", "This function is simply syntactic sugar to make nested function calls easier to read.\n", "This is important because that pattern becomes increasingly common when dealing with iterators.\n", "\n", "As a simple example, let's rewrite our running mean using `pipe`:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import toolz as tz\n", "filename = 'data/expr.tsv'\n", "mean = tz.pipe(filename, readtsv, add1, log, running_mean)\n", "\n", "# This is equivalent to nesting the functions like this:\n", "# running_mean(log(add1(readtsv(filename))))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "What was originally multiple lines, or an unwieldy mess of parentheses, is now a clean description of the sequential transformations of the input data.\n", "Much easier to understand!\n", "\n", "This strategy also has an advantage over the original NumPy implementation: if we scale our data to millions or billions of rows, our computer might struggle to hold all the data in memory.\n", "In contrast, here we are only loading lines from disk one at a time, and maintaining only a single line's worth of data.\n", "\n", "## k-mer counting and error correction\n", "\n", "You might want to review chapters 1 and 2 for information about DNA and genomics.\n", "Briefly, your genetic information, the blueprint for making *you*, is encoded as a sequence of chemical *bases* in your *genome*.\n", "These are really, really tiny, so you can't just look in a microscope and read them.\n", "You also can't read a long string of them: errors accumulate and the readout becomes unusable.\n", "(New technology is changing this, but here we will focus on short-read sequencing data, the most common today.)\n", "Luckily, every one of your cells has an identical copy of your genome, so what we can do is shred those copies into tiny segments (about 100 bases long), and then assemble those like an enormous puzzle of 30 million pieces.\n", "\n", "Before performing assembly, it is vital to perform read correction.\n", "During DNA sequencing some bases are incorrectly read out, and must be fixed, or they will mess up the assembly.\n", "(Imagine having puzzle pieces with the wrong shape.)\n", "\n", "One correction strategy is to find similar reads in your dataset and fix the error by grabbing the correct information from those reads. Or alternatively, you may choose to completely discard those reads containing errors.\n", "\n", "However, this is a very inefficient way to do it, because finding similar reads means you would compare each read to every other read.\n", "This takes $N^2$ operations, or $9 \\times 10^{14}$ for a 30 million read dataset!\n", "(And these are not cheap operations.)\n", "\n", "There is another way.\n", "[Pavel Pevzner and others](http://www.pnas.org/content/98/17/9748.full) realized that reads could be broken down into smaller, overlapping *k-mers*, substrings of length k, which can then be stored in a hash table (a dictionary, in Python).\n", "This has tons of advantages, but the main one is that instead of computing on the total number of reads, which can be arbitrarily large, we can compute on the total number of k-mers, which can only be as large as the genome itself — usually 1-2 orders of magnitude smaller than the reads.\n", "\n", "If we choose a value for k that is large enough to ensure any k-mer appears only once in the genome, the number of times a k-mer appears is exactly the number of reads that originate from that part of the genome.\n", "This is called the *coverage* of that region.\n", "\n", "If a read has an error in it, there is a high probability that the k-mers overlapping the error will be unique or close to unique in the genome.\n", "Think of the equivalent in English: if you were to take reads from Shakespeare, and one read was \"to be or nob to be\", the 6-mer \"nob to\" will appear rarely or not at all, whereas \"not to\" will be very frequent.\n", "\n", "This is the basis for k-mer error correction: split the reads into k-mers, count the occurrence of each k-mer, and use some logic to replace rare k-mers in reads with similar common ones.\n", "(Or, alternatively, discard reads with erroneous k-mers.\n", "This is possible because reads are so abundant that we can afford to toss out erroneous data.)\n", "\n", "This is also an example in which streaming is *essential*.\n", "As mentioned before, the number of reads can be enormous, so we don't want to store them in memory.\n", "\n", "DNA sequence data is commonly represented in FASTA format.\n", "This is a plaintext format, consisting of one or many DNA sequences per file, each with a name and the actual sequence.\n", "\n", "A sample FASTA file:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```\n", " > sequence_name1\n", " TCAATCTCTTTTATATTAGATCTCGTTAAAGTAAAATTTTGGTTTGTGTTAAAGTACAAG\n", " GGGTACCTATGACCACGGAACCAACAAAGTGCCTAAATAGGACATCAAGTAACTAGCGGT\n", " ACGT\n", "\n", " > sequence_name2\n", " ATGTCCCAGGCGTTCCTTTTGCATTTGCTTCGCATTAACAGAATATCCAGCGTACTTAGG\n", " ATTGTCGACCTGTCTTGTCGTACGTGGCCGCAACACCAGGTATAGTGCCAATACAAGTCA\n", " GACTAAAACTGGTTC\n", "```\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we have the required information to convert a stream of lines from a FASTA file to a count of k-mers:\n", "\n", "- filter lines so that only sequence lines are used\n", "- for each sequence line, produce a stream of k-mers\n", "- add each k-mer to a dictionary counter\n", "\n", "Here's how you would do this in pure Python, using nothing but built-ins:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def is_sequence(line):\n", " line = line.rstrip() # remove '\\n' at end of line\n", " return len(line) > 0 and not line.startswith('>')\n", "\n", "def reads_to_kmers(reads_iter, k=7):\n", " for read in reads_iter:\n", " for start in range(0, len(read) - k):\n", " yield read[start : start + k] # note yield, so this is a generator\n", "\n", "def kmer_counter(kmer_iter):\n", " counts = {}\n", " for kmer in kmer_iter:\n", " if kmer not in counts:\n", " counts[kmer] = 0\n", " counts[kmer] += 1\n", " return counts\n", "\n", "with open('data/sample.fasta') as fin:\n", " reads = filter(is_sequence, fin)\n", " kmers = reads_to_kmers(reads)\n", " counts = kmer_counter(kmers)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This totally works and is streaming, so reads are loaded from disk one at a time and piped through the k-mer converter and to the k-mer counter.\n", "We can then plot a histogram of the counts, and confirm that there are indeed two well-separated populations of correct and erroneous k-mers:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Make plots appear inline, set custom plotting style\n", "%matplotlib inline\n", "import matplotlib.pyplot as plt\n", "plt.style.use('style/elegant.mplstyle')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def integer_histogram(counts, normed=True, xlim=[], ylim=[],\n", " *args, **kwargs):\n", " hist = np.bincount(counts)\n", " if normed:\n", " hist = hist / np.sum(hist)\n", " fig, ax = plt.subplots()\n", " ax.plot(np.arange(hist.size), hist, *args, **kwargs)\n", " ax.set_xlabel('counts')\n", " ax.set_ylabel('frequency')\n", " ax.set_xlim(*xlim)\n", " ax.set_ylim(*ylim)\n", "\n", "counts_arr = np.fromiter(counts.values(), dtype=int, count=len(counts))\n", "integer_histogram(counts_arr, xlim=(-1, 250))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "Notice the nice distribution of k-mer frequencies, along with a big bump of k-mers (at the left of the plot) that appear only once.\n", "Such low frequency k-mers are likely to be errors.\n", "\n", "But, with the code above, we are actually doing a bit too much work.\n", "A lot of the functionality we wrote in for loops and yields is actually *stream manipulation*: transforming a stream of data into a different kind of data, and accumulating it at the end.\n", "Toolz has a lot of stream manipulation primitives that make it easy to write the above in just one function call; and, once you know the names of the transforming functions, it also becomes easier to visualize what is happening to your data stream at each point.\n", "\n", "For example, the *sliding window* function is exactly what we need to make k-mers:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(tz.sliding_window.__doc__)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Additionally, the *frequencies* function counts the appearance of individual items in a data stream.\n", "Together with pipe, we can now count k-mers in a single function call:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from toolz import curried as c\n", "\n", "k = 7\n", "counts = tz.pipe('data/sample.fasta', open,\n", " c.filter(is_sequence),\n", " c.map(str.rstrip),\n", " c.map(c.sliding_window(k)),\n", " tz.concat, c.map(''.join),\n", " tz.frequencies)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "But, just a minute: what are all those `c.function` calls from `toolz.curried`?\n", "\n", "## Currying: the spice of streaming\n", "\n", "Earlier, we briefly used a *curried* version of the `map` function, which\n", "applies a given function to each element in a sequence. Now that we've mixed a\n", "few more curried calls in there, it's time share with you what it means!\n", "Currying is not named after the spice blend (though it does spice up your code).\n", "It is named for Haskell Curry, the mathematician who invented the concept.\n", "Haskell Curry is also the namesake of the Haskell programming language — in which\n", "*all* functions are curried!\n", "\n", "\"Currying\" means *partially* evaluating a function and returning another, \"smaller\" function.\n", "Normally in Python if you don't give a function all of its required arguments then it will throw a fit.\n", "In contrast, a curried function can just take *some* of those arguments.\n", "If the curried function doesn't get enough arguments, it returns a new function that takes the leftover arguments.\n", "Once that second function is called with the remaining arguments, it can perform the original task.\n", "Another word for currying is partial evaluation.\n", "In functional programming, currying is a way to produce a function that can wait for the rest of the arguments to show up later.\n", "\n", "So, while the function call `map(np.log, numbers_list)` applies the `np.log`\n", "function to all of the numbers in `numbers_list` (returning a sequence of the\n", "logged numbers), the call `toolz.curried.map(np.log)` returns a *function* that\n", "takes in a sequence of numbers and returns a sequence of logged numbers.\n", "\n", "It turns out that having a function that already knows about some of the arguments is perfect for streaming!\n", "We've seen a hint of how powerful currying and pipes can be together in the\n", "above code snippet.\n", "\n", "But currying can be a bit of a mind-bend when you first start, so we'll try it with some simple examples to demonstrate how it works.\n", "Let's start by writing a simple, non-curried function:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def add(a, b):\n", " return a + b\n", "\n", "add(2, 5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we write a similar function which we curry manually:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def add_curried(a, b=None):\n", " if b is None:\n", " # second argument not given, so make a function and return it\n", " def add_partial(b):\n", " return add(a, b)\n", " return add_partial\n", " else:\n", " # Both values were given, so we can just return a value\n", " return add(a, b)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's try out a curried function to make sure it does what we expect." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "add_curried(2, 5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Okay, it acts like a normal function when given both variables.\n", "Now let's leave out the second variable." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "add_curried(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As we expected, it returned a function.\n", "Now let's use that function:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "add2 = add_curried(2)\n", "add2(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, that worked, but `add_curried` was a hard function to read.\n", "Future us will probably have trouble remembering how we wrote that code.\n", "Luckily, Toolz has the, well, tools to help us out." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import toolz as tz\n", "\n", "@tz.curry # Use curry as a decorator\n", "def add(x, y):\n", " return x + y\n", "\n", "add_partial = add(2)\n", "add_partial(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To summarize what we did, `add` is now a curried function, so it can take one of the arguments and returns another function, `add_partial`, which “remembers” that argument.\n", "\n", "In fact, all of the Toolz functions are also available as curried functions in the `toolz.curried` namespace.\n", "Toolz also includes curried version of some handy higher order Python functions like `map`, `filter` and `reduce`.\n", "We will import the `curried` namespace as `c` so our code doesn't get too cluttered.\n", "So for example the curried version of `map` will be `c.map`.\n", "Note, that the curried functions (e.g. `c.map`) are different from the `@curry` decorator, which is used to create a curried function." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from toolz import curried as c\n", "c.map" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As a reminder, `map` is a built-in function.\n", "From the [docs](https://docs.python.org/3.4/library/functions.html#map):\n", "\n", "> map(function, iterable, ...)\n", "> Return an iterator that applies function to every item of iterable, yielding the results.\n", "\n", "A curried version of `map` is particularly handy when working in a Toolz pipe.\n", "You can just pass a function to `c.map` and then stream in the iterator later using `tz.pipe`.\n", "Take another look at our function for reading in the genome to see how this works in practice." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def genome(file_pattern):\n", " \"\"\"Stream a genome, letter by letter, from a list of FASTA filenames.\"\"\"\n", " return tz.pipe(file_pattern, glob, sorted, # Filenames\n", " c.map(open), # lines\n", " # concatenate lines from all files:\n", " tz.concat,\n", " # drop header from each sequence\n", " c.filter(is_sequence),\n", " # concatenate characters from all lines\n", " tz.concat,\n", " # discard newlines and 'N'\n", " c.filter(is_nucleotide))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Back to counting k-mers\n", "\n", "Okay, so now we've got our heads around curried, let's get back to our k-mer counting code.\n", "Here's that code again that used those curried functions:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from toolz import curried as c\n", "\n", "k = 7\n", "counts = tz.pipe('data/sample.fasta', open,\n", " c.filter(is_sequence),\n", " c.map(str.rstrip),\n", " c.map(c.sliding_window(k)),\n", " tz.concat, c.map(''.join),\n", " tz.frequencies)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can now observe the frequency of different k-mers:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "counts = np.fromiter(counts.values(), dtype=int, count=len(counts))\n", "integer_histogram(counts, xlim=(-1, 250), lw=2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "> **Tips for working with streams {.callout}**\n", "> - Convert \"list of list\" to \"long list\" with `tz.concat`\n", "> - Don’t get caught out:\n", "> * Iterators get consumed.\n", "So if you make a generator object and do some processing on it, and then a later step fails, you need to re-create the generator.\n", "The original is already gone.\n", "> * Iterators are lazy. You need to force evaluation sometimes.\n", "> - When you have lots of functions in a pipe, it’s sometimes hard to figure out where things go wrong.\n", " Take a small stream and add functions to your pipe one by one from the first/leftmost until you find the broken one.\n", " You can also insert `map(do(print))` (`map` and `do` are from\n", " `toolz.curried`) at any point in a stream to print each element while it\n", " streams through.\n", "\n", "\n", "\n", "**Exercise:**\n", "The scikit-learn library has an IncrementalPCA class, which allows you to run\n", "principal components analysis on a dataset without loading the full dataset\n", "into memory.\n", "But you need to chunk your data yourself, which makes the code a bit awkward to\n", "use.\n", "Make a function that can take a stream of data samples and perform PCA.\n", "Then, use the function to compute the PCA of the `iris` machine learning\n", "dataset, which is in `data/iris.csv`. (You can also access it from the\n", "`datasets` module of scikit-learn, using `datasets.load_iris()`.) Optionally,\n", "you can color the points with the species number, found in\n", "`data/iris-target.csv`.\n", "\n", "*Hint:* The `IncrementalPCA` class is in `sklearn.decomposition`, and\n", "requires a *batch size* greater than 1 to train the model. Look at the\n", "`toolz.curried.partition` function for how to create a stream of batches from a\n", "stream of data points.\n", "\n", "\n", "\n", "**Solution:**\n", "First, we write the function to train the model. The function should take in a\n", "stream of samples and output a PCA model, which can *transform* new samples by\n", "projecting them from the original n-dimensional space to the principal\n", "component space." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import toolz as tz\n", "from toolz import curried as c\n", "from sklearn import decomposition\n", "from sklearn import datasets\n", "import numpy as np\n", "\n", "def streaming_pca(samples, n_components=2, batch_size=100):\n", " ipca = decomposition.IncrementalPCA(n_components=n_components,\n", " batch_size=batch_size)\n", " tz.pipe(samples, # iterator of 1D arrays\n", " c.partition(batch_size), # iterator of tuples\n", " c.map(np.array), # iterator of 2D arrays\n", " c.map(ipca.partial_fit), # partial_fit on each\n", " tz.last) # Suck the stream of data through the pipeline\n", " return ipca" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, we can use this function to *train* (or *fit*) a PCA model:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "reshape = tz.curry(np.reshape)\n", "\n", "def array_from_txt(line, sep=',', dtype=np.float):\n", " return np.array(line.rstrip().split(sep), dtype=dtype)\n", "\n", "with open('data/iris.csv') as fin:\n", " pca_obj = tz.pipe(fin, c.map(array_from_txt), streaming_pca)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally, we can stream our original samples through the `transform` function of\n", "our model. We stack them together to obtain a `n_samples` by `n_components`\n", "matrix of data:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "with open('data/iris.csv') as fin:\n", " components = tz.pipe(fin,\n", " c.map(array_from_txt),\n", " c.map(reshape(newshape=(1, -1))),\n", " c.map(pca_obj.transform),\n", " np.vstack)\n", "\n", "print(components.shape)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can now plot the components:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "iris_types = np.loadtxt('data/iris-target.csv')\n", "plt.scatter(*components.T, c=iris_types, cmap='viridis');" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "You can verify that this gives (approximately) the same result as a standard\n", "PCA:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "iris = np.loadtxt('data/iris.csv', delimiter=',')\n", "components2 = decomposition.PCA(n_components=2).fit_transform(iris)\n", "plt.scatter(*components2.T, c=iris_types, cmap='viridis');" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "The difference, of course, is that streaming PCA can scale to extremely large\n", "datasets.\n", "\n", "\n", "\n", "\n", "\n", "## Markov model from a full genome\n", "\n", "Back to our original code example.\n", "What is a Markov model, and why is it useful?\n", "\n", "In general, a Markov model assumes that the probability of the system moving to a given state, is only dependent on the state that it was in just previously.\n", "For example if it is sunny right now, there is a high probability that it will be sunny tomorrow.\n", "The fact that it was raining yesterday is irrelevant.\n", "In this theory, all the information required to predict the future is encoded in the current state of things.\n", "The past is irrelevant.\n", "This assumption is useful for simplifying otherwise intractable problems, and\n", "often gives good results.\n", "Markov models are behind much of the signal processing in mobile phone and\n", "satellite communications, for example.\n", "\n", "In the context of genomics, as we will see, different functional regions of a\n", "genome have different *transition probabilities* between similar states.\n", "Observing these in a new genome, we can predict something about the function of\n", "those regions. Going back to the weather analogy, the probability of going from\n", "a sunny day to a rainy day is very different depending on whether you are in\n", "Los Angeles or London. Therefore, if I give you a string of (sunny, sunny,\n", "sunny, rainy, sunny, ...) days, you can predict whether it came from Los\n", "Angeles or London, assuming you have a previously trained model.\n", "\n", "In this chapter, we'll cover just the model building, for now.\n", "\n", "- You can download the *Drosophila melanogaster* (fruit fly) genome file dm6.fa.gz from\n", "http://hgdownload.cse.ucsc.edu/goldenPath/dm6/bigZips/.\n", "You will need to unzip it using: `gzip -d dm6.fa.gz`\n", "\n", "In the genome data, genetic sequence, which consists of the letters A, C, G,\n", "and T, is encoded as belonging to *repetitive elements*, a specific class of\n", "DNA, by whether it is in lower case (repetitive) or upper case\n", "(non-repetitive). We can use this information when we build the Markov model.\n", "\n", "We want to encode the Markov model as a NumPy array, so we will make\n", "dictionaries to index from letters to indices in [0, 7] (`LDICT` for \"letters\n", "dictionary\"), and from pairs of letters to 2D indices in ([0, 7], [0, 7])\n", "(`PDICT` or \"pairs dictionary\"):" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import itertools as it\n", "\n", "LDICT = dict(zip('ACGTacgt', range(8)))\n", "PDICT = {(a, b): (LDICT[a], LDICT[b])\n", " for a, b in it.product(LDICT, LDICT)}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We also want to filter out non-sequence data: the sequence names, which are in\n", "lines starting with `>`, and unknown sequence, which is labeled as `N`, so we\n", "will make functions to filter on:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def is_sequence(line):\n", " return not line.startswith('>')\n", "\n", "def is_nucleotide(letter):\n", " return letter in LDICT # ignore 'N'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally, whenever we get a new nucleotide pair, say, ('A', 'T'), we want to\n", "increment our Markov model (our NumPy matrix) at the corresponding position. We\n", "make a curried function to do so:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import toolz as tz\n", "\n", "@tz.curry\n", "def increment_model(model, index):\n", " model[index] += 1" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can now combine these elements to stream a genome into our NumPy matrix.\n", "Note that, if `seq` below is a stream, we never need to store the whole genome,\n", "or even a big chunk of the genome, in memory!" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from toolz import curried as c\n", "\n", "def markov(seq):\n", " \"\"\"Get a 1st-order Markov model from a sequence of nucleotides.\"\"\"\n", " model = np.zeros((8, 8))\n", " tz.last(tz.pipe(seq,\n", " c.sliding_window(2), # each successive tuple\n", " c.map(PDICT.__getitem__), # location in matrix of tuple\n", " c.map(increment_model(model)))) # increment matrix\n", " # convert counts to transition probability matrix\n", " model /= np.sum(model, axis=1)[:, np.newaxis]\n", " return model" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we simply need to produce that genome stream, and make our Markov model:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from glob import glob\n", "\n", "def genome(file_pattern):\n", " \"\"\"Stream a genome, letter by letter, from a list of FASTA filenames.\"\"\"\n", " return tz.pipe(file_pattern, glob, sorted, # Filenames\n", " c.map(open), # lines\n", " # concatenate lines from all files:\n", " tz.concat,\n", " # drop header from each sequence\n", " c.filter(is_sequence),\n", " # concatenate characters from all lines\n", " tz.concat,\n", " # discard newlines and 'N'\n", " c.filter(is_nucleotide))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's try it out on the Drosophila (fruit fly) genome:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Download dm6.fa.gz from ftp://hgdownload.cse.ucsc.edu/goldenPath/dm6/bigZips/\n", "# Unzip before using: gzip -d dm6.fa.gz\n", "dm = 'data/dm6.fa'\n", "model = tz.pipe(dm, genome, c.take(10**7), markov)\n", "# we use `take` to just run on the first 10 million bases, to speed things up.\n", "# the take step can just be removed if you have ~5-10 mins to wait." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's look at the resulting matrix:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(' ', ' '.join('ACGTacgt'), '\\n')\n", "print(model)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It's probably clearer to look at the result as an image:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def plot_model(model, labels, figure=None):\n", " fig = figure or plt.figure()\n", " ax = fig.add_axes([0.1, 0.1, 0.8, 0.8])\n", " im = ax.imshow(model, cmap='magma');\n", " axcolor = fig.add_axes([0.91, 0.1, 0.02, 0.8])\n", " plt.colorbar(im, cax=axcolor)\n", " for axis in [ax.xaxis, ax.yaxis]:\n", " axis.set_ticks(range(8))\n", " axis.set_ticks_position('none')\n", " axis.set_ticklabels(labels)\n", " return ax\n", "\n", "plot_model(model, labels='ACGTacgt');" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "Notice how the C-A and G-C transitions are different between the repeat and\n", "non-repeat parts of the genome. This information can be used to classify\n", "previously unseen DNA sequence.\n", "\n", "\n", "\n", "**Exercise:** add a step to the start of the pipe to unzip the data so you don't\n", "have to keep a decompressed version on your hard drive. The Drosophila genome,\n", "for example, takes less than a third of the space on disk when compressed with\n", "gzip. And yes, unzipping can be streamed, too!\n", "\n", "*Hint:* The `gzip` package, part of Python's standard library, allows you\n", "to open `.gz` files as if they were normal files.\n", "\n", "\n", "\n", "**Solution:** We can replace `open` in the original `genome` code with a\n", "curried version of `gzip.open`. The default mode of `gzip`'s `open` function is\n", "`rb` (**r**ead **b**ytes), instead of `rt` for Python's built-in `open`\n", "(**r**ead **t**ext), so we have to provide it." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import gzip\n", "\n", "gzopen = tz.curry(gzip.open)\n", "\n", "\n", "def genome_gz(file_pattern):\n", " \"\"\"Stream a genome, letter by letter, from a list of FASTA filenames.\"\"\"\n", " return tz.pipe(file_pattern, glob, sorted, # Filenames\n", " c.map(gzopen(mode='rt')), # lines\n", " # concatenate lines from all files:\n", " tz.concat,\n", " # drop header from each sequence\n", " c.filter(is_sequence),\n", " # concatenate characters from all lines\n", " tz.concat,\n", " # discard newlines and 'N'\n", " c.filter(is_nucleotide))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can try this out with the compressed drosophila genome file:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dm = 'data/dm6.fa.gz'\n", "model = tz.pipe(dm, genome_gz, c.take(10**7), markov)\n", "plot_model(model, labels='ACGTacgt')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you want to have a single `genome` function, you could write a custom `open`\n", "function that decides from filename or from trying and failing whether a file\n", "is a gzip file.\n", "\n", "Similarly, if you have a `.tar.gz` full of FASTA files, you can use Python's\n", "`tarfile` module instead of `glob` to read each file individually. The only\n", "caveat is that you will have to use the `bytes.decode` function to decode each\n", "line, as `tarfile` returns them as bytes, not as text.\n", "\n", "\n", "\n", "\n", "\n", "We hope to have shown you at least a hint that streaming in Python can be easy\n", "when you use a few abstractions, like the ones Toolz provides.\n", "\n", "Streaming can make you more productive, because big data takes linearly longer\n", "than small data. In batch analysis, big data can take forever to run, because\n", "the operating system has to keep transferring data from RAM to the hard disk\n", "and back. Or, Python might refuse altogether and simply show a `MemoryError`!\n", "This means that, for many analyses, you don’t need a bigger machine to analyse\n", "bigger datasets. And, if your tests pass on small data, they’ll pass on big\n", "data, too!\n", "\n", "Our take home message from this chapter is this: when writing an algorithm, or\n", "analysis, think about whether you can do it streaming. If you can, just do it\n", "from the beginning. Your future self will thank you.\n", "Doing it later is harder, and results in things like this:\n", "\n", "![TODOs in history. Comic by Manu Cornet, http://www.bonkersworld.net/all-engineers-are-the-same/, used with permission.](http://bonkersworld.net/img/2012.08.15_all_engineers_are_the_same.png)" ] } ], "metadata": {}, "nbformat": 4, "nbformat_minor": 4 }