{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Signed Cumulative Distribution Transform Nearest Local Subspace (SCDT-NLS) Classifier\n", "\n", "This tutorial will demonstrate how to use the SCDT-NLS classifier from the *PyTransKit* package to classify 1D signals." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Class:: SCDT_NLS\n", "**Functions**:\n", "\n", "1. Constructor function:\n", " scdt_nls_obj = SCDT_NLS(num_classes, rm_edge)\n", " \n", " Inputs:\n", " ----------------\n", " num_classes : integer value\n", " totale number of classes in the dataset.\n", " rm_edge : [optional] boolean \n", " IF TRUE the first and last points of CDTs will be removed.\n", " \n", " Outputs:\n", " ----------------\n", " scdt_nls_obj : class object\n", " Instance of the class SCDT_NLS.\n", " \n", "2. Fit function:\n", " scdt_nls_obj.fit(Xtrain, Ytrain, Ttrain)\n", " \n", " Inputs:\n", " ----------------\n", " Xtrain : array-like, shape (n_samples, n_columns)\n", " 1D data for training.\n", " Ytrain : ndarray of shape (n_samples,)\n", " Labels of the training samples.\n", " Ttrain : [optional] array-like, shape (n_samples, n_columns)\n", " domain for corresponding training signals.\n", " \n", "3. Predict function:\n", " preds = scdt_nls_obj.predict(Xtest, Ttest, k, N)\n", " \n", " Inputs:\n", " ----------------\n", " Xtest : array-like, shape (n_samples, n_columns)\n", " 1D data for testing.\n", " Ttest : [optional] array-like, shape (n_samples, n_columns)\n", " domain for corresponding test signals.\n", " k : [Optional] pre-tuned parameter \n", " number of closest points to test sample\n", " N : [Optional] pre-tuned parameter \n", " number of sinusoidal bases used for subspace enrrichment\n", " \n", " Outputs:\n", " ----------------\n", " preds : 1d array, shape (n_samples,)\n", " Predicted labels for test samples.\n", " " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Example\n", "The following example will demonstrate how to:\n", "* create and initialize an instance of the class SCDT_NLS\n", "* train the model using 1D signals\n", "* apply the model to predict calss labels of the test 1D samples
\n", "\n", "In this example we have used a synthetic dataset (1D). The dataset contains three classes, each containing data generated from two templates." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Import python libraries" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "from numpy import interp\n", "import math\n", "import matplotlib.pyplot as plt\n", "from scipy.linalg import lstsq\n", "from scipy import signal\n", "import random\n", "\n", "import sys\n", "sys.path.append('../')\n", "from pytranskit.classification.utils import *" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Import SCDT-NLS class from PyTransKit package" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "from pytranskit.classification.scdt_nls import SCDT_NLS" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Define a six template signals (three classes each with two templates)" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "def signal_gabor_phi1(t):\n", " w = 25\n", " s1 = np.real(math.pi**-0.25 * np.exp(2*math.pi*1j*w*(t-0.5)) * np.exp(-250*((t-0.5)**2)))\n", " return s1\n", "\n", "def signal_gabor_phi2(t):\n", " w = 25\n", " s1 = (np.real(math.pi**-0.25 * np.exp(2*math.pi*1j*w*(t-0.35)) * np.exp(-250*((t-0.35)**2)))\n", " +0.25*np.real(math.pi**-0.25 * np.exp(2*math.pi*1j*w*(t-0.7)) * np.exp(-250*((t-0.7)**2))) )\n", " return s1\n", "\n", "def signal_sawtooth_phi1(t):\n", " s2 = signal.sawtooth(2 * np.pi * 15 * (t-0.5))* np.exp(-250*((t-0.5)**2))\n", " return s2\n", "\n", "def signal_sawtooth_phi2(t):\n", " s2 = (signal.sawtooth(2 * np.pi * 15 * (t-0.35))* np.exp(-250*((t-0.35)**2))\n", " +0.25*signal.sawtooth(2 * np.pi * 15 * (t-0.7))* np.exp(-250*((t-0.7)**2)) )\n", " return s2\n", "\n", "def signal_square_phi1(t):\n", " s3 = signal.square(2 * np.pi * 15 * (t-0.5))*np.exp(-250*((t-0.5)**2))\n", " return s3\n", "\n", "def signal_square_phi2(t):\n", " s3 = (signal.square(2 * np.pi * 15 * (t-0.35))*np.exp(-250*((t-0.35)**2)) \n", " + 0.2*signal.square(2 * np.pi * 15 * (t-0.70))*np.exp(-250*((t-0.70)**2)))\n", " return s3" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Generate template signals for three classes" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "N = 300 # number of discrete samples per signal\n", "\n", "## Define a set of signals\n", "t = np.linspace(0.,1.,N)\n", "\n", "s_template = []\n", "\n", "s_template.append(signal_gabor_phi1(t))\n", "s_template.append(signal_gabor_phi2(t))\n", "\n", "s_template.append(signal_sawtooth_phi1(t))\n", "s_template.append(signal_sawtooth_phi2(t))\n", "\n", "s_template.append(signal_square_phi1(t))\n", "s_template.append(signal_square_phi2(t))\n", "\n", "num_classes = len(s_template)//2\n", "\n", "## Plotting\n", "lw = 4.0\n", "fig, ax = plt.subplots(2, 3, sharex=False, sharey=False, figsize=(24,2*4))\n", "\n", "ax[0,0].plot(t,s_template[0],'tab:blue',lw=lw)\n", "ax[0,0].set_title(\"Class: 1\",fontsize=20)\n", "ax[0,0].set_yticks([])\n", "ax[0,0].set_xticks([])\n", "\n", "ax[0,1].plot(t,s_template[2],'tab:orange',lw=lw)\n", "ax[0,1].set_title(\"Class: 2\",fontsize=20)\n", "ax[0,1].set_yticks([])\n", "ax[0,1].set_xticks([])\n", "\n", "ax[0,2].plot(t,s_template[4],'k',lw=lw)\n", "ax[0,2].set_title(\"Class: 3\",fontsize=20)\n", "ax[0,2].set_yticks([])\n", "ax[0,2].set_xticks([])\n", "\n", "ax[1,0].plot(t,s_template[1],'tab:blue',ls='--',lw=lw)\n", "ax[1,0].set_yticks([])\n", "ax[1,0].set_xticks([])\n", "\n", "ax[1,1].plot(t,s_template[3],'tab:orange',ls='--',lw=lw)\n", "ax[1,1].set_yticks([])\n", "ax[1,1].set_xticks([])\n", "\n", "ax[1,2].plot(t,s_template[5],'k',ls='--',lw=lw)\n", "ax[1,2].set_yticks([])\n", "ax[1,2].set_xticks([])\n", "\n", "plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Define $g(t)$ to generate confounds, i.e., $s_g = g's\\circ g$" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "def g_function_gaussian_mix(x,centers, widths, scales):\n", " X = np.zeros(len(x))\n", " for center, width, scale in zip(centers, widths, scales):\n", " X = X + scale * np.exp(-(x - center) * (x - center) * width)\n", " X/=X.sum()\n", " return X" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Generate confounds for training set" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "6\n", "(100,)\n" ] } ], "source": [ "num_classes = len(s_template)//2\n", "\n", "# generate confounds\n", "L_slope = 10 # number of slope variations\n", "L_shift = 10\n", "s_conf = []\n", "slope_array = np.random.uniform(0.85, 1.15, L_slope) #np.linspace(0.96,1.04,L_slope)\n", "shift_array = np.random.uniform(-0.09,0.09,L_shift) #np.linspace(-0.01,0.01,L_shift)\n", "\n", "x_train = []\n", "y_train = []\n", "t_train = []\n", "tp_train = []\n", "coeff = np.zeros(5)\n", "for k in range(num_classes):\n", " for tmp in range(2):\n", " Nc = 0\n", " tmp_ind = 2*k + tmp\n", " s_conf = []\n", " y_conf = []\n", " t_conf = []\n", " for i_slope in range(L_slope):\n", " for i_shift in range(L_shift):\n", " nn = random.randint(2,10)\n", " centers = np.random.normal(0.5, 0.2, nn)\n", " widths = np.random.uniform(10.0, 50.0, nn)\n", " coeffs = np.random.uniform(0.1, 1.0, nn)\n", " coeffs /= coeffs.sum()\n", "\n", " gm = g_function_gaussian_mix(t,centers, widths, coeffs)\n", "\n", " slope = slope_array[i_slope]\n", " shift = shift_array[i_shift]\n", "\n", " g = np.cumsum(gm)*slope+shift\n", " dt = np.gradient(t)\n", " g_prime = np.gradient(g)/dt\n", " t_conf.append(g)\n", " \n", " if tmp_ind==0:\n", " sc = g_prime*signal_gabor_phi1(g) \n", " elif tmp_ind==1:\n", " sc = g_prime*signal_gabor_phi2(g) \n", " elif tmp_ind==2:\n", " sc = g_prime*signal_sawtooth_phi1(g) \n", " elif tmp_ind==3:\n", " sc = g_prime*signal_sawtooth_phi2(g) \n", " elif tmp_ind==4:\n", " sc = g_prime*signal_square_phi1(g) \n", " elif tmp_ind==5:\n", " sc = g_prime*signal_square_phi2(g) \n", " s_conf.append(sc)\n", " y_conf.append(k)\n", " Nc = Nc+1\n", " x_train.append(np.stack(s_conf, axis=0))\n", " y_train.append(np.stack(y_conf, axis=0))\n", " t_train.append(np.stack(t_conf, axis=0))\n", "print(len(x_train))\n", "print(y_train[0].shape)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Generate confounds for testing set" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "(300, 300)\n", "(300,)\n" ] } ], "source": [ "# generate confounds\n", "L_slope = 10 # number of slope variations\n", "L_shift = 5\n", "s_conf = []\n", "slope_array = np.random.uniform(0.85, 1.15, L_slope) #np.linspace(0.96,1.04,L_slope)\n", "shift_array = np.random.uniform(-0.09,0.09,L_shift) #np.linspace(-0.01,0.01,L_shift)\n", "\n", "s_conf = []\n", "y_conf = []\n", "t_conf = []\n", "coeff = np.zeros(5)\n", "for k in range(num_classes):\n", " for tmp in range(2):\n", " tmp_ind = 2*k + tmp\n", " for i_slope in range(L_slope):\n", " for i_shift in range(L_shift):\n", " nn = random.randint(2,10)\n", " centers = np.random.normal(0.5, 0.2, nn)\n", " widths = np.random.uniform(10.0, 50.0, nn)\n", " coeffs = np.random.uniform(0.1, 1.0, nn)\n", " coeffs /= coeffs.sum()\n", "\n", " gm = g_function_gaussian_mix(t,centers, widths, coeffs)\n", "\n", " slope = slope_array[i_slope]\n", " shift = shift_array[i_shift]\n", "\n", " g = np.cumsum(gm)*slope+shift\n", " dt = np.gradient(t)\n", " g_prime = np.gradient(g)/dt\n", " t_conf.append(g)\n", " if tmp_ind==0:\n", " sc = g_prime*signal_gabor_phi1(g) \n", " elif tmp_ind==1:\n", " sc = g_prime*signal_gabor_phi2(g) \n", " elif tmp_ind==2:\n", " sc = g_prime*signal_sawtooth_phi1(g) \n", " elif tmp_ind==3:\n", " sc = g_prime*signal_sawtooth_phi2(g) \n", " elif tmp_ind==4:\n", " sc = g_prime*signal_square_phi1(g) \n", " elif tmp_ind==5:\n", " sc = g_prime*signal_square_phi2(g) \n", " s_conf.append(sc)\n", " y_conf.append(k)\n", "x_test = np.stack(s_conf, axis=0)\n", "y_test = np.stack(y_conf, axis=0)\n", "t_test = np.stack(t_conf, axis=0)\n", "\n", "print(x_test.shape)\n", "print(y_test.shape)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Choose number of training samples per class\n", "In this example we have used 64 randomly chosen samples per class to train the model. The samples were chosen in such a way that the training set contains an equal number of samples generated from each template." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "n_samples_perclass = 64 ## number of training samples per class\n", "n_counts = x_train[0].shape[0] ## number of total samples per template\n", "x_train_sub = []\n", "y_train_sub = []\n", "\n", "for l in range(len(x_train)): ## iterate over set from each template\n", " data = x_train[l]\n", " y_data = y_train[l]\n", " index_array = random.sample(range(n_counts), n_samples_perclass//2)\n", " for i in range(len(index_array)):\n", " x_train_sub.append(data[index_array[i]])\n", " y_train_sub.append(y_data[index_array[i]])\n", "x_train_sub = np.stack(x_train_sub,axis = 0)\n", "y_train_sub = np.stack(y_train_sub,axis = 0)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create an instance for SCDT_NLS class" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "scdt_nls_obj = SCDT_NLS(num_classes)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Training Phase" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+++++++++++ Training Phase +++++++++++\n", "\n", "Calculating SCDTs for training data ...\n", "\n", "Tune parameters using validation set ...\n", "\n" ] } ], "source": [ "scdt_nls_obj.fit(x_train_sub, y_train_sub, no_local_enrichment=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Testing Phase" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+++++++++++ Testing Phase +++++++++++\n", "\n", "Calculating SCDTs for testing data ...\n", "\n", "Apply NLS algorithm in SCDT domain\n", "\n", "+++++++++++ Result +++++++++++\n", "\n", "Test accuracy: 100.0%\n" ] } ], "source": [ "from sklearn.metrics import accuracy_score\n", "\n", "preds = scdt_nls_obj.predict(x_test)\n", "\n", "print('+++++++++++ Result +++++++++++')\n", "print('\\nTest accuracy: {}%'.format(100*accuracy_score(y_test, preds)))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.7" } }, "nbformat": 4, "nbformat_minor": 4 }