In [None]:
!pip -q install langchain huggingface_hub openai google-search-results tiktoken cohere faiss-cpu

[2K [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m518.3/518.3 kB[0m [31m8.2 MB/s[0m eta [36m0:00:00[0m
[2K [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.1/200.1 kB[0m [31m12.4 MB/s[0m eta [36m0:00:00[0m
[2K [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m70.3/70.3 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h Preparing metadata (setup.py) ... [?25l[?25hdone
[2K [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m27.7 MB/s[0m eta [36m0:00:00[0m
[2K [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m17.0/17.0 MB[0m [31m19.6 MB/s[0m eta [36m0:00:00[0m
[2K [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.0/1.0 MB[0m [31m15.9 MB/s[0m eta [36m0:00:00[0m
[2K [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m20.7 MB/s[0m eta [36m0:00:00[0m
[2K [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m90.0/90.0 kB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[2K [90m━━━━

# BabyAGI with LangChain

Task Creation 
Task Prioritization 
Execution

In [None]:
import os

os.environ["OPENAI_API_KEY"] = ""
os.environ["SERPAPI_API_KEY"] = ""

In [None]:
!pip show langchain

Name: langchain
Version: 0.0.137
Summary: Building applications with LLMs through composability
Home-page: https://www.github.com/hwchase17/langchain
Author: 
Author-email: 
License: MIT
Location: /usr/local/lib/python3.9/dist-packages
Requires: aiohttp, async-timeout, dataclasses-json, numpy, openapi-schema-pydantic, pydantic, PyYAML, requests, SQLAlchemy, tenacity
Required-by: 


### Setting up BabyAGI

In [None]:
import os
from collections import deque
from typing import Dict, List, Optional, Any

from langchain import LLMChain, OpenAI, PromptTemplate
from langchain.embeddings import OpenAIEmbeddings
from langchain.llms import BaseLLM
from langchain.vectorstores.base import VectorStore
from pydantic import BaseModel, Field
from langchain.chains.base import Chain

In [None]:
from langchain.vectorstores import FAISS
from langchain.docstore import InMemoryDocstore

In [None]:
# Define your embedding model
embeddings_model = OpenAIEmbeddings()

# Initialize the vectorstore as empty
import faiss
embedding_size = 1536
index = faiss.IndexFlatL2(embedding_size)
vectorstore = FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {})

## Chains

In [None]:
class TaskCreationChain(LLMChain):
 """Chain to generates tasks."""

 @classmethod
 def from_llm(cls, llm: BaseLLM, verbose: bool = True) -> LLMChain:
 """Get the response parser."""
 task_creation_template = (
 "You are an task creation AI that uses the result of an execution agent"
 " to create new tasks with the following objective: {objective},"
 " The last completed task has the result: {result}."
 " This result was based on this task description: {task_description}."
 " These are incomplete tasks: {incomplete_tasks}."
 " Based on the result, create new tasks to be completed"
 " by the AI system that do not overlap with incomplete tasks."
 " Return the tasks as an array."
 )
 prompt = PromptTemplate(
 template=task_creation_template,
 input_variables=["result", "task_description", "incomplete_tasks", "objective"],
 )
 return cls(prompt=prompt, llm=llm, verbose=verbose)

In [None]:
class TaskPrioritizationChain(LLMChain):
 """Chain to prioritize tasks."""

 @classmethod
 def from_llm(cls, llm: BaseLLM, verbose: bool = True) -> LLMChain:
 """Get the response parser."""
 task_prioritization_template = (
 "You are an task prioritization AI tasked with cleaning the formatting of and reprioritizing"
 " the following tasks: {task_names}."
 " Consider the ultimate objective of your team: {objective}."
 " Do not remove any tasks. Return the result as a numbered list, like:"
 " #. First task"
 " #. Second task"
 " Start the task list with number {next_task_id}."
 )
 prompt = PromptTemplate(
 template=task_prioritization_template,
 input_variables=["task_names", "next_task_id", "objective"],
 )
 return cls(prompt=prompt, llm=llm, verbose=verbose)

In [None]:
# class ExecutionChain(LLMChain):
# """Chain to execute tasks."""

# @classmethod
# def from_llm(cls, llm: BaseLLM, verbose: bool = True) -> LLMChain:
# """Get the response parser."""
# execution_template = (
# "You are an AI who performs one task based on the following objective: {objective}."
# " Take into account these previously completed tasks: {context}."
# " Your task: {task}."
# " Response:"
# )
# prompt = PromptTemplate(
# template=execution_template,
# input_variables=["objective", "context", "task"],
# )
# return cls(prompt=prompt, llm=llm, verbose=verbose)

#### With tools

In [None]:
from langchain.agents import ZeroShotAgent, Tool, AgentExecutor
from langchain import OpenAI, SerpAPIWrapper, LLMChain
todo_prompt = PromptTemplate.from_template("You are a planner who is an expert at coming up with a todo list for a given objective. Come up with a todo list for this objective: {objective}")
todo_chain = LLMChain(llm=OpenAI(temperature=0), prompt=todo_prompt)
search = SerpAPIWrapper()
tools = [
 Tool(
 name = "Search",
 func=search.run,
 description="useful for when you need to answer questions about current events"
 ),
 Tool(
 name = "TODO",
 func=todo_chain.run,
 description="useful for when you need to come up with todo lists. Input: an objective to create a todo list for. Output: a todo list for that objective. Please be very clear what the objective is!"
 )
]


prefix = """You are an AI who performs one task based on the following objective: {objective}. Take into account these previously completed tasks: {context}."""
suffix = """Question: {task}
{agent_scratchpad}"""
prompt = ZeroShotAgent.create_prompt(
 tools, 
 prefix=prefix, 
 suffix=suffix, 
 input_variables=["objective", "task", "context","agent_scratchpad"]
)

## BabyAGI Controller

In [None]:
def get_next_task(task_creation_chain: LLMChain, result: Dict, task_description: str, task_list: List[str], objective: str) -> List[Dict]:
 """Get the next task."""
 incomplete_tasks = ", ".join(task_list)
 response = task_creation_chain.run(result=result, task_description=task_description, incomplete_tasks=incomplete_tasks, objective=objective)
 new_tasks = response.split('\n')
 return [{"task_name": task_name} for task_name in new_tasks if task_name.strip()]

In [None]:
def prioritize_tasks(task_prioritization_chain: LLMChain, this_task_id: int, task_list: List[Dict], objective: str) -> List[Dict]:
 """Prioritize tasks."""
 task_names = [t["task_name"] for t in task_list]
 next_task_id = int(this_task_id) + 1
 response = task_prioritization_chain.run(task_names=task_names, 
 next_task_id=next_task_id, 
 objective=objective)
 new_tasks = response.split('\n')
 prioritized_task_list = []
 for task_string in new_tasks:
 if not task_string.strip():
 continue
 task_parts = task_string.strip().split(".", 1)
 if len(task_parts) == 2:
 task_id = task_parts[0].strip()
 task_name = task_parts[1].strip()
 prioritized_task_list.append({"task_id": task_id, "task_name": task_name})
 return prioritized_task_list

In [None]:
def _get_top_tasks(vectorstore, query: str, k: int) -> List[str]:
 """Get the top k tasks based on the query."""
 results = vectorstore.similarity_search_with_score(query, k=k)
 if not results:
 return []
 sorted_results, _ = zip(*sorted(results, key=lambda x: x[1], reverse=True))
 return [str(item.metadata['task']) for item in sorted_results]

def execute_task(vectorstore, execution_chain: LLMChain, objective: str, task: str, k: int = 5) -> str:
 """Execute a task."""
 context = _get_top_tasks(vectorstore, query=objective, k=k)
 return execution_chain.run(objective=objective, context=context, task=task)

In [None]:
# class BabyAGI(Chain, BaseModel):
# """Controller model for the BabyAGI agent."""

# task_list: deque = Field(default_factory=deque)
# task_creation_chain: TaskCreationChain = Field(...)
# task_prioritization_chain: TaskPrioritizationChain = Field(...)
# execution_chain: ExecutionChain = Field(...)
# task_id_counter: int = Field(1)
# vectorstore: VectorStore = Field(init=False)
# max_iterations: Optional[int] = None
 
# class Config:
# """Configuration for this pydantic object."""
# arbitrary_types_allowed = True

# def add_task(self, task: Dict):
# self.task_list.append(task)

# def print_task_list(self):
# print("\033[95m\033[1m" + "\n*****TASK LIST*****\n" + "\033[0m\033[0m")
# for t in self.task_list:
# print(str(t["task_id"]) + ": " + t["task_name"])

# def print_next_task(self, task: Dict):
# print("\033[92m\033[1m" + "\n*****NEXT TASK*****\n" + "\033[0m\033[0m")
# print(str(task["task_id"]) + ": " + task["task_name"])

# def print_task_result(self, result: str):
# print("\033[93m\033[1m" + "\n*****TASK RESULT*****\n" + "\033[0m\033[0m")
# print(result)
 
# @property
# def input_keys(self) -> List[str]:
# return ["objective"]
 
# @property
# def output_keys(self) -> List[str]:
# return []

# def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
# """Run the agent."""
# objective = inputs['objective']
# first_task = inputs.get("first_task", "Make a todo list")
# self.add_task({"task_id": 1, "task_name": first_task})
# num_iters = 0
# while True:
# if self.task_list:
# self.print_task_list()

# # Step 1: Pull the first task
# task = self.task_list.popleft()
# self.print_next_task(task)

# # Step 2: Execute the task
# result = execute_task(
# self.vectorstore, self.execution_chain, objective, task["task_name"]
# )
# this_task_id = int(task["task_id"])
# self.print_task_result(result)

# # Step 3: Store the result in Pinecone
# result_id = f"result_{task['task_id']}"
# self.vectorstore.add_texts(
# texts=[result],
# metadatas=[{"task": task["task_name"]}],
# ids=[result_id],
# )

# # Step 4: Create new tasks and reprioritize task list
# new_tasks = get_next_task(
# self.task_creation_chain, result, task["task_name"], [t["task_name"] for t in self.task_list], objective
# )
# for new_task in new_tasks:
# self.task_id_counter += 1
# new_task.update({"task_id": self.task_id_counter})
# self.add_task(new_task)
# self.task_list = deque(
# prioritize_tasks(
# self.task_prioritization_chain, this_task_id, list(self.task_list), objective
# )
# )
# num_iters += 1
# if self.max_iterations is not None and num_iters == self.max_iterations:
# print("\033[91m\033[1m" + "\n*****TASK ENDING*****\n" + "\033[0m\033[0m")
# break
# return {}

# @classmethod
# def from_llm(
# cls,
# llm: BaseLLM,
# vectorstore: VectorStore,
# verbose: bool = False,
# **kwargs
# ) -> "BabyAGI":
# """Initialize the BabyAGI Controller."""
# task_creation_chain = TaskCreationChain.from_llm(
# llm, verbose=verbose
# )
# task_prioritization_chain = TaskPrioritizationChain.from_llm(
# llm, verbose=verbose
# )
# execution_chain = ExecutionChain.from_llm(llm, verbose=verbose)
# return cls(
# task_creation_chain=task_creation_chain,
# task_prioritization_chain=task_prioritization_chain,
# execution_chain=execution_chain,
# vectorstore=vectorstore,
# **kwargs
# )

In [None]:
class BabyAGI(Chain, BaseModel):
 """Controller model for the BabyAGI agent."""

 task_list: deque = Field(default_factory=deque)
 task_creation_chain: TaskCreationChain = Field(...)
 task_prioritization_chain: TaskPrioritizationChain = Field(...)
 execution_chain: AgentExecutor = Field(...)
 task_id_counter: int = Field(1)
 vectorstore: VectorStore = Field(init=False)
 max_iterations: Optional[int] = None
 
 class Config:
 """Configuration for this pydantic object."""
 arbitrary_types_allowed = True

 def add_task(self, task: Dict):
 self.task_list.append(task)

 def print_task_list(self):
 print("\033[95m\033[1m" + "\n*****TASK LIST*****\n" + "\033[0m\033[0m")
 for t in self.task_list:
 print(str(t["task_id"]) + ": " + t["task_name"])

 def print_next_task(self, task: Dict):
 print("\033[92m\033[1m" + "\n*****NEXT TASK*****\n" + "\033[0m\033[0m")
 print(str(task["task_id"]) + ": " + task["task_name"])

 def print_task_result(self, result: str):
 print("\033[93m\033[1m" + "\n*****TASK RESULT*****\n" + "\033[0m\033[0m")
 print(result)
 
 @property
 def input_keys(self) -> List[str]:
 return ["objective"]
 
 @property
 def output_keys(self) -> List[str]:
 return []

 def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
 """Run the agent."""
 objective = inputs['objective']
 first_task = inputs.get("first_task", "Make a todo list")
 self.add_task({"task_id": 1, "task_name": first_task})
 num_iters = 0
 while True:
 if self.task_list:
 self.print_task_list()

 # Step 1: Pull the first task
 task = self.task_list.popleft()
 self.print_next_task(task)

 # Step 2: Execute the task
 result = execute_task(
 self.vectorstore, self.execution_chain, objective, task["task_name"]
 )
 this_task_id = int(task["task_id"])
 self.print_task_result(result)

 # Step 3: Store the result in Pinecone
 result_id = f"result_{task['task_id']}"
 self.vectorstore.add_texts(
 texts=[result],
 metadatas=[{"task": task["task_name"]}],
 ids=[result_id],
 )

 # Step 4: Create new tasks and reprioritize task list
 new_tasks = get_next_task(
 self.task_creation_chain, result, task["task_name"], [t["task_name"] for t in self.task_list], objective
 )
 for new_task in new_tasks:
 self.task_id_counter += 1
 new_task.update({"task_id": self.task_id_counter})
 self.add_task(new_task)
 self.task_list = deque(
 prioritize_tasks(
 self.task_prioritization_chain, this_task_id, list(self.task_list), objective
 )
 )
 num_iters += 1
 if self.max_iterations is not None and num_iters == self.max_iterations:
 print("\033[91m\033[1m" + "\n*****TASK ENDING*****\n" + "\033[0m\033[0m")
 break
 return {}

 @classmethod
 def from_llm(
 cls,
 llm: BaseLLM,
 vectorstore: VectorStore,
 verbose: bool = False,
 **kwargs
 ) -> "BabyAGI":
 """Initialize the BabyAGI Controller."""
 task_creation_chain = TaskCreationChain.from_llm(
 llm, verbose=verbose
 )
 task_prioritization_chain = TaskPrioritizationChain.from_llm(
 llm, verbose=verbose
 )
 llm_chain = LLMChain(llm=llm, prompt=prompt)
 tool_names = [tool.name for tool in tools]
 agent = ZeroShotAgent(llm_chain=llm_chain, allowed_tools=tool_names)
 agent_executor = AgentExecutor.from_agent_and_tools(agent=agent, tools=tools, verbose=True)
 return cls(
 task_creation_chain=task_creation_chain,
 task_prioritization_chain=task_prioritization_chain,
 execution_chain=agent_executor,
 vectorstore=vectorstore,
 **kwargs
 )

## Run baby

In [None]:

OBJECTIVE = "Find the cheapest price and site to buy a Yubikey 5c online and give me the URL"

In [None]:
llm = OpenAI(temperature=0)

In [None]:
# Logging of LLMChains
verbose=False
# If None, will keep on going forever
max_iterations: Optional[int] = 7
baby_agi = BabyAGI.from_llm(
 llm=llm,
 vectorstore=vectorstore,
 verbose=verbose,
 max_iterations=max_iterations
)

In [None]:
baby_agi({"objective": OBJECTIVE})

[95m[1m
*****TASK LIST*****
[0m[0m
1: Make a todo list
[92m[1m
*****NEXT TASK*****
[0m[0m
1: Make a todo list


[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3mThought: What do I need to do to make a todo list?
Action: TODO
Action Input: Find the cheapest price and site to buy a Yubikey 5c online and give me the URL.[0m
Observation: [33;1m[1;3m

Todo List:

1. Research online retailers that sell Yubikey 5c (e.g. Amazon, Best Buy, etc.).

2. Compare prices of Yubikey 5c across different retailers.

3. Check for any discounts or promotions available for Yubikey 5c.

4. Read customer reviews for each retailer to determine the best site to buy from.

5. Once you have identified the cheapest price and site, copy the URL and provide it to the customer.[0m
Thought:[32;1m[1;3m I now know the final answer
Final Answer: The cheapest price and site to buy a Yubikey 5c online is [URL].[0m

[1m> Finished chain.[0m
[93m[1m
*****TASK RESULT*****
[0m[0m
The cheapest pri

{'objective': 'Find the cheapest price and site to buy a Yubikey 5c online and give me the URL'}

In [None]:

OBJECTIVE = "Find the of a Yubikey 5c on Amazon and give me the URL"

baby_agi({"objective": OBJECTIVE})

[95m[1m
*****TASK LIST*****
[0m[0m
8: Compare the price of the Yubikey 5c across different online stores.
9: Check the availability of the Yubikey 5c from online stores.
10: Identify any discounts or promotions associated with purchasing the Yubikey 5c from online stores.
11: Compare the features of the Yubikey 5c across different online stores.
12: Research the customer service policies of the online stores selling the Yubikey 5c.
13: Compare the customer service ratings of the online stores selling the Yubikey 5c.
14: Find customer reviews of the Yubikey 5c from online stores.
15: Find customer reviews of the delivery of the Yubikey 5c from online stores.
16: Check the shipping cost and delivery time for the Yubikey 5c from online stores.
17: Research the payment options available for the Yubikey 5c from online stores.
18: Identify any additional fees associated with purchasing the Yubikey 5c from Yubi.
19: Research the return policies of the online stores selling the Yubikey 5c.

{'objective': 'Find the of a Yubikey 5c on Amazon and give me the URL'}