{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Use a table pipeline as a reusable function\n", "\n", "Convert a table with computed columns into a callable function for multi-agent workflows and pipeline composition." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Problem\n", "\n", "You have a table that runs a complex pipeline—LLM calls, tool use, post-processing—and you want to reuse that entire pipeline from other tables. Copy-pasting computed column definitions is error-prone and hard to maintain.\n", "\n", "| Scenario | Pipeline | Consumers |\n", "|----------|----------|-----------|\n", "| Research agent | Web search → LLM analysis → Summary | Report generator, Dashboard |\n", "| Data enrichment | Lookup → Transform → Validate | Orders table, Analytics |\n", "| Content moderation | Vision analysis → Text check → Score | User uploads, Comments |" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Solution\n", "\n", "**What's in this recipe:**\n", "\n", "- Create an \"agent\" table with computed columns\n", "- Convert the table to a callable UDF with `pxt.udf(table, return_value=...)`\n", "- Use the table UDF in other tables' computed columns\n", "\n", "You wrap an entire table pipeline as a function. When you call this function from another table, it inserts a row into the agent table, runs all computed columns, and returns the specified output column." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Setup" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%pip install -qU pixeltable openai" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "import getpass\n", "import os\n", "\n", "if 'OPENAI_API_KEY' not in os.environ:\n", " os.environ['OPENAI_API_KEY'] = getpass.getpass('OpenAI API Key: ')" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Connected to Pixeltable database at: postgresql+psycopg://postgres:@/pixeltable?host=/Users/pjlb/.pixeltable/pgdata\n", "Created directory 'table_udf_demo'.\n" ] }, { "data": { "text/plain": [ "" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import pixeltable as pxt\n", "from pixeltable.functions.openai import chat_completions\n", "\n", "# Create a fresh directory\n", "pxt.drop_dir('table_udf_demo', force=True)\n", "pxt.create_dir('table_udf_demo')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create an agent table with computed columns\n", "\n", "You create a table that encapsulates a complete pipeline. This example builds a summarization agent:" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Created table 'summarizer'.\n" ] } ], "source": [ "# Create the agent table with input column\n", "summarizer = pxt.create_table(\n", " 'table_udf_demo/summarizer', {'text': pxt.String}\n", ")" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Added 0 column values with 0 errors.\n" ] }, { "data": { "text/plain": [ "No rows affected." ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Add the LLM call as a computed column\n", "summarizer.add_computed_column(\n", " response=chat_completions(\n", " messages=[\n", " {\n", " 'role': 'user',\n", " 'content': 'Summarize this in one sentence:\\n\\n'\n", " + summarizer.text,\n", " }\n", " ],\n", " model='gpt-4o-mini',\n", " )\n", ")" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Added 0 column values with 0 errors.\n" ] }, { "data": { "text/plain": [ "No rows affected." ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Extract the summary text\n", "summarizer.add_computed_column(\n", " summary=summarizer.response.choices[0].message.content\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Convert the table to a UDF\n", "\n", "You use `pxt.udf(table, return_value=...)` to convert the table into a callable function. The `return_value` specifies which column to return:" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "# Convert the summarizer table into a callable UDF\n", "summarize = pxt.udf(summarizer, return_value=summarizer.summary)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Use the table UDF in another table\n", "\n", "You can now use `summarize()` as a computed column in any other table:" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Created table 'articles'.\n" ] } ], "source": [ "# Create a table that uses the summarizer\n", "articles = pxt.create_table(\n", " 'table_udf_demo/articles',\n", " {'title': pxt.String, 'content': pxt.String},\n", ")" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Added 0 column values with 0 errors.\n" ] }, { "data": { "text/plain": [ "No rows affected." ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Add the table UDF as a computed column\n", "articles.add_computed_column(summary=summarize(text=articles.content))" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Inserting rows into `articles`: 2 rows [00:00, 196.58 rows/s]\n", "Inserted 2 rows with 0 errors.\n" ] }, { "data": { "text/plain": [ "2 rows inserted, 6 values computed." ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Insert articles - summaries are generated automatically\n", "articles.insert(\n", " [\n", " {\n", " 'title': 'Climate Report',\n", " 'content': 'Global temperatures rose by 1.2 degrees Celsius above pre-industrial levels last year, marking the hottest year on record. Scientists attribute this to continued greenhouse gas emissions and a strong El Nino pattern. The report calls for immediate action to reduce carbon emissions.',\n", " },\n", " {\n", " 'title': 'Tech Merger',\n", " 'content': 'Two major semiconductor companies announced a merger valued at $50 billion. The combined entity will control 30% of the global chip market. Regulators in multiple countries will review the deal over the next 18 months.',\n", " },\n", " ]\n", ")" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
titlesummary
Climate ReportLast year was the hottest on record, with global temperatures rising by 1.2 degrees Celsius above pre-industrial levels due to ongoing greenhouse gas emissions and a strong El Nino pattern, prompting calls for urgent action to reduce carbon emissions.
Tech MergerTwo major semiconductor companies are merging in a \\$50 billion deal that will give them a 30% share of the global chip market, pending regulatory review in several countries over the next 18 months.
" ], "text/plain": [ " title summary\n", "0 Climate Report Last year was the hottest on record, with glob...\n", "1 Tech Merger Two major semiconductor companies are merging ..." ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# View results\n", "articles.select(articles.title, articles.summary).collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Explanation\n", "\n", "**How table UDFs work:**\n", "\n", "```\n", "Consumer table row → Table UDF called → Agent table inserts row →\n", "Computed columns run → Return value extracted → Consumer gets result\n", "```\n", "\n", "**When to use table UDFs vs `@pxt.query`:**\n", "\n", "| Pattern | Use when |\n", "|---------|----------|\n", "| Table UDF | You need to run a multi-step pipeline (LLM → process → LLM) |\n", "| `@pxt.query` | You need to retrieve/search existing data |\n", "| `@pxt.udf` | You need a pure Python transformation |\n", "\n", "**Key benefits:**\n", "\n", "- **Encapsulation**: Hide complex pipeline details behind a simple function call\n", "- **Reusability**: Use the same agent from multiple consumer tables\n", "- **Persistence**: All intermediate results are stored in the agent table for debugging\n", "- **Composition**: Chain agents together for multi-stage workflows" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## See also\n", "\n", "- [Look up structured data](https://docs.pixeltable.com/howto/cookbooks/agents/pattern-data-lookup) - Simple key-based lookups with `retrieval_udf`\n", "- [Build a RAG pipeline](https://docs.pixeltable.com/howto/cookbooks/agents/pattern-rag-pipeline) - Retrieval with `@pxt.query`\n", "- [Use tool calling with LLMs](https://docs.pixeltable.com/howto/cookbooks/agents/llm-tool-calling) - Add tools to agent tables" ] } ], "metadata": { "kernelspec": { "display_name": "pixeltable", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.11" } }, "nbformat": 4, "nbformat_minor": 2 }