{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Count-min sketch" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Think of the count-min sketch as a generalization of the Bloom filter: instead of overestimating _whether or not_ we've seen a certain key, the count-min sketch overestimates _how many times_ we've seen it. You could implement a precise structure to solve this problem with a map from keys to counts (a tree, an associative array, or a hash table, for example), but -- just as with the Bloom filter -- there are cases in which the space requirements of a precise structure may be unacceptable.\n", "\n", "We'll start by importing some necessary libraries -- `numpy`, `pandas`, and our hash functions -- again." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from datasketching.hashing import hashes_for\n", "import numpy as np\n", "import pandas as pd" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class CMS(object):\n", " def __init__(self, width, hashes):\n", " \"\"\" Initializes a Count-min sketch with the\n", " given width and a collection of hashes, \n", " which are functions taking arbitrary \n", " values and returning integers. The depth\n", " of the sketch structure is taken from the\n", " number of supplied hash functions.\n", " \n", " hashes can be either a function taking \n", " a value and returning a list of results\n", " or a list of functions. In the latter \n", " case, this constructor will synthesize \n", " the former \"\"\"\n", " self.__width = width\n", " \n", " if hasattr(hashes, '__call__'):\n", " self.__hashes = hashes\n", " # inspect the tuple returned by the hash function to get a depth\n", " self.__depth = len(hashes(bytes()))\n", " else:\n", " funs = hashes[:]\n", " self.__depth = len(hashes)\n", " def h(value):\n", " return [int(f(value)) for f in funs]\n", " self.__hashes = h\n", " \n", " self.__buckets = np.zeros((int(width), int(self.__depth)), np.uint64)\n", " \n", " \n", " def width(self):\n", " return self.__width\n", " \n", " def depth(self):\n", " return self.__depth\n", " \n", " def insert(self, value):\n", " \"\"\" Inserts a value into this sketch \"\"\"\n", " for (row, col) in enumerate(self.__hashes(value)):\n", " self.__buckets[col % self.__width][row] += 1\n", " \n", " def lookup(self, value):\n", " \"\"\" Returns a biased estimate of number of times value has been inserted in this sketch\"\"\"\n", " return min([self.__buckets[col % self.__width][row] for (row, col) in enumerate(self.__hashes(value))])\n", " \n", " def merge_from(self, other):\n", " \"\"\" Merges other in to this sketch by \n", " adding the counts from each bucket in other\n", " to the corresponding buckets in this\n", " \n", " Updates this. \"\"\"\n", " self.__buckets += other.__buckets\n", " \n", " def merge(self, other):\n", " \"\"\" Creates a new sketch by merging this sketch's\n", " counts with those of another sketch. \"\"\"\n", " \n", " cms = CMS(self.width(), self.__hashes)\n", " cms.__buckets += self.__buckets\n", " cms.__buckets += other.__buckets\n", " return cms\n", " \n", " def inner(self, other):\n", " \"\"\" returns the inner product of self and other, estimating \n", " the equijoin size between the streams modeled by \n", " self and other \"\"\"\n", " r, = np.tensordot(self.__buckets, other.__buckets).flat\n", " return r\n", " \n", " def minimum(self, other):\n", " \"\"\" Creates a new sketch by taking the elementwise minimum \n", " of this sketch and another. \"\"\"\n", " cms = CMS(self.width(), self.__hashes)\n", " np.minimum(self.__buckets, other.__buckets, cms.__buckets)\n", " return cms\n", "\n", " def dup(self):\n", " cms = CMS(self.width(), self.__hashes)\n", " cms.merge_from(self)\n", " return cms" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cms = CMS(16384, hashes_for(3,8))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cms.lookup(\"foo\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cms.insert(\"foo\")\n", "cms.lookup(\"foo\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "While hash collisions in Bloom filters lead to false positives, hash collisions in count-min sketches lead to overestimating counts. To see how much this will affect us in practice, we can design an empirical experiment to plot the cumulative distribution of the factors that we've overestimated counts by in sketches of various sizes." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def cms_experiment(sample_count, size, hashes, seed=0x15300625):\n", " import random\n", " from collections import namedtuple\n", " \n", " random.seed(seed)\n", " cms = CMS(size, hashes)\n", " \n", " result = []\n", " total_count = 0\n", " \n", " # update the counts\n", " for i in range(sample_count):\n", " bits = random.getrandbits(64)\n", " if i % 100 == 0:\n", " # every hundredth entry is a heavy hitter\n", " insert_count = (bits % 512) + 1\n", " else:\n", " insert_count = (bits % 8) + 1\n", " \n", " for i in range(insert_count):\n", " cms.insert(bits)\n", " \n", " random.seed(seed)\n", " # look up the bit sequences again\n", " for i in range(sample_count):\n", " bits = random.getrandbits(64)\n", " if i % 100 == 0:\n", " # every hundredth entry is a heavy hitter\n", " expected_count = (bits % 512) + 1\n", " else:\n", " expected_count = (bits % 8) + 1\n", "\n", " result.append((int(cms.lookup(bits)), int(expected_count)))\n", " \n", " return result" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import altair as alt\n", "alt.renderers.enable('notebook')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "results = cms_experiment(1 << 14, 4096, hashes_for(3, 8))\n", "df = pd.DataFrame.from_records(results)\n", "df.rename(columns={0: \"actual count\", 1: \"expected count\"}, inplace=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from statsmodels.distributions.empirical_distribution import ECDF\n", "df2 = pd.DataFrame()\n", "df2[\"overestimation factor\"] = (df[\"actual count\"] / df[\"expected count\"]).sort_values()\n", "ecdf = ECDF(df2[\"overestimation factor\"])\n", "df2[\"percentage of samples overestimated by less than\"] = ecdf(df2[\"overestimation factor\"])\n", "alt.Chart(df2.drop_duplicates()).mark_line().encode(x=\"overestimation factor\", y=\"percentage of samples overestimated by less than\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As you can see, about 55% of our counts for this small sketch are overestimated by less than a factor of three, although the worst overestimates are quite large indeed. Let's try with a larger sketch structure." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "results = cms_experiment(1 << 14, 8192, hashes_for(3, 8))\n", "df = pd.DataFrame.from_records(results)\n", "df.rename(columns={0: \"actual count\", 1: \"expected count\"}, inplace=True)\n", "\n", "df2 = pd.DataFrame()\n", "df2[\"overestimation factor\"] = (df[\"actual count\"] / df[\"expected count\"]).sort_values()\n", "ecdf = ECDF(df2[\"overestimation factor\"])\n", "df2[\"percentage of samples overestimated by less than\"] = ecdf(df2[\"overestimation factor\"])\n", "alt.Chart(df2.drop_duplicates()).mark_line().encode(x=\"overestimation factor\", y=\"percentage of samples overestimated by less than\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "With a larger filter size (columns) *and* more hash functions (rows), we can dramatically reduce the bias." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "results = cms_experiment(1 << 14, 8192, hashes_for(8, 5))\n", "df = pd.DataFrame.from_records(results)\n", "df.rename(columns={0: \"actual count\", 1: \"expected count\"}, inplace=True)\n", "\n", "df2 = pd.DataFrame()\n", "df2[\"overestimation factor\"] = (df[\"actual count\"] / df[\"expected count\"]).sort_values()\n", "ecdf = ECDF(df2[\"overestimation factor\"])\n", "df2[\"percentage of samples overestimated by less than\"] = ecdf(df2[\"overestimation factor\"])\n", "alt.Chart(df2.drop_duplicates()).mark_line().encode(x=\"overestimation factor\", y=\"percentage of samples overestimated by less than\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Exercise: tracking the top _k_ elements in a stream or set\n", "\n", "In this exercise, you'll combine a count-min sketch and an auxiliary data structure in order to approximately track the top _k_ elements in a stream. When we insert something into a count-min sketch, we can immediately look it up to get an estimate of how many times we've seen it. We can then check a running list of (some of) the most frequent items we've seen and update that list if the count-min sketch indicates that the item we've just added is one of the top elements.\n", "\n", "A *priority heap* is an efficient way to track items by a priority ordering; we can use it to track (some of) the most frequent items we've seen. Python's `heapq` module provides an implementation of priority queues. If we store tuples of counts and items, we can use the count as the priority ordering; because Python's `heapq` module places the minimum element first, we'll need to invert the priority of counts. \n", "\n", "Let's see a couple of ways to do this:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import heapq\n", "\n", "one = []\n", "two = []\n", "\n", "counts = [700, 600, 500, 400]\n", "items = [\"seven hundred\", \"six hundred\", \"five hundred\", \"four hundred\"]\n", "\n", "for item in zip(counts, items):\n", " # either negate the count...\n", " heapq.heappush(one, (-item[0], item[1]))\n", " \n", " # ...or put things in normally and use a \n", " # different ordering function later\n", " heapq.heappush(two, item)\n", "\n", "# note that we'll ask for the \"smallest\"\n", "# items since we're inverting the priority\n", "\n", "print([(-count, item) for count, item in heapq.nsmallest(4, one)])\n", "print(heapq.nsmallest(4, two, key=lambda t: -t[0]))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now it's time to implement the top-k structure. Read the constructor code (`__init__`) to see what members the `TopK` object has, and fill in the code that the `FIXME` comments ask you to. If you get stuck, [check out the solution](solutions/top-k.ipynb)!" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import datasketching.cms as cms\n", "import heapq\n", "\n", "MAX_OVERHEAD = 10\n", "\n", "class TopK(object):\n", "\n", " def __init__(self, k, width, hashes):\n", " self.width = width\n", " self.hashes = hashes\n", " self.cms = cms.CMS(width, hashes)\n", " self.k = k\n", " \n", " # this is a priority queue; manage it with the heapq module\n", " self.queue = []\n", " \n", " # Hint #1: you'll use this to see whether you're inserting\n", " # a new item or updating one that's already in the queue\n", " # Hint #2: you won't keep _everything_ you've seen in this\n", " # set; just the objects that are in the queue\n", " self.seen = set()\n", "\n", " def insert(self, obj):\n", " \"\"\" Inserts _obj_ in this summary and updates the top k elements if this\n", " element is likely to be in the top k elements as given by the underlying top-k sketch \"\"\"\n", "\n", " # Identify how many times you've seen `obj` already\n", " self.cms.insert(obj)\n", " count = self.cms.lookup(obj)\n", "\n", " # FIXME: write code to insert _obj_ into \n", " # the priority queue (`self.queue`).\n", "\n", " # Hint #1: How would you sort the priority queue to ensure that it contains the top k elements?\n", " # Hint #2: Python's `heapq.heappush` function puts the smallest things first\n", " # Hint #3: What happens when you need to update the count of something that's already in the queue?\n", " # Hint #4: How will you ensure that the priority queue doesn't grow unbounded?\n", "\n", "\n", " def topk(self):\n", " \"\"\" Returns a list of 2-tuples (value, count) for \n", " the top k elements in this structure \"\"\"\n", " \n", " # FIXME: replace this line with code that \n", " # uses heapq.nsmallest to return the top k elements\n", " return self.queue\n", "\n", " def merge(self, other):\n", " # Merge the two count-min sketches\n", " result = TopK(self.width, self.hashes)\n", " result.cms.merge_from(self.cms)\n", " result.cms.merge_from(other.cms)\n", " \n", " # determine how many elements to keep \n", " # from the combined priority queue\n", " newsize = max(int(self.k * MAX_OVERHEAD / 2), 1)\n", " \n", " # FIXME: write code to merge the two queues here\n", " # Hint #1: \n", " \n", " return result" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can run an experiment to see how our top-k structure behaves:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import datasketching.cms as cms\n", "\n", "def topk_experiment(sample_count, size, hashes, k=10, seed=0x15300625):\n", " import random\n", " from collections import namedtuple\n", " \n", " random.seed(seed)\n", " topk = TopK(k, size, hashes)\n", " \n", " result = []\n", " total_count = 0\n", " \n", " # update the counts\n", " for i in range(sample_count):\n", " bits = random.getrandbits(64)\n", " if i % 100 == 0:\n", " # every hundredth entry is a heavy hitter\n", " insert_count = (bits % 512) + 1\n", " else:\n", " insert_count = (bits % 8) + 1\n", " \n", " for i in range(insert_count):\n", " topk.insert(bits)\n", " \n", " return topk.topk()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from datasketching.hashing import hashes_for\n", "topk_experiment(40000, 16384, hashes_for(3,8), k=20)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## More exercises\n", "\n", "Here are some more exercises to try out if you're interested in extending the count-min sketch:\n", "\n", "* The count-min sketch is a biased estimator. Implement a technique to adjust the estimates for expected bias.\n", "* Consider how you'd handle negative inserts. How would you need to change the query code? What else might change?\n", "* The implementation includes a `minimum` method. What might it be useful for? What limitations might it have?\n", "\n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.4" } }, "nbformat": 4, "nbformat_minor": 2 }