# Count-min sketch

Think of the count-min sketch as a generalization of the Bloom filter: instead of overestimating _whether or not_ we've seen a certain key, the count-min sketch overestimates _how many times_ we've seen it. You could implement a precise structure to solve this problem with a map from keys to counts (a tree, an associative array, or a hash table, for example), but -- just as with the Bloom filter -- there are cases in which the space requirements of a precise structure may be unacceptable.

We'll start by importing some necessary libraries -- `numpy`, `pandas`, and our hash functions -- again.

In [None]:
from datasketching.hashing import hashes_for
import numpy as np
import pandas as pd

In [None]:
class CMS(object):
 def __init__(self, width, hashes):
 """ Initializes a Count-min sketch with the
 given width and a collection of hashes, 
 which are functions taking arbitrary 
 values and returning integers. The depth
 of the sketch structure is taken from the
 number of supplied hash functions.
 
 hashes can be either a function taking 
 a value and returning a list of results
 or a list of functions. In the latter 
 case, this constructor will synthesize 
 the former """
 self.__width = width
 
 if hasattr(hashes, '__call__'):
 self.__hashes = hashes
 # inspect the tuple returned by the hash function to get a depth
 self.__depth = len(hashes(bytes()))
 else:
 funs = hashes[:]
 self.__depth = len(hashes)
 def h(value):
 return [int(f(value)) for f in funs]
 self.__hashes = h
 
 self.__buckets = np.zeros((int(width), int(self.__depth)), np.uint64)
 
 
 def width(self):
 return self.__width
 
 def depth(self):
 return self.__depth
 
 def insert(self, value):
 """ Inserts a value into this sketch """
 for (row, col) in enumerate(self.__hashes(value)):
 self.__buckets[col % self.__width][row] += 1
 
 def lookup(self, value):
 """ Returns a biased estimate of number of times value has been inserted in this sketch"""
 return min([self.__buckets[col % self.__width][row] for (row, col) in enumerate(self.__hashes(value))])
 
 def merge_from(self, other):
 """ Merges other in to this sketch by 
 adding the counts from each bucket in other
 to the corresponding buckets in this
 
 Updates this. """
 self.__buckets += other.__buckets
 
 def merge(self, other):
 """ Creates a new sketch by merging this sketch's
 counts with those of another sketch. """
 
 cms = CMS(self.width(), self.__hashes)
 cms.__buckets += self.__buckets
 cms.__buckets += other.__buckets
 return cms
 
 def inner(self, other):
 """ returns the inner product of self and other, estimating 
 the equijoin size between the streams modeled by 
 self and other """
 r, = np.tensordot(self.__buckets, other.__buckets).flat
 return r
 
 def minimum(self, other):
 """ Creates a new sketch by taking the elementwise minimum 
 of this sketch and another. """
 cms = CMS(self.width(), self.__hashes)
 np.minimum(self.__buckets, other.__buckets, cms.__buckets)
 return cms

 def dup(self):
 cms = CMS(self.width(), self.__hashes)
 cms.merge_from(self)
 return cms

In [None]:
cms = CMS(16384, hashes_for(3,8))

In [None]:
cms.lookup("foo")

In [None]:
cms.insert("foo")
cms.lookup("foo")

While hash collisions in Bloom filters lead to false positives, hash collisions in count-min sketches lead to overestimating counts. To see how much this will affect us in practice, we can design an empirical experiment to plot the cumulative distribution of the factors that we've overestimated counts by in sketches of various sizes.

In [None]:
def cms_experiment(sample_count, size, hashes, seed=0x15300625):
 import random
 from collections import namedtuple
 
 random.seed(seed)
 cms = CMS(size, hashes)
 
 result = []
 total_count = 0
 
 # update the counts
 for i in range(sample_count):
 bits = random.getrandbits(64)
 if i % 100 == 0:
 # every hundredth entry is a heavy hitter
 insert_count = (bits % 512) + 1
 else:
 insert_count = (bits % 8) + 1
 
 for i in range(insert_count):
 cms.insert(bits)
 
 random.seed(seed)
 # look up the bit sequences again
 for i in range(sample_count):
 bits = random.getrandbits(64)
 if i % 100 == 0:
 # every hundredth entry is a heavy hitter
 expected_count = (bits % 512) + 1
 else:
 expected_count = (bits % 8) + 1

 result.append((int(cms.lookup(bits)), int(expected_count)))
 
 return result

In [None]:
import altair as alt
alt.renderers.enable('notebook')

In [None]:
results = cms_experiment(1 << 14, 4096, hashes_for(3, 8))
df = pd.DataFrame.from_records(results)
df.rename(columns={0: "actual count", 1: "expected count"}, inplace=True)

In [None]:
from statsmodels.distributions.empirical_distribution import ECDF
df2 = pd.DataFrame()
df2["overestimation factor"] = (df["actual count"] / df["expected count"]).sort_values()
ecdf = ECDF(df2["overestimation factor"])
df2["percentage of samples overestimated by less than"] = ecdf(df2["overestimation factor"])
alt.Chart(df2.drop_duplicates()).mark_line().encode(x="overestimation factor", y="percentage of samples overestimated by less than")

As you can see, about 55% of our counts for this small sketch are overestimated by less than a factor of three, although the worst overestimates are quite large indeed. Let's try with a larger sketch structure.

In [None]:
results = cms_experiment(1 << 14, 8192, hashes_for(3, 8))
df = pd.DataFrame.from_records(results)
df.rename(columns={0: "actual count", 1: "expected count"}, inplace=True)

df2 = pd.DataFrame()
df2["overestimation factor"] = (df["actual count"] / df["expected count"]).sort_values()
ecdf = ECDF(df2["overestimation factor"])
df2["percentage of samples overestimated by less than"] = ecdf(df2["overestimation factor"])
alt.Chart(df2.drop_duplicates()).mark_line().encode(x="overestimation factor", y="percentage of samples overestimated by less than")

With a larger filter size (columns) *and* more hash functions (rows), we can dramatically reduce the bias.

In [None]:
results = cms_experiment(1 << 14, 8192, hashes_for(8, 5))
df = pd.DataFrame.from_records(results)
df.rename(columns={0: "actual count", 1: "expected count"}, inplace=True)

df2 = pd.DataFrame()
df2["overestimation factor"] = (df["actual count"] / df["expected count"]).sort_values()
ecdf = ECDF(df2["overestimation factor"])
df2["percentage of samples overestimated by less than"] = ecdf(df2["overestimation factor"])
alt.Chart(df2.drop_duplicates()).mark_line().encode(x="overestimation factor", y="percentage of samples overestimated by less than")

## Exercise: tracking the top _k_ elements in a stream or set

In this exercise, you'll combine a count-min sketch and an auxiliary data structure in order to approximately track the top _k_ elements in a stream. When we insert something into a count-min sketch, we can immediately look it up to get an estimate of how many times we've seen it. We can then check a running list of (some of) the most frequent items we've seen and update that list if the count-min sketch indicates that the item we've just added is one of the top elements.

A *priority heap* is an efficient way to track items by a priority ordering; we can use it to track (some of) the most frequent items we've seen. Python's `heapq` module provides an implementation of priority queues. If we store tuples of counts and items, we can use the count as the priority ordering; because Python's `heapq` module places the minimum element first, we'll need to invert the priority of counts. 

Let's see a couple of ways to do this:

In [None]:
import heapq

one = []
two = []

counts = [700, 600, 500, 400]
items = ["seven hundred", "six hundred", "five hundred", "four hundred"]

for item in zip(counts, items):
 # either negate the count...
 heapq.heappush(one, (-item[0], item[1]))
 
 # ...or put things in normally and use a 
 # different ordering function later
 heapq.heappush(two, item)

# note that we'll ask for the "smallest"
# items since we're inverting the priority

print([(-count, item) for count, item in heapq.nsmallest(4, one)])
print(heapq.nsmallest(4, two, key=lambda t: -t[0]))

Now it's time to implement the top-k structure. Read the constructor code (`__init__`) to see what members the `TopK` object has, and fill in the code that the `FIXME` comments ask you to. If you get stuck, [check out the solution](solutions/top-k.ipynb)!

In [None]:
import datasketching.cms as cms
import heapq

MAX_OVERHEAD = 10

class TopK(object):

 def __init__(self, k, width, hashes):
 self.width = width
 self.hashes = hashes
 self.cms = cms.CMS(width, hashes)
 self.k = k
 
 # this is a priority queue; manage it with the heapq module
 self.queue = []
 
 # Hint #1: you'll use this to see whether you're inserting
 # a new item or updating one that's already in the queue
 # Hint #2: you won't keep _everything_ you've seen in this
 # set; just the objects that are in the queue
 self.seen = set()

 def insert(self, obj):
 """ Inserts _obj_ in this summary and updates the top k elements if this
 element is likely to be in the top k elements as given by the underlying top-k sketch """

 # Identify how many times you've seen `obj` already
 self.cms.insert(obj)
 count = self.cms.lookup(obj)

 # FIXME: write code to insert _obj_ into 
 # the priority queue (`self.queue`).

 # Hint #1: How would you sort the priority queue to ensure that it contains the top k elements?
 # Hint #2: Python's `heapq.heappush` function puts the smallest things first
 # Hint #3: What happens when you need to update the count of something that's already in the queue?
 # Hint #4: How will you ensure that the priority queue doesn't grow unbounded?


 def topk(self):
 """ Returns a list of 2-tuples (value, count) for 
 the top k elements in this structure """
 
 # FIXME: replace this line with code that 
 # uses heapq.nsmallest to return the top k elements
 return self.queue

 def merge(self, other):
 # Merge the two count-min sketches
 result = TopK(self.width, self.hashes)
 result.cms.merge_from(self.cms)
 result.cms.merge_from(other.cms)
 
 # determine how many elements to keep 
 # from the combined priority queue
 newsize = max(int(self.k * MAX_OVERHEAD / 2), 1)
 
 # FIXME: write code to merge the two queues here
 # Hint #1: 
 
 return result

We can run an experiment to see how our top-k structure behaves:

In [None]:
import datasketching.cms as cms

def topk_experiment(sample_count, size, hashes, k=10, seed=0x15300625):
 import random
 from collections import namedtuple
 
 random.seed(seed)
 topk = TopK(k, size, hashes)
 
 result = []
 total_count = 0
 
 # update the counts
 for i in range(sample_count):
 bits = random.getrandbits(64)
 if i % 100 == 0:
 # every hundredth entry is a heavy hitter
 insert_count = (bits % 512) + 1
 else:
 insert_count = (bits % 8) + 1
 
 for i in range(insert_count):
 topk.insert(bits)
 
 return topk.topk()

In [None]:
from datasketching.hashing import hashes_for
topk_experiment(40000, 16384, hashes_for(3,8), k=20)

## More exercises

Here are some more exercises to try out if you're interested in extending the count-min sketch:

* The count-min sketch is a biased estimator. Implement a technique to adjust the estimates for expected bias.
* Consider how you'd handle negative inserts. How would you need to change the query code? What else might change?
* The implementation includes a `minimum` method. What might it be useful for? What limitations might it have?

