{ "cells": [ { "cell_type": "code", "execution_count": 13, "metadata": { "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "The tensorboard extension is already loaded. To reload it, use:\n", " %reload_ext tensorboard\n" ] } ], "source": [ "# Load the TensorBoard notebook extension\n", "%load_ext tensorboard" ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "tags": [ "imports" ] }, "outputs": [], "source": [ "import logging\n", "import sys\n", "import shutil\n", "import os\n", "import time\n", "import json\n", "import sys\n", "import pandas as pd\n", "import tensorflow as tf\n", "import numpy as np\n", "from tensorflow.python.lib.io import file_io\n", "from elasticsearch import exceptions as expf\n", "from elasticsearch import Elasticsearch\n" ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "tags": [ "functions" ] }, "outputs": [], "source": [ "def tf_calc_confusion_matrix_ops(actuals, predictions):\n", " \"\"\"Constructs the Tensorflow operations for obtaining the confusion matrix operators.\n", "\n", " Args:\n", " actuals (tf.tensor): tensor that contain actuals\n", " predictions (tf.tensor): tensor that contains predictions\n", "\n", " Returns:\n", " tensors: true_postive, true_negative, false_positive, false_negative\n", "\n", " \"\"\"\n", "\n", " ones_like_actuals = tf.ones_like(actuals)\n", " zeros_like_actuals = tf.zeros_like(actuals)\n", " ones_like_predictions = tf.ones_like(predictions)\n", " zeros_like_predictions = tf.zeros_like(predictions)\n", "\n", " tp_op = tf.reduce_sum(\n", " tf.cast(\n", " tf.logical_and(\n", " tf.equal(actuals, ones_like_actuals),\n", " tf.equal(predictions, ones_like_predictions)\n", " ),\n", " \"float\"\n", " )\n", " )\n", "\n", " tn_op = tf.reduce_sum(\n", " tf.cast(\n", " tf.logical_and(\n", " tf.equal(actuals, zeros_like_actuals),\n", " tf.equal(predictions, zeros_like_predictions)\n", " ),\n", " \"float\"\n", " )\n", " )\n", "\n", " fp_op = tf.reduce_sum(\n", " tf.cast(\n", " tf.logical_and(\n", " tf.equal(actuals, zeros_like_actuals),\n", " tf.equal(predictions, ones_like_predictions)\n", " ),\n", " \"float\"\n", " )\n", " )\n", "\n", " fn_op = tf.reduce_sum(\n", " tf.cast(\n", " tf.logical_and(\n", " tf.equal(actuals, ones_like_actuals),\n", " tf.equal(predictions, zeros_like_predictions)\n", " ),\n", " \"float\"\n", " )\n", " )\n", "\n", " return tp_op, tn_op, fp_op, fn_op\n", "\n", "\n", "def tf_calc_confusion_metrics(true_pos, true_neg, false_pos, false_neg):\n", " \"\"\"Construct the Tensorflow operations for obtaining the confusion matrix.\n", "\n", " Args:\n", " true_pos (tf.tensor): tensor with true positives\n", " true_neg (tf.tensor): tensor with true negatives\n", " false_pos (tf.tensor): tensor with false positives\n", " false_neg (tf.tensor): tensor with false negatives\n", "\n", " Returns:\n", " tensor calculations: precision, recall, f1_score and accuracy\n", "\n", " \"\"\"\n", " tpfn = float(true_pos) + float(false_neg)\n", " tpr = 0 if tpfn == 0 else float(true_pos) / tpfn\n", "\n", " total = float(true_pos) + float(false_pos) + float(false_neg) + float(true_neg)\n", " accuracy = 0 if total == 0 else (float(true_pos) + float(true_neg)) / total\n", "\n", " recall = tpr\n", " tpfp = float(true_pos) + float(false_pos)\n", " precision = 0 if tpfp == 0 else float(true_pos) / tpfp\n", "\n", " f1_score = 0 if recall == 0 else (2 * (precision * recall)) / (precision + recall)\n", "\n", " print('Precision = ', precision)\n", " print('Recall = ', recall)\n", " print('F1 Score = ', f1_score)\n", " print('Accuracy = ', accuracy)\n", "\n", " return {'precision': precision, 'recall': recall, 'f1': f1_score,\n", " 'accuracy': accuracy}\n", "\n", "\n", "def tf_confusion_matrix(model, actual_classes, session, feed_dict):\n", " \"\"\"Calculates confusion matrix when training.\n", "\n", " Args:\n", " model (object): instance of the model class Object\n", " actual_classes (tf.tensor): tensor that contains the actual classes\n", " session (tf.session): tensorflow session in which the tensors are evaluated\n", " feed_dict (dict): dictionary with features and actual classes\n", "\n", "\n", " \"\"\"\n", "\n", " predictions = tf.argmax(model, 1)\n", " actuals = tf.argmax(actual_classes, 1)\n", " tp_op, tn_op, fp_op, fn_op = tf_calc_confusion_matrix_ops(actuals, predictions)\n", " true_pos, true_neg, false_pos, false_neg = \\\n", " session.run(\n", " [tp_op, tn_op, fp_op, fn_op],\n", " feed_dict\n", " )\n", "\n", " return tf_calc_confusion_metrics(true_pos, true_neg, false_pos, false_neg)" ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "tags": [ "functions" ] }, "outputs": [], "source": [ "\n", "class FlatModel():\n", " \"\"\"Neural network model that contains only single layer.\"\"\"\n", "\n", " def __init__(self, nr_predictors, nr_classes):\n", " \"\"\"\n", "\n", " Args:\n", " nr_predictors (int): amount of predictors\n", " nr_classes (int): amount of classes\n", " \"\"\"\n", " self._nr_predictors = nr_predictors\n", " self._nr_classes = nr_classes\n", "\n", " @property\n", " def nr_predictors(self):\n", " \"\"\"Amount of predictors property.\"\"\"\n", " return self._nr_predictors\n", "\n", " @property\n", " def nr_classes(self):\n", " \"\"\"Amount of classes property.\"\"\"\n", " return self._nr_classes\n", "\n", " def build_model(self, feature_data):\n", " \"\"\"Builds the tensorflow model.\n", "\n", " Args:\n", " feature_data (tf. tensors): feature tensors\n", "\n", " Returns:\n", " model: tensorflow graph\n", "\n", " \"\"\"\n", " weights = tf.Variable(tf.truncated_normal([self._nr_predictors, self._nr_classes],\n", " stddev=0.0001))\n", " biases = tf.Variable(tf.ones([self._nr_classes]))\n", "\n", " model = tf.nn.softmax(tf.matmul(feature_data, weights) + biases)\n", "\n", " return model\n", "\n", "\n", "class DeepModel():\n", " \"\"\"Neural network model that contains two hidden layers.\"\"\"\n", "\n", " def __init__(self, nr_predictors, nr_classes, dim_hidden1=50, dim_hidden2=25):\n", " \"\"\"\n", "\n", " Args:\n", " nr_predictors (int): amount of predictors\n", " nr_classes (int): amount of classes\n", " dim_hidden1 (int): amount of neurons in first hidden layer\n", " dim_hidden2 (int): amount of neurons in second hidden layer\n", " \"\"\"\n", " self._nr_predictors = nr_predictors\n", " self._nr_classes = nr_classes\n", " self.dim_hidden1 = dim_hidden1\n", " self.dim_hidden2 = dim_hidden2\n", "\n", " @property\n", " def nr_predictors(self):\n", " \"\"\"Amount of predictors property.\"\"\"\n", " return self._nr_predictors\n", "\n", " @property\n", " def nr_classes(self):\n", " \"\"\"Amount of classes property.\"\"\"\n", " return self._nr_classes\n", "\n", " def build_model(self, feature_data):\n", " \"\"\"Builds the tensorflow model.\n", "\n", " Args:\n", " feature_data (tf. tensors): feature tensors\n", "\n", " Returns:\n", " model: tensorflow graph\n", "\n", " \"\"\"\n", " weights1 = tf.Variable(tf.truncated_normal([self._nr_predictors, self.dim_hidden1],\n", " stddev=0.0001))\n", " biases1 = tf.Variable(tf.ones([self.dim_hidden1]))\n", "\n", " weights2 = tf.Variable(tf.truncated_normal([self.dim_hidden1, self.dim_hidden2],\n", " stddev=0.0001))\n", " biases2 = tf.Variable(tf.ones([self.dim_hidden2]))\n", "\n", " weights3 = tf.Variable(tf.truncated_normal([self.dim_hidden2, self.nr_classes],\n", " stddev=0.0001))\n", " biases3 = tf.Variable(tf.ones([self._nr_classes]))\n", "\n", " hidden_layer_1 = tf.nn.relu(tf.matmul(feature_data, weights1) + biases1)\n", " hidden_layer_2 = tf.nn.relu(tf.matmul(hidden_layer_1, weights2) + biases2)\n", " model = tf.nn.softmax(tf.matmul(hidden_layer_2, weights3) + biases3)\n", "\n", " return model" ] }, { "cell_type": "code", "execution_count": 17, "metadata": { "tags": [ "functions" ] }, "outputs": [], "source": [ "\n", "def load_data(tickers, es_address, year_cutoff=2010):\n", " \"\"\"Load stock market data (close values for each day) for given tickers.\n", "\n", " Args:\n", " tickers (list): list of tickers\n", "\n", " Returns:\n", " pandas.dataframe: dataframe with close values of tickers\n", "\n", " \"\"\"\n", " query = {\n", " \"query\": {\n", " \"range\" : {\n", " \"Date\" : {\n", " \"gte\": year_cutoff,\n", " }\n", " }\n", " }\n", "}\n", " # get the data\n", " es = Elasticsearch( [es_address], port=9200 )\n", " res_source = {}\n", " for ticker in tickers:\n", " res_source[ticker] = [hit[\"_source\"] \n", " for hit in es.search(index=ticker, \n", " body=query, \n", " size=2000, \n", " _source=[\"Date\", \"Close\"]\n", " )['hits']['hits']]\n", "\n", " results = {}\n", " for ticker in tickers:\n", " results[ticker] = pd.DataFrame(res_source[ticker]).set_index('Date')\n", "\n", " # sort and fill blanks\n", " closing_data = pd.DataFrame()\n", " for ticker in tickers:\n", " closing_data['{}_close'.format(ticker)] = results[ticker]['Close']\n", " closing_data['{}_close'.format(ticker)] = pd.to_numeric(closing_data['{}_close'.format(ticker)],errors='coerce')\n", " closing_data.sort_index(inplace=True)\n", " closing_data.index = pd.to_datetime(closing_data.index, format='%Y-%m-%d%H:%M:%S%Z', \n", " errors='coerce') \n", " closing_data = closing_data.fillna(method='ffill')\n", "\n", " return closing_data\n", "\n", "\n", "def preprocess_data(closing_data):\n", " \"\"\"Preprocesses data into time series.\n", "\n", " Args:\n", " closing_data (pandas.dataframe): dataframe with close values of tickers\n", "\n", " Returns:\n", " pandas.dataframe: dataframe with time series\n", "\n", " \"\"\"\n", " # transform into log return\n", " log_return_data = pd.DataFrame()\n", " tickers = [column_header.split(\"_\")[0] for column_header in closing_data.columns.values]\n", " for ticker in tickers:\n", " log_return_data['{}_log_return'.format(ticker)] = np.log(\n", " closing_data['{}_close'.format(ticker)] /\n", " closing_data['{}_close'.format(ticker)].shift())\n", "\n", " log_return_data['snp_log_return_positive'] = 0\n", " log_return_data.ix[log_return_data['snp_log_return'] >= 0, 'snp_log_return_positive'] = 1\n", " log_return_data['snp_log_return_negative'] = 0\n", " log_return_data.ix[log_return_data['snp_log_return'] < 0, 'snp_log_return_negative'] = 1\n", "\n", " # create dataframe\n", " training_test_data = pd.DataFrame(\n", " columns=[\n", " 'snp_log_return_positive', 'snp_log_return_negative',\n", " 'snp_log_return_1', 'snp_log_return_2', 'snp_log_return_3',\n", " 'nyse_log_return_1', 'nyse_log_return_2', 'nyse_log_return_3',\n", " 'djia_log_return_1', 'djia_log_return_2', 'djia_log_return_3',\n", " 'nikkei_log_return_0', 'nikkei_log_return_1', 'nikkei_log_return_2',\n", " 'hangseng_log_return_0', 'hangseng_log_return_1', 'hangseng_log_return_2',\n", " 'ftse_log_return_0', 'ftse_log_return_1', 'ftse_log_return_2',\n", " 'dax_log_return_0', 'dax_log_return_1', 'dax_log_return_2',\n", " 'aord_log_return_0', 'aord_log_return_1', 'aord_log_return_2'])\n", "\n", " # fill dataframe with time series\n", " for i in range(7, len(log_return_data)):\n", " training_test_data = training_test_data.append(\n", " {'snp_log_return_positive': log_return_data['snp_log_return_positive'].ix[i],\n", " 'snp_log_return_negative': log_return_data['snp_log_return_negative'].ix[i],\n", " 'snp_log_return_1': log_return_data['snp_log_return'].ix[i - 1],\n", " 'snp_log_return_2': log_return_data['snp_log_return'].ix[i - 2],\n", " 'snp_log_return_3': log_return_data['snp_log_return'].ix[i - 3],\n", " 'nyse_log_return_1': log_return_data['nyse_log_return'].ix[i - 1],\n", " 'nyse_log_return_2': log_return_data['nyse_log_return'].ix[i - 2],\n", " 'nyse_log_return_3': log_return_data['nyse_log_return'].ix[i - 3],\n", " 'djia_log_return_1': log_return_data['djia_log_return'].ix[i - 1],\n", " 'djia_log_return_2': log_return_data['djia_log_return'].ix[i - 2],\n", " 'djia_log_return_3': log_return_data['djia_log_return'].ix[i - 3],\n", " 'nikkei_log_return_0': log_return_data['nikkei_log_return'].ix[i],\n", " 'nikkei_log_return_1': log_return_data['nikkei_log_return'].ix[i - 1],\n", " 'nikkei_log_return_2': log_return_data['nikkei_log_return'].ix[i - 2],\n", " 'hangseng_log_return_0': log_return_data['hangseng_log_return'].ix[i],\n", " 'hangseng_log_return_1': log_return_data['hangseng_log_return'].ix[i - 1],\n", " 'hangseng_log_return_2': log_return_data['hangseng_log_return'].ix[i - 2],\n", " 'ftse_log_return_0': log_return_data['ftse_log_return'].ix[i],\n", " 'ftse_log_return_1': log_return_data['ftse_log_return'].ix[i - 1],\n", " 'ftse_log_return_2': log_return_data['ftse_log_return'].ix[i - 2],\n", " 'dax_log_return_0': log_return_data['dax_log_return'].ix[i],\n", " 'dax_log_return_1': log_return_data['dax_log_return'].ix[i - 1],\n", " 'dax_log_return_2': log_return_data['dax_log_return'].ix[i - 2],\n", " 'aord_log_return_0': log_return_data['aord_log_return'].ix[i],\n", " 'aord_log_return_1': log_return_data['aord_log_return'].ix[i - 1],\n", " 'aord_log_return_2': log_return_data['aord_log_return'].ix[i - 2]},\n", " ignore_index=True)\n", "\n", " return training_test_data\n", "\n", "\n", "def train_test_split(training_test_data, train_test_ratio=0.8):\n", " \"\"\"Splits the data into a training and test set according to the provided ratio.\n", "\n", " Args:\n", " training_test_data (pandas.dataframe): dict with time series\n", " train_test_ratio (float): ratio of train test split\n", "\n", " Returns:\n", " tensors: predictors and classes tensors for training and respectively test set\n", "\n", " \"\"\"\n", " predictors_tf = training_test_data[training_test_data.columns[2:]]\n", " classes_tf = training_test_data[training_test_data.columns[:2]]\n", "\n", " training_set_size = int(len(training_test_data) * train_test_ratio)\n", "\n", " train_test_dict = {'training_predictors_tf': predictors_tf[:training_set_size],\n", " 'training_classes_tf': classes_tf[:training_set_size],\n", " 'test_predictors_tf': predictors_tf[training_set_size:],\n", " 'test_classes_tf': classes_tf[training_set_size:]}\n", "\n", " return train_test_dict\n" ] }, { "cell_type": "code", "execution_count": 18, "metadata": { "tags": [ "functions" ] }, "outputs": [], "source": [ "import os\n", "import boto3\n", "from botocore.exceptions import ClientError\n", "\n", "def upload_to_storage(bucket, export_path, endpoint_url, access_key, secret_key):\n", " \"\"\"Upload files from export path to RGW ceph storage.\n", "\n", " Args:\n", " bucket (str): RGW Cloud Storage bucket\n", " export_path (str): export path\n", "\n", " Returns:\n", "\n", " \"\"\"\n", " s3 = boto3.client('s3', endpoint_url=endpoint_url ,aws_access_key_id=access_key,\n", " aws_secret_access_key=secret_key)\n", " s3.create_bucket(Bucket=bucket)\n", " for root, _, files in os.walk(export_path):\n", " for file in files:\n", " path = os.path.join(root, file)\n", " try:\n", " s3.upload_file(path, bucket, path)\n", " except ClientError as e:\n", " return False\n", "\n", "def download_blob(bucket_name, source_blob_name, destination_file_name, endpoint_url, access_key, secret_key):\n", " \"\"\"Downloads a blob from the bucket.\"\"\"\n", " s3 = boto3.client('s3', endpoint_url=endpoint_url ,aws_access_key_id=access_key,\n", " aws_secret_access_key=secret_key)\n", " s3.download_file(bucket_name, source_blob_name, destination_file_name)\n", " \n", " print('File {} downloaded to {}.'.format(\n", " source_blob_name,\n", " destination_file_name))\n", "\n", "\n", "def list_blobs(bucket_name, endpoint_url, access_key, secret_key, prefix):\n", " \"\"\"Lists all the blobs in the bucket.\"\"\"\n", " s3 = boto3.resource('s3', endpoint_url=endpoint_url ,aws_access_key_id=access_key,\n", " aws_secret_access_key=secret_key)\n", " bucket = s3.Bucket(bucket_name)\n", " return bucket.objects.filter(Prefix=prefix)\n", "\n", "def copy_objects(bucket_name, endpoint_url, access_key, secret_key, path, new_path):\n", " s3 = boto3.resource('s3', endpoint_url=endpoint_url ,aws_access_key_id=access_key,\n", " aws_secret_access_key=secret_key)\n", " bucket = s3.Bucket(bucket_name)\n", " \n", " for obj in bucket.objects.filter(Prefix=path):\n", " old_source = { 'Bucket': bucket_name,\n", " 'Key': obj.key}\n", " # replace the prefix\n", " new_key = obj.key.replace(path, new_path, 1)\n", " new_obj = bucket.Object(new_key)\n", " new_obj.copy(old_source) \n" ] }, { "cell_type": "code", "execution_count": 19, "metadata": { "tags": [ "pipeline-parameters" ] }, "outputs": [], "source": [ "cutoff_year=\"2010\"\n", "bucket=\"KaleBucket\"\n", "endpoint_url=\"http://10.168.209.191\"\n", "access_key=\"MBVPDGAX9I28CFYRWLJ3\"\n", "secret_key=\"CTzANSlTT1ONhMKe4SKplL3arzyHinkHMqqIMOaS\"\n", "elastic_url=\"10.168.209.177\"\n", "model=\"DeepModel\"\n", "store_path=\"data/data_2010.csv\"\n", "epochs=30001\n", "tag=\"v1\"\n" ] }, { "cell_type": "code", "execution_count": 20, "metadata": { "tags": [ "block:preprocess" ] }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:67: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n", "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:69: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n", "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:87: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n", "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:88: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n", "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:89: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n", "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:90: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n", "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:91: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n", "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:92: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n", "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:93: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n", "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:94: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n", "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:95: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n", "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:96: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n", "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:97: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n", "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:98: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n", "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:99: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n", "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:100: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n", "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:101: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n", "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:102: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n", "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:103: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n", "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:104: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n", "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:105: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n", "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:106: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n", "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:107: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n", "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:108: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n", "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:109: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n", "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:110: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n", "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:111: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n", "/usr/local/lib/python3.6/dist-packages/ipykernel_launcher.py:112: DeprecationWarning: \n", ".ix is deprecated. Please use\n", ".loc for label based indexing or\n", ".iloc for positional indexing\n", "\n", "See the documentation here:\n", "http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated\n" ] } ], "source": [ "logging.info('starting preprocessing of data..')\n", "tickers = ['snp', 'nyse', 'djia', 'nikkei', 'hangseng', 'ftse', 'dax', 'aord']\n", "closing_data = load_data(tickers, elastic_url, cutoff_year)\n", "time_series = preprocess_data(closing_data)\n", "logging.info('preprocessing of data complete..')\n", "\n", "logging.info('starting uploading of the preprocessed data on Ceph..')\n", "temp_folder = 'data'\n", "if not os.path.exists(temp_folder):\n", " os.mkdir(temp_folder)\n", "file_path = os.path.join(temp_folder, 'data_{}.csv'.format(cutoff_year))\n", "time_series.to_csv(file_path, index=False)\n", "upload_to_storage(bucket, temp_folder, endpoint_url, access_key, secret_key)\n", "shutil.rmtree(temp_folder)" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "tags": [ "block:train", "prev:preprocess" ] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "File data/data_2010.csv downloaded to data/data.csv.\n", "30000 0.7664931\n", "Precision = 0.782608695652174\n", "Recall = 0.625\n", "F1 Score = 0.694980694980695\n", "Accuracy = 0.726643598615917\n", "WARNING:tensorflow:From :70: simple_save (from tensorflow.python.saved_model.simple_save) is deprecated and will be removed in a future version.\n", "Instructions for updating:\n", "This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.simple_save.\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "WARNING:tensorflow:From :70: simple_save (from tensorflow.python.saved_model.simple_save) is deprecated and will be removed in a future version.\n", "Instructions for updating:\n", "This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.simple_save.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "WARNING:tensorflow:From /usr/local/lib/python3.6/dist-packages/tensorflow/python/saved_model/signature_def_utils_impl.py:201: build_tensor_info (from tensorflow.python.saved_model.utils_impl) is deprecated and will be removed in a future version.\n", "Instructions for updating:\n", "This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.utils.build_tensor_info or tf.compat.v1.saved_model.build_tensor_info.\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "WARNING:tensorflow:From /usr/local/lib/python3.6/dist-packages/tensorflow/python/saved_model/signature_def_utils_impl.py:201: build_tensor_info (from tensorflow.python.saved_model.utils_impl) is deprecated and will be removed in a future version.\n", "Instructions for updating:\n", "This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.utils.build_tensor_info or tf.compat.v1.saved_model.build_tensor_info.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:Assets added to graph.\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "INFO:tensorflow:Assets added to graph.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:No assets to write.\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "INFO:tensorflow:No assets to write.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "INFO:tensorflow:SavedModel written to: models/v1/saved_model.pb\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "INFO:tensorflow:SavedModel written to: models/v1/saved_model.pb\n" ] } ], "source": [ "logging.info('getting the ML model...')\n", "model = DeepModel(nr_predictors=24, nr_classes=2)\n", "logging.info('getting the data...')\n", "temp_folder = 'data'\n", "if not os.path.exists(temp_folder):\n", " os.mkdir(temp_folder)\n", "file_path = os.path.join(temp_folder, 'data.csv')\n", "download_blob(bucket, store_path, file_path, endpoint_url, access_key, secret_key)\n", "time_series = pd.read_csv(file_path)\n", "training_test_data = train_test_split(time_series, 0.8)\n", " # define training objective\n", "logging.info('defining the training objective...')\n", "sess = tf.Session()\n", "feature_data = tf.placeholder(\"float\", [None, 24])\n", "actual_classes = tf.placeholder(\"float\", [None, 2])\n", "\n", "model = model.build_model(feature_data)\n", "cost = -tf.reduce_sum(actual_classes * tf.log(model))\n", "train_opt = tf.train.AdamOptimizer(learning_rate=0.0001).minimize(cost)\n", "init = tf.global_variables_initializer()\n", "sess.run(init)\n", "\n", "correct_prediction = tf.equal(tf.argmax(model, 1), tf.argmax(actual_classes, 1))\n", "accuracy = tf.reduce_mean(tf.cast(correct_prediction, \"float\"))\n", "\n", "logging.info('training the model...')\n", "time_dct = {}\n", "time_dct['start'] = time.time()\n", "for i in range(1, epochs):\n", " sess.run(\n", " train_opt,\n", " feed_dict={\n", " feature_data: training_test_data['training_predictors_tf'].values,\n", " actual_classes: training_test_data['training_classes_tf'].values.reshape(\n", " len(training_test_data['training_classes_tf'].values), 2)\n", " }\n", " )\n", "if i % 5000 == 0:\n", " train_acc = sess.run(\n", " accuracy,\n", " feed_dict={\n", " feature_data: training_test_data['training_predictors_tf'].values,\n", " actual_classes: training_test_data['training_classes_tf'].values.reshape(\n", " len(training_test_data['training_classes_tf'].values), 2)\n", " }\n", " )\n", " print(i, train_acc)\n", "time_dct['end'] = time.time()\n", "logging.info('training took {0:.2f} sec'.format(time_dct['end'] - time_dct['start']))\n", "\n", "\n", " # print results of confusion matrix\n", "logging.info('validating model on test set...')\n", "feed_dict = {\n", " feature_data: training_test_data['test_predictors_tf'].values,\n", " actual_classes: training_test_data['test_classes_tf'].values.reshape(\n", " len(training_test_data['test_classes_tf'].values), 2)\n", " }\n", "test_acc = tf_confusion_matrix(model, actual_classes, sess,\n", " feed_dict)['accuracy']\n", "\n", " # create signature for TensorFlow Serving\n", "logging.info('Exporting model for tensorflow-serving...')\n", "\n", "export_path = os.path.join(\"models\", tag)\n", "tf.saved_model.simple_save(\n", " sess,\n", " export_path,\n", " inputs={'predictors': feature_data},\n", " outputs={'prediction': tf.argmax(model, 1),\n", " 'model-tag': tf.constant([str(tag)])}\n", " )\n" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "tags": [ "block:deploy", "prev:train" ] }, "outputs": [], "source": [ "# save model on Ceph\n", "logging.info(\"uploading to \" + bucket + \"/\" + export_path)\n", "upload_to_storage(bucket, export_path, endpoint_url, access_key, secret_key)\n", "\n", "metrics_info = {\n", " 'metrics': [{\n", " 'name': 'accuracy-train',\n", " 'numberValue': float(train_acc),\n", " 'format': \"PERCENTAGE\"\n", " }, {\n", " 'name': 'accuracy-test',\n", " 'numberValue': float(test_acc),\n", " 'format': \"PERCENTAGE\"\n", "}]}\n", "with file_io.FileIO('mlpipeline-metrics.json', 'w') as f:\n", " json.dump(metrics_info, f)\n", "\n", "with open(\"/tmp/accuracy\", \"w\") as output_file:\n", " output_file.write(str(float(test_acc)))\n", "\n", " # remove local files\n", "\n", "# get latest active version for TF serving directory\n", "serving_dir = 'tfserving'\n", "blobs = list_blobs(bucket, endpoint_url, \n", " access_key, secret_key, \n", " prefix=serving_dir)\n", "version = set()\n", "for blob in blobs:\n", " version.add(int(blob.key.split('/')[1]))\n", "if version:\n", " new_version = max(version)+1\n", "else:\n", " new_version = 1\n", "\n", " # copy the files\n", "logging.info('deploying model %s as model number %s on TF serving', tag, new_version)\n", "\n", " # #TODO use upload from storage helper\n", "copy_objects(bucket, endpoint_url, \n", " access_key, secret_key, \n", " 'models/{}/'.format(tag), \n", " 'tfserving/{}/'.format(new_version) )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "kubeflow_notebook": { "autosnapshot": false, "docker_image": "localhost:32000/kale-custom", "experiment": { "id": "new", "name": "test" }, "experiment_name": "test", "katib_metadata": { "algorithm": { "algorithmName": "grid" }, "maxFailedTrialCount": 3, "maxTrialCount": 12, "objective": { "objectiveMetricName": "", "type": "minimize" }, "parallelTrialCount": 3, "parameters": [] }, "katib_run": false, "pipeline_description": "Share some candies between three lovely kids", "pipeline_name": "financial-time-series", "snapshot_volumes": false, "steps_defaults": [], "volumes": [] }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.8" } }, "nbformat": 4, "nbformat_minor": 4 }