{ "metadata": { "name": "" }, "nbformat": 3, "nbformat_minor": 0, "worksheets": [ { "cells": [ { "cell_type": "code", "collapsed": false, "input": [ "import matplotlib.pyplot as plt\n", "import numpy as np\n", "import scipy.stats" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 1 }, { "cell_type": "code", "collapsed": false, "input": [ "# Experiment: `NUM_SETS` times, pick one of the `NUM_COINS` coins and\n", "# flip it `TOSSES_PER_SET` times.\n", "NUM_SETS = 5\n", "TOSSES_PER_SET = 10\n", "NUM_COINS = 2\n", "\n", "# Data collected from experiment: Number of heads/tails in each set.\n", "X_HEAD_COUNTS = np.array([5, 9, 8, 4, 7])\n", "assert len(X_HEAD_COUNTS) == NUM_SETS\n", "X_TAIL_COUNTS = TOSSES_PER_SET - X_HEAD_COUNTS" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 2 }, { "cell_type": "code", "collapsed": false, "input": [ "def log_likelihood(head_counts, assignments, theta):\n", " \"\"\"Log likelihood of coin flips given coin assignments and theta.\"\"\"\n", " set_probs = theta[assignments]\n", " ll = 0.0\n", " for prob_heads, head_count in zip(set_probs, head_counts):\n", " ll += np.log(scipy.stats.binom.pmf(head_count, TOSSES_PER_SET, prob_heads))\n", " return ll" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 3 }, { "cell_type": "markdown", "metadata": {}, "source": [ "Define a function to do a single run of EM given an initial guess of the parameters, then call it multiple times with random initial guesses and keep the one that maximizes the [log] likelihood." ] }, { "cell_type": "code", "collapsed": false, "input": [ "def _coin_flip_em(theta_guess, hard_assignments=False):\n", " \"\"\"EM with an initial guess of the parameters.\"\"\"\n", " theta = np.asarray(theta_guess)\n", " \n", " log_likelihoods = []\n", "\n", " while True:\n", " # E-step\n", " coin_probs = np.zeros((NUM_SETS, NUM_COINS), dtype=float)\n", " for set_idx in xrange(NUM_SETS):\n", " num_heads = X_HEAD_COUNTS[set_idx]\n", " num_tails = TOSSES_PER_SET - num_heads\n", " for coin_idx in xrange(NUM_COINS):\n", " p_heads = theta[coin_idx]\n", " p_tails = 1.0 - p_heads\n", " unnormalized_prob = (p_heads ** num_heads) * (p_tails ** num_tails)\n", " coin_probs[set_idx, coin_idx] = unnormalized_prob\n", " # Normalize the probabilities in each row.\n", " if hard_assignments:\n", " hard_probs = np.zeros_like(coin_probs)\n", " hard_probs[coin_probs.argmax(axis=1) == 1, 1] = 1\n", " hard_probs[:, 0] = 1 - hard_probs[:, 1]\n", " coin_probs = hard_probs\n", " else:\n", " coin_probs = coin_probs / np.tile(coin_probs.sum(axis=1), (2, 1)).T\n", " \n", " # M-step.\n", " # Get the expected number of head and tail counts from each coin.\n", " expected_head_counts = (coin_probs * np.tile(X_HEAD_COUNTS, (2, 1)).T).sum(axis=0)\n", " expected_tail_counts = (coin_probs * np.tile(X_TAIL_COUNTS, (2, 1)).T).sum(axis=0)\n", " # Re-estimate the probability of heads from each coin.\n", " for coin_idx in xrange(len(theta)):\n", " denom = expected_head_counts[coin_idx] + expected_tail_counts[coin_idx]\n", " if denom != 0:\n", " theta[coin_idx] = expected_head_counts[coin_idx] / denom\n", "\n", " # Quit when converged.\n", " log_likelihoods.append(log_likelihood(X_HEAD_COUNTS, coin_probs.argmax(axis=1), theta))\n", " if len(log_likelihoods) >= 2 and abs(log_likelihoods[-1] - log_likelihoods[-2]) < 10e-10:\n", " break\n", "\n", " return theta, log_likelihoods" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 4 }, { "cell_type": "code", "collapsed": false, "input": [ "def coin_flip_em(hard_assignments=False, num_trials=100):\n", " \"\"\"Do repeated EM trials and keep the best one.\"\"\"\n", " best_theta, best_ll, all_lls = None, None, []\n", " for _ in xrange(num_trials):\n", " theta_guess = np.random.random(NUM_COINS)\n", " theta, lls = _coin_flip_em(theta_guess, hard_assignments=hard_assignments)\n", " all_lls.append(lls[-1])\n", " if best_ll is None or lls[-1] > best_ll:\n", " best_theta = theta\n", " best_ll = lls[-1]\n", " return best_theta, best_ll, all_lls" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 5 }, { "cell_type": "code", "collapsed": false, "input": [ "print 'Best theta from hard assignments:', coin_flip_em(hard_assignments=True)[0]\n", "print 'Best theta from soft assignments:', coin_flip_em(hard_assignments=False)[0]" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "Best theta from hard assignments: " ] }, { "output_type": "stream", "stream": "stdout", "text": [ "[ 0.45 0.8 ]\n", "Best theta from soft assignments: " ] }, { "output_type": "stream", "stream": "stdout", "text": [ "[ 0.51958312 0.79678907]\n" ] } ], "prompt_number": 6 } ], "metadata": {} } ] }