{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "For blog: [Part-of-Speech-Tagging](https://yam.gift/2019/06/11/SLP/2019-06-11-Part-of-Speech-Tagging/)\n", "\n", "Cython version:\n", "\n", "[seqlearn/viterbi.pyx at master · larsmans/seqlearn](https://github.com/larsmans/seqlearn/blob/master/seqlearn/_decode/viterbi.pyx)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![](http://qnimg.lovevivian.cn/slp-ch8-3.jpeg)\n", "\n", "![](http://qnimg.lovevivian.cn/slp-ch8-4.jpeg)\n", "\n", "![](http://qnimg.lovevivian.cn/slp-ch8-5.jpeg)\n" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import math\n", "import numpy as np\n", "from pprint import pprint\n", "import timeit" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "def simple_beam_search(data, k):\n", " dd = dict(zip(range(len(data)), data))\n", " sort = sorted(dd.items(), key=lambda x:x[1], reverse=True)\n", " res = [w for w,p in sort[:k]]\n", " return sorted(res)" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "A = np.array(\n", " [0.3777,0.0110,0.0009,0.0084,0.0584,0.0090,0.0025,\n", " 0.0008,0.0002,0.7968,0.0005,0.0008,0.1698,0.0041,\n", " 0.0322,0.0005,0.0050,0.0837,0.0615,0.0514,0.2231,\n", " 0.0366,0.0004,0.0001,0.0733,0.4509,0.0036,0.0036,\n", " 0.0096,0.0176,0.0014,0.0086,0.1216,0.0177,0.0068,\n", " 0.0068,0.0102,0.1011,0.1012,0.0120,0.0728,0.0479,\n", " 0.1147,0.0021,0.0002,0.2157,0.4744,0.0102,0.0017]).reshape(7, 7)\n", "B = np.array(\n", " [0.000032,0,0,0.000048,0,\n", " 0,0.308431,0,0,0,\n", " 0,0.000028,0.000672,0,0.000028,\n", " 0,0,0.000340,0,0,\n", " 0,0.000200,0.000223,0,0.002337,\n", " 0,0,0.010446,0,0,\n", " 0,0,0,0.506099,0]).reshape(7, 5)\n", "# B = np.array(\n", "# [0, 0.000048, 0, 0, 0.000032,\n", "# 0, 0, 0, 0.308431, 0,\n", "# 0.000028, 0, 0.000672, 0.000028, 0,\n", "# 0, 0, 0.000340, 0, 0,\n", "# 0.002337, 0, 0.000223, 0.000200, 0,\n", "# 0, 0, 0.010446, 0, 0,\n", "# 0, 0.506099, 0, 0, 0]).reshape(7, 5)\n", "pi = np.array([0.2767,0.0006,0.0031,0.0453,0.0449,0.0510,0.2026])" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "O = \"Janet will back the hill\".split()\n", "S = \"NNP MD VB JJ NN RB DT\".split()\n", "lens = len(S)\n", "leno = len(O)\n", "P = np.zeros((lens, leno))\n", "V = np.zeros((lens, leno))\n", "bp = np.zeros(leno, dtype=np.int8)" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "for i in range(lens):\n", " V[i, 0] = pi[i] * B[i, 0]\n", " P[i, 0] = 0" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([0, 1, 2, 6, 4], dtype=int8)" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "index = range(V.shape[0])\n", "for o in range(1, leno):\n", " for s in range(lens):\n", " prob = V[index, o-1] * A[index, s] * B[s, o]\n", " V[s, o] = max(prob)\n", " P[s, o] = index[np.argmax(prob)]\n", " index = simple_beam_search(V[:, o], 2)\n", "bp[-1] = np.argmax(V[:, -1])\n", "for i in reversed(range(1, leno)):\n", " bp[i-1] = P[bp[i], i]\n", "bp" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "def viterbi(y, A, B, pi, beam_size=None):\n", " \"\"\"\n", " Parameters\n", " ----------\n", " y : array (T,)\n", " Observation state sequence. int dtype. sequence index in B\n", " \"\"\"\n", " lens = A.shape[0]\n", " leno = len(y)\n", " P = np.zeros((lens, leno))\n", " V = np.zeros((lens, leno))\n", " bp = np.zeros(leno, dtype=np.int8)\n", " # 因为只有 一个,所以不需要像下面那样取 max 处理\n", " for i in range(lens):\n", " V[i, 0] = pi[i] * B[i, y[0]]\n", " P[i, 0] = 0\n", " # 初始化选择所有的 states\n", " index = range(V.shape[0])\n", " for o in range(1, leno):\n", " for s in range(lens):\n", " # 这里的概率是上一层所有的 states 到这一步某一个 state 的概率,array(len(states), )\n", " prob = V[index, o-1] * A[index, s] * B[s, y[o]]\n", " # 选择(实际上是上一层)最大的概率\n", " V[s, o] = max(prob)\n", " # P 的计算用 V[:, o-1] * A[:, s] 更容易理解,表示上一层的最大概率\n", " # 选择上一层最大概率对应的 state\n", " # beam search 时这里需要注意修改(从注释的样子修改为现在的样子)\n", " P[s, o] = index[np.argmax(prob)] # np.argmax(prob)\n", " # 这一步可以使用 beam search 对 S 进行过滤\n", " if beam_size:\n", " index = simple_beam_search(V[:, o], beam_size)\n", " # 因为是往前计算的,所以最后得到的是最后一层的概率,之前的需要回溯\n", " bp[-1] = np.argmax(V[:, -1])\n", " for i in reversed(range(1, leno)):\n", " bp[i-1] = P[bp[i], i]\n", " return V, P, bp" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([0, 1, 2, 6, 4], dtype=int8)" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "V, P, bp = viterbi(np.array(range(5)), A, B, pi, beam_size=10)\n", "bp" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "775 µs ± 65.3 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)\n" ] } ], "source": [ "%timeit viterbi(np.array(range(5)), A, B, pi, beam_size=None)" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "569 µs ± 26.8 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)\n" ] } ], "source": [ "# 结果很直观\n", "%timeit viterbi(np.array(range(5)), A, B, pi, beam_size=2)" ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "scrolled": true }, "outputs": [ { "data": { "text/plain": [ "array([0, 1, 2, 6, 4], dtype=int8)" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 将 B 反序后(见 B 定义处),4 3 2 1 0 表示 O 中的每一个 token 在 B 中对应的 index\n", "V, P, bp = viterbi(np.array([4, 3, 2, 1, 0]), A, B, pi)\n", "bp" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['Janet/NNP', 'will/MD', 'back/VB', 'the/DT', 'hill/NN']" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "[O[o]+\"/\"+S[s] for o,s in enumerate(bp)]" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([[8.85440000e-06, 0.00000000e+00, 0.00000000e+00, 2.48613983e-17,\n", " 0.00000000e+00],\n", " [0.00000000e+00, 3.00406859e-08, 0.00000000e+00, 0.00000000e+00,\n", " 0.00000000e+00],\n", " [0.00000000e+00, 2.23130880e-13, 1.60852733e-11, 0.00000000e+00,\n", " 1.01707158e-20],\n", " [0.00000000e+00, 0.00000000e+00, 5.10691660e-15, 0.00000000e+00,\n", " 0.00000000e+00],\n", " [0.00000000e+00, 1.03419392e-10, 5.35925837e-15, 0.00000000e+00,\n", " 2.01357071e-15],\n", " [0.00000000e+00, 0.00000000e+00, 5.32840899e-11, 0.00000000e+00,\n", " 0.00000000e+00],\n", " [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.81619925e-12,\n", " 0.00000000e+00]])" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "V" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([[0., 0., 0., 2., 0.],\n", " [0., 0., 0., 0., 0.],\n", " [0., 0., 1., 0., 6.],\n", " [0., 0., 1., 0., 0.],\n", " [0., 0., 1., 0., 6.],\n", " [0., 0., 1., 0., 0.],\n", " [0., 0., 0., 2., 0.]])" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "P" ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "scrolled": false }, "outputs": [ { "data": { "text/plain": [ "(array([0, 1, 2, 6, 4], dtype=uint8),\n", " array([[8.85440000e-06, 0.00000000e+00, 0.00000000e+00, 2.48613983e-17,\n", " 0.00000000e+00],\n", " [0.00000000e+00, 3.00406859e-08, 0.00000000e+00, 0.00000000e+00,\n", " 0.00000000e+00],\n", " [0.00000000e+00, 2.23130880e-13, 1.60852733e-11, 0.00000000e+00,\n", " 1.01707158e-20],\n", " [0.00000000e+00, 0.00000000e+00, 5.10691660e-15, 0.00000000e+00,\n", " 0.00000000e+00],\n", " [0.00000000e+00, 1.03419392e-10, 5.35925837e-15, 0.00000000e+00,\n", " 2.01357071e-15],\n", " [0.00000000e+00, 0.00000000e+00, 5.32840899e-11, 0.00000000e+00,\n", " 0.00000000e+00],\n", " [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.81619925e-12,\n", " 0.00000000e+00]]),\n", " array([[0, 0, 1, 2, 6],\n", " [0, 0, 1, 5, 6],\n", " [0, 0, 1, 5, 6],\n", " [0, 0, 1, 5, 6],\n", " [0, 0, 1, 2, 6],\n", " [0, 0, 1, 5, 6],\n", " [0, 0, 1, 2, 6]], dtype=uint8))" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "viterbi2(np.array(range(len(O))), A, B, pi)" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "# from https://stackoverflow.com/questions/9729968/python-implementation-of-viterbi-algorithm\n", "def viterbi2(y, A, B, Pi=None):\n", " \"\"\"\n", " Return the MAP estimate of state trajectory of Hidden Markov Model.\n", "\n", " Parameters\n", " ----------\n", " y : array (T,)\n", " Observation state sequence. int dtype.\n", " A : array (K, K)\n", " State transition matrix. See HiddenMarkovModel.state_transition for\n", " details.\n", " B : array (K, M)\n", " Emission matrix. See HiddenMarkovModel.emission for details.\n", " Pi: optional, (K,)\n", " Initial state probabilities: Pi[i] is the probability x[0] == i. If\n", " None, uniform initial distribution is assumed (Pi[:] == 1/K).\n", "\n", " Returns\n", " -------\n", " x : array (T,)\n", " Maximum a posteriori probability estimate of hidden state trajectory,\n", " conditioned on observation sequence y under the model parameters A, B,\n", " Pi.\n", " T1: array (K, T)\n", " the probability of the most likely path so far\n", " T2: array (K, T)\n", " the x_j-1 of the most likely path so far\n", " \"\"\"\n", " # Cardinality of the state space\n", " K = A.shape[0]\n", " # Initialize the priors with default (uniform dist) if not given by caller\n", " Pi = Pi if Pi is not None else np.full(K, 1 / K)\n", " T = len(y)\n", " T1 = np.empty((K, T), 'd')\n", " T2 = np.empty((K, T), 'B')\n", "\n", " # Initilaize the tracking tables from first observation\n", " T1[:, 0] = Pi * B[:, y[0]]\n", " T2[:, 0] = 0\n", "\n", " # Iterate throught the observations updating the tracking tables\n", " for i in range(1, T):\n", " T1[:, i] = np.max(T1[:, i - 1] * A.T * B[np.newaxis, :, y[i]].T, 1)\n", " T2[:, i] = np.argmax(T1[:, i - 1] * A.T, 1)\n", "\n", " # Build the output, optimal model trajectory\n", " x = np.empty(T, 'B')\n", " x[-1] = np.argmax(T1[:, T - 1])\n", " for i in reversed(range(1, T)):\n", " x[i - 1] = T2[x[i], i]\n", "\n", " return x, T1, T2" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.4" }, "latex_envs": { "LaTeX_envs_menu_present": true, "autoclose": false, "autocomplete": true, "bibliofile": "biblio.bib", "cite_by": "apalike", "current_citInitial": 1, "eqLabelWithNumbers": true, "eqNumInitial": 1, "hotkeys": { "equation": "Ctrl-E", "itemize": "Ctrl-I" }, "labels_anchors": false, "latex_user_defs": false, "report_style_numbering": false, "user_envs_cfg": false }, "toc": { "base_numbering": 1, "nav_menu": {}, "number_sections": true, "sideBar": true, "skip_h1_title": false, "title_cell": "Table of Contents", "title_sidebar": "Contents", "toc_cell": false, "toc_position": {}, "toc_section_display": true, "toc_window_display": false } }, "nbformat": 4, "nbformat_minor": 2 }