--- template: overrides/blogs.html tags: - machine learning --- # Kaggle Optiver竞赛前排大神开源方案 !!! info 作者:Void,发布于2021-08-27,阅读时间:约10分钟,微信公众号文章链接:[:fontawesome-solid-link:](https://mp.weixin.qq.com/s/tIowQSB7mX2j-gVQCHw1eg) ## 1 引言 故事的起因是8月26号的时候看到了一篇微信文章,Kaggle Optiver竞赛的前排(top 50)选手竟然公开了自己的方案。 本着看热闹以及学习的态度,就让我们一起来看一下具有无私开源精神的前排大神的方案吧。 ## 2 竞赛介绍 Optiver是非常top的对冲基金,赛题的要求是预测高频金融数据(股票)的波动率。为什么要预测波动率呢?一是因为波动率相较股价容易预测,二是准确预测波动率可以运用于期权定价等。 在金融范畴中,波动率有不同的衡量方式,如最简单的股票收益率的标准差,也有其他的,如高频中的已实现波动率,隐含波动率等。 赛题是一个回归问题,给出了10分钟的订单薄数据,需要我们预测未来10分钟的已实现波动率: $$ \sigma=\sqrt{\sum_{t} r_{t-1, t}^{2}} $$ 其中,r是股票的log收益率,即股价取log做差分。评价指标是RMSPE(root mean square percentage error): $$ \mathrm{RMSPE}=\sqrt{\frac{1}{n} \sum_{i=1}^{n}\left(\left(y_{i}-\hat{y}_{i}\right) / y_{i}\right)^{2}} $$ ## 3 具体代码 下面我们就来看看前排大神的[代码](https://www.kaggle.com/alexioslyon/lgbm-baseline/comments)。 ### 3.1 import packages 第一部分照例是导入一堆包。 ```python from IPython.core.display import display, HTML import pandas as pd import numpy as np # linear algebra import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv) import glob import os import gc from joblib import Parallel, delayed from sklearn import preprocessing, model_selection from sklearn.preprocessing import MinMaxScaler from sklearn.preprocessing import QuantileTransformer from sklearn.metrics import r2_score import matplotlib.pyplot as plt import seaborn as sns import numpy.matlib path_submissions = '/' target_name = 'target' scores_folds = {} ``` ### 3.2 特征工程 第二部分是构造了一堆特征。其中,题目说明了股价的计算方式采用WAP(weighted averaged price)的方式: $$ W A P=\frac{\text { BidPrice }_{1} * \text { AskSize }_{1}+\text { AskPrice }_{1} * \text { BidSize }_{1}}{\text { BidSize }_{1}+\text { AskSize }_{1}} $$ 这种计算方式适用于有买一卖一等的订单薄数据,同时考虑了价格和挂单量。 #### 3.2.1 订单薄特征 关于订单薄特征(在book_preprocessor函数中),我们可以看到构造了基于不同档位(买一卖一,买二卖二,买三卖三,买四卖四)WAP计算的已实现波动率,一些盘口的特征,如价差,size等等。并用滑动窗口,构造了以上特征的一些统计量。 ```python # data directory data_dir = '../input/optiver-realized-volatility-prediction/' # Function to calculate first WAP def calc_wap1(df): wap = (df['bid_price1'] * df['ask_size1'] + df['ask_price1'] * df['bid_size1']) / (df['bid_size1'] + df['ask_size1']) return wap # Function to calculate second WAP def calc_wap2(df): wap = (df['bid_price2'] * df['ask_size2'] + df['ask_price2'] * df['bid_size2']) / (df['bid_size2'] + df['ask_size2']) return wap def calc_wap3(df): wap = (df['bid_price1'] * df['bid_size1'] + df['ask_price1'] * df['ask_size1']) / (df['bid_size1'] + df['ask_size1']) return wap def calc_wap4(df): wap = (df['bid_price2'] * df['bid_size2'] + df['ask_price2'] * df['ask_size2']) / (df['bid_size2'] + df['ask_size2']) return wap # Function to calculate the log of the return # Remember that logb(x / y) = logb(x) - logb(y) def log_return(series): return np.log(series).diff() # Calculate the realized volatility def realized_volatility(series): return np.sqrt(np.sum(series**2)) # Function to count unique elements of a series def count_unique(series): return len(np.unique(series)) # Function to read our base train and test set def read_train_test(): train = pd.read_csv('../input/optiver-realized-volatility-prediction/train.csv') test = pd.read_csv('../input/optiver-realized-volatility-prediction/test.csv') # Create a key to merge with book and trade data train['row_id'] = train['stock_id'].astype(str) + '-' + train['time_id'].astype(str) test['row_id'] = test['stock_id'].astype(str) + '-' + test['time_id'].astype(str) print(f'Our training set has {train.shape[0]} rows') return train, test # Function to preprocess book data (for each stock id) def book_preprocessor(file_path): df = pd.read_parquet(file_path) # Calculate Wap df['wap1'] = calc_wap1(df) df['wap2'] = calc_wap2(df) df['wap3'] = calc_wap3(df) df['wap4'] = calc_wap4(df) # Calculate log returns df['log_return1'] = df.groupby(['time_id'])['wap1'].apply(log_return) df['log_return2'] = df.groupby(['time_id'])['wap2'].apply(log_return) df['log_return3'] = df.groupby(['time_id'])['wap3'].apply(log_return) df['log_return4'] = df.groupby(['time_id'])['wap4'].apply(log_return) # Calculate wap balance df['wap_balance'] = abs(df['wap1'] - df['wap2']) # Calculate spread df['price_spread'] = (df['ask_price1'] - df['bid_price1']) / ((df['ask_price1'] + df['bid_price1']) / 2) df['price_spread2'] = (df['ask_price2'] - df['bid_price2']) / ((df['ask_price2'] + df['bid_price2']) / 2) df['bid_spread'] = df['bid_price1'] - df['bid_price2'] df['ask_spread'] = df['ask_price1'] - df['ask_price2'] df["bid_ask_spread"] = abs(df['bid_spread'] - df['ask_spread']) df['total_volume'] = (df['ask_size1'] + df['ask_size2']) + (df['bid_size1'] + df['bid_size2']) df['volume_imbalance'] = abs((df['ask_size1'] + df['ask_size2']) - (df['bid_size1'] + df['bid_size2'])) # Dict for aggregations create_feature_dict = { 'wap1': [np.sum, np.std], 'wap2': [np.sum, np.std], 'wap3': [np.sum, np.std], 'wap4': [np.sum, np.std], 'log_return1': [realized_volatility], 'log_return2': [realized_volatility], 'log_return3': [realized_volatility], 'log_return4': [realized_volatility], 'wap_balance': [np.sum, np.max], 'price_spread':[np.sum, np.max], 'price_spread2':[np.sum, np.max], 'bid_spread':[np.sum, np.max], 'ask_spread':[np.sum, np.max], 'total_volume':[np.sum, np.max], 'volume_imbalance':[np.sum, np.max], "bid_ask_spread":[np.sum, np.max], } create_feature_dict_time = { 'log_return1': [realized_volatility], 'log_return2': [realized_volatility], 'log_return3': [realized_volatility], 'log_return4': [realized_volatility], } # Function to get group stats for different windows (seconds in bucket) def get_stats_window(fe_dict,seconds_in_bucket, add_suffix = False): # Group by the window df_feature = df[df['seconds_in_bucket'] >= seconds_in_bucket].groupby(['time_id']).agg(fe_dict).reset_index() # Rename columns joining suffix df_feature.columns = ['_'.join(col) for col in df_feature.columns] # Add a suffix to differentiate windows if add_suffix: df_feature = df_feature.add_suffix('_' + str(seconds_in_bucket)) return df_feature # Get the stats for different windows df_feature = get_stats_window(create_feature_dict,seconds_in_bucket = 0, add_suffix = False) df_feature_500 = get_stats_window(create_feature_dict_time,seconds_in_bucket = 500, add_suffix = True) df_feature_400 = get_stats_window(create_feature_dict_time,seconds_in_bucket = 400, add_suffix = True) df_feature_300 = get_stats_window(create_feature_dict_time,seconds_in_bucket = 300, add_suffix = True) df_feature_200 = get_stats_window(create_feature_dict_time,seconds_in_bucket = 200, add_suffix = True) df_feature_100 = get_stats_window(create_feature_dict_time,seconds_in_bucket = 100, add_suffix = True) # Merge all df_feature = df_feature.merge(df_feature_500, how = 'left', left_on = 'time_id_', right_on = 'time_id__500') df_feature = df_feature.merge(df_feature_400, how = 'left', left_on = 'time_id_', right_on = 'time_id__400') df_feature = df_feature.merge(df_feature_300, how = 'left', left_on = 'time_id_', right_on = 'time_id__300') df_feature = df_feature.merge(df_feature_200, how = 'left', left_on = 'time_id_', right_on = 'time_id__200') df_feature = df_feature.merge(df_feature_100, how = 'left', left_on = 'time_id_', right_on = 'time_id__100') # Drop unnecesary time_ids df_feature.drop(['time_id__500','time_id__400', 'time_id__300', 'time_id__200','time_id__100'], axis = 1, inplace = True) # Create row_id so we can merge stock_id = file_path.split('=')[1] df_feature['row_id'] = df_feature['time_id_'].apply(lambda x: f'{stock_id}-{x}') df_feature.drop(['time_id_'], axis = 1, inplace = True) return df_feature ``` #### 3.2.2 交易特征 这部分特征包括实际交易价格,成交量的相关特征,如波动率,最小值,最大值等等。 ```python # Function to preprocess trade data (for each stock id) def trade_preprocessor(file_path): df = pd.read_parquet(file_path) df['log_return'] = df.groupby('time_id')['price'].apply(log_return) df['amount']=df['price']*df['size'] # Dict for aggregations create_feature_dict = { 'log_return':[realized_volatility], 'seconds_in_bucket':[count_unique], 'size':[np.sum, np.max, np.min], 'order_count':[np.sum,np.max], 'amount':[np.sum,np.max,np.min], } create_feature_dict_time = { 'log_return':[realized_volatility], 'seconds_in_bucket':[count_unique], 'size':[np.sum], 'order_count':[np.sum], } # Function to get group stats for different windows (seconds in bucket) def get_stats_window(fe_dict,seconds_in_bucket, add_suffix = False): # Group by the window df_feature = df[df['seconds_in_bucket'] >= seconds_in_bucket].groupby(['time_id']).agg(fe_dict).reset_index() # Rename columns joining suffix df_feature.columns = ['_'.join(col) for col in df_feature.columns] # Add a suffix to differentiate windows if add_suffix: df_feature = df_feature.add_suffix('_' + str(seconds_in_bucket)) return df_feature # Get the stats for different windows df_feature = get_stats_window(create_feature_dict,seconds_in_bucket = 0, add_suffix = False) df_feature_500 = get_stats_window(create_feature_dict_time,seconds_in_bucket = 500, add_suffix = True) df_feature_400 = get_stats_window(create_feature_dict_time,seconds_in_bucket = 400, add_suffix = True) df_feature_300 = get_stats_window(create_feature_dict_time,seconds_in_bucket = 300, add_suffix = True) df_feature_200 = get_stats_window(create_feature_dict_time,seconds_in_bucket = 200, add_suffix = True) df_feature_100 = get_stats_window(create_feature_dict_time,seconds_in_bucket = 100, add_suffix = True) def tendency(price, vol): df_diff = np.diff(price) val = (df_diff/price[1:])*100 power = np.sum(val*vol[1:]) return(power) lis = [] for n_time_id in df['time_id'].unique(): df_id = df[df['time_id'] == n_time_id] tendencyV = tendency(df_id['price'].values, df_id['size'].values) f_max = np.sum(df_id['price'].values > np.mean(df_id['price'].values)) f_min = np.sum(df_id['price'].values < np.mean(df_id['price'].values)) df_max = np.sum(np.diff(df_id['price'].values) > 0) df_min = np.sum(np.diff(df_id['price'].values) < 0) # new abs_diff = np.median(np.abs( df_id['price'].values - np.mean(df_id['price'].values))) energy = np.mean(df_id['price'].values**2) iqr_p = np.percentile(df_id['price'].values,75) - np.percentile(df_id['price'].values,25) # vol vars abs_diff_v = np.median(np.abs( df_id['size'].values - np.mean(df_id['size'].values))) energy_v = np.sum(df_id['size'].values**2) iqr_p_v = np.percentile(df_id['size'].values,75) - np.percentile(df_id['size'].values,25) lis.append({'time_id':n_time_id,'tendency':tendencyV,'f_max':f_max,'f_min':f_min,'df_max':df_max,'df_min':df_min, 'abs_diff':abs_diff,'energy':energy,'iqr_p':iqr_p,'abs_diff_v':abs_diff_v,'energy_v':energy_v,'iqr_p_v':iqr_p_v}) df_lr = pd.DataFrame(lis) df_feature = df_feature.merge(df_lr, how = 'left', left_on = 'time_id_', right_on = 'time_id') # Merge all df_feature = df_feature.merge(df_feature_500, how = 'left', left_on = 'time_id_', right_on = 'time_id__500') df_feature = df_feature.merge(df_feature_400, how = 'left', left_on = 'time_id_', right_on = 'time_id__400') df_feature = df_feature.merge(df_feature_300, how = 'left', left_on = 'time_id_', right_on = 'time_id__300') df_feature = df_feature.merge(df_feature_200, how = 'left', left_on = 'time_id_', right_on = 'time_id__200') df_feature = df_feature.merge(df_feature_100, how = 'left', left_on = 'time_id_', right_on = 'time_id__100') # Drop unnecesary time_ids df_feature.drop(['time_id__500','time_id__400', 'time_id__300', 'time_id__200','time_id','time_id__100'], axis = 1, inplace = True) df_feature = df_feature.add_prefix('trade_') stock_id = file_path.split('=')[1] df_feature['row_id'] = df_feature['trade_time_id_'].apply(lambda x:f'{stock_id}-{x}') df_feature.drop(['trade_time_id_'], axis = 1, inplace = True) return df_feature # Function to get group stats for the stock_id and time_id def get_time_stock(df): vol_cols = ['log_return1_realized_volatility', 'log_return2_realized_volatility', 'log_return1_realized_volatility_400', 'log_return2_realized_volatility_400', 'log_return1_realized_volatility_300', 'log_return2_realized_volatility_300', 'log_return1_realized_volatility_200', 'log_return2_realized_volatility_200', 'trade_log_return_realized_volatility', 'trade_log_return_realized_volatility_400', 'trade_log_return_realized_volatility_300', 'trade_log_return_realized_volatility_200'] # Group by the stock id df_stock_id = df.groupby(['stock_id'])[vol_cols].agg(['mean', 'std', 'max', 'min', ]).reset_index() # Rename columns joining suffix df_stock_id.columns = ['_'.join(col) for col in df_stock_id.columns] df_stock_id = df_stock_id.add_suffix('_' + 'stock') # Group by the stock id df_time_id = df.groupby(['time_id'])[vol_cols].agg(['mean', 'std', 'max', 'min', ]).reset_index() # Rename columns joining suffix df_time_id.columns = ['_'.join(col) for col in df_time_id.columns] df_time_id = df_time_id.add_suffix('_' + 'time') # Merge with original dataframe df = df.merge(df_stock_id, how = 'left', left_on = ['stock_id'], right_on = ['stock_id__stock']) df = df.merge(df_time_id, how = 'left', left_on = ['time_id'], right_on = ['time_id__time']) df.drop(['stock_id__stock', 'time_id__time'], axis = 1, inplace = True) return df ``` #### 3.2.3 聚合特征 这一块采用了kmeans聚类的方式,按相似股票聚合,并计算以上特征的平均值作为新的特征。 ```python from sklearn.cluster import KMeans # making agg features train_p = pd.read_csv('../input/optiver-realized-volatility-prediction/train.csv') train_p = train_p.pivot(index='time_id', columns='stock_id', values='target') corr = train_p.corr() ids = corr.index kmeans = KMeans(n_clusters=7, random_state=0).fit(corr.values) print(kmeans.labels_) l = [] for n in range(7): l.append ( [ (x-1) for x in ( (ids+1)*(kmeans.labels_ == n)) if x > 0] ) mat = [] matTest = [] n = 0 for ind in l: print(ind) newDf = train.loc[train['stock_id'].isin(ind) ] newDf = newDf.groupby(['time_id']).agg(np.nanmean) newDf.loc[:,'stock_id'] = str(n)+'c1' mat.append ( newDf ) newDf = test.loc[test['stock_id'].isin(ind) ] newDf = newDf.groupby(['time_id']).agg(np.nanmean) newDf.loc[:,'stock_id'] = str(n)+'c1' matTest.append ( newDf ) n+=1 mat1 = pd.concat(mat).reset_index() mat1.drop(columns=['target'],inplace=True) mat2 = pd.concat(matTest).reset_index() ``` ### 3.3 并行化以及损失函数计算 并行化是对每只股票做特征计算的并行。 ```python # Funtion to make preprocessing function in parallel (for each stock id) def preprocessor(list_stock_ids, is_train = True): # Parrallel for loop def for_joblib(stock_id): # Train if is_train: file_path_book = data_dir + "book_train.parquet/stock_id=" + str(stock_id) file_path_trade = data_dir + "trade_train.parquet/stock_id=" + str(stock_id) # Test else: file_path_book = data_dir + "book_test.parquet/stock_id=" + str(stock_id) file_path_trade = data_dir + "trade_test.parquet/stock_id=" + str(stock_id) # Preprocess book and trade data and merge them df_tmp = pd.merge(book_preprocessor(file_path_book), trade_preprocessor(file_path_trade), on = 'row_id', how = 'left') # Return the merge dataframe return df_tmp # Use parallel api to call paralle for loop df = Parallel(n_jobs = -1, verbose = 1)(delayed(for_joblib)(stock_id) for stock_id in list_stock_ids) # Concatenate all the dataframes that return from Parallel df = pd.concat(df, ignore_index = True) return df # Function to calculate the root mean squared percentage error def rmspe(y_true, y_pred): return np.sqrt(np.mean(np.square((y_true - y_pred) / y_true))) # Function to early stop with root mean squared percentage error def feval_rmspe(y_pred, lgb_train): y_true = lgb_train.get_label() return 'RMSPE', rmspe(y_true, y_pred), False ``` ```python # Read train and test train, test = read_train_test() # Get unique stock ids train_stock_ids = train['stock_id'].unique() # Preprocess them using Parallel and our single stock id functions train_ = preprocessor(train_stock_ids, is_train = True) train = train.merge(train_, on = ['row_id'], how = 'left') # Get unique stock ids test_stock_ids = test['stock_id'].unique() # Preprocess them using Parallel and our single stock id functions test_ = preprocessor(test_stock_ids, is_train = False) test = test.merge(test_, on = ['row_id'], how = 'left') # Get group stats of time_id and stock_id train = get_time_stock(train) test = get_time_stock(test) ``` ### 3.4 模型训练 关于模型部分,采用了一个LightGBM和一个NN模型ensemble,这一块并没有太多独特的东西,有兴趣的可以看下源代码。 ### 3.5 模型融合及提交 Ensemble这块就是LGBM和NN的简单平均。 ```python test_nn["row_id"] = test_nn["stock_id"].astype(str) + "-" + test_nn["time_id"].astype(str) test_nn[target_name] = (test_predictions_nn+predictions_lgb)/2 score = round(rmspe(y_true = train_nn[target_name].values, y_pred = train_nn[pred_name].values),5) print('RMSPE {}: {} - Folds: {}'.format(model_name, score, scores_folds[model_name])) display(test_nn[['row_id', target_name]].head(3)) test_nn[['row_id', target_name]].to_csv('submission.csv',index = False) ``` ## 4 小结 可以看到前排大神在特征工程方面做了不少工作,同时工程上的良好实现也是有一个不错结果的必备基础。虽然模型并不fancy,但是结果依旧给力,值得我们好好学习。