{ "cells": [ { "cell_type": "markdown", "metadata": { "id": "KPtzOzgJ-Ak2" }, "source": [ "# Edge Impulse Python API Bindings Example" ] }, { "cell_type": "markdown", "metadata": { "id": "sKIz3w8K_dN1" }, "source": [ "[![View in Edge Impulse docs](https://raw.githubusercontent.com/edgeimpulse/notebooks/main/.assets/images/ei-badge.svg)](https://docs.edgeimpulse.com/docs/tutorials/api-examples/python-api-bindings-example)\n", "[![Open in Google Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/edgeimpulse/notebooks/blob/main/notebooks/python-api-bindings-example.ipynb)\n", "[![View on GitHub](https://raw.githubusercontent.com/edgeimpulse/notebooks/main/.assets/images/badge-view-on-github.svg)](https://github.com/edgeimpulse/notebooks/blob/main/notebooks/python-api-bindings-example.ipynb)\n", "[![Download notebook](https://raw.githubusercontent.com/edgeimpulse/notebooks/main/.assets/images/badge-download-notebook.svg)](https://raw.githubusercontent.com/edgeimpulse/notebooks/main/notebooks/python-api-bindings-example.ipynb)" ] }, { "cell_type": "markdown", "metadata": { "id": "gLWb8AoZ_ZyY" }, "source": [ "The [Python SDK](https://docs.edgeimpulse.com/docs/tools/edge-impulse-python-sdk) is built on top of the [Edge Impulse Python API bindings](https://pypi.org/project/edgeimpulse-api/), which is known as the _edgeimpulse_api_ package. These are Python wrappers for all of the [web API calls](https://docs.edgeimpulse.com/reference/edge-impulse-api/edge-impulse-api) that you can use to interact with Edge Impulse projects programmatically (i.e. without needing to use the Studio graphical interface).\n", "\n", "The API reference guide for using the Python API bindings can be found [here](https://docs.edgeimpulse.com/reference/python-api-bindings/edgeimpulse_api).\n", "\n", "This example will walk you through the process of using the Edge Impulse API bindings to upload data, define an impulse, process features, train a model, and deploy the impulse as a C++ library.\n", "\n", "After creating your project and copying the API key, feel free to leave the project open in a browser window so you can watch the changes as we make API calls. You might need to refresh the browser after each call to see the changes take affect.\n", "\n", "> **Important!** This project will add data and remove any current features and models in a project. We highly recommend creating a new project when running this notebook! Don't say we didn't warn you if you mess up an existing project." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "TFny1qVW99dN" }, "outputs": [], "source": [ "# Install the Edge Impulse API bindings and the requests package\n", "!python -m pip install edgeimpulse-api requests" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "kV6EOSOuC9nV" }, "outputs": [], "source": [ "import json\n", "import re\n", "import os\n", "import pprint\n", "import time\n", "\n", "import requests" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "IppiSCw4_0eH" }, "outputs": [], "source": [ "# Import the API objects we plan to use\n", "from edgeimpulse_api import (\n", " ApiClient,\n", " BuildOnDeviceModelRequest,\n", " Configuration,\n", " DeploymentApi,\n", " DSPApi,\n", " DSPConfigRequest,\n", " GenerateFeaturesRequest,\n", " Impulse,\n", " ImpulseApi,\n", " JobsApi,\n", " ProjectsApi,\n", " SetKerasParameterRequest,\n", " StartClassifyJobRequest,\n", " UpdateProjectRequest,\n", ")" ] }, { "cell_type": "markdown", "metadata": { "id": "tHum_KkPAfhG" }, "source": [ "You will need to obtain an API key from an Edge Impulse project. Log into [edgeimpulse.com](https://edgeimpulse.com/) and create a new project. Open the project, navigate to **Dashboard** and click on the **Keys** tab to view your API keys. Double-click on the API key to highlight it, right-click, and select **Copy**.\n", "\n", "![Copy API key from Edge Impulse project](https://raw.githubusercontent.com/edgeimpulse/notebooks/main/.assets/images/python-sdk-copy-ei-api-key.png)\n", "\n", "Note that you do not actually need to use the project in the Edge Impulse Studio. We just need the API Key.\n", "\n", "Paste that API key string in the `EI_API_KEY` value in the following cell:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "GpIaKwEJAhpI" }, "outputs": [], "source": [ "# Settings\n", "API_KEY = \"ei_dae2...\" # Change this to your Edge Impulse API key\n", "API_HOST = \"https://studio.edgeimpulse.com/v1\"\n", "DATASET_PATH = \"dataset/gestures\"\n", "OUTPUT_PATH = \".\"" ] }, { "cell_type": "markdown", "metadata": { "id": "W0qE0bWCrNvP" }, "source": [ "## Initialize API clients\n", "\n", "The Python API bindings use a series of submodules, each encapsulating one of the API subsections (e.g. Projects, DSP, Learn, etc.). To use these submodules, you need to instantiate a generic API module and use that to instantiate the individual API objects. We'll use these objects to make the API calls later.\n", "\n", "To configure a client, you generally create a configuration object (often from a dict) and then pass that object as an argument to the client." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "NB0g7vxErNQF" }, "outputs": [], "source": [ "# Create top-level API client\n", "config = Configuration(\n", " host=API_HOST,\n", " api_key={\"ApiKeyAuthentication\": API_KEY}\n", ")\n", "client = ApiClient(config)\n", "\n", "# Instantiate sub-clients\n", "deployment_api = DeploymentApi(client)\n", "dsp_api = DSPApi(client)\n", "impulse_api = ImpulseApi(client)\n", "jobs_api = JobsApi(client)\n", "projects_api = ProjectsApi(client)" ] }, { "cell_type": "markdown", "metadata": { "id": "lPOr6bSjqse4" }, "source": [ "## Initialize project\n", "\n", "Before uploading data, we should make sure the project is in the regular impulse flow mode, rather than [BYOM mode](https://docs.edgeimpulse.com/docs/edge-impulse-studio/bring-your-own-model-byom). We'll also need the project ID for most of the other API calls in the future.\n", "\n", "Notice that the general pattern for calling API functions is to instantiate a configuration/request object and pass it to the API method that's part of the submodule. You can find which parameters a specific API call expects by looking at [the call's documentation page](https://docs.edgeimpulse.com/reference/edge-impulse-api/projects/update_project).\n", "\n", "API calls (links to associated documentation):\n", "\n", " * [Projects / List (active) projects](https://docs.edgeimpulse.com/reference/edge-impulse-api/projects/list_active_projects)\n", " * [Projects / Update project](https://docs.edgeimpulse.com/reference/edge-impulse-api/projects/update_project)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "AFOytMLU_ulh" }, "outputs": [], "source": [ "# Get the project ID, which we'll need for future API calls\n", "response = projects_api.list_projects()\n", "if not hasattr(response, \"success\") or getattr(response, \"success\") == False:\n", " raise RuntimeError(\"Could not obtain the project ID.\")\n", "else:\n", " project_id = response.projects[0].id\n", "\n", "# Print the project ID\n", "print(f\"Project ID: {project_id}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "cWggMwaIqrpS" }, "outputs": [], "source": [ "# Create request object with the required parameters\n", "update_project_request = UpdateProjectRequest.from_dict({\n", " \"inPretrainedModelFlow\": False,\n", "})\n", "\n", "# Update the project and check the response for errors\n", "response = projects_api.update_project(\n", " project_id=project_id,\n", " update_project_request=update_project_request,\n", ")\n", "if not hasattr(response, \"success\") or getattr(response, \"success\") == False:\n", " raise RuntimeError(\"Could not obtain the project ID.\")\n", "else:\n", " print(\"Project is now in impulse workflow.\")" ] }, { "cell_type": "markdown", "metadata": { "id": "z_GzBa0YBzGo" }, "source": [ "## Upload dataset\n", "\n", "We'll start by downloading the gesture dataset from https://docs.edgeimpulse.com/docs/pre-built-datasets/continuous-gestures. Note that the [ingestion API](https://docs.edgeimpulse.com/reference/data-ingestion/ingestion-api) is separate from the regular Edge Impulse API: the URL and interface are different. As a result, we must construct the request manually and cannot rely on the Python API bindings.\n", "\n", "We rely on the ingestion service using the string before the first period in the filename to determine the label. For example, \"idle.1.cbor\" will be automatically assigned the label \"idle.\" If you wish to set a label manually, you must specify the `x-label` parameter in the headers. Note that you can only define a label this way when uploading a group of data at a time. For example, setting `\"x-label\": \"idle\"` in the headers would give all data uploaded with that call the label \"idle.\"\n", "\n", "API calls used with associated documentation:\n", "\n", " * [Ingestion service](https://docs.edgeimpulse.com/reference/data-ingestion/ingestion-api)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "InjgAOyRAn6z" }, "outputs": [], "source": [ "# Download and unzip gesture dataset\n", "!mkdir -p dataset/\n", "!wget -P dataset -q https://cdn.edgeimpulse.com/datasets/gestures.zip\n", "!unzip -q dataset/gestures.zip -d {DATASET_PATH}" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "OGMm_7ELHMFb" }, "outputs": [], "source": [ "def upload_files(api_key, path, subset):\n", " \"\"\"\n", " Upload files in the given path/subset (where subset is \"training\" or\n", " \"testing\")\n", " \"\"\"\n", "\n", " # Construct request\n", " url = f\"https://ingestion.edgeimpulse.com/api/{subset}/files\"\n", " headers = {\n", " \"x-api-key\": api_key,\n", " \"x-disallow-duplicates\": \"true\",\n", " }\n", "\n", " # Get file handles and create dataset to upload\n", " files = []\n", " file_list = os.listdir(os.path.join(path, subset))\n", " for file_name in file_list:\n", " file_path = os.path.join(path, subset, file_name)\n", " if os.path.isfile(file_path):\n", " file_handle = open(file_path, \"rb\")\n", " files.append((\"data\", (file_name, file_handle, \"multipart/form-data\")))\n", "\n", " # Upload the files\n", " response = requests.post(\n", " url=url,\n", " headers=headers,\n", " files=files,\n", " )\n", "\n", " # Print any errors for files that did not upload\n", " upload_responses = response.json()[\"files\"]\n", " for resp in upload_responses:\n", " if not resp[\"success\"]:\n", " print(resp)\n", "\n", " # Close all the handles\n", " for handle in files:\n", " handle[1][1].close()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "8witLfBgH-Ay" }, "outputs": [], "source": [ "# Upload the dataset to the project\n", "print(\"Uploading training dataset...\")\n", "upload_files(API_KEY, DATASET_PATH, \"training\")\n", "print(\"Uploading testing dataset...\")\n", "upload_files(API_KEY, DATASET_PATH, \"testing\")" ] }, { "cell_type": "markdown", "metadata": { "id": "8isx_nKdOqSs" }, "source": [ "## Create an impulse\n", "\n", "Now that we uploaded our data, it's time to create an impulse. An \"impulse\" is a combination of processing (feature extraction) and learning blocks. The general flow of data is:\n", "\n", "> data -> input block -> processing block(s) -> learning block(s)\n", "\n", "Only the processing and learning blocks make up the \"impulse.\" However, we must still specify the input block, as it allows us to perform preprocessing, like windowing (for time series data) or cropping/scaling (for image data).\n", "\n", "Your project will have one input block, but it can contain multiple processing and learning blocks. Specific outputs from the processing block can be specified as inputs to the learning blocks. However, for simplicity, we'll just show one processing block and one learning block.\n", "\n", "> **Note:** Historically, processing blocks were called \"DSP blocks,\" as they focused on time series data. In Studio, the name has been changed to \"Processing block,\" as the blocks work with different types of data, but you'll see it referred to as \"DSP block\" in the API.\n", "\n", "It's important that you define the input block with the same parameters as your captured data, especially the sampling rate! Additionally, the processing block axes names **must** match up with their names in the dataset.\n", "\n", "API calls (links to associated documentation):\n", "\n", " * [Impulse / Get impulse blocks](https://docs.edgeimpulse.com/reference/edge-impulse-api/impulse/get_impulse_blocks)\n", " * [Impulse / Delete impulse](https://docs.edgeimpulse.com/reference/edge-impulse-api/impulse/delete_impulse)\n", " * [Impulse / Create impulse](https://docs.edgeimpulse.com/reference/edge-impulse-api/impulse/create_impulse)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Djn91Lq-ZpR8" }, "outputs": [], "source": [ "# To start, let's fetch a list of all the available blocks\n", "response = impulse_api.get_impulse_blocks(\n", " project_id=project_id\n", ")\n", "if not hasattr(response, \"success\") or getattr(response, \"success\") is False:\n", " raise RuntimeError(\"Could not get impulse blocks.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "nTOB4175asrn" }, "outputs": [], "source": [ "# Print the available input blocks\n", "print(\"Input blocks\")\n", "print(json.dumps(json.loads(response.to_json())[\"inputBlocks\"], indent=2))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "7UIhLBJLa2U-" }, "outputs": [], "source": [ "# Print the available processing blocks\n", "print(\"Processing blocks\")\n", "print(json.dumps(json.loads(response.to_json())[\"dspBlocks\"], indent=2))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "MYrjrUB7a7Et" }, "outputs": [], "source": [ "# Print the available learning blocks\n", "print(\"Learning blocks\")\n", "print(json.dumps(json.loads(response.to_json())[\"learnBlocks\"], indent=2))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "5j-g9mkrLB9k" }, "outputs": [], "source": [ "# Give our impulse blocks IDs, which we'll use later\n", "processing_id = 2\n", "learning_id = 3\n", "\n", "# Impulses (and their blocks) are defined as a collection of key/value pairs\n", "impulse = Impulse.from_dict({\n", " \"inputBlocks\": [\n", " {\n", " \"id\": 1,\n", " \"type\": \"time-series\",\n", " \"name\": \"Time series\",\n", " \"title\": \"Time series data\",\n", " \"windowSizeMs\": 1000,\n", " \"windowIncreaseMs\": 500,\n", " \"frequencyHz\": 62.5,\n", " \"padZeros\": True,\n", " }\n", " ],\n", " \"dspBlocks\": [\n", " {\n", " \"id\": processing_id,\n", " \"type\": \"spectral-analysis\",\n", " \"name\": \"Spectral Analysis\",\n", " \"implementationVersion\": 4,\n", " \"title\": \"processing\",\n", " \"axes\": [\"accX\", \"accY\", \"accZ\"],\n", " \"input\": 1,\n", " }\n", " ],\n", " \"learnBlocks\": [\n", " {\n", " \"id\": learning_id,\n", " \"type\": \"keras\",\n", " \"name\": \"Classifier\",\n", " \"title\": \"Classification\",\n", " \"dsp\": [processing_id],\n", " }\n", " ],\n", "})" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "NxgDfPVFRxAO" }, "outputs": [], "source": [ "# Delete the current impulse in the project\n", "response = impulse_api.delete_impulse(\n", " project_id=project_id\n", ")\n", "if not hasattr(response, \"success\") or getattr(response, \"success\") is False:\n", " raise RuntimeError(\"Could not delete current impulse.\")\n", "\n", "# Add blocks to impulse\n", "response = impulse_api.create_impulse(\n", " project_id=project_id,\n", " impulse=impulse\n", ")\n", "if not hasattr(response, \"success\") or getattr(response, \"success\") is False:\n", " raise RuntimeError(\"Could not create impulse.\")" ] }, { "cell_type": "markdown", "metadata": { "id": "1vuJumLp58U1" }, "source": [ "## Configure processing block\n", "\n", "Before generating features, we need to configure the processing block. We'll start by printing all the available parameters for the `spectral-analysis` block, which we set when we created the impulse above.\n", "\n", "API calls (links to associated documentation):\n", "\n", " * [DSP / Get config](https://docs.edgeimpulse.com/reference/edge-impulse-api/dsp/get_config)\n", " * [DSP / Set config](https://docs.edgeimpulse.com/reference/edge-impulse-api/dsp/set_config)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Ht2LegOF1rYb" }, "outputs": [], "source": [ "# Get processing block config\n", "response = dsp_api.get_dsp_config(\n", " project_id=project_id,\n", " dsp_id=processing_id\n", ")\n", "\n", "# Construct user-readable parameters\n", "settings = []\n", "for group in response.config:\n", " for item in group.items:\n", " element = {}\n", " element[\"parameter\"] = item.param\n", " element[\"description\"] = item.help\n", " element[\"currentValue\"] = item.value\n", " element[\"defaultValue\"] = item.default_value\n", " element[\"type\"] = item.type\n", " if hasattr(item, \"select_options\") and \\\n", " getattr(item, \"select_options\") is not None:\n", " element[\"options\"] = [i.value for i in item.select_options]\n", " settings.append(element)\n", "\n", "# Print the settings\n", "print(json.dumps(settings, indent=2))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "TPEuV3ku3vuN" }, "outputs": [], "source": [ "# Define processing block configuration\n", "config_request = DSPConfigRequest.from_dict({\n", " \"config\": {\n", " \"scale-axes\": 1.0,\n", " \"input-decimation-ratio\": 1,\n", " \"filter-type\": \"none\",\n", " \"analysis-type\": \"FFT\",\n", " \"fft-length\": 16,\n", " \"do-log\": True,\n", " \"do-fft-overlap\": True,\n", " \"extra-low-freq\": False,\n", " }\n", "})\n", "\n", "# Set processing block configuration\n", "response = dsp_api.set_dsp_config(\n", " project_id=project_id,\n", " dsp_id=processing_id,\n", " dsp_config_request=config_request\n", ")\n", "if not hasattr(response, \"success\") or getattr(response, \"success\") is False:\n", " raise RuntimeError(\"Could not start feature generation job.\")\n", "else:\n", " print(\"Processing block has been configured.\")" ] }, { "cell_type": "markdown", "metadata": { "id": "dJxMwnVhRrYG" }, "source": [ "## Run processing block to generate features\n", "\n", "After we've defined the impulse, we then want to use our processing block(s) to extract features from our data. We'll skip feature importance and feature explorer to make this go faster.\n", "\n", "Generating features kicks off a job in Studio. A \"job\" involves instantiating a Docker container and running a custom script in the container to perform some action. In our case, that involves reading in data, extracting features from that data, and saving those features as Numpy (.npy) files in our project.\n", "\n", "Because jobs can take a while, the API call will return immediately. If the call was successful, the response will contain a job number. We can then monitor that job and wait for it to finish before continuing.\n", "\n", "API calls (links to associated documentation):\n", "\n", " * [Jobs / Generate features](https://docs.edgeimpulse.com/reference/edge-impulse-api/jobs/generate_features)\n", " * [Jobs / Get job status](https://docs.edgeimpulse.com/reference/edge-impulse-api/jobs/get_job_status)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "gdLwkXUS_QMR" }, "outputs": [], "source": [ "def poll_job(jobs_api, project_id, job_id):\n", " \"\"\"Wait for job to complete\"\"\"\n", "\n", " # Wait for job to complete\n", " while True:\n", "\n", " # Check on job status\n", " response = jobs_api.get_job_status(\n", " project_id=project_id,\n", " job_id=job_id\n", " )\n", " if not hasattr(response, \"success\") or getattr(response, \"success\") is False:\n", " print(\"ERROR: Could not get job status\")\n", " return False\n", " else:\n", " if hasattr(response, \"job\") and hasattr(response.job, \"finished\"):\n", " if response.job.finished:\n", " print(f\"Job completed at {response.job.finished}\")\n", " return response.job.finished_successful\n", " else:\n", " print(\"ERROR: Response did not contain a 'job' field.\")\n", " return False\n", "\n", " # Print that we're still running and wait\n", " print(f\"Waiting for job {job_id} to finish...\")\n", " time.sleep(2.0)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "dxddUwKWWcj7" }, "outputs": [], "source": [ "# Define generate features request\n", "generate_features_request = GenerateFeaturesRequest.from_dict({\n", " \"dspId\": processing_id,\n", " \"calculate_feature_importance\": False,\n", " \"skip_feature_explorer\": True,\n", "})\n", "\n", "# Generate features\n", "response = jobs_api.generate_features_job(\n", " project_id=project_id,\n", " generate_features_request=generate_features_request,\n", ")\n", "if not hasattr(response, \"success\") or getattr(response, \"success\") is False:\n", " raise RuntimeError(\"Could not start feature generation job.\")\n", "\n", "# Extract job ID\n", "job_id = response.id\n", "\n", "# Wait for job to complete\n", "success = poll_job(jobs_api, project_id, job_id)\n", "if success:\n", " print(\"Features have been generated.\")\n", "else:\n", " print(f\"ERROR: Job failed. See https://studio.edgeimpulse.com/studio/{project_id}/jobs#show-job-{job_id} for more details.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "0wk6uWvwAVia" }, "outputs": [], "source": [ "# Optional: download NumPy features (x: training data, y: training labels)\n", "print(\"Go here to download the generated features in NumPy format:\")\n", "print(f\"https://studio.edgeimpulse.com/v1/api/{project_id}/dsp-data/{processing_id}/x/training\")\n", "print(f\"https://studio.edgeimpulse.com/v1/api/{project_id}/dsp-data/{processing_id}/y/training\")" ] }, { "cell_type": "markdown", "metadata": { "id": "8q3_LLwwEoEA" }, "source": [ "## Use learning block to train model\n", "\n", "Now that we have trained features, we can run the learning block to train the model on those features. Note that Edge Impulse has a number of learning blocks, each with different methods of configuration. We'll be using the \"keras\" block, which uses TensorFlow and Keras under the hood.\n", "\n", "You can use the [get_keras](https://docs.edgeimpulse.com/reference/python-api-bindings/edgeimpulse_api/api/learn_api#get_keras) and [set_keras](https://docs.edgeimpulse.com/reference/python-api-bindings/edgeimpulse_api/api/learn_api#set_keras) functions to configure the granular settings. We'll use the defaults for that block and just set the number of epochs and learning rate for training.\n", "\n", "API calls (links to associated documentation):\n", "\n", " * [Jobs / Train model (Keras)](https://docs.edgeimpulse.com/reference/edge-impulse-api/jobs/train_model_-keras)\n", " * [Jobs / Get job status](https://docs.edgeimpulse.com/reference/edge-impulse-api/jobs/get_job_status)\n", " * [Jobs / Get logs](https://docs.edgeimpulse.com/reference/edge-impulse-api/jobs/get_logs)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "_PtkJ0ikBf9l" }, "outputs": [], "source": [ " # Define training request\n", "keras_parameter_request = SetKerasParameterRequest.from_dict({\n", " \"mode\": \"visual\",\n", " \"training_cycles\": 10,\n", " \"learning_rate\": 0.001,\n", " \"train_test_split\": 0.8,\n", " \"skip_embeddings_and_memory\": True,\n", "})\n", "\n", "# Train model\n", "response = jobs_api.train_keras_job(\n", " project_id=project_id,\n", " learn_id=learning_id,\n", " set_keras_parameter_request=keras_parameter_request,\n", ")\n", "if not hasattr(response, \"success\") or getattr(response, \"success\") is False:\n", " raise RuntimeError(\"Could not start training job.\")\n", "\n", "# Extract job ID\n", "job_id = response.id\n", "\n", "# Wait for job to complete\n", "success = poll_job(jobs_api, project_id, job_id)\n", "if success:\n", " print(\"Model has been trained.\")\n", "else:\n", " print(f\"ERROR: Job failed. See https://studio.edgeimpulse.com/studio/{project_id}/jobs#show-job-{job_id} for more details.\")" ] }, { "cell_type": "markdown", "metadata": { "id": "8LAglLwn6Jma" }, "source": [ "Now that the model has been trained, we can go back to the job logs to find the accuracy metrics for both the float32 and int8 quantization levels. We'll need to parse the logs to find these. Because the logs are printed with the most recent events first, we'll work backwards through the log to find these metrics." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "y3fb0yfm6ceG" }, "outputs": [], "source": [ "def get_metrics(response, quantization=None):\n", " \"\"\"\n", " Parse the response to find the accuracy/training metrics for a given\n", " quantization level. If quantization is None, return the first set of metrics\n", " found.\n", " \"\"\"\n", " metrics = None\n", " delimiter_str = \"calculate_classification_metrics\"\n", "\n", " # Skip finding quantization metrics if not given\n", " if quantization:\n", " quantization_found = False\n", " else:\n", " quantization_found = True\n", "\n", " # Parse logs\n", " for log in reversed(response.to_dict()[\"stdout\"]):\n", " data_field = log[\"data\"]\n", " if quantization_found:\n", " substrings = data_field.split(\"\\n\")\n", " for substring in substrings:\n", " substring = substring.strip()\n", " if substring.startswith(delimiter_str):\n", " metrics = json.loads(substring[len(delimiter_str):])\n", " break\n", " else:\n", " if data_field.startswith(f\"Calculating {quantization} accuracy\"):\n", " quantization_found = True\n", "\n", " return metrics" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "AB47VpTXxwnL" }, "outputs": [], "source": [ "# Get the job logs for the previous job\n", "response = jobs_api.get_jobs_logs(\n", " project_id=project_id,\n", " job_id=job_id\n", ")\n", "if not hasattr(response, \"success\") or getattr(response, \"success\") is False:\n", " raise RuntimeError(\"Could not get job log.\")\n", "\n", "# Print training metrics (quantization is \"float32\" or \"int8\")\n", "quantization = \"float32\"\n", "metrics = get_metrics(response, quantization)\n", "if metrics:\n", " print(f\"Training metrics for {quantization} quantization:\")\n", " pprint.pprint(metrics)\n", "else:\n", " print(\"ERROR: Could not get training metrics.\")" ] }, { "cell_type": "markdown", "metadata": { "id": "dIuT-Mhp-71J" }, "source": [ "## Test the impulse\n", "\n", "As with any good machine learning project, we should test the accuracy of the model using our holdout (\"testing\") set. We'll call the `classify` API function to make that happen and then parse the job logs to get the results.\n", "\n", "In most cases, using `int8` quantization will result in a faster, smaller model, but you will slightly lose some accuracy.\n", "\n", "API calls (links to associated documentation):\n", "\n", " * [Jobs / Classify](https://docs.edgeimpulse.com/reference/edge-impulse-api/jobs/classify)\n", " * [Jobs / Get job status](https://docs.edgeimpulse.com/reference/edge-impulse-api/jobs/get_job_status)\n", " * [Jobs / Get logs](https://docs.edgeimpulse.com/reference/edge-impulse-api/jobs/get_logs)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "HdEksW2M-7Ob" }, "outputs": [], "source": [ " # Set the model quantization level (\"float32\", \"int8\", or \"akida\")\n", "quantization = \"int8\"\n", "classify_request = StartClassifyJobRequest.from_dict({\n", " \"model_variants\": quantization\n", "})\n", "\n", "# Start model testing job\n", "response = jobs_api.start_classify_job(\n", " project_id=project_id,\n", " start_classify_job_request=classify_request\n", ")\n", "if not hasattr(response, \"success\") or getattr(response, \"success\") is False:\n", " raise RuntimeError(\"Could not start classify job.\")\n", "\n", "# Extract job ID\n", "job_id = response.id\n", "\n", "# Wait for job to complete\n", "success = poll_job(jobs_api, project_id, job_id)\n", "if success:\n", " print(\"Inference performed on test set.\")\n", "else:\n", " print(f\"ERROR: Job failed. See https://studio.edgeimpulse.com/studio/{project_id}/jobs#show-job-{job_id} for more details.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "RYTJl-7GCC65" }, "outputs": [], "source": [ "# Get the job logs for the previous job\n", "response = jobs_api.get_jobs_logs(\n", " project_id=project_id,\n", " job_id=job_id\n", ")\n", "if not hasattr(response, \"success\") or getattr(response, \"success\") is False:\n", " raise RuntimeError(\"Could not get job log.\")\n", "\n", "# Print\n", "metrics = get_metrics(response)\n", "if metrics:\n", " print(f\"Test metrics for {quantization} quantization:\")\n", " pprint.pprint(metrics)\n", "else:\n", " print(\"ERROR: Could not get test metrics.\")" ] }, { "cell_type": "markdown", "metadata": { "id": "MsEAb8V6MO2B" }, "source": [ "## Deploy the impulse\n", "\n", "Now that you've trained the model, let's build it as a C++ library and download it. We'll start by printing out the available target devices. Note that this list changes depending on how you've configured your impulse. For example, if you use a Syntiant-specific learning block, then you'll see Syntiant boards listed. We'll use the \"zip\" target, which gives us a generic C++ library that we can use for nearly any hardware.\n", "\n", "The `engine` must be one of:\n", "\n", "```\n", "tflite\n", "tflite-eon\n", "tflite-eon-ram-optimized\n", "tensorrt\n", "tensaiflow\n", "drp-ai\n", "tidl\n", "akida\n", "syntiant\n", "memryx\n", "neox\n", "```\n", "\n", "We'll use `tflite`, as that's the most ubiquitous.\n", "\n", "`modelType` is the quantization level. Your options are:\n", "\n", "```\n", "float32\n", "int8\n", "```\n", "\n", "In most cases, using `int8` quantization will result in a faster, smaller model, but you will slightly lose some accuracy.\n", "\n", "API calls (links to associated documentation):\n", "\n", " * [Deployment / Deployment targets (data sources)](https://docs.edgeimpulse.com/reference/edge-impulse-api/deployment/deployment_targets_-data_sources)\n", " * [Jobs / Build on-device model](https://docs.edgeimpulse.com/reference/edge-impulse-api/jobs/build_on-device_model)\n", " * [Deployment / Download](https://docs.edgeimpulse.com/reference/edge-impulse-api/deployment/download)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "9kePPtX7OsbM" }, "outputs": [], "source": [ "# Get the available devices\n", "response = deployment_api.list_deployment_targets_for_project_data_sources(\n", " project_id=project_id\n", ")\n", "if not hasattr(response, \"success\") or getattr(response, \"success\") is False:\n", " raise RuntimeError(\"Could not get device list.\")\n", "\n", "# Print the available devices\n", "targets = [x.to_dict()[\"format\"] for x in response.targets]\n", "for target in targets:\n", " print(target)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "qInW3vE6OaN6" }, "outputs": [], "source": [ "# Choose the target hardware (from the list above), engine,\n", "target_hardware = \"zip\"\n", "engine = \"tflite\"\n", "quantization = \"int8\"\n", "\n", "# Construct request\n", "device_model_request = BuildOnDeviceModelRequest.from_dict({\n", " \"engine\": engine,\n", " \"modelType\": quantization\n", "})\n", "\n", "# Start build job\n", "response = jobs_api.build_on_device_model_job(\n", " project_id=project_id,\n", " type=target_hardware,\n", " build_on_device_model_request=device_model_request,\n", ")\n", "if not hasattr(response, \"success\") or getattr(response, \"success\") is False:\n", " raise RuntimeError(\"Could not start feature generation job.\")\n", "\n", "# Extract job ID\n", "job_id = response.id\n", "\n", "# Wait for job to complete\n", "success = poll_job(jobs_api, project_id, job_id)\n", "if success:\n", " print(\"Impulse built.\")\n", "else:\n", " print(f\"ERROR: Job failed. See https://studio.edgeimpulse.com/studio/{project_id}/jobs#show-job-{job_id} for more details.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "QLU8jDNFpv9T" }, "outputs": [], "source": [ "# Get the download link information\n", "response = deployment_api.download_build(\n", " project_id=project_id,\n", " type=target_hardware,\n", " model_type=quantization,\n", " engine=engine,\n", " _preload_content=False,\n", ")\n", "if response.status != 200:\n", " raise RuntimeError(\"Could not get download information.\")\n", "\n", "# Find the file name in the headers\n", "file_name = re.findall(r\"filename\\*?=(.+)\", response.headers[\"Content-Disposition\"])[0].replace(\"utf-8''\", \"\")\n", "file_path = os.path.join(OUTPUT_PATH, file_name)\n", "\n", "# Write the contents to a file\n", "with open(file_path, \"wb\") as f:\n", " f.write(response.data)" ] }, { "cell_type": "markdown", "metadata": { "id": "klQoJH9yvO2C" }, "source": [ "You should have a .zip file in the same directory as this notebook. Download or move it to somewhere else on your computer and unzip it. You can now follow [this guide](https://docs.edgeimpulse.com/docs/run-inference/cpp-library/deploy-your-model-as-a-c-library) to link and compile the library as part of an application." ] } ], "metadata": { "colab": { "provenance": [] }, "kernelspec": { "display_name": "Python 3", "name": "python3" }, "language_info": { "name": "python" } }, "nbformat": 4, "nbformat_minor": 0 }