{ "cells": [ { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "

Feature Engineering

\n", "\n", "In this notebook, you will learn how to incorporate feature engineering into your pipeline.\n", "" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "Apache Beam works better with Python 2 at the moment, so we're going to work within the Python 2 kernel." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "%%bash\n", "source activate py2env\n", "conda install -y pytz\n", "pip uninstall -y google-cloud-dataflow\n", "pip install --upgrade apache-beam[gcp]==2.9.0" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "After doing a pip install, you have to ```Reset Session``` so that the new packages are picked up. Please click on the button in the above menu." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "import tensorflow as tf\n", "import apache_beam as beam\n", "import shutil\n", "print(tf.__version__)" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "

1. Environment variables for project and bucket

\n", "\n", "
  • Your project id is the *unique* string that identifies your project (not the project name). You can find this from the GCP Console dashboard's Home page. My dashboard reads: Project ID: cloud-training-demos
  • \n", "
  • Cloud training often involves saving and restoring model files. Therefore, we should create a single-region bucket. If you don't have a bucket already, I suggest that you create one from the GCP console (because it will dynamically check whether the bucket name you want is available)
  • \n", "\n", "Change the cell below to reflect your Project ID and bucket name.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "import os\n", "REGION = 'us-central1' # Choose an available region for Cloud MLE from https://cloud.google.com/ml-engine/docs/regions.\n", "BUCKET = 'cloud-training-demos-ml' # REPLACE WITH YOUR BUCKET NAME. Use a regional bucket in the region you selected.\n", "PROJECT = 'cloud-training-demos' # CHANGE THIS" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "# for bash\n", "os.environ['PROJECT'] = PROJECT\n", "os.environ['BUCKET'] = BUCKET\n", "os.environ['REGION'] = REGION\n", "os.environ['TFVERSION'] = '1.8' \n", "\n", "## ensure we're using python2 env\n", "os.environ['CLOUDSDK_PYTHON'] = 'python2'" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "%%bash\n", "## ensure gcloud is up to date\n", "gcloud components update\n", "\n", "gcloud config set project $PROJECT\n", "gcloud config set compute/region $REGION\n", "\n", "## ensure we predict locally with our current Python environment\n", "gcloud config set ml_engine/local_python `which python`" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "

    2. Specifying query to pull the data

    \n", "\n", "Let's pull out a few extra columns from the timestamp." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "def create_query(phase, EVERY_N):\n", " if EVERY_N == None:\n", " EVERY_N = 4 #use full dataset\n", " \n", " #select and pre-process fields\n", " base_query = \"\"\"\n", "SELECT\n", " (tolls_amount + fare_amount) AS fare_amount,\n", " DAYOFWEEK(pickup_datetime) AS dayofweek,\n", " HOUR(pickup_datetime) AS hourofday,\n", " pickup_longitude AS pickuplon,\n", " pickup_latitude AS pickuplat,\n", " dropoff_longitude AS dropofflon,\n", " dropoff_latitude AS dropofflat,\n", " passenger_count*1.0 AS passengers,\n", " CONCAT(STRING(pickup_datetime), STRING(pickup_longitude), STRING(pickup_latitude), STRING(dropoff_latitude), STRING(dropoff_longitude)) AS key\n", "FROM\n", " [nyc-tlc:yellow.trips]\n", "WHERE\n", " trip_distance > 0\n", " AND fare_amount >= 2.5\n", " AND pickup_longitude > -78\n", " AND pickup_longitude < -70\n", " AND dropoff_longitude > -78\n", " AND dropoff_longitude < -70\n", " AND pickup_latitude > 37\n", " AND pickup_latitude < 45\n", " AND dropoff_latitude > 37\n", " AND dropoff_latitude < 45\n", " AND passenger_count > 0\n", " \"\"\"\n", " \n", " #add subsampling criteria by modding with hashkey\n", " if phase == 'train': \n", " query = \"{} AND ABS(HASH(pickup_datetime)) % {} < 2\".format(base_query,EVERY_N)\n", " elif phase == 'valid': \n", " query = \"{} AND ABS(HASH(pickup_datetime)) % {} == 2\".format(base_query,EVERY_N)\n", " elif phase == 'test':\n", " query = \"{} AND ABS(HASH(pickup_datetime)) % {} == 3\".format(base_query,EVERY_N)\n", " return query\n", " \n", "print create_query('valid', 100) #example query using 1% of data" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "Try the query above in https://bigquery.cloud.google.com/table/nyc-tlc:yellow.trips if you want to see what it does (ADD LIMIT 10 to the query!)" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "

    3. Preprocessing Dataflow job from BigQuery

    \n", "\n", "This code reads from BigQuery and saves the data as-is on Google Cloud Storage. We can do additional preprocessing and cleanup inside Dataflow, but then we'll have to remember to repeat that prepreprocessing during inference. It is better to use tf.transform which will do this book-keeping for you, or to do preprocessing within your TensorFlow model. We will look at this in future notebooks. For now, we are simply moving data from BigQuery to CSV using Dataflow.\n", "\n", "While we could read from BQ directly from TensorFlow (See: https://www.tensorflow.org/api_docs/python/tf/contrib/cloud/BigQueryReader), it is quite convenient to export to CSV and do the training off CSV. Let's use Dataflow to do this at scale.\n", "\n", "Because we are running this on the Cloud, you should go to the GCP Console (https://console.cloud.google.com/dataflow) to look at the status of the job. It will take several minutes for the preprocessing job to launch." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "%%bash\n", "gsutil -m rm -rf gs://$BUCKET/taxifare/ch4/taxi_preproc/" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "import datetime\n", "\n", "####\n", "# Arguments:\n", "# -rowdict: Dictionary. The beam bigquery reader returns a PCollection in\n", "# which each row is represented as a python dictionary\n", "# Returns:\n", "# -rowstring: a comma separated string representation of the record with dayofweek\n", "# converted from int to string (e.g. 3 --> Tue)\n", "####\n", "def to_csv(rowdict):\n", " days = ['null', 'Sun', 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat']\n", " CSV_COLUMNS = 'fare_amount,dayofweek,hourofday,pickuplon,pickuplat,dropofflon,dropofflat,passengers,key'.split(',')\n", " rowdict['dayofweek'] = days[rowdict['dayofweek']]\n", " rowstring = ','.join([str(rowdict[k]) for k in CSV_COLUMNS])\n", " return rowstring\n", "\n", "\n", "####\n", "# Arguments:\n", "# -EVERY_N: Integer. Sample one out of every N rows from the full dataset.\n", "# Larger values will yield smaller sample\n", "# -RUNNER: 'DirectRunner' or 'DataflowRunner'. Specfy to run the pipeline\n", "# locally or on Google Cloud respectively. \n", "# Side-effects:\n", "# -Creates and executes dataflow pipeline. \n", "# See https://beam.apache.org/documentation/programming-guide/#creating-a-pipeline\n", "####\n", "def preprocess(EVERY_N, RUNNER):\n", " job_name = 'preprocess-taxifeatures' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')\n", " print 'Launching Dataflow job {} ... hang on'.format(job_name)\n", " OUTPUT_DIR = 'gs://{0}/taxifare/ch4/taxi_preproc/'.format(BUCKET)\n", "\n", " #dictionary of pipeline options\n", " options = {\n", " 'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),\n", " 'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),\n", " 'job_name': 'preprocess-taxifeatures' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S'),\n", " 'project': PROJECT,\n", " 'runner': RUNNER\n", " }\n", " #instantiate PipelineOptions object using options dictionary\n", " opts = beam.pipeline.PipelineOptions(flags=[], **options)\n", " #instantantiate Pipeline object using PipelineOptions\n", " with beam.Pipeline(options=opts) as p:\n", " for phase in ['train', 'valid']:\n", " query = create_query(phase, EVERY_N) \n", " outfile = os.path.join(OUTPUT_DIR, '{}.csv'.format(phase))\n", " (\n", " p | 'read_{}'.format(phase) >> ##TODO: read from BigQuery\n", " | 'tocsv_{}'.format(phase) >> ##TODO: apply the to_csv function to every row\n", " | 'write_{}'.format(phase) >> ##TODO: write to outfile\n", " )\n", " print(\"Done\")" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "Run pipeline locally" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "preprocess(50*10000, 'DirectRunner') " ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "Run pipleline on cloud on a larger sample size." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "preprocess(50*100, 'DataflowRunner') \n", "#change first arg to None to preprocess full dataset" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "Once the job completes, observe the files created in Google Cloud Storage" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "%%bash\n", "gsutil ls -l gs://$BUCKET/taxifare/ch4/taxi_preproc/" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "%%bash\n", "#print first 10 lines of first shard of train.csv\n", "gsutil cat \"gs://$BUCKET/taxifare/ch4/taxi_preproc/train.csv-00000-of-*\" | head" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "

    4. Develop model with new inputs

    \n", "\n", "Download the first shard of the preprocessed data to enable local development." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "%%bash\n", "mkdir sample\n", "gsutil cp \"gs://$BUCKET/taxifare/ch4/taxi_preproc/train.csv-00000-of-*\" sample/train.csv\n", "gsutil cp \"gs://$BUCKET/taxifare/ch4/taxi_preproc/valid.csv-00000-of-*\" sample/valid.csv" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "Complete the TODOs in taxifare/trainer/model.py so that the code below works." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "%%bash\n", "rm -rf taxifare.tar.gz taxi_trained\n", "export PYTHONPATH=${PYTHONPATH}:${PWD}/taxifare\n", "python -m trainer.task \\\n", " --train_data_paths=${PWD}/sample/train.csv \\\n", " --eval_data_paths=${PWD}/sample/valid.csv \\\n", " --output_dir=${PWD}/taxi_trained \\\n", " --train_steps=1000 \\\n", " --job-dir=/tmp" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "!ls taxi_trained/export/exporter/" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "%%writefile /tmp/test.json\n", "{\"dayofweek\": \"Sun\", \"hourofday\": 17, \"pickuplon\": -73.885262, \"pickuplat\": 40.773008, \"dropofflon\": -73.987232, \"dropofflat\": 40.732403, \"passengers\": 2}" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "%%bash\n", "model_dir=$(ls ${PWD}/taxi_trained/export/exporter)\n", "gcloud ai-platform local predict \\\n", " --model-dir=${PWD}/taxi_trained/export/exporter/${model_dir} \\\n", " --json-instances=/tmp/test.json" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "#if gcloud ai-platform local predict fails, might need to update glcoud\n", "#!gcloud --quiet components update" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "

    5. Train on cloud

    \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "deletable": true, "editable": true }, "outputs": [], "source": [ "%%bash\n", "OUTDIR=gs://${BUCKET}/taxifare/ch4/taxi_trained\n", "JOBNAME=lab4a_$(date -u +%y%m%d_%H%M%S)\n", "echo $OUTDIR $REGION $JOBNAME\n", "gsutil -m rm -rf $OUTDIR\n", "gcloud ai-platform jobs submit training $JOBNAME \\\n", " --region=$REGION \\\n", " --module-name=trainer.task \\\n", " --package-path=${PWD}/taxifare/trainer \\\n", " --job-dir=$OUTDIR \\\n", " --staging-bucket=gs://$BUCKET \\\n", " --scale-tier=BASIC \\\n", " --runtime-version=$TFVERSION \\\n", " -- \\\n", " --train_data_paths=\"gs://$BUCKET/taxifare/ch4/taxi_preproc/train*\" \\\n", " --eval_data_paths=\"gs://${BUCKET}/taxifare/ch4/taxi_preproc/valid*\" \\\n", " --train_steps=5000 \\\n", " --output_dir=$OUTDIR" ] }, { "cell_type": "markdown", "metadata": { "deletable": true, "editable": true }, "source": [ "Copyright 2016 Google Inc. Licensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License" ] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.13" } }, "nbformat": 4, "nbformat_minor": 2 }