{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Simple De Bruijn Graph implementation with Eulerian walk-finder"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"class DeBruijnGraph:\n",
" ''' De Bruijn directed multigraph built from a collection of\n",
" strings. User supplies strings and k-mer length k. Nodes\n",
" are k-1-mers. An Edge corresponds to the k-mer that joins\n",
" a left k-1-mer to a right k-1-mer. '''\n",
" \n",
" @staticmethod\n",
" def chop(st, k):\n",
" ''' Chop string into k-mers of given length '''\n",
" for i in range(len(st)-(k-1)):\n",
" yield (st[i:i+k], st[i:i+k-1], st[i+1:i+k])\n",
" \n",
" class Node:\n",
" ''' Node representing a k-1 mer. Keep track of # of\n",
" incoming/outgoing edges so it's easy to check for\n",
" balanced, semi-balanced. '''\n",
" \n",
" def __init__(self, km1mer):\n",
" self.km1mer = km1mer\n",
" self.nin = 0\n",
" self.nout = 0\n",
" \n",
" def isSemiBalanced(self):\n",
" return abs(self.nin - self.nout) == 1\n",
" \n",
" def isBalanced(self):\n",
" return self.nin == self.nout\n",
" \n",
" def __hash__(self):\n",
" return hash(self.km1mer)\n",
" \n",
" def __str__(self):\n",
" return self.km1mer\n",
" \n",
" def __init__(self, strIter, k, circularize=False):\n",
" ''' Build de Bruijn multigraph given string iterator and k-mer\n",
" length k '''\n",
" self.G = {} # multimap from nodes to neighbors\n",
" self.nodes = {} # maps k-1-mers to Node objects\n",
" for st in strIter:\n",
" if circularize:\n",
" st += st[:k-1]\n",
" for kmer, km1L, km1R in self.chop(st, k):\n",
" nodeL, nodeR = None, None\n",
" if km1L in self.nodes:\n",
" nodeL = self.nodes[km1L]\n",
" else:\n",
" nodeL = self.nodes[km1L] = self.Node(km1L)\n",
" if km1R in self.nodes:\n",
" nodeR = self.nodes[km1R]\n",
" else:\n",
" nodeR = self.nodes[km1R] = self.Node(km1R)\n",
" nodeL.nout += 1\n",
" nodeR.nin += 1\n",
" self.G.setdefault(nodeL, []).append(nodeR)\n",
" # Iterate over nodes; tally # balanced, semi-balanced, neither\n",
" self.nsemi, self.nbal, self.nneither = 0, 0, 0\n",
" # Keep track of head and tail nodes in the case of a graph with\n",
" # Eularian walk (not cycle)\n",
" self.head, self.tail = None, None\n",
" for node in iter(self.nodes.values()):\n",
" if node.isBalanced():\n",
" self.nbal += 1\n",
" elif node.isSemiBalanced():\n",
" if node.nin == node.nout + 1:\n",
" self.tail = node\n",
" if node.nin == node.nout - 1:\n",
" self.head = node\n",
" self.nsemi += 1\n",
" else:\n",
" self.nneither += 1\n",
" \n",
" def nnodes(self):\n",
" ''' Return # nodes '''\n",
" return len(self.nodes)\n",
" \n",
" def nedges(self):\n",
" ''' Return # edges '''\n",
" return len(self.G)\n",
" \n",
" def hasEulerianWalk(self):\n",
" ''' Return true iff graph has Eulerian walk. '''\n",
" return self.nneither == 0 and self.nsemi == 2\n",
" \n",
" def hasEulerianCycle(self):\n",
" ''' Return true iff graph has Eulerian cycle. '''\n",
" return self.nneither == 0 and self.nsemi == 0\n",
" \n",
" def isEulerian(self):\n",
" ''' Return true iff graph has Eulerian walk or cycle '''\n",
" # technically, if it has an Eulerian walk\n",
" return self.hasEulerianWalk() or self.hasEulerianCycle()\n",
" \n",
" def eulerianWalkOrCycle(self):\n",
" ''' Find and return sequence of nodes (represented by\n",
" their k-1-mer labels) corresponding to Eulerian walk\n",
" or cycle '''\n",
" assert self.isEulerian()\n",
" g = self.G\n",
" if self.hasEulerianWalk():\n",
" g = g.copy()\n",
" g.setdefault(self.tail, []).append(self.head)\n",
" # graph g has an Eulerian cycle\n",
" tour = []\n",
" src = next(iter(g.keys())) # pick arbitrary starting node\n",
" \n",
" def __visit(n):\n",
" while len(g[n]) > 0:\n",
" dst = g[n].pop()\n",
" __visit(dst)\n",
" tour.append(n)\n",
" \n",
" __visit(src)\n",
" tour = tour[::-1][:-1] # reverse and then take all but last node\n",
" \n",
" if self.hasEulerianWalk():\n",
" # Adjust node list so that it starts at head and ends at tail\n",
" sti = tour.index(self.head)\n",
" tour = tour[sti:] + tour[:sti]\n",
" \n",
" # Return node list\n",
" return list(map(str, tour))"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"g = DeBruijnGraph(['AAABBBA'], k=3)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"(True, True, False)"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"g.isEulerian(), g.hasEulerianWalk(), g.hasEulerianCycle()"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"4"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# the figure we drew in class had 4 nodes\n",
"g.nnodes()"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['AA', 'AB', 'BB', 'BB', 'BA', 'AA']"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"g.eulerianWalkOrCycle()"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"g = DeBruijnGraph(['AAABBBBA'], k=3) # Add 1 more B to the run of Bs"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"(True, True, False)"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"g.isEulerian(), g.hasEulerianWalk(), g.hasEulerianCycle()"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"4"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# the figure we drew in class again had 4 nodes\n",
"g.nnodes()"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['AA', 'AB', 'BB', 'BB', 'BB', 'BA', 'AA']"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"g.eulerianWalkOrCycle()"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"# circularize makes DeBruijnGraph treat string as circular,\n",
"# returning to left-hand side at extreme right end\n",
"g = DeBruijnGraph(['AAABBBBA'], k=3, circularize=True)"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"(True, False, True)"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"g.isEulerian(), g.hasEulerianWalk(), g.hasEulerianCycle()"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['AA', 'AA', 'AB', 'BB', 'BB', 'BB', 'BA', 'AA']"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"g.eulerianWalkOrCycle()"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['a_lo',\n",
" '_lon',\n",
" 'long',\n",
" 'ong_',\n",
" 'ng_l',\n",
" 'g_lo',\n",
" '_lon',\n",
" 'long',\n",
" 'ong_',\n",
" 'ng_l',\n",
" 'g_lo',\n",
" '_lon',\n",
" 'long',\n",
" 'ong_',\n",
" 'ng_t',\n",
" 'g_ti',\n",
" '_tim',\n",
" 'time']"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"DeBruijnGraph([\"a_long_long_long_time\"], 5).eulerianWalkOrCycle()"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"3"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"DeBruijnGraph(['a_long_long_long_time'], 5).eulerianWalkOrCycle().count('long')"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'to_every_thing_turn_turn_turn_there_is_a_season'"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# see if we can get correct reconstruction at k=4\n",
"walk = DeBruijnGraph(['to_every_thing_turn_turn_turn_there_is_a_season'], 4).eulerianWalkOrCycle()\n",
"walk[0] + ''.join(map(lambda x: x[-1], walk[1:]))"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'to_every_thing_turn_turn_turn_there_is_a_season'"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# that worked, but k=3 doesn't! unresolvable repeat at k=3\n",
"walk = DeBruijnGraph(['to_every_thing_turn_turn_turn_there_is_a_season'], 3).eulerianWalkOrCycle()\n",
"walk[0] + ''.join(map(lambda x: x[-1], walk[1:]))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Visualizing the graph"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We define a new Python class extending `DeBruijnGraph`, but adding a `to_dot` member function. `to_dot` returns a Digraph object - a graph with directed edges. See the [graphviz package user guide](http://graphviz.readthedocs.io/en/latest/manual.html) for more details on Digraph. Jupyter is kind enough to render Digraphs into pretty pictures."
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [],
"source": [
"import graphviz"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [],
"source": [
"class DeBruijnGraph2(DeBruijnGraph):\n",
" def to_dot(self, weights=False):\n",
" \"\"\" Return string with graphviz representation. If 'weights'\n",
" is true, label edges corresponding to distinct k-1-mers\n",
" with weights, instead of drawing separate edges for\n",
" k-1-mer copies. \"\"\"\n",
" g = graphviz.Digraph(comment='DeBruijn graph')\n",
" for node in iter(self.G.keys()):\n",
" g.node(node.km1mer, node.km1mer)\n",
" for src, dsts in iter(self.G.items()):\n",
" if weights:\n",
" weightmap = {}\n",
" if weights:\n",
" for dst in dsts:\n",
" weightmap[dst] = weightmap.get(dst, 0) + 1\n",
" for dst, v in weightmap.items():\n",
" g.edge(src.km1mer, dst.km1mer, label=str(v))\n",
" else:\n",
" for dst in dsts:\n",
" g.edge(src.km1mer, dst.km1mer)\n",
" return g"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"data": {
"image/svg+xml": [
"\n",
"\n",
"\n",
"\n",
"\n"
],
"text/plain": [
""
]
},
"execution_count": 19,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# sing along\n",
"DeBruijnGraph2(['to_every_thing_turn_turn_turn_there_is_a_season_turn_turn_turn'], 4).to_dot()"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"data": {
"image/svg+xml": [
"\n",
"\n",
"\n",
"\n",
"\n"
],
"text/plain": [
""
]
},
"execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# now simplified with edge weights\n",
"DeBruijnGraph2(['to_every_thing_turn_turn_turn_there_is_a_season_turn_turn_turn'], 4).to_dot(True)"
]
}
],
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
}
},
"nbformat": 4,
"nbformat_minor": 1
}