{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sys\n", "!{sys.executable} -m pip install PyAthena" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pyathena import connect \n", "import pandas as pd\n", "import sagemaker\n", "import matplotlib.pyplot as plt\n", "\n", "#TODO: Change the bucket to point to an s3 bucket to use for model output and training data\n", "bucket = 'athena-federation-test'\n", "output_location = 's3://' + bucket + '/athena-ml/'\n", "connection = connect(s3_staging_dir=output_location, region_name='us-east-1') " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "create_table = \\\n", "\"\"\"\n", "CREATE EXTERNAL TABLE `taxi_ridership_data`(\n", " `time` string , \n", " `number` int)\n", "ROW FORMAT SERDE \n", " 'org.apache.hadoop.hive.serde2.OpenCSVSerde' \n", "WITH SERDEPROPERTIES ( \n", " 'separatorChar'=',') \n", "STORED AS INPUTFORMAT \n", " 'org.apache.hadoop.mapred.TextInputFormat' \n", "OUTPUTFORMAT \n", " 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'\n", "LOCATION\n", " 's3://athena-examples-us-east-1/workshop-ml/'\n", "\"\"\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## Create a new Athena table holding data we will use to predict anomalies\n", "pd.read_sql(create_table, connection) " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## Select the entire dataset and save it to a variable to be used later to fit the model.\n", "results = pd.read_sql(\"SELECT * FROM default.taxi_ridership_data\", connection) " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## Lets see the data we are working with\n", "results" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker import RandomCutForest\n", "\n", "prefix = 'athena-ml/anomalydetection'\n", "execution_role = sagemaker.get_execution_role()\n", "session = sagemaker.Session()\n", "\n", "# specify general training job information\n", "rcf = RandomCutForest(role=execution_role,\n", " train_instance_count=1,\n", " train_instance_type='ml.c4.8xlarge',\n", " data_location='s3://{}/{}/'.format(bucket, prefix),\n", " output_path='s3://{}/{}/output'.format(bucket, prefix),\n", " num_samples_per_tree=512,\n", " num_trees=50)\n", "\n", "# Run the training job using the results we got from the Athena query earlier\n", "rcf.fit(rcf.record_set(results.number.values.reshape(-1,1)))\n", "\n", "print('Training job name: {}'.format(rcf.latest_training_job.job_name))\n", "\n", "rcf_inference = rcf.deploy(\n", " initial_instance_count=1,\n", " instance_type='ml.c4.8xlarge',\n", ")\n", "\n", "print('\\nEndpoint name (used by Athena): {}'.format(rcf_inference.endpoint))" ] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.5" } }, "nbformat": 4, "nbformat_minor": 4 }