{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "from sklearn.datasets import load_iris\n",
    "from sklearn.decomposition import KernelPCA as skKernelPCA"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Implementation 1\n",
    "- linear kernel\n",
    "- similar to sklearn kernel='linear'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "class KernelCenterer():\n",
    "    def fit(self, K):\n",
    "        n_samples = K.shape[0]\n",
    "        self.K_fit_rows_ = np.sum(K, axis=0) / n_samples\n",
    "        self.K_fit_all_ = self.K_fit_rows_.sum() / n_samples\n",
    "        return self\n",
    "\n",
    "    def transform(self, K):\n",
    "        Kt = (K - (np.sum(K, axis=1) / K.shape[1])[:, np.newaxis]\n",
    "              - self.K_fit_rows_ + self.K_fit_all_)\n",
    "        return Kt"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "class KernelPCA():\n",
    "    def __init__(self, n_components):\n",
    "        self.n_components = n_components\n",
    "\n",
    "    @staticmethod\n",
    "    def _linear_kernel(X, Y):\n",
    "        K = np.dot(X, Y.T)\n",
    "        return K\n",
    "\n",
    "    def fit(self, X):\n",
    "        self.X_fit_ = X\n",
    "        K = self._linear_kernel(X, X)\n",
    "        self._centerer = KernelCenterer().fit(K)\n",
    "        Kt = self._centerer.transform(K)\n",
    "        eigval, eigvec = np.linalg.eigh(Kt)\n",
    "        self.lambdas_ = eigval[-self.n_components:][::-1]\n",
    "        self.alphas_ = eigvec[:, -self.n_components:][:, ::-1]\n",
    "        return self\n",
    "\n",
    "    def transform(self, X):\n",
    "        K = self._linear_kernel(X, self.X_fit_)\n",
    "        Kt = self._centerer.transform(K)\n",
    "        scaled_alphas = self.alphas_ / np.sqrt(self.lambdas_)\n",
    "        return np.dot(Kt, scaled_alphas)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "X, _ = load_iris(return_X_y=True)\n",
    "kpca1 = KernelPCA(n_components=2).fit(X)\n",
    "kpca2 = skKernelPCA(n_components=2).fit(X)\n",
    "assert np.allclose(kpca1.lambdas_, kpca2.lambdas_)\n",
    "for i in range(kpca1.alphas_.shape[1]):\n",
    "    assert np.allclose(kpca1.alphas_[:, i], kpca2.alphas_[:, i]) or np.allclose(kpca1.alphas_[:, i], -kpca2.alphas_[:, i])\n",
    "Xt1 = kpca1.transform(X)\n",
    "Xt2 = kpca2.transform(X)\n",
    "for i in range(Xt1.shape[1]):\n",
    "    assert np.allclose(Xt1[:, i], Xt2[:, i]) or np.allclose(Xt1[:, i], -Xt2[:, i])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Implementation 2\n",
    "- rbf kernel\n",
    "- similar to sklearn kernel='rbf'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "class KernelPCA():\n",
    "    def __init__(self, n_components, gamma=None):\n",
    "        self.n_components = n_components\n",
    "        self.gamma = gamma\n",
    "\n",
    "    @staticmethod\n",
    "    def _rbf_kernel(X, Y, gamma):\n",
    "        if gamma is None:\n",
    "            gamma = 1 / X.shape[1]\n",
    "        K = np.zeros((X.shape[0], Y.shape[0]))\n",
    "        for i in range(X.shape[0]):\n",
    "            for j in range(Y.shape[0]):\n",
    "                K[i, j] = np.exp(-gamma * np.sum(np.square(X[i] - Y[j])))\n",
    "        return K\n",
    "\n",
    "    def fit(self, X):\n",
    "        self.X_fit_ = X\n",
    "        K = self._rbf_kernel(X, X, self.gamma)\n",
    "        self._centerer = KernelCenterer().fit(K)\n",
    "        Kt = self._centerer.transform(K)\n",
    "        eigval, eigvec = np.linalg.eigh(Kt)\n",
    "        self.lambdas_ = eigval[-self.n_components:][::-1]\n",
    "        self.alphas_ = eigvec[:, -self.n_components:][:, ::-1]\n",
    "        return self\n",
    "\n",
    "    def transform(self, X):\n",
    "        K = self._rbf_kernel(X, self.X_fit_, self.gamma)\n",
    "        Kt = self._centerer.transform(K)\n",
    "        scaled_alphas = self.alphas_ / np.sqrt(self.lambdas_)\n",
    "        return np.dot(Kt, scaled_alphas)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "X, _ = load_iris(return_X_y=True)\n",
    "kpca1 = KernelPCA(n_components=2).fit(X)\n",
    "kpca2 = skKernelPCA(n_components=2, kernel='rbf').fit(X)\n",
    "assert np.allclose(kpca1.lambdas_, kpca2.lambdas_)\n",
    "for i in range(kpca1.alphas_.shape[1]):\n",
    "    assert np.allclose(kpca1.alphas_[:, i], kpca2.alphas_[:, i]) or np.allclose(kpca1.alphas_[:, i], -kpca2.alphas_[:, i])\n",
    "Xt1 = kpca1.transform(X)\n",
    "Xt2 = kpca2.transform(X)\n",
    "for i in range(Xt1.shape[1]):\n",
    "    assert np.allclose(Xt1[:, i], Xt2[:, i]) or np.allclose(Xt1[:, i], -Xt2[:, i])"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "dev",
   "language": "python",
   "name": "dev"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}