{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "1.0.1\n" ] } ], "source": [ "import tensorflow as tf\n", "from tensorflow.examples.tutorials.mnist import input_data\n", "import numpy as np\n", "import math\n", "import matplotlib.pyplot as plt\n", "import pandas as pd\n", "%matplotlib inline\n", "print(tf.__version__)" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def drawErrorValues(epoch_list, train_error_value_list, validation_error_value_list, test_error_value_list):\n", " fig = plt.figure(figsize=(20, 5))\n", " plt.subplot(121)\n", " plt.plot(epoch_list, train_error_value_list, 'r', label='Train')\n", " plt.plot(epoch_list, validation_error_value_list, 'g', label='Validation')\n", " plt.ylabel('Total Error')\n", " plt.xlabel('Epochs')\n", " plt.grid(True)\n", " plt.legend(loc='upper right')\n", "\n", " plt.subplot(122)\n", " plt.plot(epoch_list, test_error_value_list, 'b', label='Test')\n", " plt.ylabel('Accuracy')\n", " plt.xlabel('Epochs')\n", " plt.yticks(np.arange(min(test_error_value_list), max(test_error_value_list), 100))\n", " plt.grid(True)\n", " plt.legend(loc='lower right') \n", " plt.show()" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": false }, "outputs": [], "source": [ "class RNN:\n", " def __init__(self, batch_size, n_steps, n_inputs, n_state_units, n_classes):\n", " self.batch_size = batch_size\n", " self.n_steps = n_steps\n", " self.n_inputs = n_inputs\n", " self.n_state_units = n_state_units\n", " self.n_classes = n_classes\n", " \n", " def set_data(self, train_data, val_data, test_data):\n", " self.train = train_data\n", " self.validation = val_data\n", " self.test = test_data\n", " \n", " def make_rnn(self, learning_rate):\n", " self.x = tf.placeholder(tf.float32, [None, self.n_steps, self.n_inputs])\n", " xt = tf.transpose(self.x, perm = [1, 0, 2])\n", " xr = tf.reshape(xt, shape = [-1, self.n_inputs])\n", " xs = tf.split(value=xr, num_or_size_splits=self.n_steps, axis=0)\n", " self.y = tf.placeholder(tf.float32, [None, n_classes])\n", " \n", " self.real_batch_size = tf.placeholder(tf.int32)\n", " \n", " rnn_cell = tf.contrib.rnn.BasicLSTMCell(num_units = n_state_units)\n", " initial_state = rnn_cell.zero_state(batch_size=self.real_batch_size, dtype=tf.float32)\n", " self.outputs, self.final_state = tf.contrib.rnn.static_rnn(rnn_cell, xs, initial_state=initial_state, dtype=tf.float32)\n", " \n", " self.W2 = tf.Variable(tf.random_normal([self.n_state_units, self.n_classes]))\n", " self.B = tf.Variable(tf.random_normal([self.n_classes]))\n", "\n", " self.pred = tf.matmul(self.outputs[-1], self.W2) + self.B\n", " self.error = tf.reduce_sum(tf.square(self.pred - self.y), axis=0)\n", " self.optimizer = tf.train.AdagradOptimizer(learning_rate=learning_rate).minimize(self.error)\n", " \n", " def learning(self, max_training_epochs):\n", " epoch_list = []\n", " train_error_value_list = []\n", " validation_error_value_list = []\n", " test_error_value_list = []\n", " test_accuracy_list = []\n", "\n", " init = tf.global_variables_initializer()\n", "\n", " with tf.Session() as sess:\n", " sess.run(init)\n", "\n", " print(\"batch_size\", self.batch_size)\n", " total_batch = int(math.ceil(len(self.train['x']) / float(self.batch_size)))\n", " print(\"Total batch: {0}\".format(total_batch))\n", "\n", " for epoch in range(max_training_epochs):\n", " for i in range(total_batch):\n", " batch_x = self.train['x'][i * self.batch_size: (i+1) * self.batch_size]\n", " batch_y = self.train['y'][i * self.batch_size: (i+1) * self.batch_size]\n", " sess.run((self.optimizer, self.error), feed_dict={\n", " self.x: batch_x, \n", " self.y: batch_y,\n", " self.real_batch_size: len(batch_x)\n", " })\n", "\n", " epoch_list.append(epoch)\n", " t_error_value = sess.run(self.error, feed_dict={\n", " self.x: self.train['x'], \n", " self.y: self.train['y'],\n", " self.real_batch_size: len(self.train['x'])\n", " })\n", " train_error_value_list.append(t_error_value)\n", "\n", " v_error_value = sess.run(self.error, feed_dict={\n", " self.x: self.validation['x'], \n", " self.y: self.validation['y'],\n", " self.real_batch_size: len(self.validation['x'])\n", " })\n", " validation_error_value_list.append(v_error_value)\n", "\n", " error_value = sess.run(self.error, feed_dict={\n", " self.x: self.test['x'], \n", " self.y: self.test['y'],\n", " self.real_batch_size: len(self.test['x']) \n", " })\n", " test_error_value_list.append(error_value)\n", " if (epoch % 100 == 0):\n", " print(\"epoch: {0}, test_error_value: {1}\".format(epoch, error_value))\n", "\n", " print(\"train complete!\")\n", " drawErrorValues(epoch_list, train_error_value_list, validation_error_value_list, test_error_value_list)\n", " \n", " def get_final_test_pred(self, data_x):\n", " init = tf.global_variables_initializer()\n", " with tf.Session() as sess:\n", " sess.run(init)\n", " pred = sess.run(self.pred, feed_dict={self.x: data_x})\n", " return pred" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def split_data(data, val_size=0.1, test_size=0.2):\n", " \"\"\"\n", " splits data to training, validation and testing parts\n", " \"\"\"\n", " ntest = int(round(len(data) * (1 - test_size)))\n", " nval = int(round(len(data) * (1 - (val_size + test_size))))\n", "\n", " train_data, val_data, test_data = data[:nval], data[nval:ntest], data[ntest:]\n", " return train_data, val_data, test_data\n", "\n", "def prepare_data(data, n_steps):\n", " x = []\n", " y = []\n", " for i in range(len(data) - n_steps - 1):\n", " x_item = []\n", " for j in range(n_steps):\n", " x_item.append([data[i + j]])\n", " x.append(x_item)\n", " y.append([data[i + n_steps]])\n", " x = np.asarray(x)\n", " y = np.asarray(y)\n", " return x, y \n", "\n", "def generate_data(func, x, n_steps):\n", " \"\"\"\n", " generates data with based on a function fct\n", " creates new data frame based on previous observation\n", " * example:\n", " l = [1, 2, 3, 4, 5]\n", " n_steps = 3\n", " -> [[[1], [2], [3]], [[2], [3], [4]], [[3], [4], [5]]]\n", " \"\"\"\n", " data = func(x)\n", " train_data, val_data, test_data = split_data(data, val_size=0.1, test_size=0.2)\n", " train = {}\n", " val = {}\n", " test = {}\n", " train['x'], train['y'] = prepare_data(train_data, n_steps)\n", " val['x'], val['y'] = prepare_data(val_data, n_steps)\n", " test['x'], test['y'] = prepare_data(test_data, n_steps)\n", " return train, val, test" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "batch_size = 128\n", "n_steps = 10\n", "n_inputs = 1\n", "n_state_units = 4\n", "n_classes = 1\n", "learning_rate = 0.01\n", "max_training_epochs = 10000\n", "\n", "if __name__ == '__main__':\n", " rnn = RNN(batch_size, n_steps, n_inputs, n_state_units, n_classes)\n", " train, val, test = generate_data(np.sin, np.linspace(0, 100, 100000), n_steps)\n", " rnn.set_data(train, val, test)\n", " rnn.make_rnn(learning_rate)\n", " rnn.learning(max_training_epochs)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "if __name__ == '__main__':\n", " pred = rnn.get_final_test_pred(train['x'])\n", " fig = plt.figure(figsize=(20, 5))\n", " plt.subplot(111)\n", " plt.plot(range(0, len(train['x'])), train['y'], 'b', label='True Sin Curv')\n", " plt.plot(range(0, len(train['x'])), pred, 'r', label='Predicted Sin Curv')\n", " plt.ylabel('Sine')\n", " plt.xlabel('-')\n", " plt.grid(True)\n", " plt.legend(loc='upper right')" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "if __name__ == '__main__':\n", " pred = rnn.get_final_test_pred(val['x'])\n", " fig = plt.figure(figsize=(20, 5))\n", " plt.subplot(111)\n", " plt.plot(range(0, len(val['x'])), val['y'], 'b', label='True Sin Curv')\n", " plt.plot(range(0, len(val['x'])), pred, 'r', label='Predicted Sin Curv')\n", " plt.ylabel('Sine')\n", " plt.xlabel('-')\n", " plt.grid(True)\n", " plt.legend(loc='upper right')" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "if __name__ == '__main__':\n", " pred = rnn.get_final_test_pred(test['x'])\n", " fig = plt.figure(figsize=(20, 5))\n", " plt.subplot(111)\n", " plt.plot(range(0, len(test['x'])), test['y'], 'b', label='True Sin Curv')\n", " plt.plot(range(0, len(test['x'])), pred, 'r', label='Predicted Sin Curv')\n", " plt.ylabel('Sine')\n", " plt.xlabel('-')\n", " plt.grid(True)\n", " plt.legend(loc='upper right')" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.0" } }, "nbformat": 4, "nbformat_minor": 0 }