{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# k-means Clustering from scratch" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Requirement already satisfied: matplotlib in /usr/local/lib/python3.6/dist-packages (3.1.3)\n", "Requirement already satisfied: cycler>=0.10 in /usr/local/lib/python3.6/dist-packages (from matplotlib) (0.10.0)\n", "Requirement already satisfied: python-dateutil>=2.1 in /usr/local/lib/python3.6/dist-packages (from matplotlib) (2.8.1)\n", "Requirement already satisfied: kiwisolver>=1.0.1 in /usr/local/lib/python3.6/dist-packages (from matplotlib) (1.1.0)\n", "Requirement already satisfied: numpy>=1.11 in /usr/local/lib/python3.6/dist-packages (from matplotlib) (1.18.1)\n", "Requirement already satisfied: pyparsing!=2.0.4,!=2.1.2,!=2.1.6,>=2.0.1 in /usr/local/lib/python3.6/dist-packages (from matplotlib) (2.4.6)\n", "Requirement already satisfied: six in /usr/local/lib/python3.6/dist-packages (from cycler>=0.10->matplotlib) (1.14.0)\n", "Requirement already satisfied: setuptools in /usr/local/lib/python3.6/dist-packages (from kiwisolver>=1.0.1->matplotlib) (45.2.0)\n" ] } ], "source": [ "!pip3 install matplotlib" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "import csv\n", "from random import sample\n", "from statistics import mean\n", "from math import sqrt, inf\n", "from pathlib import Path\n", "from collections import defaultdict\n", "from typing import List, Dict, Tuple\n", "from matplotlib import pyplot as plt" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "# Ensure that we have a `data` directory we use to store downloaded data\n", "!mkdir -p data\n", "data_dir: Path = Path('data')" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "File ‘data/iris.data’ already there; not retrieving.\n", "\n" ] } ], "source": [ "# Downloading the Iris data set\n", "!wget -nc -P data https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "6.9,3.1,5.1,2.3,Iris-virginica\n", "5.8,2.7,5.1,1.9,Iris-virginica\n", "6.8,3.2,5.9,2.3,Iris-virginica\n", "6.7,3.3,5.7,2.5,Iris-virginica\n", "6.7,3.0,5.2,2.3,Iris-virginica\n", "6.3,2.5,5.0,1.9,Iris-virginica\n", "6.5,3.0,5.2,2.0,Iris-virginica\n", "6.2,3.4,5.4,2.3,Iris-virginica\n", "5.9,3.0,5.1,1.8,Iris-virginica\n", "\n" ] } ], "source": [ "# The structure of the Iris data set is as follows:\n", "# Sepal Length, Sepal Width, Petal Length, Petal Width, Class\n", "!tail data/iris.data" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "# Create the Python path pointing to the `iris.data` file\n", "data_path: Path = data_dir / 'iris.data'" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "# The list in which we store the \"petal length\" and \"sepal width\" as vectors (a vector is a list of floats)\n", "data_points: List[List[float]] = []\n", "\n", "# Indexes according to the data set description\n", "petal_length_idx: int = 2\n", "sepal_width_idx: int = 1\n", "\n", "# Read the `iris.data` file and parse it line-by-line\n", "with open(data_path) as csv_file:\n", " reader = csv.reader(csv_file, delimiter=',')\n", " for row in reader:\n", " # Check if the given row is a valid iris data point\n", " if len(row) == 5:\n", " label: str = row[-1]\n", " x1: float = float(row[petal_length_idx])\n", " x2: float = float(row[sepal_width_idx])\n", " data_points.append([x1, x2])" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "150" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "len(data_points)" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[[1.4, 3.5], [1.4, 3.0], [1.3, 3.2], [1.5, 3.1], [1.4, 3.6]]" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data_points[:5]" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "# Plot the `data_points`\n", "plt.scatter([item[0] for item in data_points], [item[1] for item in data_points])\n", "plt.xlabel('Petal Length')\n", "plt.ylabel('Sepal Width');" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "# Function to compute the Euclidean distance\n", "# See: https://en.wikipedia.org/wiki/Euclidean_distance\n", "def distance(a: List[float], b: List[float]) -> float:\n", " assert len(a) == len(b)\n", " return sqrt(sum((a_i - b_i) ** 2 for a_i, b_i in zip(a, b)))\n", "\n", "assert distance([1, 2], [1, 2]) == 0\n", "assert distance([1, 2, 3, 4], [5, 6, 7, 8]) == 8" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "# Function which computes the element-wise average of a list of vectors (a vector is a list of floats)\n", "def vector_mean(xs: List[List[float]]) -> List[float]:\n", " # Check that all arrays have the same number of dimensions\n", " for prev, curr in zip(xs, xs[1:]):\n", " assert len(prev) == len(curr)\n", " num_items: int = len(xs)\n", " # Figure out how many dimensions we have to support\n", " num_dims: int = len(xs[0])\n", " # Dynamically create a list which contains lists for each dimension\n", " # to simplify the mean calculation later on\n", " dim_values: List[List[float]] = [[] for _ in range(num_dims)]\n", " for x in xs:\n", " for dim, val in enumerate(x):\n", " dim_values[dim].append(val)\n", " # Calculate the mean across the dimensions\n", " return [mean(item) for item in dim_values]\n", "\n", "assert vector_mean([[1], [2], [3]]) == [2]\n", "assert vector_mean([[1, 2], [3, 4], [5, 6]]) == [3, 4]\n", "assert vector_mean([[1, 2, 3], [4, 5, 6], [7, 8, 9]]) == [4, 5, 6]" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "class KMeans:\n", " def __init__(self, k: int) -> None:\n", " self._k: int = k\n", " self._centroids: Dict[int, List[float]] = defaultdict(list)\n", " self._clusters: Dict[int, List[List[float]]] = defaultdict(list)\n", "\n", " def train(self, data_points: List[List[float]]) -> None:\n", " # Pick `k` random samples from the `data_points` and use them as the initial centroids\n", " centroids: List[List[float]] = sample(data_points, self._k)\n", " # Initialize the `_centroids` lookup dict with such centroids\n", " for i, centroid in enumerate(centroids):\n", " self._centroids[i] = centroid\n", " # Star the training process\n", " while True:\n", " # Starting a new round, removing all previous `cluster` associations (if any)\n", " self._clusters.clear() \n", " # Iterate over all items in the `data_points` and compute their distances to all `centroids`\n", " item: List[float]\n", " for item in data_points:\n", " smallest_distance: float = inf\n", " closest_centroid_idx: int = None\n", " # Identify the closest `centroid`\n", " centroid_idx: int\n", " centroid: List[float]\n", " for centroid_idx, centroid in self._centroids.items():\n", " current_distance: float = distance(item, centroid)\n", " if current_distance < smallest_distance:\n", " smallest_distance: float = current_distance\n", " closest_centroid_idx: int = centroid_idx\n", " # Append the current `item` to the `Cluster` whith the nearest `centroid`\n", " self._clusters[closest_centroid_idx].append(item)\n", " # The `vector_mean` of all items in the `cluster` should be the `cluster`s new centroid\n", " old_centroid: List[float]\n", " centroids_to_update: List[Tuple[int, List[float]]] = []\n", " for old_centroid_idx, old_centroid in self._centroids.items():\n", " items: List[List[float]] = self._clusters[old_centroid_idx]\n", " new_centroid: List[float] = vector_mean(items)\n", " if new_centroid != old_centroid:\n", " centroids_to_update.append((old_centroid_idx, new_centroid))\n", " # Update centroids if they changed\n", " if len(centroids_to_update):\n", " idx: int\n", " centroid: List[float]\n", " for idx, centroid in centroids_to_update:\n", " self._centroids[idx] = centroid\n", " # If nothing changed, we're done\n", " else:\n", " break\n", " \n", " @property\n", " def centroids(self) -> Dict[int, List[float]]:\n", " return self._centroids\n", " \n", " @property\n", " def clusters(self) -> Dict[int, List[List[float]]]:\n", " return self._clusters" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "The clusters centroids are: [[1.4941176470588236, 3.4], [4.925252525252525, 2.875757575757576]]\n", "The number of elements in each cluster are: [51, 99]\n" ] } ], "source": [ "# Create a new KMeans instance and train it\n", "km: KMeans = KMeans(2)\n", "km.train(data_points)\n", "\n", "print(f'The clusters centroids are: {list(km.centroids.values())}')\n", "print(f'The number of elements in each cluster are: {[len(items) for items in km.clusters.values()]}')" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "# Plot the `clusters` and their `centroids`\n", "# Gather all the necessary data to plot the `clusters`\n", "xs: List[float] = []\n", "ys: List[float] = []\n", "cs: List[int] = []\n", "for cluster_idx, items in km.clusters.items():\n", " for item in items:\n", " cs.append(cluster_idx)\n", " xs.append(item[0])\n", " ys.append(item[1])\n", "\n", "fig = plt.figure()\n", "ax = fig.add_subplot()\n", "ax.scatter(xs, ys, c=cs)\n", "\n", "# Add the centroids\n", "for c in km.centroids.values():\n", " ax.scatter(c[0], c[1], c='red', marker='+')\n", "\n", "# Set labels\n", "ax.set_xlabel('Petal Length')\n", "ax.set_ylabel('Sepal Width');" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "# Function which quantifies how far apart two values are\n", "# We'll use it to calculate errors later on\n", "def squared_error(a: float, b: float) -> float:\n", " return (a - b) ** 2\n", "\n", "assert squared_error(2, 2) == 0\n", "assert squared_error(1, 2) == 1\n", "assert squared_error(1, 10) == 81" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "# Create an \"Elbow chart\" to find the \"best\" `k`\n", "# See: https://en.wikipedia.org/wiki/Elbow_method_(clustering)\n", "\n", "# Lists to record the `k` values and the computed `error` sums\n", "# which are used for plotting later on\n", "ks: List[int] = []\n", "error_sums: List[float] = []\n", "\n", "# Create clusterings for the range of `k` values\n", "for k in range(1, 10):\n", " # Create and train a new KMeans instance for the current `k`\n", " km: KMeans = KMeans(k)\n", " km.train(data_points)\n", " # List to keep track of the individual KMean errors\n", " errors: List[float] = []\n", " # Iterate over all `clusters` and extract their `centroid_idx`s and `items`\n", " centroid_idx: List[float]\n", " items: List[List[float]]\n", " for centroid_idx, items in km.clusters.items():\n", " # Lookup `centroid` coordinates based on its index\n", " centroid: List[float] = km.centroids[centroid_idx]\n", " # Iterate over each `item` in the cluster\n", " item: List[float]\n", " for item in items:\n", " # Calculate how far the current `cluster`s `item` is from the `centroid`\n", " dist: float = distance(centroid, item)\n", " # The closer the `item` in question, the better (less error)\n", " # (the closest one can be is `0`)\n", " error: float = squared_error(dist, 0)\n", " # Record the `error` value\n", " errors.append(error)\n", " # Append the current `k` and the sum of all `errors`\n", " ks.append(k)\n", " error_sums.append(sum(errors))\n", "\n", "# Plot the `k` and error values to see which `k` is \"best\"\n", "plt.plot(ks, error_sums)\n", "plt.xlabel('K')\n", "plt.ylabel('Error');" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.9" } }, "nbformat": 4, "nbformat_minor": 4 }