{"nbformat":4,"nbformat_minor":0,"metadata":{"colab":{"name":"2022-01-25-mlops-tfx.ipynb","provenance":[{"file_id":"https://github.com/recohut/nbs/blob/main/raw/T328749%20%7C%20MLOps%20with%20TFX%20Pipeline.ipynb","timestamp":1644671164117},{"file_id":"https://gist.github.com/rafiqhasan/2164304ede002f4a8bfe56e5434e1a34#file-dl-e2e-taxi-dataset-tfx-e2e-ipynb","timestamp":1628656487313}],"collapsed_sections":[]},"kernelspec":{"display_name":"Python 3","name":"python3"}},"cells":[{"cell_type":"markdown","source":["# MLOps with TFX Pipeline"],"metadata":{"id":"DBkcVEByCra5"}},{"cell_type":"markdown","metadata":{"id":"DUH2FIZH73M_"},"source":["Author - [Hasan Rafiq](https://www.linkedin.com/in/sam04/)"]},{"cell_type":"markdown","metadata":{"id":"MZOYTt1RW4TK"},"source":["\n","## **TensorFlow Extended (TFX)** \n","Is an end-to-end platform for deploying production ML pipelines"]},{"cell_type":"markdown","metadata":{"id":"NPI8kU6ClvGe"},"source":["![prog_fin.png]()"]},{"cell_type":"markdown","metadata":{"id":"PemmMleYAtXX"},"source":["### **Enterprise ML is not about the best model:**\n","![mlops-continuous-delivery-and-automation-pipelines-in-machine-learning-1-elements-of-ml.png]()"]},{"cell_type":"markdown","metadata":{"id":"BYwKL3V4n1K5"},"source":["### **Corresponding TFX libraries per component**:\n","\n","![libraries_components.png]()"]},{"cell_type":"code","metadata":{"id":"J9XeXakTLkZ5"},"source":["!pip install tfx"],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"id":"YIqpWK9efviJ","colab":{"base_uri":"https://localhost:8080/"},"outputId":"97beba54-58c2-4cef-8e13-7dcb0d1b7e10"},"source":["import os\n","import pprint\n","import numpy as np\n","import tempfile\n","import urllib\n","\n","import absl\n","import pandas as pd\n","import tensorflow as tf\n","import tensorflow_model_analysis as tfma\n","tf.get_logger().propagate = False\n","pp = pprint.PrettyPrinter()\n","\n","import tfx\n","from tfx.components import CsvExampleGen\n","from typing import Dict, List, Text\n","from tfx.components import Evaluator\n","from tfx.components import ExampleValidator\n","from tfx.components import Pusher\n","from tfx.components import ResolverNode\n","from tfx.components import SchemaGen\n","from tfx.components import StatisticsGen\n","from tfx.components import Trainer\n","from tfx.components import Transform\n","from tfx.components.base import executor_spec\n","from tfx.components.trainer.executor import GenericExecutor\n","from tfx.dsl.experimental import latest_blessed_model_resolver\n","from tfx.orchestration import metadata\n","from tfx.orchestration import pipeline\n","from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext\n","from tfx.proto import pusher_pb2\n","from tfx.proto import trainer_pb2\n","from tfx.types import Channel\n","from tfx.types.standard_artifacts import Model\n","from tfx.types.standard_artifacts import ModelBlessing\n","from tfx.utils.dsl_utils import external_input\n","\n","\n","%load_ext tfx.orchestration.experimental.interactive.notebook_extensions.skip"],"execution_count":null,"outputs":[{"output_type":"stream","text":["WARNING:absl:RuntimeParameter is only supported on Cloud-based DAG runner currently.\n"],"name":"stderr"}]},{"cell_type":"markdown","metadata":{"id":"wCZTHRy0N1D6"},"source":["Let's check the library versions."]},{"cell_type":"code","metadata":{"id":"eZ4K18_DN2D8","colab":{"base_uri":"https://localhost:8080/"},"outputId":"f5b29d20-33dc-4d9c-eb59-ca903cbee1f1"},"source":["print('TensorFlow version: {}'.format(tf.__version__))\n","print('TFX version: {}'.format(tfx.__version__))"],"execution_count":null,"outputs":[{"output_type":"stream","text":["TensorFlow version: 2.4.1\n","TFX version: 0.28.0\n"],"name":"stdout"}]},{"cell_type":"code","metadata":{"id":"grFKIgyXOAVM","colab":{"base_uri":"https://localhost:8080/"},"outputId":"79010cb6-21d5-417e-de56-f59922a57c19"},"source":["!rm -rf data.*\n","!rm -rf *trainer.py\n","!sudo rm -r /content/tfx"],"execution_count":null,"outputs":[{"output_type":"stream","text":["rm: cannot remove '/content/tfx': No such file or directory\n"],"name":"stdout"}]},{"cell_type":"code","metadata":{"id":"r_17y324BB0K"},"source":["! cd /content/\n","! mkdir /content/tfx/\n","! mkdir /content/tfx/pipelines\n","! mkdir /content/tfx/metadata\n","! mkdir /content/tfx/logs\n","! mkdir /content/tfx/data\n","! mkdir /content/tfx/serving_model"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"n2cMMAbSkGfX"},"source":["### Download example data\n","We download the example dataset for use in our TFX pipeline.\n","\n","The dataset we're using is the [Taxi Trips dataset](https://data.cityofchicago.org/Transportation/Taxi-Trips/wrvz-psew) released by the City of Chicago. The columns in this dataset are:\n","\n","\n","\n","\n","\n","\n","\n","\n","
pickup_community_areafaretrip_start_month
trip_start_hourtrip_start_daytrip_start_timestamp
pickup_latitudepickup_longitudedropoff_latitude
dropoff_longitudetrip_milespickup_census_tract
dropoff_census_tractpayment_typecompany
trip_secondsdropoff_community_areatips
\n","\n","With this dataset, we will build a model that predicts the `fare` of a trip."]},{"cell_type":"code","metadata":{"id":"BywX6OUEhAqn","colab":{"base_uri":"https://localhost:8080/"},"outputId":"ce71f8dc-7d2d-49ca-f6ab-f181f368e780"},"source":["!wget https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/data/simple/data.csv"],"execution_count":null,"outputs":[{"output_type":"stream","text":["--2021-03-27 14:45:42-- https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/data/simple/data.csv\n","Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.108.133, 185.199.111.133, ...\n","Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.\n","HTTP request sent, awaiting response... 200 OK\n","Length: 1922812 (1.8M) [text/plain]\n","Saving to: ‘data.csv’\n","\n","data.csv 100%[===================>] 1.83M --.-KB/s in 0.08s \n","\n","2021-03-27 14:45:43 (23.8 MB/s) - ‘data.csv’ saved [1922812/1922812]\n","\n"],"name":"stdout"}]},{"cell_type":"code","metadata":{"id":"SakyMaydJ4G5","colab":{"base_uri":"https://localhost:8080/"},"outputId":"adc143eb-9e40-4205-c9ba-bc1fa8d1d13d"},"source":["df = pd.read_csv('/content/data.csv')\n","\n","##Drop useless columns\n","df = df.drop(['trip_start_timestamp','trip_miles','pickup_census_tract',\n"," 'dropoff_census_tract','trip_seconds','payment_type','tips', \n"," 'company','dropoff_community_area','pickup_community_area'], axis=1)\n","\n","#Drop NA rows\n","df = df.dropna()\n","\n","##Keep a test set for final testing( TFX internally splits train and validation data )\n","np.random.seed(seed=2)\n","msk = np.random.rand(len(df)) < 0.9\n","traindf = df[msk]\n","evaldf = df[~msk]\n","\n","print(len(traindf))\n","print(len(evaldf))\n","\n","traindf.to_csv(\"/content/tfx/data/data_trans.csv\", index=False, header=True)\n","evaldf.to_csv(\"eval.csv\", index=False, header=False)"],"execution_count":null,"outputs":[{"output_type":"stream","text":["13077\n","1442\n"],"name":"stdout"}]},{"cell_type":"markdown","metadata":{"id":"blZC1sIQOWfH"},"source":["Take a quick look at the CSV file."]},{"cell_type":"code","metadata":{"id":"c5YPeLPFOXaD","colab":{"base_uri":"https://localhost:8080/"},"outputId":"c6fe1a10-47fa-4b37-97af-55165f11c16b"},"source":["!head {_data_filepath}"],"execution_count":null,"outputs":[{"output_type":"stream","text":["head: cannot open '{_data_filepath}' for reading: No such file or directory\n"],"name":"stdout"}]},{"cell_type":"markdown","metadata":{"id":"ufJKQ6OvkJlY"},"source":["### Set up pipeline paths"]},{"cell_type":"code","metadata":{"id":"RsH0ckYiADx5"},"source":["##Define all constant\n","_tfx_root = os.path.join(os.getcwd(), 'tfx'); # Create location ~/tfx\n","_pipeline_root = os.path.join(_tfx_root, 'pipelines'); # Join ~/tfx/pipelines/\n","_metadata_db_root = os.path.join(_tfx_root, 'metadata.db'); # Join ~/tfx/metadata.db\n","_log_root = os.path.join(_tfx_root, 'logs');\n","_model_root = os.path.join(_tfx_root, 'model');\n","_data_root = os.path.join(_tfx_root, 'data');\n","_serving_model_dir = os.path.join(_tfx_root, 'serving_model')\n","_data_filepath = os.path.join(_data_root, \"data_trans.csv\")\n","\n","_input_fn_module_file = 'inputfn_trainer.py'\n","_constants_module_file = 'constants_trainer.py'\n","_model_trainer_module_file = 'model_trainer.py'"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"8ONIE_hdkPS4"},"source":["### Create the InteractiveContext\n","Last, we create an InteractiveContext, which will allow us to run TFX components interactively in this notebook."]},{"cell_type":"code","metadata":{"id":"0Rh6K5sUf9dd","colab":{"base_uri":"https://localhost:8080/"},"outputId":"5591a3d5-6c64-4d58-d26b-b607b5e9b0c9"},"source":["# Here, we create an InteractiveContext using default parameters. This will\n","# use a temporary directory with an ephemeral ML Metadata database instance.\n","# To use your own pipeline root or database, the optional properties\n","# `pipeline_root` and `metadata_connection_config` may be passed to\n","# InteractiveContext. Calls to InteractiveContext are no-ops outside of the\n","# notebook.\n","context = InteractiveContext(pipeline_root=_tfx_root)"],"execution_count":null,"outputs":[{"output_type":"stream","text":["WARNING:absl:InteractiveContext metadata_connection_config not provided: using SQLite ML Metadata database at /content/tfx/metadata.sqlite.\n"],"name":"stderr"}]},{"cell_type":"markdown","metadata":{"id":"HdQWxfsVkzdJ"},"source":["## Run TFX components interactively\n","In the cells that follow, we create TFX components one-by-one, run each of them, and visualize their output artifacts."]},{"cell_type":"markdown","metadata":{"id":"L9fwt9gQk3BR"},"source":["### ExampleGen\n","\n","The `ExampleGen` component is usually at the start of a TFX pipeline. It will:\n","\n","1. **Split** data( placed in _data_root ) into training and evaluation sets (by default, 2/3 training + 1/3 eval)\n","2. Convert data into the `tf.Example` format\n","3. **Copy splits** into the `_tfx_root` directory for other components to access\n","\n","`ExampleGen` takes as input the path to your data source. In our case, this is the `_data_root` path that contains the downloaded CSV.\n","\n","Note: In this notebook, we can instantiate components one-by-one and run them with `InteractiveContext.run()`. By contrast, in a production setting, we would specify all the components upfront in a `Pipeline` to pass to the orchestrator (see the [Building a TFX Pipeline Guide](../tfx/guide/build_tfx_pipeline))."]},{"cell_type":"code","metadata":{"id":"PyXjuMt8f-9u","colab":{"base_uri":"https://localhost:8080/","height":495},"outputId":"ee240eaf-dd4e-40c1-c381-1e899bf16b36"},"source":["example_gen = CsvExampleGen(input=external_input(_data_root))\n","context.run(example_gen)"],"execution_count":null,"outputs":[{"output_type":"stream","text":["WARNING:absl:From :1: external_input (from tfx.utils.dsl_utils) is deprecated and will be removed in a future version.\n","Instructions for updating:\n","external_input is deprecated, directly pass the uri to ExampleGen.\n","WARNING:absl:The \"input\" argument to the CsvExampleGen component has been deprecated by \"input_base\". Please update your usage as support for this argument will be removed soon.\n","WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features.\n"],"name":"stderr"},{"output_type":"display_data","data":{"application/javascript":["\n"," if (typeof window.interactive_beam_jquery == 'undefined') {\n"," var jqueryScript = document.createElement('script');\n"," jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n"," jqueryScript.type = 'text/javascript';\n"," jqueryScript.onload = function() {\n"," var datatableScript = document.createElement('script');\n"," datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n"," datatableScript.type = 'text/javascript';\n"," datatableScript.onload = function() {\n"," window.interactive_beam_jquery = jQuery.noConflict(true);\n"," window.interactive_beam_jquery(document).ready(function($){\n"," \n"," });\n"," }\n"," document.head.appendChild(datatableScript);\n"," };\n"," document.head.appendChild(jqueryScript);\n"," } else {\n"," window.interactive_beam_jquery(document).ready(function($){\n"," \n"," });\n"," }"]},"metadata":{"tags":[]}},{"output_type":"stream","text":["WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.\n"],"name":"stderr"},{"output_type":"execute_result","data":{"text/html":["\n","\n","
ExecutionResult at 0x7f9767f28650
.execution_id1
.component\n","\n","
CsvExampleGen at 0x7f97bc88c290
.inputs{}
.outputs
['examples']\n","\n","
Channel of type 'Examples' (1 artifact) at 0x7f97bc88c8d0
.type_nameExamples
._artifacts
[0]\n","\n","
Artifact of type 'Examples' (uri: /content/tfx/CsvExampleGen/examples/1) at 0x7f9767e35f50
.type<class 'tfx.types.standard_artifacts.Examples'>
.uri/content/tfx/CsvExampleGen/examples/1
.span0
.split_names["train", "eval"]
.version0
.exec_properties
['input_base']/content/tfx/data
['input_config']{\n"," "splits": [\n"," {\n"," "name": "single_split",\n"," "pattern": "*"\n"," }\n"," ]\n","}
['output_config']{\n"," "split_config": {\n"," "splits": [\n"," {\n"," "hash_buckets": 2,\n"," "name": "train"\n"," },\n"," {\n"," "hash_buckets": 1,\n"," "name": "eval"\n"," }\n"," ]\n"," }\n","}
['output_data_format']6
['custom_config']None
['range_config']None
['span']0
['version']None
['input_fingerprint']split:single_split,num_files:1,total_bytes:907007,xor_checksum:1616856396,sum_checksum:1616856396
.component.inputs{}
.component.outputs
['examples']\n","\n","
Channel of type 'Examples' (1 artifact) at 0x7f97bc88c8d0
.type_nameExamples
._artifacts
[0]\n","\n","
Artifact of type 'Examples' (uri: /content/tfx/CsvExampleGen/examples/1) at 0x7f9767e35f50
.type<class 'tfx.types.standard_artifacts.Examples'>
.uri/content/tfx/CsvExampleGen/examples/1
.span0
.split_names["train", "eval"]
.version0
"],"text/plain":["ExecutionResult(\n"," component_id: CsvExampleGen\n"," execution_id: 1\n"," outputs:\n"," examples: Channel(\n"," type_name: Examples\n"," artifacts: [Artifact(artifact: id: 1\n"," type_id: 5\n"," uri: \"/content/tfx/CsvExampleGen/examples/1\"\n"," properties {\n"," key: \"split_names\"\n"," value {\n"," string_value: \"[\\\"train\\\", \\\"eval\\\"]\"\n"," }\n"," }\n"," custom_properties {\n"," key: \"input_fingerprint\"\n"," value {\n"," string_value: \"split:single_split,num_files:1,total_bytes:907007,xor_checksum:1616856396,sum_checksum:1616856396\"\n"," }\n"," }\n"," custom_properties {\n"," key: \"payload_format\"\n"," value {\n"," string_value: \"FORMAT_TF_EXAMPLE\"\n"," }\n"," }\n"," custom_properties {\n"," key: \"span\"\n"," value {\n"," string_value: \"0\"\n"," }\n"," }\n"," custom_properties {\n"," key: \"state\"\n"," value {\n"," string_value: \"published\"\n"," }\n"," }\n"," state: LIVE\n"," , artifact_type: id: 5\n"," name: \"Examples\"\n"," properties {\n"," key: \"span\"\n"," value: INT\n"," }\n"," properties {\n"," key: \"split_names\"\n"," value: STRING\n"," }\n"," properties {\n"," key: \"version\"\n"," value: INT\n"," }\n"," )]\n"," additional_properties: {}\n"," additional_custom_properties: {}\n"," ))"]},"metadata":{"tags":[]},"execution_count":10}]},{"cell_type":"markdown","metadata":{"id":"OqCoZh7KPUm9"},"source":["Let's examine the output artifacts of `ExampleGen`. This component produces two artifacts, training examples and evaluation examples:"]},{"cell_type":"code","metadata":{"id":"880KkTAkPeUg","colab":{"base_uri":"https://localhost:8080/"},"outputId":"9dbd3b1e-4e13-44c9-e062-8eb1e5b5b799"},"source":["artifact = example_gen.outputs['examples'].get()[0]\n","print(artifact.split_names, artifact.uri)"],"execution_count":null,"outputs":[{"output_type":"stream","text":["[\"train\", \"eval\"] /content/tfx/CsvExampleGen/examples/1\n"],"name":"stdout"}]},{"cell_type":"markdown","metadata":{"id":"J6vcbW_wPqvl"},"source":["We can also take a look at the first three training examples:"]},{"cell_type":"code","metadata":{"id":"H4XIXjiCPwzQ","colab":{"base_uri":"https://localhost:8080/"},"outputId":"f55974d3-9a4b-43b3-b236-e1877625bd7e"},"source":["# Get the URI of the output artifact representing the training examples, which is a directory\n","train_uri = os.path.join(example_gen.outputs['examples'].get()[0].uri, 'train')\n","\n","# Get the list of files in this directory (all compressed TFRecord files)\n","tfrecord_filenames = [os.path.join(train_uri, name)\n"," for name in os.listdir(train_uri)]\n","\n","# Create a `TFRecordDataset` to read these files\n","dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type=\"GZIP\")\n","\n","# Iterate over the first 3 records and decode them.\n","for tfrecord in dataset.take(3):\n"," serialized_example = tfrecord.numpy()\n"," example = tf.train.Example()\n"," example.ParseFromString(serialized_example)\n"," pp.pprint(example)"],"execution_count":null,"outputs":[{"output_type":"stream","text":["features {\n"," feature {\n"," key: \"dropoff_latitude\"\n"," value {\n"," float_list {\n"," value: 41.92045211791992\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"dropoff_longitude\"\n"," value {\n"," float_list {\n"," value: -87.6799545288086\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"fare\"\n"," value {\n"," float_list {\n"," value: 3.8499999046325684\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"pickup_latitude\"\n"," value {\n"," float_list {\n"," value: 41.8996696472168\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"pickup_longitude\"\n"," value {\n"," float_list {\n"," value: -87.66983795166016\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"trip_start_day\"\n"," value {\n"," int64_list {\n"," value: 6\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"trip_start_hour\"\n"," value {\n"," int64_list {\n"," value: 15\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"trip_start_month\"\n"," value {\n"," int64_list {\n"," value: 3\n"," }\n"," }\n"," }\n","}\n","\n","features {\n"," feature {\n"," key: \"dropoff_latitude\"\n"," value {\n"," float_list {\n"," value: 41.92045211791992\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"dropoff_longitude\"\n"," value {\n"," float_list {\n"," value: -87.6799545288086\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"fare\"\n"," value {\n"," float_list {\n"," value: 7.25\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"pickup_latitude\"\n"," value {\n"," float_list {\n"," value: 41.90665054321289\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"pickup_longitude\"\n"," value {\n"," float_list {\n"," value: -87.66533660888672\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"trip_start_day\"\n"," value {\n"," int64_list {\n"," value: 7\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"trip_start_hour\"\n"," value {\n"," int64_list {\n"," value: 21\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"trip_start_month\"\n"," value {\n"," int64_list {\n"," value: 10\n"," }\n"," }\n"," }\n","}\n","\n","features {\n"," feature {\n"," key: \"dropoff_latitude\"\n"," value {\n"," float_list {\n"," value: 41.849246978759766\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"dropoff_longitude\"\n"," value {\n"," float_list {\n"," value: -87.62413787841797\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"fare\"\n"," value {\n"," float_list {\n"," value: 13.050000190734863\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"pickup_latitude\"\n"," value {\n"," float_list {\n"," value: 41.849246978759766\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"pickup_longitude\"\n"," value {\n"," float_list {\n"," value: -87.62413787841797\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"trip_start_day\"\n"," value {\n"," int64_list {\n"," value: 2\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"trip_start_hour\"\n"," value {\n"," int64_list {\n"," value: 17\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"trip_start_month\"\n"," value {\n"," int64_list {\n"," value: 9\n"," }\n"," }\n"," }\n","}\n","\n"],"name":"stdout"}]},{"cell_type":"markdown","metadata":{"id":"2gluYjccf-IP"},"source":["Now that `ExampleGen` has finished ingesting the data, the next step is data analysis."]},{"cell_type":"markdown","metadata":{"id":"csM6BFhtk5Aa"},"source":["### StatisticsGen\n","The `StatisticsGen` component **computes statistics** over your dataset for data analysis, as well as for use in downstream components. It uses the [TensorFlow Data Validation](https://www.tensorflow.org/tfx/data_validation/get_started) library.\n","\n","`StatisticsGen` takes as input the dataset we just ingested using `ExampleGen`."]},{"cell_type":"code","metadata":{"id":"MAscCCYWgA-9","colab":{"base_uri":"https://localhost:8080/","height":238},"outputId":"94b5e87e-5c5f-4e07-875a-671cc2d72bff"},"source":["statistics_gen = StatisticsGen(\n"," examples=example_gen.outputs['examples'])\n","context.run(statistics_gen)"],"execution_count":null,"outputs":[{"output_type":"execute_result","data":{"text/html":["\n","\n","
ExecutionResult at 0x7f97668b5050
.execution_id2
.component\n","\n","
StatisticsGen at 0x7f97b8815590
.inputs
['examples']\n","\n","
Channel of type 'Examples' (1 artifact) at 0x7f97bc88c8d0
.type_nameExamples
._artifacts
[0]\n","\n","
Artifact of type 'Examples' (uri: /content/tfx/CsvExampleGen/examples/1) at 0x7f9767e35f50
.type<class 'tfx.types.standard_artifacts.Examples'>
.uri/content/tfx/CsvExampleGen/examples/1
.span0
.split_names["train", "eval"]
.version0
.outputs
['statistics']\n","\n","
Channel of type 'ExampleStatistics' (1 artifact) at 0x7f97660af2d0
.type_nameExampleStatistics
._artifacts
[0]\n","\n","
Artifact of type 'ExampleStatistics' (uri: /content/tfx/StatisticsGen/statistics/2) at 0x7f97660af310
.type<class 'tfx.types.standard_artifacts.ExampleStatistics'>
.uri/content/tfx/StatisticsGen/statistics/2
.span0
.split_names["train", "eval"]
.exec_properties
['stats_options_json']None
['exclude_splits'][]
.component.inputs
['examples']\n","\n","
Channel of type 'Examples' (1 artifact) at 0x7f97bc88c8d0
.type_nameExamples
._artifacts
[0]\n","\n","
Artifact of type 'Examples' (uri: /content/tfx/CsvExampleGen/examples/1) at 0x7f9767e35f50
.type<class 'tfx.types.standard_artifacts.Examples'>
.uri/content/tfx/CsvExampleGen/examples/1
.span0
.split_names["train", "eval"]
.version0
.component.outputs
['statistics']\n","\n","
Channel of type 'ExampleStatistics' (1 artifact) at 0x7f97660af2d0
.type_nameExampleStatistics
._artifacts
[0]\n","\n","
Artifact of type 'ExampleStatistics' (uri: /content/tfx/StatisticsGen/statistics/2) at 0x7f97660af310
.type<class 'tfx.types.standard_artifacts.ExampleStatistics'>
.uri/content/tfx/StatisticsGen/statistics/2
.span0
.split_names["train", "eval"]
"],"text/plain":["ExecutionResult(\n"," component_id: StatisticsGen\n"," execution_id: 2\n"," outputs:\n"," statistics: Channel(\n"," type_name: ExampleStatistics\n"," artifacts: [Artifact(artifact: id: 2\n"," type_id: 7\n"," uri: \"/content/tfx/StatisticsGen/statistics/2\"\n"," properties {\n"," key: \"split_names\"\n"," value {\n"," string_value: \"[\\\"train\\\", \\\"eval\\\"]\"\n"," }\n"," }\n"," custom_properties {\n"," key: \"name\"\n"," value {\n"," string_value: \"statistics\"\n"," }\n"," }\n"," custom_properties {\n"," key: \"producer_component\"\n"," value {\n"," string_value: \"StatisticsGen\"\n"," }\n"," }\n"," custom_properties {\n"," key: \"state\"\n"," value {\n"," string_value: \"published\"\n"," }\n"," }\n"," state: LIVE\n"," , artifact_type: id: 7\n"," name: \"ExampleStatistics\"\n"," properties {\n"," key: \"span\"\n"," value: INT\n"," }\n"," properties {\n"," key: \"split_names\"\n"," value: STRING\n"," }\n"," )]\n"," additional_properties: {}\n"," additional_custom_properties: {}\n"," ))"]},"metadata":{"tags":[]},"execution_count":13}]},{"cell_type":"markdown","metadata":{"id":"HLI6cb_5WugZ"},"source":["After `StatisticsGen` finishes running, we can visualize the outputted statistics - **TFDV**. Try playing with the different plots!"]},{"cell_type":"code","metadata":{"id":"tLjXy7K6Tp_G","colab":{"base_uri":"https://localhost:8080/","height":735},"outputId":"84c7c6b1-8bc7-4c86-ae31-45aa9c9c9445"},"source":["context.show(statistics_gen.outputs['statistics'])"],"execution_count":null,"outputs":[{"output_type":"display_data","data":{"text/html":["Artifact at /content/tfx/StatisticsGen/statistics/2

"],"text/plain":[""]},"metadata":{"tags":[]}},{"output_type":"display_data","data":{"text/html":["
'train' split:

"],"text/plain":[""]},"metadata":{"tags":[]}},{"output_type":"stream","text":["WARNING:tensorflow:From /usr/local/lib/python3.7/dist-packages/tensorflow_data_validation/utils/stats_util.py:247: tf_record_iterator (from tensorflow.python.lib.io.tf_record) is deprecated and will be removed in a future version.\n","Instructions for updating:\n","Use eager execution and: \n","`tf.data.TFRecordDataset(path)`\n"],"name":"stdout"},{"output_type":"display_data","data":{"text/html":["\n"," "],"text/plain":[""]},"metadata":{"tags":[]}},{"output_type":"display_data","data":{"text/html":["
'eval' split:

"],"text/plain":[""]},"metadata":{"tags":[]}},{"output_type":"display_data","data":{"text/html":["\n"," "],"text/plain":[""]},"metadata":{"tags":[]}}]},{"cell_type":"markdown","metadata":{"id":"HLKLTO9Nk60p"},"source":["### SchemaGen\n","\n","The `SchemaGen` component generates a schema based on your data statistics( outputs of StatisticsGen ). (A schema defines the expected bounds, types, and properties of the features in your dataset.) It also uses the [TensorFlow Data Validation](https://www.tensorflow.org/tfx/data_validation/get_started) library.\n","\n","Note: The generated schema is best-effort and only tries to infer basic properties of the data. It is expected that you review and modify it as needed.\n","\n","`SchemaGen` will take as input the statistics that we generated with `StatisticsGen`, looking at the training split by default."]},{"cell_type":"code","metadata":{"id":"ygQvZ6hsiQ_J","colab":{"base_uri":"https://localhost:8080/","height":238},"outputId":"4d3388a2-92ee-414d-c625-961d7b83ea86"},"source":["schema_gen = SchemaGen(\n"," statistics=statistics_gen.outputs['statistics'],\n"," infer_feature_shape=False)\n","context.run(schema_gen)"],"execution_count":null,"outputs":[{"output_type":"execute_result","data":{"text/html":["\n","\n","
ExecutionResult at 0x7f9766106c50
.execution_id3
.component\n","\n","
SchemaGen at 0x7f9761984150
.inputs
['statistics']\n","\n","
Channel of type 'ExampleStatistics' (1 artifact) at 0x7f97660af2d0
.type_nameExampleStatistics
._artifacts
[0]\n","\n","
Artifact of type 'ExampleStatistics' (uri: /content/tfx/StatisticsGen/statistics/2) at 0x7f97660af310
.type<class 'tfx.types.standard_artifacts.ExampleStatistics'>
.uri/content/tfx/StatisticsGen/statistics/2
.span0
.split_names["train", "eval"]
.outputs
['schema']\n","\n","
Channel of type 'Schema' (1 artifact) at 0x7f9761984b50
.type_nameSchema
._artifacts
[0]\n","\n","
Artifact of type 'Schema' (uri: /content/tfx/SchemaGen/schema/3) at 0x7f9761974f90
.type<class 'tfx.types.standard_artifacts.Schema'>
.uri/content/tfx/SchemaGen/schema/3
.exec_properties
['infer_feature_shape']0
['exclude_splits'][]
.component.inputs
['statistics']\n","\n","
Channel of type 'ExampleStatistics' (1 artifact) at 0x7f97660af2d0
.type_nameExampleStatistics
._artifacts
[0]\n","\n","
Artifact of type 'ExampleStatistics' (uri: /content/tfx/StatisticsGen/statistics/2) at 0x7f97660af310
.type<class 'tfx.types.standard_artifacts.ExampleStatistics'>
.uri/content/tfx/StatisticsGen/statistics/2
.span0
.split_names["train", "eval"]
.component.outputs
['schema']\n","\n","
Channel of type 'Schema' (1 artifact) at 0x7f9761984b50
.type_nameSchema
._artifacts
[0]\n","\n","
Artifact of type 'Schema' (uri: /content/tfx/SchemaGen/schema/3) at 0x7f9761974f90
.type<class 'tfx.types.standard_artifacts.Schema'>
.uri/content/tfx/SchemaGen/schema/3
"],"text/plain":["ExecutionResult(\n"," component_id: SchemaGen\n"," execution_id: 3\n"," outputs:\n"," schema: Channel(\n"," type_name: Schema\n"," artifacts: [Artifact(artifact: id: 3\n"," type_id: 9\n"," uri: \"/content/tfx/SchemaGen/schema/3\"\n"," custom_properties {\n"," key: \"name\"\n"," value {\n"," string_value: \"schema\"\n"," }\n"," }\n"," custom_properties {\n"," key: \"producer_component\"\n"," value {\n"," string_value: \"SchemaGen\"\n"," }\n"," }\n"," custom_properties {\n"," key: \"state\"\n"," value {\n"," string_value: \"published\"\n"," }\n"," }\n"," state: LIVE\n"," , artifact_type: id: 9\n"," name: \"Schema\"\n"," )]\n"," additional_properties: {}\n"," additional_custom_properties: {}\n"," ))"]},"metadata":{"tags":[]},"execution_count":15}]},{"cell_type":"markdown","metadata":{"id":"zi6TxTUKXM6b"},"source":["After `SchemaGen` finishes running, we can visualize the generated schema as a table."]},{"cell_type":"code","metadata":{"id":"Ec9vqDXpXeMb","colab":{"base_uri":"https://localhost:8080/","height":355},"outputId":"83c9cdc4-9d39-4ac6-b15c-3f651729c5c9"},"source":["context.show(schema_gen.outputs['schema'])"],"execution_count":null,"outputs":[{"output_type":"display_data","data":{"text/html":["Artifact at /content/tfx/SchemaGen/schema/3

"],"text/plain":[""]},"metadata":{"tags":[]}},{"output_type":"display_data","data":{"text/html":["
\n","\n","\n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n"," \n","
TypePresenceValencyDomain
Feature name
'dropoff_latitude'FLOATrequiredsingle-
'dropoff_longitude'FLOATrequiredsingle-
'fare'FLOATrequiredsingle-
'pickup_latitude'FLOATrequiredsingle-
'pickup_longitude'FLOATrequiredsingle-
'trip_start_day'INTrequiredsingle-
'trip_start_hour'INTrequiredsingle-
'trip_start_month'INTrequiredsingle-
\n","
"],"text/plain":[" Type Presence Valency Domain\n","Feature name \n","'dropoff_latitude' FLOAT required single -\n","'dropoff_longitude' FLOAT required single -\n","'fare' FLOAT required single -\n","'pickup_latitude' FLOAT required single -\n","'pickup_longitude' FLOAT required single -\n","'trip_start_day' INT required single -\n","'trip_start_hour' INT required single -\n","'trip_start_month' INT required single -"]},"metadata":{"tags":[]}}]},{"cell_type":"markdown","metadata":{"id":"kZWWdbA-m7zp"},"source":["Each feature in your dataset shows up as a row in the schema table, alongside its properties. The schema also captures all the values that a categorical feature takes on, denoted as its domain.\n","\n","To learn more about schemas, see [the SchemaGen documentation](https://www.tensorflow.org/tfx/guide/schemagen)."]},{"cell_type":"markdown","metadata":{"id":"V1qcUuO9k9f8"},"source":["### ExampleValidator\n","The `ExampleValidator` component detects anomalies in your data, based on the expectations defined by the schema. It also uses the [TensorFlow Data Validation](https://www.tensorflow.org/tfx/data_validation/get_started) library.\n","\n","`ExampleValidator` will take as input the statistics from `StatisticsGen`, and the schema from `SchemaGen`."]},{"cell_type":"code","metadata":{"id":"XRlRUuGgiXks","colab":{"base_uri":"https://localhost:8080/","height":363},"outputId":"9f5ab67c-2c41-4cc7-f7e7-275c2fd41391"},"source":["example_validator = ExampleValidator(\n"," statistics=statistics_gen.outputs['statistics'],\n"," schema=schema_gen.outputs['schema'])\n","context.run(example_validator)"],"execution_count":null,"outputs":[{"output_type":"execute_result","data":{"text/html":["\n","\n","
ExecutionResult at 0x7f9766186950
.execution_id4
.component\n","\n","
ExampleValidator at 0x7f9766186110
.inputs
['statistics']\n","\n","
Channel of type 'ExampleStatistics' (1 artifact) at 0x7f97660af2d0
.type_nameExampleStatistics
._artifacts
[0]\n","\n","
Artifact of type 'ExampleStatistics' (uri: /content/tfx/StatisticsGen/statistics/2) at 0x7f97660af310
.type<class 'tfx.types.standard_artifacts.ExampleStatistics'>
.uri/content/tfx/StatisticsGen/statistics/2
.span0
.split_names["train", "eval"]
['schema']\n","\n","
Channel of type 'Schema' (1 artifact) at 0x7f9761984b50
.type_nameSchema
._artifacts
[0]\n","\n","
Artifact of type 'Schema' (uri: /content/tfx/SchemaGen/schema/3) at 0x7f9761974f90
.type<class 'tfx.types.standard_artifacts.Schema'>
.uri/content/tfx/SchemaGen/schema/3
.outputs
['anomalies']\n","\n","
Channel of type 'ExampleAnomalies' (1 artifact) at 0x7f97661867d0
.type_nameExampleAnomalies
._artifacts
[0]\n","\n","
Artifact of type 'ExampleAnomalies' (uri: /content/tfx/ExampleValidator/anomalies/4) at 0x7f97660a1f10
.type<class 'tfx.types.standard_artifacts.ExampleAnomalies'>
.uri/content/tfx/ExampleValidator/anomalies/4
.span0
.split_names["train", "eval"]
.exec_properties
['exclude_splits'][]
.component.inputs
['statistics']\n","\n","
Channel of type 'ExampleStatistics' (1 artifact) at 0x7f97660af2d0
.type_nameExampleStatistics
._artifacts
[0]\n","\n","
Artifact of type 'ExampleStatistics' (uri: /content/tfx/StatisticsGen/statistics/2) at 0x7f97660af310
.type<class 'tfx.types.standard_artifacts.ExampleStatistics'>
.uri/content/tfx/StatisticsGen/statistics/2
.span0
.split_names["train", "eval"]
['schema']\n","\n","
Channel of type 'Schema' (1 artifact) at 0x7f9761984b50
.type_nameSchema
._artifacts
[0]\n","\n","
Artifact of type 'Schema' (uri: /content/tfx/SchemaGen/schema/3) at 0x7f9761974f90
.type<class 'tfx.types.standard_artifacts.Schema'>
.uri/content/tfx/SchemaGen/schema/3
.component.outputs
['anomalies']\n","\n","
Channel of type 'ExampleAnomalies' (1 artifact) at 0x7f97661867d0
.type_nameExampleAnomalies
._artifacts
[0]\n","\n","
Artifact of type 'ExampleAnomalies' (uri: /content/tfx/ExampleValidator/anomalies/4) at 0x7f97660a1f10
.type<class 'tfx.types.standard_artifacts.ExampleAnomalies'>
.uri/content/tfx/ExampleValidator/anomalies/4
.span0
.split_names["train", "eval"]
"],"text/plain":["ExecutionResult(\n"," component_id: ExampleValidator\n"," execution_id: 4\n"," outputs:\n"," anomalies: Channel(\n"," type_name: ExampleAnomalies\n"," artifacts: [Artifact(artifact: id: 4\n"," type_id: 11\n"," uri: \"/content/tfx/ExampleValidator/anomalies/4\"\n"," properties {\n"," key: \"split_names\"\n"," value {\n"," string_value: \"[\\\"train\\\", \\\"eval\\\"]\"\n"," }\n"," }\n"," custom_properties {\n"," key: \"name\"\n"," value {\n"," string_value: \"anomalies\"\n"," }\n"," }\n"," custom_properties {\n"," key: \"producer_component\"\n"," value {\n"," string_value: \"ExampleValidator\"\n"," }\n"," }\n"," custom_properties {\n"," key: \"state\"\n"," value {\n"," string_value: \"published\"\n"," }\n"," }\n"," state: LIVE\n"," , artifact_type: id: 11\n"," name: \"ExampleAnomalies\"\n"," properties {\n"," key: \"span\"\n"," value: INT\n"," }\n"," properties {\n"," key: \"split_names\"\n"," value: STRING\n"," }\n"," )]\n"," additional_properties: {}\n"," additional_custom_properties: {}\n"," ))"]},"metadata":{"tags":[]},"execution_count":17}]},{"cell_type":"markdown","metadata":{"id":"855mrHgJcoer"},"source":["After `ExampleValidator` finishes running, we can visualize the anomalies as a table."]},{"cell_type":"code","metadata":{"id":"TDyAAozQcrk3","colab":{"base_uri":"https://localhost:8080/","height":254},"outputId":"19fe888e-f999-4981-84ed-5b5d5bf42fb9"},"source":["context.show(example_validator.outputs['anomalies'])"],"execution_count":null,"outputs":[{"output_type":"display_data","data":{"text/html":["Artifact at /content/tfx/ExampleValidator/anomalies/4

"],"text/plain":[""]},"metadata":{"tags":[]}},{"output_type":"display_data","data":{"text/html":["
'train' split:

"],"text/plain":[""]},"metadata":{"tags":[]}},{"output_type":"stream","text":["/usr/local/lib/python3.7/dist-packages/tensorflow_data_validation/utils/display_util.py:188: FutureWarning: Passing a negative integer is deprecated in version 1.0 and will not be supported in future version. Instead, use None to not limit the column width.\n"," pd.set_option('max_colwidth', -1)\n"],"name":"stderr"},{"output_type":"display_data","data":{"text/html":["

No anomalies found.

"],"text/plain":[""]},"metadata":{"tags":[]}},{"output_type":"display_data","data":{"text/html":["
'eval' split:

"],"text/plain":[""]},"metadata":{"tags":[]}},{"output_type":"display_data","data":{"text/html":["

No anomalies found.

"],"text/plain":[""]},"metadata":{"tags":[]}}]},{"cell_type":"markdown","metadata":{"id":"znMoJj60ybZx"},"source":["In the anomalies table, we can see that there are no anomalies. This is what we'd expect, since this the first dataset that we've analyzed and the schema is tailored to it. You should review this schema -- anything unexpected means an anomaly in the data. Once reviewed, the schema can be used to guard future data, and anomalies produced here can be used to debug model performance, understand how your data evolves over time, and identify data errors."]},{"cell_type":"code","metadata":{"id":"pwbW2zPKR_S4","colab":{"base_uri":"https://localhost:8080/"},"outputId":"24553f73-1953-4282-dba1-fde70cd58c9c"},"source":["# Get the URI of the output artifact representing the transformed examples, which is a directory\n","train_uri = os.path.join(example_gen.outputs['examples'].get()[0].uri, 'train')\n","\n","# Get the list of files in this directory (all compressed TFRecord files)\n","tfrecord_filenames = [os.path.join(train_uri, name)\n"," for name in os.listdir(train_uri)]\n","\n","# Create a `TFRecordDataset` to read these files\n","dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type=\"GZIP\")\n","\n","# Iterate over the first 1 records and decode them.\n","for tfrecord in dataset.take(1):\n"," serialized_example = tfrecord.numpy()\n"," example = tf.train.Example()\n"," example.ParseFromString(serialized_example)\n"," pp.pprint(example)"],"execution_count":null,"outputs":[{"output_type":"stream","text":["features {\n"," feature {\n"," key: \"dropoff_latitude\"\n"," value {\n"," float_list {\n"," value: 41.92045211791992\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"dropoff_longitude\"\n"," value {\n"," float_list {\n"," value: -87.6799545288086\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"fare\"\n"," value {\n"," float_list {\n"," value: 3.8499999046325684\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"pickup_latitude\"\n"," value {\n"," float_list {\n"," value: 41.8996696472168\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"pickup_longitude\"\n"," value {\n"," float_list {\n"," value: -87.66983795166016\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"trip_start_day\"\n"," value {\n"," int64_list {\n"," value: 6\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"trip_start_hour\"\n"," value {\n"," int64_list {\n"," value: 15\n"," }\n"," }\n"," }\n"," feature {\n"," key: \"trip_start_month\"\n"," value {\n"," int64_list {\n"," value: 3\n"," }\n"," }\n"," }\n","}\n","\n"],"name":"stdout"}]},{"cell_type":"markdown","metadata":{"id":"JtyRz53ZGNDH"},"source":["### Transform\n","We can use TFT here but, specifically I am using other options like constants calculated via Pandas / Numpy etc. These all will be stored on a **constants_trainer.py** file and then used in trainer."]},{"cell_type":"code","metadata":{"id":"GPsP28Zyr3pk"},"source":["bins_lat = pd.qcut(list(df['dropoff_latitude'].values) + list(df['pickup_latitude'].values), q=20, duplicates='drop', retbins=True)[1]\n","bins_lon = pd.qcut(list(df['dropoff_longitude'].values) + list(df['pickup_longitude'].values), q=20, duplicates='drop', retbins=True)[1]"],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"id":"T-zikEmDGkpf"},"source":["code = '''\n","bins_lat = {bins_lat}\n","bins_lon = {bins_lon}\n","'''\n","\n","code = code.replace('{bins_lat}', str(list(bins_lat)))\n","code = code.replace('{bins_lon}', str(list(bins_lon)))\n","\n","with open(_constants_module_file, 'w') as writefile:\n"," writefile.write(code)"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"q_b_V6eN4f69"},"source":["After the `Transform` component has transformed your data into features, and the next step is to train a model."]},{"cell_type":"markdown","metadata":{"id":"OBJFtnl6lCg9"},"source":["### Trainer\n","The `Trainer` component will train a model that you define in TensorFlow. Default Trainer support Estimator API, to use Keras API, you need to specify [Generic Trainer](https://github.com/tensorflow/community/blob/master/rfcs/20200117-tfx-generic-trainer.md) by setup `custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor)` in Trainer's contructor.\n","\n","`Trainer` takes as input the schema from `SchemaGen`, the transformed data and graph from `Transform`, training parameters, as well as a module that contains user-defined model code.\n","\n","Will generate two files: \n","- **inputfn_trainer.py** *Data-Feeder to model\n","- **model_trainer.py** *Trainer module"]},{"cell_type":"code","metadata":{"colab":{"base_uri":"https://localhost:8080/"},"id":"R0kTyFLBCMNI","outputId":"89740de6-09e8-42a4-ffdc-c360be892853"},"source":["%%writefile {_input_fn_module_file}\n","\n","import os\n","import tensorflow as tf\n","\n","###############################\n","##Feature engineering functions\n","def feature_engg_features(features):\n"," #Add new features\n"," features['distance'] = ((features['pickup_latitude'] - features['dropoff_latitude'])**2 + (features['pickup_longitude'] - features['dropoff_longitude'])**2)**0.5\n"," features['trip_start_month'] = tf.strings.as_string(features['trip_start_month'])\n"," features['trip_start_hour'] = tf.strings.as_string(features['trip_start_hour'])\n"," features['trip_start_day'] = tf.strings.as_string(features['trip_start_day'])\n","\n"," return(features)\n","\n","#To be called from TF\n","def feature_engg(features, label):\n"," #Add new features\n"," features = feature_engg_features(features)\n","\n"," return(features, label)\n","\n","def make_input_fn(dir_uri, mode, vnum_epochs = None, batch_size = 512):\n"," def decode_tfr(serialized_example):\n"," # 1. define a parser\n"," features = tf.io.parse_example(\n"," serialized_example,\n"," # Defaults are not specified since both keys are required.\n"," features={\n"," 'dropoff_latitude': tf.io.FixedLenFeature([], tf.float32),\n"," 'dropoff_longitude': tf.io.FixedLenFeature([], tf.float32),\n"," 'fare': tf.io.FixedLenFeature([], tf.float32),\n"," 'pickup_latitude': tf.io.FixedLenFeature([], tf.float32, default_value = 0.0),\n"," 'pickup_longitude': tf.io.FixedLenFeature([], tf.float32, default_value = 0.0),\n"," 'trip_start_day': tf.io.FixedLenFeature([], tf.int64),\n"," 'trip_start_hour': tf.io.FixedLenFeature([], tf.int64),\n"," 'trip_start_month': tf.io.FixedLenFeature([], tf.int64)\n"," })\n","\n"," return features, features['fare']\n","\n"," def _input_fn(v_test=False):\n"," # Get the list of files in this directory (all compressed TFRecord files)\n"," tfrecord_filenames = tf.io.gfile.glob(dir_uri)\n","\n"," # Create a `TFRecordDataset` to read these files\n"," dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type=\"GZIP\")\n","\n"," if mode == tf.estimator.ModeKeys.TRAIN:\n"," num_epochs = vnum_epochs # indefinitely\n"," else:\n"," num_epochs = 1 # end-of-input after this\n","\n"," dataset = dataset.batch(batch_size)\n"," dataset = dataset.prefetch(buffer_size = batch_size)\n","\n"," #Convert TFRecord data to dict\n"," dataset = dataset.map(decode_tfr)\n","\n"," #Feature engineering\n"," dataset = dataset.map(feature_engg)\n","\n"," if mode == tf.estimator.ModeKeys.TRAIN:\n"," num_epochs = vnum_epochs # indefinitely\n"," dataset = dataset.shuffle(buffer_size = batch_size)\n"," else:\n"," num_epochs = 1 # end-of-input after this\n","\n"," dataset = dataset.repeat(num_epochs) \n"," \n"," #Begins - Uncomment for testing only -----------------------------------------------------<\n"," if v_test == True:\n"," print(next(dataset.__iter__()))\n"," \n"," #End - Uncomment for testing only -----------------------------------------------------<\n"," return dataset\n"," return _input_fn"],"execution_count":null,"outputs":[{"output_type":"stream","text":["Writing inputfn_trainer.py\n"],"name":"stdout"}]},{"cell_type":"code","metadata":{"colab":{"base_uri":"https://localhost:8080/"},"id":"Z6QlsOPtDCQE","outputId":"c4ceb299-86c7-45a4-ef88-d55a7872bb85"},"source":["##Test the input function\n","import inputfn_trainer as ift\n","\n","#Test dataset read + Feat Engg function's - output's CSV + Feature engg columns\n","eval_file = os.path.join(example_gen.outputs['examples'].get()[0].uri, 'eval/*')\n","fn_d = ift.make_input_fn(dir_uri = eval_file,\n"," mode = tf.estimator.ModeKeys.EVAL,\n"," # vnum_epochs = 1,\n"," batch_size = 10)\n","\n","fn_d(v_test=True)"],"execution_count":null,"outputs":[{"output_type":"stream","text":["({'dropoff_latitude': , 'dropoff_longitude': , 'fare': , 'pickup_latitude': , 'pickup_longitude': , 'trip_start_day': , 'trip_start_hour': , 'trip_start_month': , 'distance': }, )\n"],"name":"stdout"},{"output_type":"execute_result","data":{"text/plain":[""]},"metadata":{"tags":[]},"execution_count":23}]},{"cell_type":"code","metadata":{"id":"nf9UuNng4YJu","colab":{"base_uri":"https://localhost:8080/"},"outputId":"b921f3b6-6f70-4057-91d6-8e3240756367"},"source":["%%writefile {_model_trainer_module_file}\n","\n","import tensorflow as tf\n","import tensorflow.keras as keras\n","import inputfn_trainer as ift\n","import constants_trainer as ct\n","\n","from tfx.components.trainer.fn_args_utils import FnArgs\n","print(tf.__version__)\n","\n","device = \"gpu\"\n","\n","if device == \"tpu\":\n"," resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='grpc://' + os.environ['COLAB_TPU_ADDR'])\n"," tf.config.experimental_connect_to_cluster(resolver)\n"," # This is the TPU initialization code that has to be at the beginning.\n"," tf.tpu.experimental.initialize_tpu_system(resolver)\n"," strategy = tf.distribute.experimental.TPUStrategy(resolver)\n","else:\n"," strategy = tf.distribute.MultiWorkerMirroredStrategy()\n","\n","#Create model\n","params_default = {\n"," 'lr' : 0.001,\n"," 'beta_1' : 0.99,\n"," 'beta_2' : 0.999,\n"," 'epsilon' : 1e-08,\n"," 'decay' : 0.01,\n"," 'hidden_layers' : 1\n","}\n","\n","# Define feature columns(Including feature engineered ones )\n","# These are the features which come from the TF Data pipeline\n","def create_feature_cols():\n"," #Keras format features\n"," k_month = tf.keras.Input(name='trip_start_month', shape=(1,), dtype=tf.string)\n"," k_hour = tf.keras.Input(name='trip_start_hour', shape=(1,), dtype=tf.string)\n"," k_day = tf.keras.Input(name='trip_start_day', shape=(1,), dtype=tf.string)\n"," k_picklat = tf.keras.Input(name='pickup_latitude', shape=(1,), dtype=tf.float32)\n"," k_picklon = tf.keras.Input(name='pickup_longitude', shape=(1,), dtype=tf.float32)\n"," k_droplat = tf.keras.Input(name='dropoff_latitude', shape=(1,), dtype=tf.float32)\n"," k_droplon = tf.keras.Input(name='dropoff_longitude', shape=(1,), dtype=tf.float32)\n"," k_distance = tf.keras.Input(name='distance', shape=(1,), dtype=tf.float32)\n"," keras_dict_input = {'trip_start_month': k_month, 'trip_start_hour': k_hour, 'trip_start_day' : k_day,\n"," 'pickup_latitude': k_picklat, 'pickup_longitude': k_picklon,\n"," 'dropoff_latitude': k_droplat, 'dropoff_longitude': k_droplon, 'distance' : k_distance\n"," }\n","\n"," return({'K' : keras_dict_input})\n","\n","def create_keras_model(feature_cols, bins_lat, bins_lon, params = params_default):\n"," METRICS = [\n"," keras.metrics.RootMeanSquaredError(name='rmse')\n"," ]\n","\n"," #Input layers\n"," input_feats = []\n"," for inp in feature_cols['K'].keys():\n"," input_feats.append(feature_cols['K'][inp])\n","\n"," ##Input processing\n"," ##https://keras.io/examples/structured_data/structured_data_classification_from_scratch/\n"," ##https://github.com/tensorflow/community/blob/master/rfcs/20191212-keras-categorical-inputs.md\n","\n"," ##Handle categorical attributes( One-hot encoding )\n"," cat_day = tf.keras.layers.experimental.preprocessing.StringLookup(vocabulary=['1','2','3','4','5','6','7'], mask_token=None)(feature_cols['K']['trip_start_day'])\n"," cat_day = tf.keras.layers.experimental.preprocessing.CategoryEncoding(max_tokens=7)(cat_day)\n","\n"," cat_hour = tf.keras.layers.experimental.preprocessing.StringLookup(vocabulary=['1','2','3','4','5','6','7','8'\n"," '9','10','11','12','13','14','15','16',\n"," '17','18','19','20','21','22','23','0'\n"," ], mask_token=None)(feature_cols['K']['trip_start_hour'])\n"," cat_hour = tf.keras.layers.experimental.preprocessing.CategoryEncoding(max_tokens=24)(cat_hour)\n","\n"," cat_month = tf.keras.layers.experimental.preprocessing.StringLookup(vocabulary=['1','2','3','4','5','6','7','8'\n"," '9','10','11','12'], mask_token=None)(feature_cols['K']['trip_start_month'])\n"," cat_month = tf.keras.layers.experimental.preprocessing.CategoryEncoding(max_tokens=12)(cat_month)\n","\n"," # cat_company = tf.keras.layers.experimental.preprocessing.StringLookup(vocabulary=df['company'].unique(), mask_token=None)(feature_cols['K']['company'])\n"," # cat_company = tf.keras.layers.experimental.preprocessing.CategoryEncoding(max_tokens=len(df['company'].unique()))(cat_company)\n","\n"," ##Binning\n"," bins_pickup_lat = tf.keras.layers.experimental.preprocessing.Discretization(bins = bins_lat)(feature_cols['K']['pickup_latitude'])\n"," cat_pickup_lat = tf.keras.layers.experimental.preprocessing.CategoryEncoding(len(bins_lat)+1)(bins_pickup_lat)\n","\n"," bins_pickup_lon = tf.keras.layers.experimental.preprocessing.Discretization(bins = bins_lon)(feature_cols['K']['pickup_longitude'])\n"," cat_pickup_lon = tf.keras.layers.experimental.preprocessing.CategoryEncoding(len(bins_lon)+1)(bins_pickup_lon)\n","\n"," bins_drop_lat = tf.keras.layers.experimental.preprocessing.Discretization(bins = bins_lat)(feature_cols['K']['dropoff_latitude'])\n"," cat_drop_lat = tf.keras.layers.experimental.preprocessing.CategoryEncoding(len(bins_lat)+1)(bins_drop_lat)\n","\n"," bins_drop_lon = tf.keras.layers.experimental.preprocessing.Discretization(bins = bins_lon)(feature_cols['K']['dropoff_longitude'])\n"," cat_drop_lon = tf.keras.layers.experimental.preprocessing.CategoryEncoding(len(bins_lon)+1)(bins_drop_lon)\n","\n"," ##Categorical cross\n"," cross_day_hour = tf.keras.layers.experimental.preprocessing.CategoryCrossing()([cat_day, cat_hour])\n"," hash_cross_day_hour = tf.keras.layers.experimental.preprocessing.Hashing(num_bins=24 * 7)(cross_day_hour)\n"," cat_cross_day_hour = tf.keras.layers.experimental.preprocessing.CategoryEncoding(max_tokens = 24* 7)(hash_cross_day_hour)\n","\n"," cross_pick_lon_lat = tf.keras.layers.experimental.preprocessing.CategoryCrossing()([cat_pickup_lat, cat_pickup_lon])\n"," hash_cross_pick_lon_lat = tf.keras.layers.experimental.preprocessing.Hashing(num_bins=(len(bins_lat) + 1) ** 2)(cross_pick_lon_lat)\n","\n"," cross_drop_lon_lat = tf.keras.layers.experimental.preprocessing.CategoryCrossing()([cat_drop_lat, cat_drop_lon])\n"," hash_cross_drop_lon_lat = tf.keras.layers.experimental.preprocessing.Hashing(num_bins=(len(bins_lat) + 1) ** 2)(cross_drop_lon_lat)\n","\n"," # Cross to embedding\n"," embed_cross_pick_lon_lat = tf.keras.layers.Embedding(((len(bins_lat) + 1) ** 2), 4)(hash_cross_pick_lon_lat)\n"," embed_cross_pick_lon_lat = tf.reduce_sum(embed_cross_pick_lon_lat, axis=-2)\n","\n"," embed_cross_drop_lon_lat = tf.keras.layers.Embedding(((len(bins_lat) + 1) ** 2), 4)(hash_cross_drop_lon_lat)\n"," embed_cross_drop_lon_lat = tf.reduce_sum(embed_cross_drop_lon_lat, axis=-2)\n","\n"," # Also pass time attributes as Deep signal( Cast to integer )\n"," int_trip_start_day = tf.strings.to_number(feature_cols['K']['trip_start_day'], tf.float32)\n"," int_trip_start_hour = tf.strings.to_number(feature_cols['K']['trip_start_hour'], tf.float32)\n"," int_trip_start_month = tf.strings.to_number(feature_cols['K']['trip_start_month'], tf.float32)\n","\n"," #Add feature engineered columns - LAMBDA layer\n","\n"," ###Create MODEL\n"," ####Concatenate all features( Numerical input )\n"," x_input_numeric = tf.keras.layers.concatenate([\n"," feature_cols['K']['pickup_latitude'], feature_cols['K']['pickup_longitude'],\n"," feature_cols['K']['dropoff_latitude'], feature_cols['K']['dropoff_longitude'],\n"," feature_cols['K']['distance'], embed_cross_pick_lon_lat, embed_cross_drop_lon_lat,\n"," int_trip_start_day, int_trip_start_hour, int_trip_start_month\n"," ])\n","\n"," #DEEP - This Dense layer connects to input layer - Numeric Data\n"," x_numeric = tf.keras.layers.Dense(32, activation='relu', kernel_initializer=\"he_uniform\")(x_input_numeric)\n"," x_numeric = tf.keras.layers.BatchNormalization()(x_numeric)\n","\n"," ####Concatenate all Categorical features( Categorical converted )\n"," x_input_categ = tf.keras.layers.concatenate([\n"," cat_month, cat_cross_day_hour, cat_pickup_lat, cat_pickup_lon,\n"," cat_drop_lat, cat_drop_lon\n"," ])\n"," \n"," #WIDE - This Dense layer connects to input layer - Categorical Data\n"," x_categ = tf.keras.layers.Dense(32, activation='relu', kernel_initializer=\"he_uniform\")(x_input_categ)\n","\n"," ####Concatenate both Wide and Deep layers\n"," x = tf.keras.layers.concatenate([x_categ, x_numeric])\n","\n"," for l_ in range(params['hidden_layers']):\n"," x = tf.keras.layers.Dense(32, activation='relu', kernel_initializer=\"he_uniform\",\n"," activity_regularizer=tf.keras.regularizers.l2(0.00001))(x)\n"," x = tf.keras.layers.BatchNormalization()(x)\n","\n"," #Final Layer\n"," out = tf.keras.layers.Dense(1, activation='relu')(x)\n"," model = tf.keras.Model(input_feats, out)\n","\n"," #Set optimizer\n"," opt = tf.keras.optimizers.Adam(lr= params['lr'], beta_1=params['beta_1'], \n"," beta_2=params['beta_2'], epsilon=params['epsilon'])\n","\n"," #Compile model\n"," model.compile(loss='mean_squared_error', optimizer=opt, metrics = METRICS)\n","\n"," #Print Summary\n"," print(model.summary())\n"," return model\n","\n","def keras_train_and_evaluate(model, train_dataset, validation_dataset, epochs=100):\n"," #Add callbacks\n"," reduce_lr = keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2,\n"," patience=5, min_lr=0.00001, verbose = 1)\n"," \n"," tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=\"./logs\")\n","\n"," #Train and Evaluate\n"," out = model.fit(train_dataset, \n"," validation_data = validation_dataset,\n"," epochs=epochs,\n"," # validation_steps = 3, ###Keep this none for running evaluation on full EVAL data every epoch\n"," steps_per_epoch = 100, ###Has to be passed - Cant help it :) [ Number of batches per epoch ]\n"," callbacks=[reduce_lr, #modelsave_callback, #tensorboard_callback, \n"," keras.callbacks.EarlyStopping(patience=20, restore_best_weights=True, verbose=True)]\n"," )\n","\n"," return model\n","\n","def save_model(model, model_save_path):\n"," @tf.function\n"," def serving(dropoff_latitude, dropoff_longitude, pickup_latitude, pickup_longitude, trip_start_day, trip_start_hour, trip_start_month):\n"," ##Feature engineering( calculate distance )\n"," distance = tf.cast( tf.sqrt((tf.abs(dropoff_latitude - pickup_latitude))**2 + (tf.abs(dropoff_longitude - pickup_longitude))**2), tf.float32)\n","\n"," payload = {\n"," 'dropoff_latitude': dropoff_latitude,\n"," 'dropoff_longitude': dropoff_longitude,\n"," 'pickup_latitude': pickup_latitude,\n"," 'pickup_longitude': pickup_longitude,\n"," 'trip_start_day': trip_start_day,\n"," 'trip_start_hour': trip_start_hour,\n"," 'trip_start_month': trip_start_month,\n"," 'distance': distance\n"," }\n"," \n"," ## Predict\n"," ##IF THERE IS AN ERROR IN NUMBER OF PARAMS PASSED HERE OR DATA TYPE THEN IT GIVES ERROR, \"COULDN'T COMPUTE OUTPUT TENSOR\"\n"," predictions = model(payload)\n"," return predictions\n","\n"," serving = serving.get_concrete_function(trip_start_day=tf.TensorSpec([None,], dtype= tf.string, name='trip_start_day'), \n"," trip_start_hour=tf.TensorSpec([None,], dtype= tf.string, name='trip_start_hour'),\n"," trip_start_month=tf.TensorSpec([None], dtype= tf.string, name='trip_start_month'), \n"," dropoff_latitude=tf.TensorSpec([None,], dtype= tf.float32, name='dropoff_latitude'),\n"," dropoff_longitude=tf.TensorSpec([None,], dtype= tf.float32, name='dropoff_longitude'), \n"," pickup_latitude=tf.TensorSpec([None,], dtype= tf.float32, name='pickup_latitude'),\n"," pickup_longitude=tf.TensorSpec([None,], dtype= tf.float32, name='pickup_longitude')\n"," )\n","\n"," # version = \"1\" #{'serving_default': call_output}\n"," tf.saved_model.save(\n"," model,\n"," model_save_path + \"/\",\n"," signatures=serving\n"," )\n","\n","##Main function called by TFX\n","def run_fn(fn_args: FnArgs):\n"," #Create dataset input functions\n"," train_dataset = ift.make_input_fn(dir_uri = fn_args.train_files,\n"," mode = tf.estimator.ModeKeys.TRAIN,\n"," batch_size = 128)()\n","\n"," validation_dataset = ift.make_input_fn(dir_uri = fn_args.eval_files,\n"," mode = tf.estimator.ModeKeys.EVAL,\n"," batch_size = 512)()\n","\n"," #Create model\n"," m_ = create_keras_model(params = params_default, feature_cols = create_feature_cols(),\n"," bins_lat = ct.bins_lat,\n"," bins_lon = ct.bins_lon)\n"," tf.keras.utils.plot_model(m_, show_shapes=True, rankdir=\"LR\")\n","\n"," #Train model\n"," m_ = keras_train_and_evaluate(m_, train_dataset, validation_dataset, fn_args.custom_config['epochs'])\n","\n"," #Save model with custom signature\n"," save_model(m_, fn_args.serving_model_dir)"],"execution_count":null,"outputs":[{"output_type":"stream","text":["Writing model_trainer.py\n"],"name":"stdout"}]},{"cell_type":"markdown","metadata":{"id":"GY4yTRaX4YJx"},"source":["Now, we pass in this model code to the `Trainer` component and run it to train the model."]},{"cell_type":"code","metadata":{"id":"429-vvCWibO0","colab":{"base_uri":"https://localhost:8080/","height":1000},"outputId":"1297b620-7bc5-4300-adcf-947527f3ce1d"},"source":["trainer = Trainer(\n"," module_file=os.path.abspath(_model_trainer_module_file),\n"," custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),\n"," examples=example_gen.outputs['examples'],\n"," train_args=trainer_pb2.TrainArgs(),\n"," eval_args=trainer_pb2.EvalArgs(),\n"," custom_config=({\"epochs\": 1})\n"," )\n","\n","context.run(trainer)"],"execution_count":null,"outputs":[{"output_type":"stream","text":["WARNING:absl:From :3: The name tfx.components.base.executor_spec.ExecutorClassSpec is deprecated. Please use tfx.dsl.components.base.executor_spec.ExecutorClassSpec instead.\n"],"name":"stderr"},{"output_type":"stream","text":["2.4.1\n","WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.\n","INFO:tensorflow:Using MirroredStrategy with devices ('/device:CPU:0',)\n","INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","Model: \"model\"\n","__________________________________________________________________________________________________\n","Layer (type) Output Shape Param # Connected to \n","==================================================================================================\n","pickup_latitude (InputLayer) [(None, 1)] 0 \n","__________________________________________________________________________________________________\n","pickup_longitude (InputLayer) [(None, 1)] 0 \n","__________________________________________________________________________________________________\n","dropoff_latitude (InputLayer) [(None, 1)] 0 \n","__________________________________________________________________________________________________\n","dropoff_longitude (InputLayer) [(None, 1)] 0 \n","__________________________________________________________________________________________________\n","discretization (Discretization) (None, 1) 0 pickup_latitude[0][0] \n","__________________________________________________________________________________________________\n","discretization_1 (Discretizatio (None, 1) 0 pickup_longitude[0][0] \n","__________________________________________________________________________________________________\n","discretization_2 (Discretizatio (None, 1) 0 dropoff_latitude[0][0] \n","__________________________________________________________________________________________________\n","discretization_3 (Discretizatio (None, 1) 0 dropoff_longitude[0][0] \n","__________________________________________________________________________________________________\n","trip_start_day (InputLayer) [(None, 1)] 0 \n","__________________________________________________________________________________________________\n","trip_start_hour (InputLayer) [(None, 1)] 0 \n","__________________________________________________________________________________________________\n","category_encoding_3 (CategoryEn (None, 21) 0 discretization[0][0] \n","__________________________________________________________________________________________________\n","category_encoding_4 (CategoryEn (None, 21) 0 discretization_1[0][0] \n","__________________________________________________________________________________________________\n","category_encoding_5 (CategoryEn (None, 21) 0 discretization_2[0][0] \n","__________________________________________________________________________________________________\n","category_encoding_6 (CategoryEn (None, 21) 0 discretization_3[0][0] \n","__________________________________________________________________________________________________\n","string_lookup (StringLookup) (None, 1) 0 trip_start_day[0][0] \n","__________________________________________________________________________________________________\n","string_lookup_1 (StringLookup) (None, 1) 0 trip_start_hour[0][0] \n","__________________________________________________________________________________________________\n","category_crossing_1 (CategoryCr (None, None) 0 category_encoding_3[0][0] \n"," category_encoding_4[0][0] \n","__________________________________________________________________________________________________\n","category_crossing_2 (CategoryCr (None, None) 0 category_encoding_5[0][0] \n"," category_encoding_6[0][0] \n","__________________________________________________________________________________________________\n","category_encoding (CategoryEnco (None, 7) 0 string_lookup[0][0] \n","__________________________________________________________________________________________________\n","category_encoding_1 (CategoryEn (None, 24) 0 string_lookup_1[0][0] \n","__________________________________________________________________________________________________\n","hashing_1 (Hashing) (None, None) 0 category_crossing_1[0][0] \n","__________________________________________________________________________________________________\n","hashing_2 (Hashing) (None, None) 0 category_crossing_2[0][0] \n","__________________________________________________________________________________________________\n","trip_start_month (InputLayer) [(None, 1)] 0 \n","__________________________________________________________________________________________________\n","category_crossing (CategoryCros (None, None) 0 category_encoding[0][0] \n"," category_encoding_1[0][0] \n","__________________________________________________________________________________________________\n","embedding (Embedding) (None, None, 4) 1764 hashing_1[0][0] \n","__________________________________________________________________________________________________\n","embedding_1 (Embedding) (None, None, 4) 1764 hashing_2[0][0] \n","__________________________________________________________________________________________________\n","string_lookup_2 (StringLookup) (None, 1) 0 trip_start_month[0][0] \n","__________________________________________________________________________________________________\n","hashing (Hashing) (None, None) 0 category_crossing[0][0] \n","__________________________________________________________________________________________________\n","distance (InputLayer) [(None, 1)] 0 \n","__________________________________________________________________________________________________\n","tf.math.reduce_sum (TFOpLambda) (None, 4) 0 embedding[0][0] \n","__________________________________________________________________________________________________\n","tf.math.reduce_sum_1 (TFOpLambd (None, 4) 0 embedding_1[0][0] \n","__________________________________________________________________________________________________\n","tf.strings.to_number (TFOpLambd (None, 1) 0 trip_start_day[0][0] \n","__________________________________________________________________________________________________\n","tf.strings.to_number_1 (TFOpLam (None, 1) 0 trip_start_hour[0][0] \n","__________________________________________________________________________________________________\n","tf.strings.to_number_2 (TFOpLam (None, 1) 0 trip_start_month[0][0] \n","__________________________________________________________________________________________________\n","category_encoding_2 (CategoryEn (None, 12) 0 string_lookup_2[0][0] \n","__________________________________________________________________________________________________\n","category_encoding_7 (CategoryEn (None, 168) 0 hashing[0][0] \n","__________________________________________________________________________________________________\n","concatenate (Concatenate) (None, 16) 0 pickup_latitude[0][0] \n"," pickup_longitude[0][0] \n"," dropoff_latitude[0][0] \n"," dropoff_longitude[0][0] \n"," distance[0][0] \n"," tf.math.reduce_sum[0][0] \n"," tf.math.reduce_sum_1[0][0] \n"," tf.strings.to_number[0][0] \n"," tf.strings.to_number_1[0][0] \n"," tf.strings.to_number_2[0][0] \n","__________________________________________________________________________________________________\n","concatenate_1 (Concatenate) (None, 264) 0 category_encoding_2[0][0] \n"," category_encoding_7[0][0] \n"," category_encoding_3[0][0] \n"," category_encoding_4[0][0] \n"," category_encoding_5[0][0] \n"," category_encoding_6[0][0] \n","__________________________________________________________________________________________________\n","dense (Dense) (None, 32) 544 concatenate[0][0] \n","__________________________________________________________________________________________________\n","dense_1 (Dense) (None, 32) 8480 concatenate_1[0][0] \n","__________________________________________________________________________________________________\n","batch_normalization (BatchNorma (None, 32) 128 dense[0][0] \n","__________________________________________________________________________________________________\n","concatenate_2 (Concatenate) (None, 64) 0 dense_1[0][0] \n"," batch_normalization[0][0] \n","__________________________________________________________________________________________________\n","dense_2 (Dense) (None, 32) 2080 concatenate_2[0][0] \n","__________________________________________________________________________________________________\n","batch_normalization_1 (BatchNor (None, 32) 128 dense_2[0][0] \n","__________________________________________________________________________________________________\n","dense_3 (Dense) (None, 1) 33 batch_normalization_1[0][0] \n","==================================================================================================\n","Total params: 14,921\n","Trainable params: 14,793\n","Non-trainable params: 128\n","__________________________________________________________________________________________________\n","None\n"],"name":"stdout"},{"output_type":"stream","text":["/usr/local/lib/python3.7/dist-packages/tensorflow/python/keras/engine/functional.py:595: UserWarning: Input dict contained keys ['fare'] which did not match any model input. They will be ignored by the model.\n"," [n for n in tensors.keys() if n not in ref_input_names])\n"],"name":"stderr"},{"output_type":"stream","text":["WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","100/100 [==============================] - ETA: 0s - loss: 292.6946 - rmse: 16.9802WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","100/100 [==============================] - 20s 173ms/step - loss: 292.2560 - rmse: 16.9681 - val_loss: 199.9450 - val_rmse: 14.1400\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","INFO:tensorflow:Assets written to: /content/tfx/Trainer/model/5/serving_model_dir/assets\n"],"name":"stdout"},{"output_type":"execute_result","data":{"text/html":["\n","\n","
ExecutionResult at 0x7f976628f610
.execution_id5
.component\n","\n","
Trainer at 0x7f97662bee10
.inputs
['examples']\n","\n","
Channel of type 'Examples' (1 artifact) at 0x7f97bc88c8d0
.type_nameExamples
._artifacts
[0]\n","\n","
Artifact of type 'Examples' (uri: /content/tfx/CsvExampleGen/examples/1) at 0x7f9767e35f50
.type<class 'tfx.types.standard_artifacts.Examples'>
.uri/content/tfx/CsvExampleGen/examples/1
.span0
.split_names["train", "eval"]
.version0
.outputs
['model']\n","\n","
Channel of type 'Model' (1 artifact) at 0x7f97661e93d0
.type_nameModel
._artifacts
[0]\n","\n","
Artifact of type 'Model' (uri: /content/tfx/Trainer/model/5) at 0x7f9766771050
.type<class 'tfx.types.standard_artifacts.Model'>
.uri/content/tfx/Trainer/model/5
['model_run']\n","\n","
Channel of type 'ModelRun' (1 artifact) at 0x7f97661e9350
.type_nameModelRun
._artifacts
[0]\n","\n","
Artifact of type 'ModelRun' (uri: /content/tfx/Trainer/model_run/5) at 0x7f9766771c10
.type<class 'tfx.types.standard_artifacts.ModelRun'>
.uri/content/tfx/Trainer/model_run/5
.exec_properties
['train_args']{}
['eval_args']{}
['module_file']/content/model_trainer.py
['run_fn']None
['trainer_fn']None
['custom_config']{"epochs": 1}
.component.inputs
['examples']\n","\n","
Channel of type 'Examples' (1 artifact) at 0x7f97bc88c8d0
.type_nameExamples
._artifacts
[0]\n","\n","
Artifact of type 'Examples' (uri: /content/tfx/CsvExampleGen/examples/1) at 0x7f9767e35f50
.type<class 'tfx.types.standard_artifacts.Examples'>
.uri/content/tfx/CsvExampleGen/examples/1
.span0
.split_names["train", "eval"]
.version0
.component.outputs
['model']\n","\n","
Channel of type 'Model' (1 artifact) at 0x7f97661e93d0
.type_nameModel
._artifacts
[0]\n","\n","
Artifact of type 'Model' (uri: /content/tfx/Trainer/model/5) at 0x7f9766771050
.type<class 'tfx.types.standard_artifacts.Model'>
.uri/content/tfx/Trainer/model/5
['model_run']\n","\n","
Channel of type 'ModelRun' (1 artifact) at 0x7f97661e9350
.type_nameModelRun
._artifacts
[0]\n","\n","
Artifact of type 'ModelRun' (uri: /content/tfx/Trainer/model_run/5) at 0x7f9766771c10
.type<class 'tfx.types.standard_artifacts.ModelRun'>
.uri/content/tfx/Trainer/model_run/5
"],"text/plain":["ExecutionResult(\n"," component_id: Trainer\n"," execution_id: 5\n"," outputs:\n"," model: Channel(\n"," type_name: Model\n"," artifacts: [Artifact(artifact: id: 5\n"," type_id: 13\n"," uri: \"/content/tfx/Trainer/model/5\"\n"," custom_properties {\n"," key: \"name\"\n"," value {\n"," string_value: \"model\"\n"," }\n"," }\n"," custom_properties {\n"," key: \"producer_component\"\n"," value {\n"," string_value: \"Trainer\"\n"," }\n"," }\n"," custom_properties {\n"," key: \"state\"\n"," value {\n"," string_value: \"published\"\n"," }\n"," }\n"," state: LIVE\n"," , artifact_type: id: 13\n"," name: \"Model\"\n"," )]\n"," additional_properties: {}\n"," additional_custom_properties: {}\n"," )\n"," model_run: Channel(\n"," type_name: ModelRun\n"," artifacts: [Artifact(artifact: id: 6\n"," type_id: 14\n"," uri: \"/content/tfx/Trainer/model_run/5\"\n"," custom_properties {\n"," key: \"name\"\n"," value {\n"," string_value: \"model_run\"\n"," }\n"," }\n"," custom_properties {\n"," key: \"producer_component\"\n"," value {\n"," string_value: \"Trainer\"\n"," }\n"," }\n"," custom_properties {\n"," key: \"state\"\n"," value {\n"," string_value: \"published\"\n"," }\n"," }\n"," state: LIVE\n"," , artifact_type: id: 14\n"," name: \"ModelRun\"\n"," )]\n"," additional_properties: {}\n"," additional_custom_properties: {}\n"," ))"]},"metadata":{"tags":[]},"execution_count":25}]},{"cell_type":"markdown","metadata":{"id":"6Cql1G35StJp"},"source":["#### Analyze Training with TensorBoard\n","Take a peek at the trainer artifact. It points to a directory containing the model subdirectories."]},{"cell_type":"code","metadata":{"id":"bXe62WE0S0Ek"},"source":["model_artifact_dir = trainer.outputs['model'].get()[0].uri\n","pp.pprint(os.listdir(model_artifact_dir))\n","model_dir = os.path.join(model_artifact_dir, 'serving_model_dir')\n","pp.pprint(os.listdir(model_dir))"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"DfjOmSro6Q3Y"},"source":["Optionally, we can connect TensorBoard to the Trainer to analyze our model's training curves."]},{"cell_type":"code","metadata":{"id":"-APzqz2NeAyj"},"source":["# model_run_artifact_dir = trainer.outputs['model_run'].get()[0].uri\n","\n","# %load_ext tensorboard\n","# %tensorboard --logdir {model_run_artifact_dir}"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"T8DYekCZlHfj"},"source":["### Pusher\n","The `Pusher` component is usually at the end of a TFX pipeline. It checks whether a model has passed validation, and if so, exports the model to `_serving_model_dir`."]},{"cell_type":"code","metadata":{"id":"r45nQ69eikc9","colab":{"base_uri":"https://localhost:8080/","height":256},"outputId":"6efcf191-16f7-428c-e8af-cdb67a0dbab7"},"source":["pusher = Pusher(\n"," model=trainer.outputs['model'],\n"," push_destination=pusher_pb2.PushDestination(\n"," filesystem=pusher_pb2.PushDestination.Filesystem(\n"," base_directory=_serving_model_dir)))\n","context.run(pusher)"],"execution_count":null,"outputs":[{"output_type":"stream","text":["WARNING:absl:Pusher is going to push the model without validation. Consider using Evaluator or InfraValidator in your pipeline.\n"],"name":"stderr"},{"output_type":"execute_result","data":{"text/html":["\n","\n","
ExecutionResult at 0x7f9766662610
.execution_id6
.component\n","\n","
Pusher at 0x7f976666bfd0
.inputs
['model']\n","\n","
Channel of type 'Model' (1 artifact) at 0x7f97661e93d0
.type_nameModel
._artifacts
[0]\n","\n","
Artifact of type 'Model' (uri: /content/tfx/Trainer/model/5) at 0x7f9766771050
.type<class 'tfx.types.standard_artifacts.Model'>
.uri/content/tfx/Trainer/model/5
.outputs
['pushed_model']\n","\n","
Channel of type 'PushedModel' (1 artifact) at 0x7f976666ba50
.type_namePushedModel
._artifacts
[0]\n","\n","
Artifact of type 'PushedModel' (uri: /content/tfx/Pusher/pushed_model/6) at 0x7f976629ba50
.type<class 'tfx.types.standard_artifacts.PushedModel'>
.uri/content/tfx/Pusher/pushed_model/6
.exec_properties
['push_destination']{\n"," "filesystem": {\n"," "base_directory": "/content/tfx/serving_model"\n"," }\n","}
['custom_config']null
.component.inputs
['model']\n","\n","
Channel of type 'Model' (1 artifact) at 0x7f97661e93d0
.type_nameModel
._artifacts
[0]\n","\n","
Artifact of type 'Model' (uri: /content/tfx/Trainer/model/5) at 0x7f9766771050
.type<class 'tfx.types.standard_artifacts.Model'>
.uri/content/tfx/Trainer/model/5
.component.outputs
['pushed_model']\n","\n","
Channel of type 'PushedModel' (1 artifact) at 0x7f976666ba50
.type_namePushedModel
._artifacts
[0]\n","\n","
Artifact of type 'PushedModel' (uri: /content/tfx/Pusher/pushed_model/6) at 0x7f976629ba50
.type<class 'tfx.types.standard_artifacts.PushedModel'>
.uri/content/tfx/Pusher/pushed_model/6
"],"text/plain":["ExecutionResult(\n"," component_id: Pusher\n"," execution_id: 6\n"," outputs:\n"," pushed_model: Channel(\n"," type_name: PushedModel\n"," artifacts: [Artifact(artifact: id: 7\n"," type_id: 16\n"," uri: \"/content/tfx/Pusher/pushed_model/6\"\n"," custom_properties {\n"," key: \"name\"\n"," value {\n"," string_value: \"pushed_model\"\n"," }\n"," }\n"," custom_properties {\n"," key: \"producer_component\"\n"," value {\n"," string_value: \"Pusher\"\n"," }\n"," }\n"," custom_properties {\n"," key: \"pushed\"\n"," value {\n"," int_value: 1\n"," }\n"," }\n"," custom_properties {\n"," key: \"pushed_destination\"\n"," value {\n"," string_value: \"/content/tfx/serving_model/1616858834\"\n"," }\n"," }\n"," custom_properties {\n"," key: \"pushed_version\"\n"," value {\n"," string_value: \"1616858834\"\n"," }\n"," }\n"," custom_properties {\n"," key: \"state\"\n"," value {\n"," string_value: \"published\"\n"," }\n"," }\n"," state: LIVE\n"," , artifact_type: id: 16\n"," name: \"PushedModel\"\n"," )]\n"," additional_properties: {}\n"," additional_custom_properties: {}\n"," ))"]},"metadata":{"tags":[]},"execution_count":26}]},{"cell_type":"markdown","metadata":{"id":"ctUErBYoTO9I"},"source":["Let's examine the output artifacts of `Pusher`. "]},{"cell_type":"code","metadata":{"id":"pRkWo-MzTSss","colab":{"base_uri":"https://localhost:8080/"},"outputId":"ed0faafc-46ba-4988-be8d-76cf7d2d51d7"},"source":["pusher.outputs"],"execution_count":null,"outputs":[{"output_type":"execute_result","data":{"text/plain":["{'pushed_model': Channel(\n"," type_name: PushedModel\n"," artifacts: [Artifact(artifact: id: 7\n","type_id: 16\n","uri: \"/content/tfx/Pusher/pushed_model/6\"\n","custom_properties {\n"," key: \"name\"\n"," value {\n"," string_value: \"pushed_model\"\n"," }\n","}\n","custom_properties {\n"," key: \"producer_component\"\n"," value {\n"," string_value: \"Pusher\"\n"," }\n","}\n","custom_properties {\n"," key: \"pushed\"\n"," value {\n"," int_value: 1\n"," }\n","}\n","custom_properties {\n"," key: \"pushed_destination\"\n"," value {\n"," string_value: \"/content/tfx/serving_model/1616858834\"\n"," }\n","}\n","custom_properties {\n"," key: \"pushed_version\"\n"," value {\n"," string_value: \"1616858834\"\n"," }\n","}\n","custom_properties {\n"," key: \"state\"\n"," value {\n"," string_value: \"published\"\n"," }\n","}\n","state: LIVE\n",", artifact_type: id: 16\n","name: \"PushedModel\"\n",")]\n"," additional_properties: {}\n"," additional_custom_properties: {}\n",")}"]},"metadata":{"tags":[]},"execution_count":27}]},{"cell_type":"markdown","metadata":{"id":"peH2PPS3VgkL"},"source":["In particular, the Pusher will export your model in the SavedModel format, which looks like this:"]},{"cell_type":"code","metadata":{"id":"4zyIqWl9TSdG","colab":{"base_uri":"https://localhost:8080/"},"outputId":"b7ac75f7-04b6-40ba-a519-124f431b0bb9"},"source":["push_uri = pusher.outputs.pushed_model.get()[0].uri\n","model = tf.saved_model.load(push_uri)\n","\n","for item in model.signatures.items():\n"," pp.pprint(item)"],"execution_count":null,"outputs":[{"output_type":"stream","text":["WARNING:tensorflow:5 out of the last 5 calls to .restored_function_body at 0x7f9766c76320> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details.\n","WARNING:tensorflow:6 out of the last 6 calls to .restored_function_body at 0x7f9766b794d0> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details.\n","('serving_default',\n"," )\n"],"name":"stdout"}]},{"cell_type":"markdown","metadata":{"id":"ZW7iMG0FkQJj"},"source":["###**Full Pipeline**\n","\n","The pipeline can be run on either of the below Orchestrators:\n","1. Local\n","2. Airflow\n","3. Kubeflow"]},{"cell_type":"code","metadata":{"colab":{"base_uri":"https://localhost:8080/"},"id":"GAFjQv7hZ71T","outputId":"0336118f-fd7e-451f-defb-cba0be064389"},"source":["!rm -rf data.*\n","# !rm -rf *trainer.py ##EDIT: Python files have to be retained\n","!rm -rf *.csv\n","!sudo rm -r /content/tfx\n","\n","! cd /content/\n","! mkdir /content/tfx/\n","! mkdir /content/tfx/pipelines\n","! mkdir /content/tfx/metadata\n","! mkdir /content/tfx/logs\n","! mkdir /content/tfx/data\n","! mkdir /content/tfx/serving_model\n","\n","! mkdir /content/train_data/\n","! mkdir /content/eval_data/\n","\n","!wget https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/data/simple/data.csv"],"execution_count":null,"outputs":[{"output_type":"stream","text":["--2021-03-27 15:29:18-- https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/data/simple/data.csv\n","Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.111.133, 185.199.110.133, ...\n","Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.\n","HTTP request sent, awaiting response... 200 OK\n","Length: 1922812 (1.8M) [text/plain]\n","Saving to: ‘data.csv’\n","\n","data.csv 100%[===================>] 1.83M --.-KB/s in 0.08s \n","\n","2021-03-27 15:29:18 (22.8 MB/s) - ‘data.csv’ saved [1922812/1922812]\n","\n"],"name":"stdout"}]},{"cell_type":"code","metadata":{"colab":{"base_uri":"https://localhost:8080/"},"id":"RtsNVH2QaWRA","outputId":"5d01b848-b21b-45b1-9caf-2b38d7058f02"},"source":["df = pd.read_csv('/content/data.csv')\n","\n","##Drop useless columns\n","df = df.drop(['trip_start_timestamp','trip_miles','pickup_census_tract',\n"," 'dropoff_census_tract','trip_seconds','payment_type','tips', \n"," 'company','dropoff_community_area','pickup_community_area'], axis=1)\n","\n","#Drop NA rows\n","df = df.dropna()\n","\n","##Keep a test set for final testing( TFX internally splits train and validation data )\n","np.random.seed(seed=2)\n","msk = np.random.rand(len(df)) < 0.9\n","traindf = df[msk]\n","evaldf = df[~msk]\n","\n","print(len(traindf))\n","print(len(evaldf))\n","\n","traindf.to_csv(\"/content/train_data/data.csv\", index=False, header=True)\n","evaldf.to_csv(\"/content/eval_data/eval.csv\", index=False, header=False)"],"execution_count":null,"outputs":[{"output_type":"stream","text":["13077\n","1442\n"],"name":"stdout"}]},{"cell_type":"code","metadata":{"id":"6RLIo8_LeB9k"},"source":["# https://github.com/tensorflow/tfx/blob/master/tfx/examples/chicago_taxi_pipeline/\n","def create_final_pipeline(\n"," pipeline_name: Text,\n"," root_path: Text,\n"," data_path: Text,\n"," training_params: Dict[Text, Text],\n"," # beam_pipeline_args: List[Text],\n",") -> pipeline.Pipeline:\n","\n"," _pipeline_root = os.path.join(root_path, 'pipelines'); # Join ~/tfx/pipelines/\n"," _metadata_db_root = os.path.join(root_path, 'metadata.db'); # Join ~/tfx/metadata.db\n"," _log_root = os.path.join(root_path, 'logs');\n"," _model_root = os.path.join(root_path, 'model');\n"," _serving_model_dir = os.path.join(root_path, 'serving_model')\n","\n"," # Full pipeline\n"," example_gen = CsvExampleGen(input=external_input(data_path))\n","\n"," statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])\n","\n"," infer_schema = SchemaGen(\n"," statistics=statistics_gen.outputs['statistics'], infer_feature_shape=False)\n","\n"," validate_stats = ExampleValidator(\n"," statistics=statistics_gen.outputs['statistics'],\n"," schema=infer_schema.outputs['schema'])\n","\n"," trainer = Trainer(\n"," module_file=os.path.abspath(_model_trainer_module_file),\n"," custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),\n"," examples=example_gen.outputs['examples'],\n"," train_args=trainer_pb2.TrainArgs(),\n"," eval_args=trainer_pb2.EvalArgs(),\n"," custom_config=(training_params)\n"," )\n","\n"," pusher = Pusher(\n"," model=trainer.outputs['model'],\n"," push_destination=pusher_pb2.PushDestination(\n"," filesystem=pusher_pb2.PushDestination.Filesystem(\n"," base_directory=_serving_model_dir)))\n","\n"," # This pipeline obj carries the business logic of the pipeline, but no runner-specific information\n"," # was included.\n"," return pipeline.Pipeline(\n"," pipeline_name= pipeline_name,\n"," pipeline_root= root_path,\n"," components=[\n"," example_gen, statistics_gen, infer_schema, validate_stats,\n"," trainer, pusher\n"," ],\n"," # metadata_connection_config = metadata.sqlite_metadata_connection_config(_metadata_db_root),\n"," metadata_connection_config = metadata.sqlite_metadata_connection_config(_metadata_db_root),\n"," enable_cache=True,\n"," beam_pipeline_args=['--direct_num_workers=%d' % 0],\n"," )"],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"colab":{"base_uri":"https://localhost:8080/"},"id":"7cb_oBqYfjHr","outputId":"7d47d1a8-a092-40e7-fb86-24b78a009a4b"},"source":["#Run pipeline locally\n","from tfx.orchestration.local.local_dag_runner import LocalDagRunner\n","\n","##Define all paths\n","_tfx_root = os.path.join(os.getcwd(), 'tfx')\n","\n","#Config params\n","training_params = {\"epochs\": 50}\n","\n","#Create and run pipeline\n","p_ = create_final_pipeline(root_path = _tfx_root, \n"," pipeline_name=\"local_pipeline\", \n"," data_path=\"/content/train_data\",\n"," training_params=training_params)\n","\n","LocalDagRunner().run(p_)"],"execution_count":null,"outputs":[{"output_type":"stream","text":["WARNING:absl:The \"input\" argument to the CsvExampleGen component has been deprecated by \"input_base\". Please update your usage as support for this argument will be removed soon.\n","WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['-f', '/root/.local/share/jupyter/runtime/kernel-7fb6cf45-63a7-4aa3-9d25-51f93b63ad96.json']\n","WARNING:absl:If direct_num_workers is not equal to 1, direct_running_mode should be `multi_processing` or `multi_threading` instead of `in_memory` in order for it to have the desired worker parallelism effect.\n","WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['-f', '/root/.local/share/jupyter/runtime/kernel-7fb6cf45-63a7-4aa3-9d25-51f93b63ad96.json']\n","WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['-f', '/root/.local/share/jupyter/runtime/kernel-7fb6cf45-63a7-4aa3-9d25-51f93b63ad96.json']\n","WARNING:absl:If direct_num_workers is not equal to 1, direct_running_mode should be `multi_processing` or `multi_threading` instead of `in_memory` in order for it to have the desired worker parallelism effect.\n","WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['-f', '/root/.local/share/jupyter/runtime/kernel-7fb6cf45-63a7-4aa3-9d25-51f93b63ad96.json']\n","WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['-f', '/root/.local/share/jupyter/runtime/kernel-7fb6cf45-63a7-4aa3-9d25-51f93b63ad96.json']\n"],"name":"stderr"},{"output_type":"stream","text":["WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","Model: \"model_1\"\n","__________________________________________________________________________________________________\n","Layer (type) Output Shape Param # Connected to \n","==================================================================================================\n","pickup_latitude (InputLayer) [(None, 1)] 0 \n","__________________________________________________________________________________________________\n","pickup_longitude (InputLayer) [(None, 1)] 0 \n","__________________________________________________________________________________________________\n","dropoff_latitude (InputLayer) [(None, 1)] 0 \n","__________________________________________________________________________________________________\n","dropoff_longitude (InputLayer) [(None, 1)] 0 \n","__________________________________________________________________________________________________\n","discretization_4 (Discretizatio (None, 1) 0 pickup_latitude[0][0] \n","__________________________________________________________________________________________________\n","discretization_5 (Discretizatio (None, 1) 0 pickup_longitude[0][0] \n","__________________________________________________________________________________________________\n","discretization_6 (Discretizatio (None, 1) 0 dropoff_latitude[0][0] \n","__________________________________________________________________________________________________\n","discretization_7 (Discretizatio (None, 1) 0 dropoff_longitude[0][0] \n","__________________________________________________________________________________________________\n","trip_start_day (InputLayer) [(None, 1)] 0 \n","__________________________________________________________________________________________________\n","trip_start_hour (InputLayer) [(None, 1)] 0 \n","__________________________________________________________________________________________________\n","category_encoding_11 (CategoryE (None, 21) 0 discretization_4[0][0] \n","__________________________________________________________________________________________________\n","category_encoding_12 (CategoryE (None, 21) 0 discretization_5[0][0] \n","__________________________________________________________________________________________________\n","category_encoding_13 (CategoryE (None, 21) 0 discretization_6[0][0] \n","__________________________________________________________________________________________________\n","category_encoding_14 (CategoryE (None, 21) 0 discretization_7[0][0] \n","__________________________________________________________________________________________________\n","string_lookup_3 (StringLookup) (None, 1) 0 trip_start_day[0][0] \n","__________________________________________________________________________________________________\n","string_lookup_4 (StringLookup) (None, 1) 0 trip_start_hour[0][0] \n","__________________________________________________________________________________________________\n","category_crossing_4 (CategoryCr (None, None) 0 category_encoding_11[0][0] \n"," category_encoding_12[0][0] \n","__________________________________________________________________________________________________\n","category_crossing_5 (CategoryCr (None, None) 0 category_encoding_13[0][0] \n"," category_encoding_14[0][0] \n","__________________________________________________________________________________________________\n","category_encoding_8 (CategoryEn (None, 7) 0 string_lookup_3[0][0] \n","__________________________________________________________________________________________________\n","category_encoding_9 (CategoryEn (None, 24) 0 string_lookup_4[0][0] \n","__________________________________________________________________________________________________\n","hashing_4 (Hashing) (None, None) 0 category_crossing_4[0][0] \n","__________________________________________________________________________________________________\n","hashing_5 (Hashing) (None, None) 0 category_crossing_5[0][0] \n","__________________________________________________________________________________________________\n","trip_start_month (InputLayer) [(None, 1)] 0 \n","__________________________________________________________________________________________________\n","category_crossing_3 (CategoryCr (None, None) 0 category_encoding_8[0][0] \n"," category_encoding_9[0][0] \n","__________________________________________________________________________________________________\n","embedding_2 (Embedding) (None, None, 4) 1764 hashing_4[0][0] \n","__________________________________________________________________________________________________\n","embedding_3 (Embedding) (None, None, 4) 1764 hashing_5[0][0] \n","__________________________________________________________________________________________________\n","string_lookup_5 (StringLookup) (None, 1) 0 trip_start_month[0][0] \n","__________________________________________________________________________________________________\n","hashing_3 (Hashing) (None, None) 0 category_crossing_3[0][0] \n","__________________________________________________________________________________________________\n","distance (InputLayer) [(None, 1)] 0 \n","__________________________________________________________________________________________________\n","tf.math.reduce_sum_2 (TFOpLambd (None, 4) 0 embedding_2[0][0] \n","__________________________________________________________________________________________________\n","tf.math.reduce_sum_3 (TFOpLambd (None, 4) 0 embedding_3[0][0] \n","__________________________________________________________________________________________________\n","tf.strings.to_number_3 (TFOpLam (None, 1) 0 trip_start_day[0][0] \n","__________________________________________________________________________________________________\n","tf.strings.to_number_4 (TFOpLam (None, 1) 0 trip_start_hour[0][0] \n","__________________________________________________________________________________________________\n","tf.strings.to_number_5 (TFOpLam (None, 1) 0 trip_start_month[0][0] \n","__________________________________________________________________________________________________\n","category_encoding_10 (CategoryE (None, 12) 0 string_lookup_5[0][0] \n","__________________________________________________________________________________________________\n","category_encoding_15 (CategoryE (None, 168) 0 hashing_3[0][0] \n","__________________________________________________________________________________________________\n","concatenate_3 (Concatenate) (None, 16) 0 pickup_latitude[0][0] \n"," pickup_longitude[0][0] \n"," dropoff_latitude[0][0] \n"," dropoff_longitude[0][0] \n"," distance[0][0] \n"," tf.math.reduce_sum_2[0][0] \n"," tf.math.reduce_sum_3[0][0] \n"," tf.strings.to_number_3[0][0] \n"," tf.strings.to_number_4[0][0] \n"," tf.strings.to_number_5[0][0] \n","__________________________________________________________________________________________________\n","concatenate_4 (Concatenate) (None, 264) 0 category_encoding_10[0][0] \n"," category_encoding_15[0][0] \n"," category_encoding_11[0][0] \n"," category_encoding_12[0][0] \n"," category_encoding_13[0][0] \n"," category_encoding_14[0][0] \n","__________________________________________________________________________________________________\n","dense_4 (Dense) (None, 32) 544 concatenate_3[0][0] \n","__________________________________________________________________________________________________\n","dense_5 (Dense) (None, 32) 8480 concatenate_4[0][0] \n","__________________________________________________________________________________________________\n","batch_normalization_2 (BatchNor (None, 32) 128 dense_4[0][0] \n","__________________________________________________________________________________________________\n","concatenate_5 (Concatenate) (None, 64) 0 dense_5[0][0] \n"," batch_normalization_2[0][0] \n","__________________________________________________________________________________________________\n","dense_6 (Dense) (None, 32) 2080 concatenate_5[0][0] \n","__________________________________________________________________________________________________\n","batch_normalization_3 (BatchNor (None, 32) 128 dense_6[0][0] \n","__________________________________________________________________________________________________\n","dense_7 (Dense) (None, 1) 33 batch_normalization_3[0][0] \n","==================================================================================================\n","Total params: 14,921\n","Trainable params: 14,793\n","Non-trainable params: 128\n","__________________________________________________________________________________________________\n","None\n","Epoch 1/50\n"],"name":"stdout"},{"output_type":"stream","text":["/usr/local/lib/python3.7/dist-packages/tensorflow/python/keras/engine/functional.py:595: UserWarning: Input dict contained keys ['fare'] which did not match any model input. They will be ignored by the model.\n"," [n for n in tensors.keys() if n not in ref_input_names])\n"],"name":"stderr"},{"output_type":"stream","text":["WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","100/100 [==============================] - ETA: 0s - loss: 217.0832 - rmse: 14.7035WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","100/100 [==============================] - 19s 169ms/step - loss: 217.1244 - rmse: 14.7052 - val_loss: 214.2426 - val_rmse: 14.6367\n","Epoch 2/50\n","100/100 [==============================] - 16s 162ms/step - loss: 194.8064 - rmse: 13.8886 - val_loss: 298.7778 - val_rmse: 17.2851\n","Epoch 3/50\n","100/100 [==============================] - 16s 160ms/step - loss: 182.8486 - rmse: 13.4537 - val_loss: 475.0226 - val_rmse: 21.7950\n","Epoch 4/50\n","100/100 [==============================] - 16s 160ms/step - loss: 120.3823 - rmse: 10.8827 - val_loss: 322.9235 - val_rmse: 17.9700\n","Epoch 5/50\n","100/100 [==============================] - 16s 162ms/step - loss: 81.6593 - rmse: 8.9703 - val_loss: 70.8273 - val_rmse: 8.4159\n","Epoch 6/50\n","100/100 [==============================] - 16s 160ms/step - loss: 57.8599 - rmse: 7.5910 - val_loss: 40.0003 - val_rmse: 6.3246\n","Epoch 7/50\n","100/100 [==============================] - 16s 160ms/step - loss: 164.7924 - rmse: 12.6626 - val_loss: 59.6570 - val_rmse: 7.7238\n","Epoch 8/50\n","100/100 [==============================] - 16s 162ms/step - loss: 194.9424 - rmse: 12.8607 - val_loss: 56.2903 - val_rmse: 7.5027\n","Epoch 9/50\n","100/100 [==============================] - 16s 160ms/step - loss: 33.9889 - rmse: 5.7552 - val_loss: 36.6010 - val_rmse: 6.0499\n","Epoch 10/50\n","100/100 [==============================] - 16s 161ms/step - loss: 63.0520 - rmse: 7.7663 - val_loss: 40.7972 - val_rmse: 6.3873\n","Epoch 11/50\n","100/100 [==============================] - 16s 160ms/step - loss: 41.0774 - rmse: 6.2706 - val_loss: 59.2725 - val_rmse: 7.6989\n","Epoch 12/50\n","100/100 [==============================] - 16s 161ms/step - loss: 75.1524 - rmse: 8.3895 - val_loss: 102.4453 - val_rmse: 10.1215\n","Epoch 13/50\n","100/100 [==============================] - 16s 160ms/step - loss: 89.3488 - rmse: 9.1501 - val_loss: 105.3505 - val_rmse: 10.2640\n","Epoch 14/50\n","100/100 [==============================] - 16s 161ms/step - loss: 51.3950 - rmse: 6.9459 - val_loss: 198.9368 - val_rmse: 14.1045\n","\n","Epoch 00014: ReduceLROnPlateau reducing learning rate to 0.00020000000949949026.\n","Epoch 15/50\n","100/100 [==============================] - 16s 160ms/step - loss: 62.9374 - rmse: 7.6894 - val_loss: 104.5761 - val_rmse: 10.2262\n","Epoch 16/50\n","100/100 [==============================] - 16s 160ms/step - loss: 196.7807 - rmse: 12.6815 - val_loss: 57.5709 - val_rmse: 7.5875\n","Epoch 17/50\n","100/100 [==============================] - 16s 162ms/step - loss: 55.5935 - rmse: 7.2975 - val_loss: 44.2611 - val_rmse: 6.6529\n","Epoch 18/50\n","100/100 [==============================] - 16s 159ms/step - loss: 30.2384 - rmse: 5.4320 - val_loss: 34.3575 - val_rmse: 5.8615\n","Epoch 19/50\n","100/100 [==============================] - 16s 161ms/step - loss: 88.3100 - rmse: 9.0977 - val_loss: 33.1711 - val_rmse: 5.7594\n","Epoch 20/50\n","100/100 [==============================] - 16s 159ms/step - loss: 144.0971 - rmse: 10.9913 - val_loss: 30.8044 - val_rmse: 5.5502\n","Epoch 21/50\n","100/100 [==============================] - 16s 161ms/step - loss: 49.9575 - rmse: 6.7986 - val_loss: 28.9896 - val_rmse: 5.3842\n","Epoch 22/50\n","100/100 [==============================] - 16s 161ms/step - loss: 69.0783 - rmse: 7.9757 - val_loss: 29.1548 - val_rmse: 5.3995\n","Epoch 23/50\n","100/100 [==============================] - 16s 161ms/step - loss: 36.7930 - rmse: 5.6760 - val_loss: 29.8607 - val_rmse: 5.4645\n","Epoch 24/50\n","100/100 [==============================] - 16s 160ms/step - loss: 34.0130 - rmse: 5.7177 - val_loss: 28.2520 - val_rmse: 5.3153\n","Epoch 25/50\n","100/100 [==============================] - 16s 162ms/step - loss: 55.6878 - rmse: 7.2212 - val_loss: 29.9096 - val_rmse: 5.4690\n","Epoch 26/50\n","100/100 [==============================] - 16s 160ms/step - loss: 52.0275 - rmse: 6.9239 - val_loss: 28.9011 - val_rmse: 5.3760\n","Epoch 27/50\n","100/100 [==============================] - 16s 161ms/step - loss: 29.6021 - rmse: 5.3416 - val_loss: 27.8313 - val_rmse: 5.2755\n","Epoch 28/50\n","100/100 [==============================] - 16s 160ms/step - loss: 72.0424 - rmse: 8.1916 - val_loss: 30.4622 - val_rmse: 5.5192\n","Epoch 29/50\n","100/100 [==============================] - 16s 161ms/step - loss: 86.4868 - rmse: 9.0198 - val_loss: 28.6383 - val_rmse: 5.3515\n","Epoch 30/50\n","100/100 [==============================] - 16s 161ms/step - loss: 47.8823 - rmse: 6.6413 - val_loss: 28.2398 - val_rmse: 5.3141\n","Epoch 31/50\n","100/100 [==============================] - 16s 160ms/step - loss: 39.5788 - rmse: 6.1364 - val_loss: 31.4934 - val_rmse: 5.6119\n","Epoch 32/50\n","100/100 [==============================] - 16s 161ms/step - loss: 71.5713 - rmse: 8.1904 - val_loss: 31.4563 - val_rmse: 5.6086\n","\n","Epoch 00032: ReduceLROnPlateau reducing learning rate to 4.0000001899898055e-05.\n","Epoch 33/50\n","100/100 [==============================] - 16s 160ms/step - loss: 68.4022 - rmse: 7.9015 - val_loss: 27.0354 - val_rmse: 5.1996\n","Epoch 34/50\n","100/100 [==============================] - 16s 162ms/step - loss: 53.8253 - rmse: 7.0993 - val_loss: 27.0893 - val_rmse: 5.2047\n","Epoch 35/50\n","100/100 [==============================] - 16s 161ms/step - loss: 65.6114 - rmse: 7.7330 - val_loss: 27.2562 - val_rmse: 5.2207\n","Epoch 36/50\n","100/100 [==============================] - 16s 161ms/step - loss: 49.9068 - rmse: 6.8633 - val_loss: 26.9264 - val_rmse: 5.1891\n","Epoch 37/50\n","100/100 [==============================] - 16s 164ms/step - loss: 22.5058 - rmse: 4.7386 - val_loss: 26.7290 - val_rmse: 5.1700\n","Epoch 38/50\n","100/100 [==============================] - 16s 161ms/step - loss: 104.6029 - rmse: 9.8482 - val_loss: 27.9150 - val_rmse: 5.2835\n","Epoch 39/50\n","100/100 [==============================] - 16s 163ms/step - loss: 137.3150 - rmse: 10.6986 - val_loss: 28.0951 - val_rmse: 5.3005\n","Epoch 40/50\n","100/100 [==============================] - 16s 161ms/step - loss: 28.5468 - rmse: 5.2845 - val_loss: 26.5959 - val_rmse: 5.1571\n","Epoch 41/50\n","100/100 [==============================] - 16s 162ms/step - loss: 125.1656 - rmse: 10.4386 - val_loss: 27.3829 - val_rmse: 5.2329\n","Epoch 42/50\n","100/100 [==============================] - 16s 161ms/step - loss: 43.6788 - rmse: 6.3620 - val_loss: 27.7053 - val_rmse: 5.2636\n","Epoch 43/50\n","100/100 [==============================] - 16s 163ms/step - loss: 53.9686 - rmse: 7.0948 - val_loss: 26.9982 - val_rmse: 5.1960\n","Epoch 44/50\n","100/100 [==============================] - 16s 162ms/step - loss: 68.4513 - rmse: 7.9779 - val_loss: 26.8975 - val_rmse: 5.1863\n","Epoch 45/50\n","100/100 [==============================] - 16s 163ms/step - loss: 29.4764 - rmse: 5.2811 - val_loss: 27.5113 - val_rmse: 5.2451\n","\n","Epoch 00045: ReduceLROnPlateau reducing learning rate to 1e-05.\n","Epoch 46/50\n","100/100 [==============================] - 16s 163ms/step - loss: 63.5452 - rmse: 7.7999 - val_loss: 28.2573 - val_rmse: 5.3157\n","Epoch 47/50\n","100/100 [==============================] - 16s 161ms/step - loss: 37.5531 - rmse: 5.9227 - val_loss: 26.8318 - val_rmse: 5.1799\n","Epoch 48/50\n","100/100 [==============================] - 16s 162ms/step - loss: 35.7494 - rmse: 5.8155 - val_loss: 26.7775 - val_rmse: 5.1747\n","Epoch 49/50\n","100/100 [==============================] - 16s 162ms/step - loss: 63.8763 - rmse: 7.6909 - val_loss: 27.1194 - val_rmse: 5.2076\n","Epoch 50/50\n","100/100 [==============================] - 16s 163ms/step - loss: 54.1033 - rmse: 7.1340 - val_loss: 25.7967 - val_rmse: 5.0790\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","WARNING:tensorflow:Using a while_loop for converting BoostedTreesBucketize\n","INFO:tensorflow:Assets written to: /content/tfx/Trainer/model/3/serving_model_dir/assets\n"],"name":"stdout"},{"output_type":"stream","text":["WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['-f', '/root/.local/share/jupyter/runtime/kernel-7fb6cf45-63a7-4aa3-9d25-51f93b63ad96.json']\n","WARNING:absl:Pusher is going to push the model without validation. Consider using Evaluator or InfraValidator in your pipeline.\n","WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['-f', '/root/.local/share/jupyter/runtime/kernel-7fb6cf45-63a7-4aa3-9d25-51f93b63ad96.json']\n","WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['-f', '/root/.local/share/jupyter/runtime/kernel-7fb6cf45-63a7-4aa3-9d25-51f93b63ad96.json']\n"],"name":"stderr"}]},{"cell_type":"markdown","metadata":{"id":"YjkNK4tjThRD"},"source":["### **Inference**( saved_model_cli )"]},{"cell_type":"code","metadata":{"colab":{"base_uri":"https://localhost:8080/"},"id":"x1dvXxsiSSSJ","outputId":"7b2925d8-9fef-4feb-dbc4-46e13d8f7c77"},"source":["!saved_model_cli show --dir \"/content/tfx/Pusher/pushed_model/4\" --all"],"execution_count":null,"outputs":[{"output_type":"stream","text":["\n","MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:\n","\n","signature_def['__saved_model_init_op']:\n"," The given SavedModel SignatureDef contains the following input(s):\n"," The given SavedModel SignatureDef contains the following output(s):\n"," outputs['__saved_model_init_op'] tensor_info:\n"," dtype: DT_INVALID\n"," shape: unknown_rank\n"," name: NoOp\n"," Method name is: \n","\n","signature_def['serving_default']:\n"," The given SavedModel SignatureDef contains the following input(s):\n"," inputs['dropoff_latitude'] tensor_info:\n"," dtype: DT_FLOAT\n"," shape: (-1)\n"," name: serving_default_dropoff_latitude:0\n"," inputs['dropoff_longitude'] tensor_info:\n"," dtype: DT_FLOAT\n"," shape: (-1)\n"," name: serving_default_dropoff_longitude:0\n"," inputs['pickup_latitude'] tensor_info:\n"," dtype: DT_FLOAT\n"," shape: (-1)\n"," name: serving_default_pickup_latitude:0\n"," inputs['pickup_longitude'] tensor_info:\n"," dtype: DT_FLOAT\n"," shape: (-1)\n"," name: serving_default_pickup_longitude:0\n"," inputs['trip_start_day'] tensor_info:\n"," dtype: DT_STRING\n"," shape: (-1)\n"," name: serving_default_trip_start_day:0\n"," inputs['trip_start_hour'] tensor_info:\n"," dtype: DT_STRING\n"," shape: (-1)\n"," name: serving_default_trip_start_hour:0\n"," inputs['trip_start_month'] tensor_info:\n"," dtype: DT_STRING\n"," shape: (-1)\n"," name: serving_default_trip_start_month:0\n"," The given SavedModel SignatureDef contains the following output(s):\n"," outputs['output_0'] tensor_info:\n"," dtype: DT_FLOAT\n"," shape: (-1, 1)\n"," name: StatefulPartitionedCall:0\n"," Method name is: tensorflow/serving/predict\n","Traceback (most recent call last):\n"," File \"/usr/local/bin/saved_model_cli\", line 8, in \n"," sys.exit(main())\n"," File \"/usr/local/lib/python2.7/dist-packages/tensorflow_core/python/tools/saved_model_cli.py\", line 990, in main\n"," args.func(args)\n"," File \"/usr/local/lib/python2.7/dist-packages/tensorflow_core/python/tools/saved_model_cli.py\", line 691, in show\n"," _show_all(args.dir)\n"," File \"/usr/local/lib/python2.7/dist-packages/tensorflow_core/python/tools/saved_model_cli.py\", line 283, in _show_all\n"," _show_defined_functions(saved_model_dir)\n"," File \"/usr/local/lib/python2.7/dist-packages/tensorflow_core/python/tools/saved_model_cli.py\", line 176, in _show_defined_functions\n"," trackable_object = load.load(saved_model_dir)\n"," File \"/usr/local/lib/python2.7/dist-packages/tensorflow_core/python/saved_model/load.py\", line 528, in load\n"," return load_internal(export_dir, tags)\n"," File \"/usr/local/lib/python2.7/dist-packages/tensorflow_core/python/saved_model/load.py\", line 552, in load_internal\n"," export_dir)\n"," File \"/usr/local/lib/python2.7/dist-packages/tensorflow_core/python/saved_model/load.py\", line 114, in __init__\n"," meta_graph.graph_def.library))\n"," File \"/usr/local/lib/python2.7/dist-packages/tensorflow_core/python/saved_model/function_deserialization.py\", line 312, in load_function_def_library\n"," func_graph = function_def_lib.function_def_to_graph(copy)\n"," File \"/usr/local/lib/python2.7/dist-packages/tensorflow_core/python/framework/function_def_to_graph.py\", line 59, in function_def_to_graph\n"," fdef, input_shapes)\n"," File \"/usr/local/lib/python2.7/dist-packages/tensorflow_core/python/framework/function_def_to_graph.py\", line 218, in function_def_to_graph_def\n"," op_def = default_graph._get_op_def(node_def.op) # pylint: disable=protected-access\n"," File \"/usr/local/lib/python2.7/dist-packages/tensorflow_core/python/framework/ops.py\", line 3712, in _get_op_def\n"," c_api.TF_GraphGetOpDef(self._c_graph, compat.as_bytes(type), buf)\n","tensorflow.python.framework.errors_impl.NotFoundError: Op type not registered 'DenseBincount' in binary running on 9fac1a128a27. Make sure the Op and Kernel are registered in the binary running in this process. Note that if you are loading a saved graph which used ops from tf.contrib, accessing (e.g.) `tf.contrib.resampler` should be done before importing the graph, as contrib ops are lazily registered when the module is first accessed.\n"],"name":"stdout"}]},{"cell_type":"code","metadata":{"colab":{"base_uri":"https://localhost:8080/"},"id":"Ws9TkuU5SpdU","outputId":"4d8d60aa-b2b0-4cdb-88bd-26e2fb1a8dbf"},"source":["#LOCAL: Predict using Keras prediction function\n","saved_mod = tf.saved_model.load(\"/content/tfx/Pusher/pushed_model/4\")\n","\n","#Get prediction function from serving\n","f = saved_mod.signatures['serving_default']\n","\n","#Run prediction function from serving\n","f(dropoff_latitude=tf.convert_to_tensor([41.920452]), dropoff_longitude = tf.convert_to_tensor([-87.679955]), pickup_latitude = tf.convert_to_tensor([41.952823]), \n"," pickup_longitude =tf.convert_to_tensor([-87.653244]), trip_start_day=tf.convert_to_tensor([\"1\"]), trip_start_hour=tf.convert_to_tensor([\"5\"]),\n"," trip_start_month=tf.convert_to_tensor([\"6\"]))"],"execution_count":null,"outputs":[{"output_type":"stream","text":["WARNING:tensorflow:11 out of the last 11 calls to .restored_function_body at 0x7f9760982f80> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details.\n","WARNING:tensorflow:11 out of the last 11 calls to .restored_function_body at 0x7f975d33f320> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details.\n","WARNING:tensorflow:11 out of the last 11 calls to .restored_function_body at 0x7f975f8ca050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details.\n","WARNING:tensorflow:11 out of the last 11 calls to .restored_function_body at 0x7f975d3b73b0> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details.\n","WARNING:tensorflow:11 out of the last 11 calls to .restored_function_body at 0x7f9760973d40> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details.\n","WARNING:tensorflow:11 out of the last 11 calls to .restored_function_body at 0x7f97609ceb90> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details.\n","WARNING:tensorflow:11 out of the last 11 calls to .restored_function_body at 0x7f975bb2eb90> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details.\n","WARNING:tensorflow:11 out of the last 11 calls to .restored_function_body at 0x7f975bb3cef0> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details.\n","WARNING:tensorflow:11 out of the last 11 calls to .restored_function_body at 0x7f975bb4f290> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details.\n"],"name":"stdout"},{"output_type":"execute_result","data":{"text/plain":["{'output_0': }"]},"metadata":{"tags":[]},"execution_count":36}]}]}