{ "metadata": { "name": "BASIC_theano_III_RBM" }, "nbformat": 3, "nbformat_minor": 0, "worksheets": [ { "cells": [ { "cell_type": "code", "collapsed": false, "input": [ "## load data\n", "import cPickle\n", "import numpy as np\n", "from sklearn.cross_validation import train_test_split\n", "X, y = cPickle.load(open('data/digits.pkl', 'rb'))\n", "print X.shape, y.shape\n", "classes = np.unique(y)\n", "train_X, validation_X, train_y, validation_y = train_test_split(X, y, test_size = 0.2)" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "(42000, 784) (42000,)\n" ] } ], "prompt_number": 1 }, { "cell_type": "code", "collapsed": false, "input": [ "import theano\n", "import theano.tensor as T\n", "import time\n", "from theanoml2 import formula, optimize\n", "from theano.tensor.shared_randomstreams import RandomStreams\n", "v_train_X, v_validation_X = map(formula.share_data, [train_X, validation_X])\n", "v_train_y, v_validation_y = map(lambda data: formula.share_data(data, dtype='int32'), \n", " [train_y, validation_y])" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 15 }, { "cell_type": "code", "collapsed": false, "input": [ "## Restricted Boltzman Machine \n", "class FRBM(object):\n", " def __init__(self, n_visible, n_hidden, \n", " W = None, b_hid = None, b_vis = None,\n", " X = None):\n", " self.rng = np.random.RandomState(0)\n", " self.theano_rng = RandomStreams(self.rng.randint(2**30))\n", " ## model inputs\n", " self.X = X or T.matrix('X')\n", " ## model params\n", " self.n_visible = n_visible\n", " self.n_hidden = n_hidden\n", " if W is None:\n", " W_bound = np.sqrt(6. / (n_hidden + n_visible))\n", " W = theano.shared(value = np.asarray(\n", " self.rng.uniform(\n", " low = -4. * W_bound,\n", " high = 4. * W_bound,\n", " size = (n_visible, n_hidden)), \n", " dtype = theano.config.floatX),\n", " name = 'RBM_W', borrow = True)\n", " if b_hid is None:\n", " b_hid = theano.shared(np.zeros(n_hidden, dtype=theano.config.floatX),\n", " name = 'RBM_b_hid', borrow = True)\n", " if b_vis is None:\n", " b_vis = theano.shared(np.zeros(n_visible, dtype = theano.config.floatX),\n", " name = 'RBM_b_vis', borrow = True)\n", " self.W = W\n", " self.b_hid = b_hid\n", " self.b_vis = b_vis\n", " self.params = [self.W, self.b_hid, self.b_vis]\n", " ## model predictions\n", " ## model cost and error\n", " def free_energy(self, v_sample):\n", " wx_b = T.dot(v_sample, self.W) + self.b_hid\n", " vbias_term = T.dot(v_sample, self.b_vis)\n", " hidden_term = T.sum(T.log(1 + T.exp(wx_b)), axis = 1)\n", " return -hidden_term - vbias_term\n", " def propup(self, vis):\n", " pre_sigmoid_activation = T.dot(vis, self.W) + self.b_hid\n", " return (pre_sigmoid_activation, T.nnet.sigmoid(pre_sigmoid_activation))\n", " def sample_h_given_v(self, v0_sample):\n", " pre_sigmoid_h1, h1_mean = self.propup(v0_sample)\n", " h1_sample = self.theano_rng.binomial(size = h1_mean.shape,\n", " n = 1, p=h1_mean, dtype = theano.config.floatX)\n", " return [pre_sigmoid_h1, h1_mean, h1_sample]\n", " def propdown(self, hid):\n", " pre_sigmoid_activation = T.dot(hid, self.W.T) + self.b_vis\n", " return [pre_sigmoid_activation, T.nnet.sigmoid(pre_sigmoid_activation)]\n", " def sample_v_given_h(self, h0_sample):\n", " pre_sigmoid_v1, v1_mean = self.propdown(h0_sample)\n", " v1_sample = self.theano_rng.binomial(size = v1_mean.shape,\n", " n=1, p=v1_mean,\n", " dtype = theano.config.floatX)\n", " return (pre_sigmoid_v1, v1_mean, v1_sample)\n", " def gibbs_hvh(self, h0_sample):\n", " pre_sigmoid_v1, v1_mean, v1_sample = self.sample_v_given_h(h0_sample)\n", " pre_sigmoid_h1, h1_mean, h1_sample = self.sample_h_given_v(v1_sample)\n", " return (pre_sigmoid_v1, v1_mean, v1_sample,\n", " pre_sigmoid_h1, h1_mean, h1_sample)\n", " def gibbs_vhv(self, v0_sample):\n", " pre_sigmoid_h1, h1_mean, h1_sample = self.sample_h_given_v(v0_sample)\n", " pre_sigmoid_v1, v1_mean, v1_sample = self.sample_v_given_h(h1_sample)\n", " return (pre_sigmoid_h1, h1_mean, h1_sample,\n", " pre_sigmoid_v1, v1_mean, v1_sample)\n", " def get_cost_updates(lr, persistent = None, k = 1)\n", " \n", "def run_rbm(v_train_X, batch_size = 20, learning_rate = 0.1, n_epochs = 15,\n", " n_chains = 20, n_samples = 10, n_hidden = 500):\n", " n_train_batches = v_train_X.get_value(borrow = True).shape[0] / batch_size\n", " index = T.lscalar()\n", " x = T.matrix('x')\n", " ## initialize storage for the persistent chain\n", " persistent_chain = theano.shared(np.zeros((batch_size, n_hidden), \n", " dtype = theano.config.floatX), \n", " borrow = True)\n", " rbm = FRBM(X = x, n_visible = 28 * 28, n_hidden = n_hidden)\n", " cost, updates = rbm.get_cost_updates(lr = learning_rate, \n", " persistent = persistent_chain,\n", " k = 15)\n", " ## train the rbm\n", " train_rbm = theano.function([index], \n", " cost, \n", " updates = updates,\n", " givens = {\n", " x: v_train_X[index*batch_size:(index+1)*batch_size]\n", " },\n", " name = 'train_rbm')\n", " for epoch in xrange(n_epochs):\n", " mean_cost = np.mean([train_rbm(i) for i in xrange(n_train_batches)])\n", " print 'training epoch %d, cost is ' % epoch, np.mean(mean_cost)" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 11 }, { "cell_type": "code", "collapsed": false, "input": [ "\n", "rbm = FRBM(n_visible = 28 * 28, n_hidden = 100)" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 12 }, { "cell_type": "code", "collapsed": false, "input": [], "language": "python", "metadata": {}, "outputs": [] } ], "metadata": {} } ] }