In [13]:
# Load the TensorBoard notebook extension
%load_ext tensorboard

The tensorboard extension is already loaded. To reload it, use:
 %reload_ext tensorboard


In [14]:
import logging
import sys
import shutil
import os
import time
import json
import sys
import pandas as pd
import tensorflow as tf
import numpy as np
from tensorflow.python.lib.io import file_io
from elasticsearch import exceptions as expf
from elasticsearch import Elasticsearch


In [15]:
def tf_calc_confusion_matrix_ops(actuals, predictions):
 """Constructs the Tensorflow operations for obtaining the confusion matrix operators.

 Args:
 actuals (tf.tensor): tensor that contain actuals
 predictions (tf.tensor): tensor that contains predictions

 Returns:
 tensors: true_postive, true_negative, false_positive, false_negative

 """

 ones_like_actuals = tf.ones_like(actuals)
 zeros_like_actuals = tf.zeros_like(actuals)
 ones_like_predictions = tf.ones_like(predictions)
 zeros_like_predictions = tf.zeros_like(predictions)

 tp_op = tf.reduce_sum(
 tf.cast(
 tf.logical_and(
 tf.equal(actuals, ones_like_actuals),
 tf.equal(predictions, ones_like_predictions)
 ),
 "float"
 )
 )

 tn_op = tf.reduce_sum(
 tf.cast(
 tf.logical_and(
 tf.equal(actuals, zeros_like_actuals),
 tf.equal(predictions, zeros_like_predictions)
 ),
 "float"
 )
 )

 fp_op = tf.reduce_sum(
 tf.cast(
 tf.logical_and(
 tf.equal(actuals, zeros_like_actuals),
 tf.equal(predictions, ones_like_predictions)
 ),
 "float"
 )
 )

 fn_op = tf.reduce_sum(
 tf.cast(
 tf.logical_and(
 tf.equal(actuals, ones_like_actuals),
 tf.equal(predictions, zeros_like_predictions)
 ),
 "float"
 )
 )

 return tp_op, tn_op, fp_op, fn_op


def tf_calc_confusion_metrics(true_pos, true_neg, false_pos, false_neg):
 """Construct the Tensorflow operations for obtaining the confusion matrix.

 Args:
 true_pos (tf.tensor): tensor with true positives
 true_neg (tf.tensor): tensor with true negatives
 false_pos (tf.tensor): tensor with false positives
 false_neg (tf.tensor): tensor with false negatives

 Returns:
 tensor calculations: precision, recall, f1_score and accuracy

 """
 tpfn = float(true_pos) + float(false_neg)
 tpr = 0 if tpfn == 0 else float(true_pos) / tpfn

 total = float(true_pos) + float(false_pos) + float(false_neg) + float(true_neg)
 accuracy = 0 if total == 0 else (float(true_pos) + float(true_neg)) / total

 recall = tpr
 tpfp = float(true_pos) + float(false_pos)
 precision = 0 if tpfp == 0 else float(true_pos) / tpfp

 f1_score = 0 if recall == 0 else (2 * (precision * recall)) / (precision + recall)

 print('Precision = ', precision)
 print('Recall = ', recall)
 print('F1 Score = ', f1_score)
 print('Accuracy = ', accuracy)

 return {'precision': precision, 'recall': recall, 'f1': f1_score,
 'accuracy': accuracy}


def tf_confusion_matrix(model, actual_classes, session, feed_dict):
 """Calculates confusion matrix when training.

 Args:
 model (object): instance of the model class Object
 actual_classes (tf.tensor): tensor that contains the actual classes
 session (tf.session): tensorflow session in which the tensors are evaluated
 feed_dict (dict): dictionary with features and actual classes


 """

 predictions = tf.argmax(model, 1)
 actuals = tf.argmax(actual_classes, 1)
 tp_op, tn_op, fp_op, fn_op = tf_calc_confusion_matrix_ops(actuals, predictions)
 true_pos, true_neg, false_pos, false_neg = \
 session.run(
 [tp_op, tn_op, fp_op, fn_op],
 feed_dict
 )

 return tf_calc_confusion_metrics(true_pos, true_neg, false_pos, false_neg)

In [16]:

class FlatModel():
 """Neural network model that contains only single layer."""

 def __init__(self, nr_predictors, nr_classes):
 """

 Args:
 nr_predictors (int): amount of predictors
 nr_classes (int): amount of classes
 """
 self._nr_predictors = nr_predictors
 self._nr_classes = nr_classes

 @property
 def nr_predictors(self):
 """Amount of predictors property."""
 return self._nr_predictors

 @property
 def nr_classes(self):
 """Amount of classes property."""
 return self._nr_classes

 def build_model(self, feature_data):
 """Builds the tensorflow model.

 Args:
 feature_data (tf. tensors): feature tensors

 Returns:
 model: tensorflow graph

 """
 weights = tf.Variable(tf.truncated_normal([self._nr_predictors, self._nr_classes],
 stddev=0.0001))
 biases = tf.Variable(tf.ones([self._nr_classes]))

 model = tf.nn.softmax(tf.matmul(feature_data, weights) + biases)

 return model


class DeepModel():
 """Neural network model that contains two hidden layers."""

 def __init__(self, nr_predictors, nr_classes, dim_hidden1=50, dim_hidden2=25):
 """

 Args:
 nr_predictors (int): amount of predictors
 nr_classes (int): amount of classes
 dim_hidden1 (int): amount of neurons in first hidden layer
 dim_hidden2 (int): amount of neurons in second hidden layer
 """
 self._nr_predictors = nr_predictors
 self._nr_classes = nr_classes
 self.dim_hidden1 = dim_hidden1
 self.dim_hidden2 = dim_hidden2

 @property
 def nr_predictors(self):
 """Amount of predictors property."""
 return self._nr_predictors

 @property
 def nr_classes(self):
 """Amount of classes property."""
 return self._nr_classes

 def build_model(self, feature_data):
 """Builds the tensorflow model.

 Args:
 feature_data (tf. tensors): feature tensors

 Returns:
 model: tensorflow graph

 """
 weights1 = tf.Variable(tf.truncated_normal([self._nr_predictors, self.dim_hidden1],
 stddev=0.0001))
 biases1 = tf.Variable(tf.ones([self.dim_hidden1]))

 weights2 = tf.Variable(tf.truncated_normal([self.dim_hidden1, self.dim_hidden2],
 stddev=0.0001))
 biases2 = tf.Variable(tf.ones([self.dim_hidden2]))

 weights3 = tf.Variable(tf.truncated_normal([self.dim_hidden2, self.nr_classes],
 stddev=0.0001))
 biases3 = tf.Variable(tf.ones([self._nr_classes]))

 hidden_layer_1 = tf.nn.relu(tf.matmul(feature_data, weights1) + biases1)
 hidden_layer_2 = tf.nn.relu(tf.matmul(hidden_layer_1, weights2) + biases2)
 model = tf.nn.softmax(tf.matmul(hidden_layer_2, weights3) + biases3)

 return model

In [17]:

def load_data(tickers, es_address, year_cutoff=2010):
 """Load stock market data (close values for each day) for given tickers.

 Args:
 tickers (list): list of tickers

 Returns:
 pandas.dataframe: dataframe with close values of tickers

 """
 query = {
 "query": {
 "range" : {
 "Date" : {
 "gte": year_cutoff,
 }
 }
 }
}
 # get the data
 es = Elasticsearch( [es_address], port=9200 )
 res_source = {}
 for ticker in tickers:
 res_source[ticker] = [hit["_source"] 
 for hit in es.search(index=ticker, 
 body=query, 
 size=2000, 
 _source=["Date", "Close"]
 )['hits']['hits']]

 results = {}
 for ticker in tickers:
 results[ticker] = pd.DataFrame(res_source[ticker]).set_index('Date')

 # sort and fill blanks
 closing_data = pd.DataFrame()
 for ticker in tickers:
 closing_data['{}_close'.format(ticker)] = results[ticker]['Close']
 closing_data['{}_close'.format(ticker)] = pd.to_numeric(closing_data['{}_close'.format(ticker)],errors='coerce')
 closing_data.sort_index(inplace=True)
 closing_data.index = pd.to_datetime(closing_data.index, format='%Y-%m-%d%H:%M:%S%Z', 
 errors='coerce') 
 closing_data = closing_data.fillna(method='ffill')

 return closing_data


def preprocess_data(closing_data):
 """Preprocesses data into time series.

 Args:
 closing_data (pandas.dataframe): dataframe with close values of tickers

 Returns:
 pandas.dataframe: dataframe with time series

 """
 # transform into log return
 log_return_data = pd.DataFrame()
 tickers = [column_header.split("_")[0] for column_header in closing_data.columns.values]
 for ticker in tickers:
 log_return_data['{}_log_return'.format(ticker)] = np.log(
 closing_data['{}_close'.format(ticker)] /
 closing_data['{}_close'.format(ticker)].shift())

 log_return_data['snp_log_return_positive'] = 0
 log_return_data.ix[log_return_data['snp_log_return'] >= 0, 'snp_log_return_positive'] = 1
 log_return_data['snp_log_return_negative'] = 0
 log_return_data.ix[log_return_data['snp_log_return'] < 0, 'snp_log_return_negative'] = 1

 # create dataframe
 training_test_data = pd.DataFrame(
 columns=[
 'snp_log_return_positive', 'snp_log_return_negative',
 'snp_log_return_1', 'snp_log_return_2', 'snp_log_return_3',
 'nyse_log_return_1', 'nyse_log_return_2', 'nyse_log_return_3',
 'djia_log_return_1', 'djia_log_return_2', 'djia_log_return_3',
 'nikkei_log_return_0', 'nikkei_log_return_1', 'nikkei_log_return_2',
 'hangseng_log_return_0', 'hangseng_log_return_1', 'hangseng_log_return_2',
 'ftse_log_return_0', 'ftse_log_return_1', 'ftse_log_return_2',
 'dax_log_return_0', 'dax_log_return_1', 'dax_log_return_2',
 'aord_log_return_0', 'aord_log_return_1', 'aord_log_return_2'])

 # fill dataframe with time series
 for i in range(7, len(log_return_data)):
 training_test_data = training_test_data.append(
 {'snp_log_return_positive': log_return_data['snp_log_return_positive'].ix[i],
 'snp_log_return_negative': log_return_data['snp_log_return_negative'].ix[i],
 'snp_log_return_1': log_return_data['snp_log_return'].ix[i - 1],
 'snp_log_return_2': log_return_data['snp_log_return'].ix[i - 2],
 'snp_log_return_3': log_return_data['snp_log_return'].ix[i - 3],
 'nyse_log_return_1': log_return_data['nyse_log_return'].ix[i - 1],
 'nyse_log_return_2': log_return_data['nyse_log_return'].ix[i - 2],
 'nyse_log_return_3': log_return_data['nyse_log_return'].ix[i - 3],
 'djia_log_return_1': log_return_data['djia_log_return'].ix[i - 1],
 'djia_log_return_2': log_return_data['djia_log_return'].ix[i - 2],
 'djia_log_return_3': log_return_data['djia_log_return'].ix[i - 3],
 'nikkei_log_return_0': log_return_data['nikkei_log_return'].ix[i],
 'nikkei_log_return_1': log_return_data['nikkei_log_return'].ix[i - 1],
 'nikkei_log_return_2': log_return_data['nikkei_log_return'].ix[i - 2],
 'hangseng_log_return_0': log_return_data['hangseng_log_return'].ix[i],
 'hangseng_log_return_1': log_return_data['hangseng_log_return'].ix[i - 1],
 'hangseng_log_return_2': log_return_data['hangseng_log_return'].ix[i - 2],
 'ftse_log_return_0': log_return_data['ftse_log_return'].ix[i],
 'ftse_log_return_1': log_return_data['ftse_log_return'].ix[i - 1],
 'ftse_log_return_2': log_return_data['ftse_log_return'].ix[i - 2],
 'dax_log_return_0': log_return_data['dax_log_return'].ix[i],
 'dax_log_return_1': log_return_data['dax_log_return'].ix[i - 1],
 'dax_log_return_2': log_return_data['dax_log_return'].ix[i - 2],
 'aord_log_return_0': log_return_data['aord_log_return'].ix[i],
 'aord_log_return_1': log_return_data['aord_log_return'].ix[i - 1],
 'aord_log_return_2': log_return_data['aord_log_return'].ix[i - 2]},
 ignore_index=True)

 return training_test_data


def train_test_split(training_test_data, train_test_ratio=0.8):
 """Splits the data into a training and test set according to the provided ratio.

 Args:
 training_test_data (pandas.dataframe): dict with time series
 train_test_ratio (float): ratio of train test split

 Returns:
 tensors: predictors and classes tensors for training and respectively test set

 """
 predictors_tf = training_test_data[training_test_data.columns[2:]]
 classes_tf = training_test_data[training_test_data.columns[:2]]

 training_set_size = int(len(training_test_data) * train_test_ratio)

 train_test_dict = {'training_predictors_tf': predictors_tf[:training_set_size],
 'training_classes_tf': classes_tf[:training_set_size],
 'test_predictors_tf': predictors_tf[training_set_size:],
 'test_classes_tf': classes_tf[training_set_size:]}

 return train_test_dict


In [18]:
import os
import boto3
from botocore.exceptions import ClientError

def upload_to_storage(bucket, export_path, endpoint_url, access_key, secret_key):
 """Upload files from export path to RGW ceph storage.

 Args:
 bucket (str): RGW Cloud Storage bucket
 export_path (str): export path

 Returns:

 """
 s3 = boto3.client('s3', endpoint_url=endpoint_url ,aws_access_key_id=access_key,
 aws_secret_access_key=secret_key)
 s3.create_bucket(Bucket=bucket)
 for root, _, files in os.walk(export_path):
 for file in files:
 path = os.path.join(root, file)
 try:
 s3.upload_file(path, bucket, path)
 except ClientError as e:
 return False

def download_blob(bucket_name, source_blob_name, destination_file_name, endpoint_url, access_key, secret_key):
 """Downloads a blob from the bucket."""
 s3 = boto3.client('s3', endpoint_url=endpoint_url ,aws_access_key_id=access_key,
 aws_secret_access_key=secret_key)
 s3.download_file(bucket_name, source_blob_name, destination_file_name)
 
 print('File {} downloaded to {}.'.format(
 source_blob_name,
 destination_file_name))


def list_blobs(bucket_name, endpoint_url, access_key, secret_key, prefix):
 """Lists all the blobs in the bucket."""
 s3 = boto3.resource('s3', endpoint_url=endpoint_url ,aws_access_key_id=access_key,
 aws_secret_access_key=secret_key)
 bucket = s3.Bucket(bucket_name)
 return bucket.objects.filter(Prefix=prefix)

def copy_objects(bucket_name, endpoint_url, access_key, secret_key, path, new_path):
 s3 = boto3.resource('s3', endpoint_url=endpoint_url ,aws_access_key_id=access_key,
 aws_secret_access_key=secret_key)
 bucket = s3.Bucket(bucket_name)
 
 for obj in bucket.objects.filter(Prefix=path):
 old_source = { 'Bucket': bucket_name,
 'Key': obj.key}
 # replace the prefix
 new_key = obj.key.replace(path, new_path, 1)
 new_obj = bucket.Object(new_key)
 new_obj.copy(old_source) 


In [19]:
cutoff_year="2010"
bucket="KaleBucket"
endpoint_url="http://10.168.209.191"
access_key="MBVPDGAX9I28CFYRWLJ3"
secret_key="CTzANSlTT1ONhMKe4SKplL3arzyHinkHMqqIMOaS"
elastic_url="10.168.209.177"
model="DeepModel"
store_path="data/data_2010.csv"
epochs=30001
tag="v1"


In [20]:
logging.info('starting preprocessing of data..')
tickers = ['snp', 'nyse', 'djia', 'nikkei', 'hangseng', 'ftse', 'dax', 'aord']
closing_data = load_data(tickers, elastic_url, cutoff_year)
time_series = preprocess_data(closing_data)
logging.info('preprocessing of data complete..')

logging.info('starting uploading of the preprocessed data on Ceph..')
temp_folder = 'data'
if not os.path.exists(temp_folder):
 os.mkdir(temp_folder)
file_path = os.path.join(temp_folder, 'data_{}.csv'.format(cutoff_year))
time_series.to_csv(file_path, index=False)
upload_to_storage(bucket, temp_folder, endpoint_url, access_key, secret_key)
shutil.rmtree(temp_folder)

.ix is deprecated. Please use
.loc for label based indexing or
.iloc for positional indexing

See the documentation here:
http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated
.ix is deprecated. Please use
.loc for label based indexing or
.iloc for positional indexing

See the documentation here:
http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated
.ix is deprecated. Please use
.loc for label based indexing or
.iloc for positional indexing

See the documentation here:
http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated
.ix is deprecated. Please use
.loc for label based indexing or
.iloc for positional indexing

See the documentation here:
http://pandas.pydata.org/pandas-docs/stable/indexing.html#ix-indexer-is-deprecated
.ix is deprecated. Please use
.loc for label based indexing or
.iloc for positional indexing

See the documentation here:
http://pandas.pydata.org/pandas-docs/stable/indexing.html#

In [9]:
logging.info('getting the ML model...')
model = DeepModel(nr_predictors=24, nr_classes=2)
logging.info('getting the data...')
temp_folder = 'data'
if not os.path.exists(temp_folder):
 os.mkdir(temp_folder)
file_path = os.path.join(temp_folder, 'data.csv')
download_blob(bucket, store_path, file_path, endpoint_url, access_key, secret_key)
time_series = pd.read_csv(file_path)
training_test_data = train_test_split(time_series, 0.8)
 # define training objective
logging.info('defining the training objective...')
sess = tf.Session()
feature_data = tf.placeholder("float", [None, 24])
actual_classes = tf.placeholder("float", [None, 2])

model = model.build_model(feature_data)
cost = -tf.reduce_sum(actual_classes * tf.log(model))
train_opt = tf.train.AdamOptimizer(learning_rate=0.0001).minimize(cost)
init = tf.global_variables_initializer()
sess.run(init)

correct_prediction = tf.equal(tf.argmax(model, 1), tf.argmax(actual_classes, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, "float"))

logging.info('training the model...')
time_dct = {}
time_dct['start'] = time.time()
for i in range(1, epochs):
 sess.run(
 train_opt,
 feed_dict={
 feature_data: training_test_data['training_predictors_tf'].values,
 actual_classes: training_test_data['training_classes_tf'].values.reshape(
 len(training_test_data['training_classes_tf'].values), 2)
 }
 )
if i % 5000 == 0:
 train_acc = sess.run(
 accuracy,
 feed_dict={
 feature_data: training_test_data['training_predictors_tf'].values,
 actual_classes: training_test_data['training_classes_tf'].values.reshape(
 len(training_test_data['training_classes_tf'].values), 2)
 }
 )
 print(i, train_acc)
time_dct['end'] = time.time()
logging.info('training took {0:.2f} sec'.format(time_dct['end'] - time_dct['start']))


 # print results of confusion matrix
logging.info('validating model on test set...')
feed_dict = {
 feature_data: training_test_data['test_predictors_tf'].values,
 actual_classes: training_test_data['test_classes_tf'].values.reshape(
 len(training_test_data['test_classes_tf'].values), 2)
 }
test_acc = tf_confusion_matrix(model, actual_classes, sess,
 feed_dict)['accuracy']

 # create signature for TensorFlow Serving
logging.info('Exporting model for tensorflow-serving...')

export_path = os.path.join("models", tag)
tf.saved_model.simple_save(
 sess,
 export_path,
 inputs={'predictors': feature_data},
 outputs={'prediction': tf.argmax(model, 1),
 'model-tag': tf.constant([str(tag)])}
 )


File data/data_2010.csv downloaded to data/data.csv.
30000 0.7664931
Precision = 0.782608695652174
Recall = 0.625
F1 Score = 0.694980694980695
Accuracy = 0.726643598615917
Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.simple_save.


Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.simple_save.


Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.utils.build_tensor_info or tf.compat.v1.saved_model.build_tensor_info.


Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.utils.build_tensor_info or tf.compat.v1.saved_model.build_tensor_info.


INFO:tensorflow:Assets added to graph.


INFO:tensorflow:Assets added to graph.


INFO:tensorflow:No assets to write.


INFO:tensorflow:No assets to write.


INFO:tensorflow:SavedModel written to: models/v1/saved_model.pb


INFO:tensorflow:SavedModel written to: models/v1/saved_model.pb


In [10]:
# save model on Ceph
logging.info("uploading to " + bucket + "/" + export_path)
upload_to_storage(bucket, export_path, endpoint_url, access_key, secret_key)

metrics_info = {
 'metrics': [{
 'name': 'accuracy-train',
 'numberValue': float(train_acc),
 'format': "PERCENTAGE"
 }, {
 'name': 'accuracy-test',
 'numberValue': float(test_acc),
 'format': "PERCENTAGE"
}]}
with file_io.FileIO('mlpipeline-metrics.json', 'w') as f:
 json.dump(metrics_info, f)

with open("/tmp/accuracy", "w") as output_file:
 output_file.write(str(float(test_acc)))

 # remove local files

# get latest active version for TF serving directory
serving_dir = 'tfserving'
blobs = list_blobs(bucket, endpoint_url, 
 access_key, secret_key, 
 prefix=serving_dir)
version = set()
for blob in blobs:
 version.add(int(blob.key.split('/')[1]))
if version:
 new_version = max(version)+1
else:
 new_version = 1

 # copy the files
logging.info('deploying model %s as model number %s on TF serving', tag, new_version)

 # #TODO use upload from storage helper
copy_objects(bucket, endpoint_url, 
 access_key, secret_key, 
 'models/{}/'.format(tag), 
 'tfserving/{}/'.format(new_version) )