<center>
<img src="../../img/ods_stickers.jpg">
## Открытый курс по машинному обучению
<center>Автор материала: Трунов Артем Геннадьевич, @datamove.

# <center>Pipeline, FeatureUnion – практика применения

## <center>Введение</center>
В этой статье будем разбираться с классами пакета sklearn, которые представляют значительное удобство и экономию времени в работе. Многие, наверное, любят, когда код иллюстрируется диаграммами классов или каким-нибудь метакодом, который позволяет убрать из поля зрения все детали реализации и оставить на виду только самое главное. Pipeline в sklearn - это и есть такой вот метакод, с помошью которого модель видна как на ладони.

В качестве же примеров будем использовать не измусоленные со всех сторон встроенные в sklearn датасеты, а знакомые читателю по домашним работам и соревнованию 'Alice' данные. Надеюсь, что кого-то эта статья побудит исправить свой код и вдохновит на новые засылки на Kaggle!

## <center>Pipeline</center>
Итак, начнем с перевода и определения. Русские слова "труба" и,тем паче, "трубопровод", мы, пожалуй, использовать не будем, а вот вариант "конвейер данных" кажется мне наиболее подходящим и благозвучным.

Документация Pipeline определяет этот класс как конвейер преобразования данных с финальным эстиматором (обучающей моделью), применяющийся для того, чтобы можно было легко менять параметры на каждом этапе конвейера и сравнивать результаты. С такой же легкостью можно заменять и сами этапы преобразований данных и финальную модель.

Давайте сразу окунемся в пример. Рассмотрим датасет Самсунга из домашней работы №7. Мы применяли  к данным алгоритм PCA для уменьшения размерности, а что бы он работал как надо, предварительно масштабировали данные. Для классификации использовали метод опорных векторов. Таким образом, наш конвейер будет состоять из двух шагов обработки (StandardScaler, PCA) и финальной модели (LinearSVC).

In [None]:
#изменить соответственно
PATH_TO_DATA="../../"

In [None]:
#загрузка данных
#На всякий случай - ссылка https://cloud.mail.ru/public/3EJK/cB2VXsyrP
import numpy as np

X_train = np.loadtxt(PATH_TO_DATA+"data/samsung_HAR/samsung_train.txt")
y_train = np.loadtxt(PATH_TO_DATA+"data/samsung_HAR/samsung_train_labels.txt").astype(int)

X_test = np.loadtxt(PATH_TO_DATA+"data/samsung_HAR/samsung_test.txt")
y_test = np.loadtxt(PATH_TO_DATA+"data/samsung_HAR/samsung_test_labels.txt").astype(int)

In [None]:
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import LinearSVC

pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('pca', PCA(n_components=65)),
    ('svc', LinearSVC())
])

Ну как, красиво? Давайте разберем. Конструктор Pipeline() принимает массив кортежей, в каждом из которых мнемоническое обозначение этапа преобразования и экземпляр класса преобразователя, инстанциированный "на лету". Первый этап предразования, 'scaler', принимает исходные данные, и выдает отмасштабированные на выход, который является входом второго этапа - 'pca'. Из 'pca' в 'svc' поступает урезанная матрица главных компонентов числом 65 штук. Обучение проводится именно на ней.

Давайте запустим обучение и получим результат.


In [None]:
pipeline.fit(X_train, y_train)

In [None]:
#"Валидируемся" на том же тренировочном датасете
pred = pipeline.predict(X_train)

from sklearn.metrics import accuracy_score, roc_auc_score

accuracy_score(pred, y_train)

Заметим, что метод fit() вызывается на всех этапах конвейера, а метод predict() - только для финального эстиматора. Тоже самое, разумеется, происходит и с другим набором данных:

In [None]:
#Валидируемся на тестовом датасете, для которого у нас есть разметка
test_pred = pipeline.predict(X_test)
accuracy_score(test_pred, y_test)

Это значит, что нам не надо тащить за собой хвост из преобразований тестовой выборки, об этом позаботится наш конвейер!

Так как конвейер обладает интерфейсом модели обучения (fit(), predict() etc), то мы можем использовать его напрямую с полюбившимися методами, такими как кросс-валидация:

In [None]:
from sklearn.model_selection import cross_val_score

cross_val_score(pipeline, X_train, y_train, cv=3)

а также GridSearchCV:

In [None]:
from sklearn.model_selection import GridSearchCV

gcv_params = {'pca__n_components': [20,60,100], 
              'svc__C': [0.001, 0.01, 0.1, 1, 10] }

gcv = GridSearchCV(pipeline, gcv_params, cv=3)
gcv.fit(X_train,y_train)

In [None]:
gcv.best_params_

"Так," - скажет внимательный читатель,- "а ведь нам надо было выбрать число компонент так, чтобы оставить 90% дисперсии исходных данных". Как же мы используем это условие в конвейере?". Вообще-то оно уже реализовано в классе PCA - достаточно передать параметер n_components=0.9 в конструктор класса. Но давайте сделаем сами. Для этого нам придется реализовать собственный класс эстиматора, для многих - первый в их жизни! Сейчас увидим, что на самом деле, это - легко!

Мы унаследуем класс PCA и перегрузим методы fit(), transform() и fit_tranaform() так, чтобы возвращать матрицу с числом компонентов, объясняющих exp_var% дисперсии.

Затем построим конвейер с новым классом.

In [None]:
class PCAExplainedVariance(PCA):
    #констуктор принимает и сохраняет значение желаемой дисперсии
    def __init__(self, exp_var=1.0 ):
        super().__init__(copy=True)
        self.exp_var = exp_var #желаемая дисперсия исходных данных
        self.N_ = 0 #число компонент, тербуемых для достижения заданной дисперсии

    # Находим соответствующее число компонент
    def fit(self, X, y=None):
        super().fit(X, y)
        self.N_ = len(X)
        cum_var = 0
        for i, component in enumerate(self.components_):
            cum_var += self.explained_variance_ratio_[i]
            if cum_var>=self.exp_var:
                self.N_ = i + 1
                break
              
    # возвращаем усеченный по числу компонент датасет
    def transform(self, X, y=None):
        U = X[:,:self.N_]
        return U
    
    # fit + transform в одном флаконе
    def fit_transform(self, X, y=None):
        self.fit(X)
        U = X[:, :self.N_]

        return U

In [None]:
#Снова собираем конвейер
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('pca', PCAExplainedVariance(exp_var=0.9)),
    ('svc', LinearSVC())
])

In [None]:
#На этот раз запустим с GridSearchCV
gcv_params = {'svc__C': [0.001, 0.01, 0.1, 1, 10] }
gcv = GridSearchCV(pipeline, gcv_params, cv=3)
gcv.fit(X_train, y_train)

In [None]:
gcv.best_params_

Объект конвейера предоставляет доступ и к экземплярам составляющих его классов. Например, чтобы посмотреть, какое число компонент оставил наш новый PCA-эстиматор:

In [None]:
gcv.best_estimator_.named_steps['pca'].N_

## <center>Feature Union</center>

Давайте идти дальше и расширять диапазон применяемых средств. Для этого возьмем в качестве примера более сложный случай.

В соревновании <a href="https://www.kaggle.com/c/catch-me-if-you-can-intruder-detection-through-webpage-session-tracking2">Catch me if you can</a> (aka "Alice") на Kaggle, мы отдельно обрабатываем посещаемые пользователями сайты с помощью техники Bag of Words, и отдельно конструируем новые признаки из чего только можно. Затем объединяем частотную матрицу с матрицей признаков и применяем логистическую регрессию.

Попробуем запрограммировать этот сценарий в конвейер.


In [None]:
#Загрузка и предобработка данных - код от @yorko
import pandas as pd

train_df = pd.read_csv(PATH_TO_DATA+"../Alice-comp/train_sessions.csv", index_col="session_id")
#test_df = pd.read_csv(PATH_TO_DATA+"../Alice-comp/test_sessions.csv", index_col="session_id")
        
# приведем колонки time1, ..., time10 к временному формату
times = ['time%s' % i for i in range(1, 11)]
train_df[times] = train_df[times].apply(pd.to_datetime).fillna(method='ffill', axis=1)
#test_df[times] = test_df[times].apply(pd.to_datetime).fillna(method='ffill', axis=1)

# отсортируем данные по времени
train_df = train_df.sort_values(by='time1')

sites = ['site%s' % i for i in range(1,11)]
train_df[sites] = train_df[sites].fillna(0).astype('int')
#test_df[sites] = test_df[sites].fillna(0).astype('int')

#целевая переменая
y_train = train_df['target']
train_df.drop('target', axis=1, inplace=True)

train_df.head()

Итак, у нас есть такой вот датафрейм и мы хотим:

    а) составить Bag Of Words из сайтов - код взят из ноутбука @yorko
    б) нагенерить признаки, связанные со временем, любезно подсказанные @yorko: year_month, start_hour, morning (последний признак - бинарный)
    
Реализуем a), б) по отдельности как классы-трансформеры, а потом объединим результаты.


In [None]:
from scipy.sparse import csr_matrix
# Этот класс-трансформер возвращает разреженную матрицу сайтов
#
from sklearn.base import BaseEstimator, TransformerMixin


class ColsToCountMatrix(BaseEstimator, TransformerMixin):
    #констуктор принимает и сохраняет название колонок для сливания в текст
    def __init__(self, columns=[]):
        self.columns=columns
        
    # fit() ничего не делает
    def fit(self, X, y = None):
        return self
    
    #преобразуем посещения сайтов в частотную матрицу
    def transform(self, X):
        # последовательность с индексами
        sites_flatten = X[self.columns].values.flatten()

        # искомая матрица
        sites_sparse = csr_matrix(([1] * sites_flatten.shape[0],
                                    sites_flatten,
                                    range(0, sites_flatten.shape[0] + 10, 10)))[:, 1:]
        return sites_sparse
#Unit test
sparse_matrix = ColsToCountMatrix(columns=sites).transform(train_df.head(3))
print(sparse_matrix.shape)
print(sparse_matrix)

In [None]:
# Этот класс-трансформер возвращает матрицу с новыми признаками
#
from sklearn.base import BaseEstimator, TransformerMixin


class TimeToFeatures(BaseEstimator, TransformerMixin):
    # берем и сохраняем колонки, которые используем для приготовления новых признаков
    def __init__(self, columns=[]):
        self.columns = columns
    # бездельник опять
    def fit(self,X,y=None):
        return self
    # работяга
    def transform(self, X):
        # это колонка 'time1' начального датафрейма
        time1=self.columns[0] 
        # создаем пустой датафрейм для новых признаков
        new_features = pd.DataFrame(index=X.index)
        # делаем новые признаки
        new_features['year_month'] = X[time1].apply(lambda ts: ts.year*100 + ts.month)
        new_features['start_hour'] = X[time1].apply(lambda ts: ts.hour)
        new_features['morning'] = new_features['start_hour'].apply(lambda sh: 1 if 4<sh<12 else 0)
        return new_features[['year_month','start_hour','morning']]
#Unit test
TimeToFeatures(columns=times).transform(train_df.head()).values

Давайте теперь применим FeatureUnion. Конструктор класса FeatureUnion, как и конструктор Pipeline, принимает список кортежей (название, класс-трансформер), а его метод transform() просто объединяет колонки, получившиеся после применения метода transform() для каждого из составных классов.


In [None]:
from sklearn.pipeline import FeatureUnion

fu=FeatureUnion([
            ('cols_to_text', ColsToCountMatrix(columns=sites)),
            ('time_to_features', TimeToFeatures(columns=times)),
])

In [None]:
#используем todense() для наглядности
full_matrix = fu.transform(train_df.head(3)).todense()
print(full_matrix.shape)
print(full_matrix)

В итоге - было 951 колонка частотной матрицы, 3 колонки новых признаков, стало 954 колонки.

Это еще не все! Раскроем возможность использовать FeatureUnion и Pipeline вместе.
Сразу поразим читателя, добавив этапы преобразования полученных данных, а так же модель обучения на объединенных данных.


In [None]:
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.linear_model import LogisticRegressionCV
from sklearn.pipeline import FeatureUnion, Pipeline
from sklearn.preprocessing import StandardScaler

logit_params={'scoring':'roc_auc','class_weight':'balanced',
             'Cs':range(1,5),'n_jobs':3, 'random_state':17}

#используем немного другой формат вызова FeatureUnion, 
#хотя веса для нашей модели не пригодятся, читатель будет знать о таких возможностях
pipeline = Pipeline([
    ('union', FeatureUnion(
        transformer_list=[
            ('text', Pipeline([
                ('cols_to_text', ColsToCountMatrix(columns=sites)),
                ('tfidf',TfidfTransformer()), 
            ])),
            ('new_features', Pipeline([
                ('time_to_features', TimeToFeatures(columns=times)),
                ('scaler', StandardScaler()),
            ])),
        ],
        transformer_weights={'text':1.0, 'features':1.0}
    )),
    ('logit',LogisticRegressionCV(**logit_params))
])

In [None]:
pipeline.fit(train_df, y_train)

In [None]:
# это подобранный перебором коэффициент регуляризации
pipeline.named_steps['logit'].C_

In [None]:
#таблица метрики ROC_AUC для С=[1,2,3,4] и трех выборок кросс-валидации (cv=3)
pipeline.named_steps['logit'].scores_

## <center>Заметки</center>

Что ж, неплохо получилось! Наша модель описывается 16-ю строками, после того как мы реализовали преобразования данных в классах-трансформерах. 

Давайте теперь разберем некоторые вопросы применения конвейеров и объединителей признаков.

1. Для того, чтобы сделать предсказания обученной модели для тестовой выборки, вызовите метод pipeline.predict_proba(df_proba)

2. Мы не можем (по крайней мере, с легкостью) в нашем конвейере сделать чаcтотную матрицу на объединенной тренировочной и тестовой выборках, как @yorko делал это на мастер-классе. Автор решил эту проблему таким образом. Вместо класса ColsToCountMatrix, который работает с колонками sites, используем класс ColsToText, определенный ниже. Он собирает сайты из всех колонок в "текст", который принимает библиотечный CountVectorizer. В констукторе этого класса читатель найдет не только опцию vocabulary для передачи словаря объединенной тренировочной и тестовой выборки, но и некоторые опции, которые имеет смысл попробовать для улучшения результатов модели.

3. Если читатель решит применить другую модель обучения, например SGDClassifier, в котором не реализована кросс-валидация, то можно "обернуть" его в GridSearchCV:

    <p>('gcv', GridSearchCV(SGDClassifier(**sgd_params), gcv_sgd_params, **gcv_params))</p>
    

Надеюсь, что читатель сможет теперь сам улучшать свою модель для соревнования - работать над признаками и подбирать параметры.


## <center>В заключение</center>
Что можно посоветовать читателю в плане дальнейшего изучения предмета?

1. Изучить официальную документацию: <a href="http://scikit-learn.org/stable/modules/generated/sklearn.pipeline.Pipeline.html">PipeLine</a>, <a href="http://scikit-learn.org/stable/modules/generated/sklearn.pipeline.FeatureUnion.html">FeatureUnion</a>, и разобрать статьи-примеры.
  
2. Взять на вооружение библиотеку <a href="https://github.com/rasbt/mlxtend">mlextend</a> Себастьяна Рашки. Там можно найти много интересных классов, не реализованных в стандартной библиотеке sklearn.

3. Посмотреть <a href="https://github.com/scikit-learn-contrib/sklearn-pandas">sklearn-pandas</a> - облегчение работы именно с датафреймами. Например, можно некоторые колонки преобразовать масштабированием, другие - по принципу one-hot-encoding.

4. Стремиться создавать такие конвейеры, которые позволяют быстро проверять модели и признаки.

Успехов!

In [None]:
#это подготовительный этап трансформации данных, 
#перед тем как применим CountVectorizer
from sklearn.base import BaseEstimator, TransformerMixin


class ColsToText(BaseEstimator, TransformerMixin):
    #конструктор принимает и сохраняет название колонок для сливания в текст
    def __init__(self,columns=[]):
        self.columns = columns
    # fit() отдыхает - делать нечего
    def fit(self, X, y= None):
        return self
    # сливаем содержимое колонок в одну строку, кроме нулей
    def transform(self, X):
        return X[self.columns]\
               .apply(lambda x: " ".join([str(a) for a in x.values if not a==0]), axis=1)\
               .values.reshape(len(X),1)
    #заметьте - возвращаем numpy.ndarray
# Unit test
ColsToText(columns=sites).transform(train_df.head())