{ "metadata": { "name": "outlier_detect" }, "nbformat": 3, "nbformat_minor": 0, "worksheets": [ { "cells": [ { "cell_type": "heading", "level": 1, "metadata": {}, "source": [ "A quick outlier detection" ] }, { "cell_type": "code", "collapsed": false, "input": [ "%pylab inline" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "import numpy as np\n", "import scipy.stats as sst\n", "import matplotlib.pylab as plt" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Algorithm\n", "\n", "1. select h observations as inliers\n", "2. compute mean (mu) and variance (var)\n", "3. compute Malhanobis distance (Md) for all observations, using mu and var\n", "4. take the h observ that have smallest Md\n", "5. Repeat 2, 3, 4 until\n", "6. Stop criteria: compare with previous var and mu\n", "7. Correct the variance bias (not done in this notebook yet)\n", "\n", "Return outlier corrected mu and var" ] }, { "cell_type": "code", "collapsed": false, "input": [ "def compute_mu_var(y):\n", " \"\"\" Compute mean and (co)variance for one or more measures\n", "\n", " Parameters\n", " ----------\n", " y : (N,) or (P, N) ndarray\n", " One row per measure, one column per observation. If a vector, treat as a\n", " (1, N) array\n", "\n", " Returns\n", " -------\n", " mu : (P,) ndarray\n", " Mean of each measure across columns.\n", " var : (P, P) ndarray\n", " Variances (diagonal) and covariances of measures\n", " \"\"\"\n", " # Make sure a vector becumes a 1, N 2D array\n", " y = np.atleast_2d(y)\n", " # Degrees of freedom\n", " P, N = y.shape\n", " df = N - 1\n", " # Mean for each row\n", " mu = y.mean(axis=1)\n", " # The mean removed the second axis. Restore it (length 1) so we can subtract\n", " subtracting_mu = np.reshape(mu, (P, 1))\n", " # Remove mean\n", " yc = y - subtracting_mu\n", " # Variance(s) and covariances\n", " var = yc.dot(yc.T) / df\n", " return mu, var" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "# test the mu / var function\n", "assert compute_mu_var(np.asarray([[1, 1, 1, 1]])) == (1,0)\n", "assert compute_mu_var(np.asarray([[-1, 0, 1]])) == (0,1)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "def compute_mahal(y, mu, var):\n", " \"\"\" Compute Mahalanobis distance for `y` given `mu`, `var`\n", "\n", " Parameters\n", " ----------\n", " y : (N,) or (P, N) ndarray\n", " One row per measure, one column per observation. If a vector, treat as a\n", " (1, N) array\n", " mu : (P,) array-like\n", " Mean of each measure across columns. Can be scalar, array or sequence\n", " (list, tuple)\n", " var : (P, P) array-like\n", " Variances (diagonal) and covariances of measures. Can be scalar, array\n", " or sequence (list, tuple)\n", "\n", " Returns\n", " -------\n", " mahals : (N,) ndarray\n", " Mahalanobis distances of each observation from `mu`, given `var`\n", " \"\"\"\n", " # Make sure y is a row vector, if it was only a 1D vector\n", " y = np.atleast_2d(y)\n", " # Shapes\n", " P, N = y.shape\n", " # Make sure mu and var are arrays\n", " mu = np.asarray(mu)\n", " # Variance should also be 2D (even if shape (1, 1)) - for np.linalg.inv\n", " var = np.atleast_2d(var)\n", " # The mean should be shape (P,). It needs to be (P, 1) shape to subtract\n", " subtracting_mu = np.reshape(mu, (P, 1))\n", " # Mean correct\n", " yc = y - subtracting_mu\n", " # Correct for (co)variances. For single row, this is the same as dividing by\n", " # the variance\n", " y_white = np.linalg.inv(var).dot(yc)\n", " # Z score for one row is (y - mu) / sqrt(var).\n", " # Z**2 is (y - mu) (y-nu) / var, which is:\n", " z2 = yc * y_white\n", " # Mahalanobis distance is mean z2 over measures\n", " return z2.mean(axis=0)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "# Test the Mahalonbis function\n", "distances = compute_mahal([-1, 0, 1], 1, 1), \n", "assert np.allclose(distances, [ 4., 1., 0.])" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "def estimate_mu_var(y, n_inliers=0.7, maxiter=10, tol=1e-6):\n", " \"\"\" Estimate corrected `mu` and `var` for `y` given number of inliers\n", "\n", " Algorithm from:\n", "\n", " Fritsch, V., Varoquaux, G., Thyreau, B., Poline, J. B., & Thirion, B. (2012).\n", " Detecting outliers in high-dimensional neuroimaging datasets with robust\n", " covariance estimators. Medical Image Analysis.\n", "\n", " Parameters\n", " ----------\n", " y : (N,) or (P, N) ndarray\n", " One row per measure, one column per observation. If a vector, treat as a\n", " (1, N) array\n", " n_inliers : int or float, optional\n", " If int, the number H (H < N) of observations in the center of the\n", " distributions that can be assumed to be non-outliers. If float, should\n", " be between 0 and 1, and give proportion of inliers\n", " maxiter : int, optional\n", " Maximum number of iterations to refine estimate of outliers\n", " tol : float, optional\n", " Smallest change in variance estimate for which we contine to iterate.\n", " Changes smaller than `tol` indicate that iteration of outlier detection\n", " has converged\n", "\n", " Returns\n", " -------\n", " mu : (P,) ndarray\n", " Mean per measure in `y`, correcting for possible outliers\n", " var : (P, P) ndarray\n", " Variances (diagonal) and covariances (off-diagonal) for measures `y`,\n", " correcting for possible outliers\n", " \"\"\"\n", " y = np.atleast_2d(y)\n", " P, N = y.shape\n", " if n_inliers > 0 and n_inliers < 1: # Proportion of inliers\n", " n_inliers = int(np.round(N * n_inliers))\n", " if n_inliers <= 0:\n", " raise ValueError('n_inliers should be > 0')\n", " # Compute first estimate of mu and varances\n", " mu, var = compute_mu_var(y)\n", " if n_inliers >= N:\n", " return mu, var\n", " # Initialize estimate of which are the inlier values.\n", " prev_inlier_indices = np.arange(n_inliers)\n", " # Keep pushing outliers to the end until we are done\n", " for i in range(maxiter):\n", " distances = compute_mahal(y, mu, var)\n", " # Pick n_inliers observatons with lowest distances\n", " inlier_indices = np.argsort(distances)[:n_inliers]\n", " # If we found the same inliers as last time, we'll get the same mu, var,\n", " # so stop iterating\n", " if np.all(inlier_indices == prev_inlier_indices):\n", " break\n", " # Re-estimate mu and var with new inliers\n", " mu_new, var_new = compute_mu_var(y[:, inlier_indices])\n", " # Check if var has changed - if not - stop iterating\n", " if np.max(np.abs(var - var_new)) < tol:\n", " break\n", " # Set mu, var, indices for next loop iteration\n", " var = var_new\n", " mu = mu_new\n", " prev_inlier_indices = inlier_indices\n", " return mu, var" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "# Do a run through of the outlier detection\n", "# Number of samples\n", "n_samples = 100\n", "# Standard deviation\n", "sigma = 1.\n", "# Data vectors (no outliers yet)\n", "Y = np.random.normal(0., sigma, size=(1, n_samples))\n", "# Proportion of inliers (1-\n", "inlier_proportion = .70\n", "# Number of inliers, outliers\n", "n_inliers = int(inlier_proportion * n_samples)\n", "n_outliers = n_samples - n_inliers\n", "# Make the correct number of outliers\n", "outliers = np.random.normal(0., sigma*10, size=(1, n_outliers))\n", "Y[:, 0:n_outliers] = outliers\n", "\n", "# Estimate the outlier corrected mean and variance\n", "print('Uncorrected mu and var')\n", "print(compute_mu_var(Y))\n", "corr_mu, corr_var = estimate_mu_var(Y)\n", "print('Robust estimates of mu and var (\"corrected\" for outliers)')\n", "print(corr_mu, corr_var)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Detection step" ] }, { "cell_type": "code", "collapsed": false, "input": [ "# choose a false positive rate - Bonferroni corrected for number of samples\n", "alpha = 0.1 / n_samples\n", "\n", "# Standard deviation - for our 1D case\n", "corr_sigma = np.sqrt(corr_var)\n", "# z - in 1D case\n", "z = (Y - corr_mu) / corr_sigma\n", "\n", "_out = plt.hist(z[0,:])" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "# Normal distribution object\n", "normdist = sst.norm(corr_mu, corr_sigma)\n", "# Probability of z value less than or equal to each observation\n", "p_values = normdist.cdf(z)\n", "# Where probability too high therefore z value too large. We're only looking\n", "# for z's that are too positive, disregarding zs that are too negative\n", "some_bad_ones = p_values > (1 - alpha)\n", "\n", "# Show what we found\n", "print \"volumes to remove :\", some_bad_ones\n", "print z[some_bad_ones]\n", "print Y[some_bad_ones]" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "# exercise : \n", "# print histogram of the good ones:" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "# scrap cell\n", "#========================================#\n", "\n", "# here - just one variable\n", "#z0 = z[0,:]\n", "#mu0 = mu[0]\n", "#sig0 = sig[0,0]\n", "#print good_ones\n", "#good_ones = np.where(some_bad_ones == False)\n", "#print good_ones.shape\n" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [], "language": "python", "metadata": {}, "outputs": [] } ], "metadata": {} } ] }