{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "**Important Disclaimer:** Mockup. So far, the example here does not support\n", "fit or predict, let alone hyperparameter tuning etc.\n", "\n", "The pipeline shown here assumes the example input tables from\n", "Fig. 2\n", "of the following paper:\n", "Hoang Thanh Lam, Johann-Michael Thiebaut, Mathieu Sinn, Bei Chen, Tiep Mai, and Oznur Alkan.\n", "\"One button machine for automating feature engineering in relational databases\". 2017. \n", "https://arxiv.org/abs/1706.00327" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "from lale.expressions import it, replace, sum, max, count, month, day_of_month, item\n", "import numpy as np" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "from lale.lib.lale import Scan, Join, Map, GroupBy, Aggregate, ConcatFeatures\n", "from sklearn.feature_selection import SelectKBest as SelectFeatures\n", "from sklearn.pipeline import Pipeline\n", "from lale.lib.autoai_libs import NumpyColumnSelector, CatEncoder, OptStandardScaler, FS1\n", "from sklearn.linear_model import LogisticRegression as LR\n", "from sklearn.neighbors import KNeighborsClassifier as KNN\n", "from xgboost import XGBClassifier as XGBoost\n", "import lale\n", "lale.wrap_imported_operators()" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "# one-to-one path doesn't need GroupBy >> Aggregate\n", "info_features = (\n", " (Scan(table=it.main) & Scan(table=it.info))\n", " >> Join(pred=[it.main.TrainId == it.info.Train_Id, #note the underscore\n", " it.main['Arrival time'] >= it.info.TimeStamp])\n", " >> Map(columns=[replace(it['Train class'], {'Regional': 0, 'Intercity': 1}),\n", " it['Max Speed (km/h)'],\n", " month(it['Arrival time'], fmt='YYYY-MM-DD HH:MM:SS'),\n", " day_of_month(it['Arrival time'])]))\n", "# one-to-many path (multiple delay rows per main-table row)\n", "delay_features = (\n", " (Scan(table=it.main) & Scan(table=it.delay))\n", " >> Join(pred=[it.main.TrainId == it.delay.TrainId,\n", " it.main['Arrival time'] >= it.delay.TimeStamp])\n", " >> GroupBy(key=it.MessageId) #primary key of main table\n", " >> Aggregate(columns=[sum(it.Delay), max(it.Delay)]))\n", "# multi-hop one-to-many path uses multi-way join\n", "event_features = (\n", " (Scan(table=it.main) & Scan(table=it.delay) & Scan(table=it.event))\n", " >> Join(pred=[it.main.TrainId == it.delay.TrainId,\n", " it.main['Arrival time'] >= it.delay.TimeStamp,\n", " it.delay.StationId == it.event.StationId,\n", " it.main.TimeStamp >= it.event.TimeStamp])\n", " >> GroupBy(key=it.MessageId) #primary key of main table\n", " >> Aggregate(columns=[count(it.Event),\n", " item(it['Train class'], 'Roadwork')]))" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "all_features = Pipeline(steps=[('data_joins',\n", " (info_features & delay_features & event_features)\n", " >> ConcatFeatures >> SelectFeatures())])" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "cats_prep = NumpyColumnSelector(columns=[0]) >> CatEncoder(dtype=np.float64)\n", "cont_prep = NumpyColumnSelector(columns=[1,2]) >> OptStandardScaler(use_scaler_flag=True)\n", "all_prep = Pipeline(steps=[('preprocessing',\n", " (cats_prep & cont_prep) >> ConcatFeatures >> FS1(additional_col_count_to_keep=3))])" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "classifier = LR | KNN | XGBoost" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "pipeline = all_features >> all_prep >> classifier" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "image/svg+xml": [ "\n", "\n", "\n", "\n", "\n", "\n", "cluster:(root)\n", "\n", "\n", "\n", "\n", "cluster:pipeline_1\n", "\n", "\n", "Pipeline: data_joins\n", "\n", "\n", "\n", "cluster:pipeline_2\n", "\n", "\n", "\n", "\n", "\n", "cluster:pipeline_3\n", "\n", "\n", "Pipeline: preprocessing\n", "\n", "\n", "\n", "cluster:pipeline_4\n", "\n", "\n", "\n", "\n", "\n", "cluster:choice\n", "\n", "\n", "Choice\n", "\n", "\n", "\n", "\n", "scan\n", "\n", "\n", "Scan:\n", "it.main\n", "\n", "\n", "\n", "\n", "join\n", "\n", "\n", "Join\n", "\n", "\n", "\n", "\n", "scan->join\n", "\n", "\n", "\n", "\n", "scan_0\n", "\n", "\n", "Scan:\n", "it.info\n", "\n", "\n", "\n", "\n", "scan_0->join\n", "\n", "\n", "\n", "\n", "map\n", "\n", "\n", "Map\n", "\n", "\n", "\n", "\n", "join->map\n", "\n", "\n", "\n", "\n", "concat_features\n", "\n", "\n", "Concat-\n", "Features\n", "\n", "\n", "\n", "\n", "map->concat_features\n", "\n", "\n", "\n", "\n", "scan_1\n", "\n", "\n", "Scan:\n", "it.main\n", "\n", "\n", "\n", "\n", "join_0\n", "\n", "\n", "Join\n", "\n", "\n", "\n", "\n", "scan_1->join_0\n", "\n", "\n", "\n", "\n", "scan_2\n", "\n", "\n", "Scan:\n", "it.delay\n", "\n", "\n", "\n", "\n", "scan_2->join_0\n", "\n", "\n", "\n", "\n", "group_by\n", "\n", "\n", "Group-\n", "By\n", "\n", "\n", "\n", "\n", "join_0->group_by\n", "\n", "\n", "\n", "\n", "aggregate\n", "\n", "\n", "Aggregate\n", "\n", "\n", "\n", "\n", "group_by->aggregate\n", "\n", "\n", "\n", "\n", "aggregate->concat_features\n", "\n", "\n", "\n", "\n", "scan_3\n", "\n", "\n", "Scan:\n", "it.main\n", "\n", "\n", "\n", "\n", "join_1\n", "\n", "\n", "Join\n", "\n", "\n", "\n", "\n", "scan_3->join_1\n", "\n", "\n", "\n", "\n", "scan_4\n", "\n", "\n", "Scan:\n", "it.delay\n", "\n", "\n", "\n", "\n", "scan_4->join_1\n", "\n", "\n", "\n", "\n", "scan_5\n", "\n", "\n", "Scan:\n", "it.event\n", "\n", "\n", "\n", "\n", "scan_5->join_1\n", "\n", "\n", "\n", "\n", "group_by_0\n", "\n", "\n", "Group-\n", "By\n", "\n", "\n", "\n", "\n", "join_1->group_by_0\n", "\n", "\n", "\n", "\n", "aggregate_0\n", "\n", "\n", "Aggregate\n", "\n", "\n", "\n", "\n", "group_by_0->aggregate_0\n", "\n", "\n", "\n", "\n", "aggregate_0->concat_features\n", "\n", "\n", "\n", "\n", "select_features\n", "\n", "\n", "Select-\n", "Features\n", "\n", "\n", "\n", "\n", "concat_features->select_features\n", "\n", "\n", "\n", "\n", "numpy_column_selector\n", "\n", "\n", "Numpy-\n", "Column-\n", "Selector\n", "\n", "\n", "\n", "\n", "select_features->numpy_column_selector\n", "\n", "\n", "\n", "\n", "cat_encoder\n", "\n", "\n", "Cat-\n", "Encoder\n", "\n", "\n", "\n", "\n", "numpy_column_selector->cat_encoder\n", "\n", "\n", "\n", "\n", "concat_features_0\n", "\n", "\n", "Concat-\n", "Features\n", "\n", "\n", "\n", "\n", "cat_encoder->concat_features_0\n", "\n", "\n", "\n", "\n", "numpy_column_selector_0\n", "\n", "\n", "Numpy-\n", "Column-\n", "Selector\n", "\n", "\n", "\n", "\n", "opt_standard_scaler\n", "\n", "\n", "Opt-\n", "Standard-\n", "Scaler\n", "\n", "\n", "\n", "\n", "numpy_column_selector_0->opt_standard_scaler\n", "\n", "\n", "\n", "\n", "opt_standard_scaler->concat_features_0\n", "\n", "\n", "\n", "\n", "fs1\n", "\n", "\n", "FS1\n", "\n", "\n", "\n", "\n", "concat_features_0->fs1\n", "\n", "\n", "\n", "\n", "lr\n", "\n", "\n", "LR\n", "\n", "\n", "\n", "\n", "fs1->lr\n", "\n", "\n", "\n", "\n", "knn\n", "\n", "\n", "KNN\n", "\n", "\n", "\n", "\n", "xg_boost\n", "\n", "\n", "XG-\n", "Boost\n", "\n", "\n", "\n", "\n", "\n" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "pipeline.visualize()" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "scrolled": false }, "outputs": [ { "data": { "text/markdown": [ "```python\n", "from sklearn.pipeline import Pipeline\n", "from lale.lib.lale import Scan\n", "from lale.expressions import it\n", "from lale.lib.lale import Join\n", "from lale.lib.lale import Map\n", "from lale.expressions import replace\n", "from lale.expressions import month\n", "from lale.expressions import day_of_month\n", "from lale.lib.lale import GroupBy\n", "from lale.lib.lale import Aggregate\n", "from lale.expressions import sum\n", "from lale.expressions import max\n", "from lale.expressions import count\n", "from lale.expressions import item\n", "from lale.lib.lale import ConcatFeatures\n", "from sklearn.feature_selection import SelectKBest as SelectFeatures\n", "from autoai_libs.transformers.exportable import NumpyColumnSelector\n", "from autoai_libs.transformers.exportable import CatEncoder\n", "import numpy as np\n", "from autoai_libs.transformers.exportable import OptStandardScaler\n", "from autoai_libs.cognito.transforms.transform_utils import FS1\n", "from sklearn.linear_model import LogisticRegression as LR\n", "from sklearn.neighbors import KNeighborsClassifier as KNN\n", "from xgboost import XGBClassifier as XGBoost\n", "import lale\n", "\n", "lale.wrap_imported_operators()\n", "scan = Scan(table=it.main)\n", "scan_0 = Scan(table=it.info)\n", "join = Join(\n", " pred=[\n", " (it.main.TrainId == it.info.Train_Id),\n", " (it.main[\"Arrival time\"] >= it.info.TimeStamp),\n", " ]\n", ")\n", "scan_1 = Scan(table=it.main)\n", "scan_2 = Scan(table=it.delay)\n", "join_0 = Join(\n", " pred=[\n", " (it.main.TrainId == it.delay.TrainId),\n", " (it.main[\"Arrival time\"] >= it.delay.TimeStamp),\n", " ]\n", ")\n", "group_by = GroupBy(key=it.MessageId)\n", "aggregate = Aggregate(columns=[sum(it.Delay), max(it.Delay)])\n", "scan_3 = Scan(table=it.main)\n", "scan_4 = Scan(table=it.delay)\n", "scan_5 = Scan(table=it.event)\n", "join_1 = Join(\n", " pred=[\n", " (it.main.TrainId == it.delay.TrainId),\n", " (it.main[\"Arrival time\"] >= it.delay.TimeStamp),\n", " (it.delay.StationId == it.event.StationId),\n", " (it.main.TimeStamp >= it.event.TimeStamp),\n", " ]\n", ")\n", "group_by_0 = GroupBy(key=it.MessageId)\n", "aggregate_0 = Aggregate(\n", " columns=[count(it.Event), item(it[\"Train class\"], \"Roadwork\")]\n", ")\n", "numpy_column_selector = NumpyColumnSelector(columns=[0])\n", "cat_encoder = CatEncoder(\n", " encoding=\"ordinal\",\n", " categories=\"auto\",\n", " dtype=np.float64,\n", " handle_unknown=\"ignore\",\n", ")\n", "numpy_column_selector_0 = NumpyColumnSelector(columns=[1, 2])\n", "fs1 = FS1(\n", " cols_ids_must_keep=[],\n", " additional_col_count_to_keep=3,\n", " ptype=\"classification\",\n", ")\n", "pipeline_3 = Pipeline(\n", " steps=[\n", " (\n", " \"preprocessing\",\n", " (\n", " (numpy_column_selector >> cat_encoder)\n", " & (numpy_column_selector_0 >> OptStandardScaler())\n", " )\n", " >> ConcatFeatures\n", " >> fs1,\n", " )\n", " ]\n", ")\n", "pipeline = (\n", " Pipeline(\n", " steps=[\n", " (\n", " \"data_joins\",\n", " (\n", " (\n", " (scan & scan_0)\n", " >> join\n", " >> Map(\n", " columns=[\n", " replace({\"Intercity\": 1, \"Regional\": 0}),\n", " it[\"Max Speed (km/h)\"],\n", " month(\n", " it[\"Arrival time\"], \"YYYY-MM-DD HH:MM:SS\"\n", " ),\n", " day_of_month(it[\"Arrival time\"]),\n", " ]\n", " )\n", " )\n", " & ((scan_1 & scan_2) >> join_0 >> group_by >> aggregate)\n", " & (\n", " (scan_3 & scan_4 & scan_5)\n", " >> join_1\n", " >> group_by_0\n", " >> aggregate_0\n", " )\n", " )\n", " >> ConcatFeatures\n", " >> SelectFeatures(),\n", " )\n", " ]\n", " )\n", " >> pipeline_3\n", " >> (LR | KNN | XGBoost)\n", ")\n", "```" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "pipeline.pretty_print(ipython_display=True)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.5" } }, "nbformat": 4, "nbformat_minor": 2 }