# Build a Data Analyst AI Agent from Scratch

## Project Setup  

This section outlines the steps required to set up the project:  
- Import necessary libraries  
- Load the required API keys  
- Establish the database connection  
- Load the sample data into the database  


In [1]:
!pip install openai

Defaulting to user installation because normal site-packages is not writeable


In [2]:
from openai import OpenAI
import json
import inspect
import requests
import pandas as pd
from teradataml import *
from datetime import date, datetime
from decimal import Decimal
from typing import Any, Optional

In [3]:
with open('configs.json', 'r') as file:
    configs=json.load(file)

In [4]:
client = OpenAI(api_key = configs['llm-api-key'])

In [5]:
%run -i ../startup.ipynb
eng = create_context(host = 'host.docker.internal', username='demo_user', password = password)
print(eng)

Performing setup ...
Setup complete



Enter password:  ············


... Logon successful
Connected as: teradatasql://demo_user:xxxxx@host.docker.internal/dbc
Engine(teradatasql://demo_user:***@host.docker.internal)


In [None]:
data_loading_queries = [
'''
CREATE DATABASE teddy_retailers
AS PERMANENT = 50000000;
''',
'''
CREATE TABLE teddy_retailers.fct_order_details AS
(
  SELECT order_id,product_id,customer_id,unit_price,quantity,amount
     FROM (
		LOCATION='/s3/dev-rel-demos.s3.amazonaws.com/ai_agent_dbt/fct_order_details.csv') as order_details
) WITH DATA;''',

'''
CREATE TABLE teddy_retailers.dim_customers AS
(
  SELECT customer_id,first_name,last_name,email
     FROM (
		LOCATION='/s3/dev-rel-demos.s3.amazonaws.com/ai_agent_dbt/dim_customers.csv') as customers
) WITH DATA;
''',
'''
CREATE TABLE teddy_retailers.dim_orders AS
(
  SELECT order_id,order_date,order_status
     FROM (
		LOCATION='/s3/dev-rel-demos.s3.amazonaws.com/ai_agent_dbt/dim_orders.csv') as orders
) WITH DATA;
''',
'''
CREATE TABLE teddy_retailers.dim_products AS
(
  SELECT product_id,product_name,product_category,price_dollars
     FROM (
		LOCATION='/s3/dev-rel-demos.s3.amazonaws.com/ai_agent_dbt/dim_products.csv') as products
) WITH DATA;
'''
]
for query in data_loading_queries:
    execute_sql(query)

## Agent Configuration  

This section covers the configuration of the agent, including:  
* Defining the data context that the agent will interact with  
* Setting up the routine the agent will follow as a system prompt (embedding the data context)  
* Establishing the list of tools available for the agent to complete its tasks  


In [6]:
def query_dbt_catalog(path):
    response = requests.get(path)
    catalog = response.json()
    return catalog

In [7]:
dbt_catalog_path = 'https://raw.githubusercontent.com/Teradata/simple_data_ai_agent/refs/heads/main/catalogs/catalog.json'

In [8]:
system_prompt_full = f"""
You are an advanced data analyst for a retail company, specializing in analyzing data from a Teradata system. Your primary responsibility is to assist users by answering business-related questions using SQL queries on the Teradata database. Follow these steps:

1. Understanding User Requests
   - Users provide business questions in plain English.
   - Extract relevant data points needed to construct a meaningful response.

2. Generating SQL Queries
   - Construct an optimized Teradata SQL query to retrieve the necessary data.
   - The query must be a **single-line string** without carriage returns or line breaks.
   - The catalog of databases, tables, and columns to query is in the following JSON structure, which is a dbt catalog. Use only the dimensional and fact tables, which are prefixed with `dim_` and `fct_`, respectively, to build the query.  
     {query_dbt_catalog(dbt_catalog_path)}
   - Ensure that the SQL query adheres to **Teradata SQL syntax** and avoids unsupported keywords such as `LIMIT`, `FETCH`, use Teradata statements such as `SAMPLE`.
   - Apply necessary joins between tables if the user's question requires it.
   - Apply appropriate filtering, grouping, and ordering to enhance performance and accuracy.

3. Executing the Query
   - Run the query in the Teradata database.

4. Responding to the User
   - Respond to the user's question based on the result of running the query.
   - Show the user the query you ran and your reasoning for running it.
   
"""

In [9]:
def function_to_schema(func) -> dict:
    type_map = {
        str: "string",
        int: "integer",
        float: "number",
        bool: "boolean",
        list: "array",
        dict: "object",
        type(None): "null",
    }

    try:
        signature = inspect.signature(func)
    except ValueError as e:
        raise ValueError(
            f"Failed to get signature for function {func.__name__}: {str(e)}"
        )

    parameters = {}
    for param in signature.parameters.values():
        try:
            param_type = type_map.get(param.annotation, "string")
        except KeyError as e:
            raise KeyError(
                f"Unknown type annotation {param.annotation} for parameter {param.name}: {str(e)}"
            )
        parameters[param.name] = {"type": param_type}

    required = [
        param.name
        for param in signature.parameters.values()
        if param.default == inspect._empty
    ]

    return {
        "type": "function",
        "function": {
            "name": func.__name__,
            "description": (func.__doc__ or "").strip(),
            "parameters": {
                "type": "object",
                "properties": parameters,
                "required": required,
            },
        },
    }

In [10]:
def serialize_teradata_types(obj: Any) -> Any:
    """Convert Teradata-specific types to JSON serializable formats."""
    if isinstance(obj, (date, datetime)):
        return obj.isoformat()
    if isinstance(obj, Decimal):
        return float(obj)
    return str(obj)

def rows_to_json(cursor_description: Any, rows: list[Any]) -> list[dict[str, Any]]:
    """Convert DB rows into JSON objects using column names as keys."""
    if not cursor_description or not rows:
        return []
    columns = [col[0] for col in cursor_description]
    out: list[dict[str, Any]] = []
    for row in rows:
        out.append({col: serialize_teradata_types(val) for col, val in zip(columns, row)})
    return out

In [11]:
def query_teradata_database(sql_statement):
    result = execute_sql(sql_statement)
    data = rows_to_json(result.description, result.fetchall())
    return json.dumps(data)

## Agent Runtime
This section covers the code executed while the agent is in action, including:
* Preparing the tools for use by the agent
* The agent's runtime function

In [12]:
tools = [query_teradata_database]
tool_schemas = [function_to_schema(tool) for tool in tools]
tools_map = {tool.__name__: tool for tool in tools}

In [13]:
tool_schemas

[{'type': 'function',
  'function': {'name': 'query_teradata_database',
   'description': '',
   'parameters': {'type': 'object',
    'properties': {'sql_statement': {'type': 'string'}},
    'required': ['sql_statement']}}}]

In [14]:
tools_map

{'query_teradata_database': <function __main__.query_teradata_database(sql_statement)>}

In [15]:
def execute_tool_call(tool_call, tools_map):
    name = tool_call.function.name
    args = json.loads(tool_call.function.arguments)
    print(f"Assistant: {name}({args})")
    # call corresponding function with provided arguments
    return tools_map[name](**args)

In [16]:
def run_full_turn(system_message, messages):

    while True:
        print(f"just logging messages {messages}")
        response = client.chat.completions.create(
            model="gpt-4.1",
            messages=[{"role": "system", "content": system_message}] + messages,
            tools=tool_schemas or None,
            seed = 2
        )
        print(f"logging response {response}")
        message = response.choices[0].message
        messages.append(message)

        if message.content:  # print assistant response
            print("Assistant Response:", message.content)

        if message.tool_calls:  # if finished handling tool calls, break
            # === handle tool calls ===
            for tool_call in message.tool_calls:
                result = execute_tool_call(tool_call, tools_map)

                result_message = {
                    "role": "tool",
                    "tool_call_id": tool_call.id,
                    "content": result,
                }
                messages.append(result_message)
        else:
            break

## Running the Agent  
Ask a business question and receive a response. Since this is a simple agent, it can only handle basic questions. While its capabilities can be enhanced, such improvements are currently out of scope.

In [17]:
messages =[]
user_input = input("User: ")
messages.append({"role": "user", "content": user_input})
new_messages = run_full_turn(system_prompt_full, messages)

User:  Who are our most profitable customers


just logging messages [{'role': 'user', 'content': 'Who are our most profitable customers'}]
logging response ChatCompletion(id='chatcmpl-CYW5sm4GkSbC8BfnDRNdpHVfLAyDN', choices=[Choice(finish_reason='tool_calls', index=0, logprobs=None, message=ChatCompletionMessage(content='To answer this question, I need to identify the customers who generated the highest total sales (profitability) for the company. This requires:\n\n- Calculating the total amount spent by each customer, which can be derived from the fct_order_details table (amount column).\n- Retrieving customer information from the dim_customers table.\n\nI will join fct_order_details with dim_customers using customer_id, sum the amount for each customer, and order by total amount in descending order.\n\nHere is the Teradata SQL query I will use:\nSELECT c.customer_id, c.first_name, c.last_name, c.email, SUM(f.amount) AS total_spent FROM teddy_retailers.fct_order_details f JOIN teddy_retailers.dim_customers c ON f.customer_id = c.

## Cleaning testing data

In [None]:
data_cleaning_queries = [
'''
DELETE DATABASE teddy_retailers ALL;
''',
'''
DROP DATABASE teddy_retailers
'''
]
for query in data_cleaning_queries:
    execute_sql(query)