from __future__ import annotations import ast import asyncio import copy import dataclasses import inspect import json import math import weakref from collections.abc import Awaitable, Callable, Mapping from dataclasses import dataclass, field from enum import Enum from types import UnionType from typing import ( TYPE_CHECKING, Annotated, Any, Concatenate, Generic, Literal, Protocol, TypeVar, Union, cast, get_args, get_origin, get_type_hints, overload, ) from openai.types.responses import CustomToolParam from openai.types.responses.file_search_tool_param import Filters, RankingOptions from openai.types.responses.response_computer_tool_call import ( PendingSafetyCheck, ResponseComputerToolCall, ) from openai.types.responses.response_output_item import LocalShellCall, McpApprovalRequest from openai.types.responses.tool_param import CodeInterpreter, ImageGeneration, Mcp from openai.types.responses.web_search_tool import Filters as WebSearchToolFilters from openai.types.responses.web_search_tool_param import UserLocation from pydantic import BaseModel, TypeAdapter, ValidationError, model_validator from typing_extensions import NotRequired, ParamSpec, TypedDict from . import _debug from ._tool_identity import ( get_explicit_function_tool_namespace, tool_qualified_name, validate_function_tool_lookup_configuration, validate_function_tool_namespace_shape, ) from .computer import AsyncComputer, Computer from .editor import ApplyPatchEditor, ApplyPatchOperation from .exceptions import ModelBehaviorError, ToolTimeoutError, UserError from .function_schema import DocstringStyle, function_schema from .logger import logger from .run_context import RunContextWrapper from .strict_schema import ensure_strict_json_schema from .tool_context import ToolContext from .tool_guardrails import ToolInputGuardrail, ToolOutputGuardrail from .tracing import SpanError from .util import _error_tracing from .util._tool_errors import get_trace_tool_error from .util._types import MaybeAwaitable if TYPE_CHECKING: from .agent import Agent, AgentBase from .items import RunItem, ToolApprovalItem ToolParams = ParamSpec("ToolParams") ToolFunctionWithoutContext = Callable[ToolParams, Any] ToolFunctionWithContext = Callable[Concatenate[RunContextWrapper[Any], ToolParams], Any] ToolFunctionWithToolContext = Callable[Concatenate[ToolContext, ToolParams], Any] ToolFunction = ( ToolFunctionWithoutContext[ToolParams] | ToolFunctionWithContext[ToolParams] | ToolFunctionWithToolContext[ToolParams] ) DEFAULT_APPROVAL_REJECTION_MESSAGE = "Tool execution was not approved." ToolTimeoutBehavior = Literal["error_as_result", "raise_exception"] ToolErrorFunction = Callable[[RunContextWrapper[Any], Exception], MaybeAwaitable[str]] CustomToolExecutor = Callable[[ToolContext[Any], str], MaybeAwaitable[Any]] CustomToolApprovalFunction = Callable[[RunContextWrapper[Any], str, str], MaybeAwaitable[bool]] _SYNC_FUNCTION_TOOL_MARKER = "__agents_sync_function_tool__" _UNSET_FAILURE_ERROR_FUNCTION = object() class ToolOutputText(BaseModel): """Represents a tool output that should be sent to the model as text.""" type: Literal["text"] = "text" text: str class ToolOutputTextDict(TypedDict, total=False): """TypedDict variant for text tool outputs.""" type: Literal["text"] text: str class ToolOutputImage(BaseModel): """Represents a tool output that should be sent to the model as an image. You can provide either an `image_url` (URL or data URL) or a `file_id` for previously uploaded content. The optional `detail` can control vision detail. """ type: Literal["image"] = "image" image_url: str | None = None file_id: str | None = None detail: Literal["low", "high", "auto"] | None = None @model_validator(mode="after") def check_at_least_one_required_field(self) -> ToolOutputImage: """Validate that at least one of image_url or file_id is provided.""" if self.image_url is None and self.file_id is None: raise ValueError("At least one of image_url or file_id must be provided") return self class ToolOutputImageDict(TypedDict, total=False): """TypedDict variant for image tool outputs.""" type: Literal["image"] image_url: NotRequired[str] file_id: NotRequired[str] detail: NotRequired[Literal["low", "high", "auto"]] class ToolOutputFileContent(BaseModel): """Represents a tool output that should be sent to the model as a file. Provide one of `file_data` (base64), `file_url`, or `file_id`. You may also provide an optional `filename` when using `file_data` to hint file name. """ type: Literal["file"] = "file" file_data: str | None = None file_url: str | None = None file_id: str | None = None filename: str | None = None @model_validator(mode="after") def check_at_least_one_required_field(self) -> ToolOutputFileContent: """Validate that at least one of file_data, file_url, or file_id is provided.""" if self.file_data is None and self.file_url is None and self.file_id is None: raise ValueError("At least one of file_data, file_url, or file_id must be provided") return self class ToolOutputFileContentDict(TypedDict, total=False): """TypedDict variant for file content tool outputs.""" type: Literal["file"] file_data: NotRequired[str] file_url: NotRequired[str] file_id: NotRequired[str] filename: NotRequired[str] ValidToolOutputPydanticModels = ToolOutputText | ToolOutputImage | ToolOutputFileContent ValidToolOutputPydanticModelsTypeAdapter: TypeAdapter[ValidToolOutputPydanticModels] = TypeAdapter( ValidToolOutputPydanticModels ) class ToolOriginType(str, Enum): """Enumerates the runtime source of a function-tool-backed run item.""" FUNCTION = "function" MCP = "mcp" AGENT_AS_TOOL = "agent_as_tool" @dataclass(frozen=True) class ToolOrigin: """Serializable metadata describing where a function-tool-backed item came from.""" type: ToolOriginType mcp_server_name: str | None = None agent_name: str | None = None agent_tool_name: str | None = None def to_json_dict(self) -> dict[str, str]: """Convert the metadata to a JSON-compatible dict.""" result: dict[str, str] = {"type": self.type.value} if self.mcp_server_name is not None: result["mcp_server_name"] = self.mcp_server_name if self.agent_name is not None: result["agent_name"] = self.agent_name if self.agent_tool_name is not None: result["agent_tool_name"] = self.agent_tool_name return result @classmethod def from_json_dict(cls, data: Any) -> ToolOrigin | None: """Deserialize tool origin metadata from JSON-compatible data.""" if not isinstance(data, Mapping): return None raw_type = data.get("type") if not isinstance(raw_type, str): return None try: origin_type = ToolOriginType(raw_type) except ValueError: return None def _optional_string(key: str) -> str | None: value = data.get(key) return value if isinstance(value, str) else None return cls( type=origin_type, mcp_server_name=_optional_string("mcp_server_name"), agent_name=_optional_string("agent_name"), agent_tool_name=_optional_string("agent_tool_name"), ) ComputerLike = Computer | AsyncComputer ComputerT = TypeVar("ComputerT", bound=ComputerLike) ComputerT_co = TypeVar("ComputerT_co", bound=ComputerLike, covariant=True) ComputerT_contra = TypeVar("ComputerT_contra", bound=ComputerLike, contravariant=True) class ComputerCreate(Protocol[ComputerT_co]): """Initializes a computer for the current run context.""" def __call__(self, *, run_context: RunContextWrapper[Any]) -> MaybeAwaitable[ComputerT_co]: ... class ComputerDispose(Protocol[ComputerT_contra]): """Cleans up a computer initialized for a run context.""" def __call__( self, *, run_context: RunContextWrapper[Any], computer: ComputerT_contra, ) -> MaybeAwaitable[None]: ... @dataclass class ComputerProvider(Generic[ComputerT]): """Configures create/dispose hooks for per-run computer lifecycle management.""" create: ComputerCreate[ComputerT] dispose: ComputerDispose[ComputerT] | None = None ComputerConfig = ComputerLike | ComputerCreate[Any] | ComputerProvider[Any] @dataclass class FunctionToolResult: tool: FunctionTool """The tool that was run.""" output: Any """The output of the tool.""" run_item: RunItem | None """The run item that was produced as a result of the tool call. This can be None when the tool run is interrupted and no output item should be emitted yet. """ interruptions: list[ToolApprovalItem] = field(default_factory=list) """Interruptions from nested agent runs (for agent-as-tool).""" agent_run_result: Any = None # RunResult | None, but avoid circular import """Nested agent run result (for agent-as-tool).""" @dataclass class FunctionTool: """A tool that wraps a function. In most cases, you should use the `function_tool` helpers to create a FunctionTool, as they let you easily wrap a Python function. """ name: str """The name of the tool, as shown to the LLM. Generally the name of the function.""" description: str """A description of the tool, as shown to the LLM.""" params_json_schema: dict[str, Any] """The JSON schema for the tool's parameters.""" on_invoke_tool: Callable[[ToolContext[Any], str], Awaitable[Any]] """A function that invokes the tool with the given context and parameters. The params passed are: 1. The tool run context. 2. The arguments from the LLM, as a JSON string. You must return a one of the structured tool output types (e.g. ToolOutputText, ToolOutputImage, ToolOutputFileContent) or a string representation of the tool output, or a list of them, or something we can call `str()` on. In case of errors, you can either raise an Exception (which will cause the run to fail) or return a string error message (which will be sent back to the LLM). """ strict_json_schema: bool = True """Whether the JSON schema is in strict mode. We **strongly** recommend setting this to True, as it increases the likelihood of correct JSON input.""" is_enabled: bool | Callable[[RunContextWrapper[Any], AgentBase], MaybeAwaitable[bool]] = True """Whether the tool is enabled. Either a bool or a Callable that takes the run context and agent and returns whether the tool is enabled. You can use this to dynamically enable/disable a tool based on your context/state.""" # Keep guardrail fields before needs_approval to preserve v0.7.0 positional # constructor compatibility for public FunctionTool callers. # Tool-specific guardrails. tool_input_guardrails: list[ToolInputGuardrail[Any]] | None = None """Optional list of input guardrails to run before invoking this tool.""" tool_output_guardrails: list[ToolOutputGuardrail[Any]] | None = None """Optional list of output guardrails to run after invoking this tool.""" needs_approval: ( bool | Callable[[RunContextWrapper[Any], dict[str, Any], str], Awaitable[bool]] ) = False """Whether the tool needs approval before execution. If True, the run will be interrupted and the tool call will need to be approved using RunState.approve() or rejected using RunState.reject() before continuing. Can be a bool (always/never needs approval) or a function that takes (run_context, tool_parameters, call_id) and returns whether this specific call needs approval.""" # Keep timeout fields after needs_approval to preserve positional constructor compatibility. timeout_seconds: float | None = None """Optional timeout (seconds) for each tool invocation.""" timeout_behavior: ToolTimeoutBehavior = "error_as_result" """How to handle timeout events. - "error_as_result": return a model-visible timeout error string. - "raise_exception": raise a ToolTimeoutError and fail the run. """ timeout_error_function: ToolErrorFunction | None = None """Optional formatter for timeout errors when timeout_behavior is "error_as_result".""" defer_loading: bool = False """Whether the Responses API should hide this tool definition until tool search loads it.""" _failure_error_function: ToolErrorFunction | None = field( default=None, kw_only=True, repr=False, ) """Internal error formatter metadata used for synthetic tool-failure outputs.""" _use_default_failure_error_function: bool = field( default=True, kw_only=True, repr=False, ) """Whether runtime-generated tool failures should use the default formatter.""" _is_agent_tool: bool = field(default=False, kw_only=True, repr=False) """Internal flag indicating if this tool is an agent-as-tool.""" _is_codex_tool: bool = field(default=False, kw_only=True, repr=False) """Internal flag indicating if this tool is a Codex tool wrapper.""" _agent_instance: Any = field(default=None, kw_only=True, repr=False) """Internal reference to the agent instance if this is an agent-as-tool.""" _tool_namespace: str | None = field(default=None, kw_only=True, repr=False) """Internal namespace metadata used to group function tools for the Responses API.""" _tool_namespace_description: str | None = field(default=None, kw_only=True, repr=False) """Internal namespace description used when serializing grouped function tools.""" _mcp_title: str | None = field(default=None, kw_only=True, repr=False) """Internal MCP display title used for ToolCallItem metadata.""" _tool_origin: ToolOrigin | None = field(default=None, kw_only=True, repr=False) """Internal scalar metadata describing the origin of function-tool-backed items.""" _emit_tool_origin: bool = field(default=True, kw_only=True, repr=False) """Whether runtime item generation should emit tool origin metadata for this tool.""" @property def qualified_name(self) -> str: """Return the public qualified name used to identify this function tool.""" return ( tool_qualified_name(self.name, get_explicit_function_tool_namespace(self)) or self.name ) def __post_init__(self): bind_to_function_tool = getattr(self.on_invoke_tool, "__agents_bind_function_tool__", None) if callable(bind_to_function_tool): self.on_invoke_tool = bind_to_function_tool(self) if self.strict_json_schema: self.params_json_schema = ensure_strict_json_schema( copy.deepcopy(self.params_json_schema) ) _validate_function_tool_timeout_config(self) def __copy__(self) -> FunctionTool: copied_tool = dataclasses.replace(self) dataclass_field_names = {tool_field.name for tool_field in dataclasses.fields(FunctionTool)} for tool_field in dataclasses.fields(FunctionTool): if tool_field.init: continue setattr(copied_tool, tool_field.name, getattr(self, tool_field.name)) for attr_name, attr_value in self.__dict__.items(): if attr_name not in dataclass_field_names: setattr(copied_tool, attr_name, attr_value) return copied_tool class _FailureHandlingFunctionToolInvoker: """Internal callable that rebinds wrapper error handling for copied FunctionTools.""" def __init__( self, invoke_tool_impl: Callable[[ToolContext[Any], str], Awaitable[Any]], on_handled_error: Callable[[FunctionTool, Exception, str, ToolContext[Any]], None], *, function_tool: FunctionTool | None = None, ) -> None: self._invoke_tool_impl = invoke_tool_impl self._on_handled_error = on_handled_error self._function_tool = function_tool def __agents_bind_function_tool__( self, function_tool: FunctionTool ) -> _FailureHandlingFunctionToolInvoker: if self._function_tool is function_tool: return self bound_invoker = _FailureHandlingFunctionToolInvoker( self._invoke_tool_impl, self._on_handled_error, function_tool=function_tool, ) if getattr(self, _SYNC_FUNCTION_TOOL_MARKER, False): setattr(bound_invoker, _SYNC_FUNCTION_TOOL_MARKER, True) return bound_invoker async def __call__(self, ctx: ToolContext[Any], input: str) -> Any: try: return await self._invoke_tool_impl(ctx, input) except Exception as e: assert self._function_tool is not None result = await maybe_invoke_function_tool_failure_error_function( function_tool=self._function_tool, context=ctx, error=e, ) if result is None: raise self._on_handled_error(self._function_tool, e, input, ctx) return result def with_function_tool_failure_error_handler( invoke_tool_impl: Callable[[ToolContext[Any], str], Awaitable[Any]], on_handled_error: Callable[[FunctionTool, Exception, str], None], ) -> Callable[[ToolContext[Any], str], Awaitable[Any]]: """Wrap a tool invoker so copied FunctionTools resolve failure policy against themselves.""" def _on_handled_error_with_context( function_tool: FunctionTool, error: Exception, input_json: str, _context: ToolContext[Any], ) -> None: on_handled_error(function_tool, error, input_json) return _with_context_function_tool_failure_error_handler( invoke_tool_impl, _on_handled_error_with_context, ) def _with_context_function_tool_failure_error_handler( invoke_tool_impl: Callable[[ToolContext[Any], str], Awaitable[Any]], on_handled_error: Callable[[FunctionTool, Exception, str, ToolContext[Any]], None], ) -> Callable[[ToolContext[Any], str], Awaitable[Any]]: """Wrap a tool invoker with context-aware handled-error reporting.""" return _FailureHandlingFunctionToolInvoker(invoke_tool_impl, on_handled_error) def _build_wrapped_function_tool( *, name: str, description: str, params_json_schema: dict[str, Any], invoke_tool_impl: Callable[[ToolContext[Any], str], Awaitable[Any]], on_handled_error: Callable[[FunctionTool, Exception, str, ToolContext[Any]], None], failure_error_function: ToolErrorFunction | None | object = _UNSET_FAILURE_ERROR_FUNCTION, strict_json_schema: bool = True, is_enabled: bool | Callable[[RunContextWrapper[Any], AgentBase], MaybeAwaitable[bool]] = True, tool_input_guardrails: list[ToolInputGuardrail[Any]] | None = None, tool_output_guardrails: list[ToolOutputGuardrail[Any]] | None = None, needs_approval: ( bool | Callable[[RunContextWrapper[Any], dict[str, Any], str], Awaitable[bool]] ) = False, timeout_seconds: float | None = None, timeout_behavior: ToolTimeoutBehavior = "error_as_result", timeout_error_function: ToolErrorFunction | None = None, defer_loading: bool = False, sync_invoker: bool = False, mcp_title: str | None = None, tool_origin: ToolOrigin | None = None, ) -> FunctionTool: """Create a FunctionTool with copied-tool-aware failure handling bound in one place.""" on_invoke_tool = _with_context_function_tool_failure_error_handler( invoke_tool_impl, on_handled_error, ) if sync_invoker: setattr(on_invoke_tool, _SYNC_FUNCTION_TOOL_MARKER, True) return set_function_tool_failure_error_function( FunctionTool( name=name, description=description, params_json_schema=params_json_schema, on_invoke_tool=on_invoke_tool, strict_json_schema=strict_json_schema, is_enabled=is_enabled, tool_input_guardrails=tool_input_guardrails, tool_output_guardrails=tool_output_guardrails, needs_approval=needs_approval, timeout_seconds=timeout_seconds, timeout_behavior=timeout_behavior, timeout_error_function=timeout_error_function, defer_loading=defer_loading, _mcp_title=mcp_title, _tool_origin=tool_origin, ), failure_error_function, ) def get_function_tool_origin(function_tool: FunctionTool) -> ToolOrigin | None: """Return scalar origin metadata for a function tool.""" if not function_tool._emit_tool_origin: return None return function_tool._tool_origin or ToolOrigin(type=ToolOriginType.FUNCTION) @dataclass class FileSearchTool: """A hosted tool that lets the LLM search through a vector store. Currently only supported with OpenAI models, using the Responses API. """ vector_store_ids: list[str] """The IDs of the vector stores to search.""" max_num_results: int | None = None """The maximum number of results to return.""" include_search_results: bool = False """Whether to include the search results in the output produced by the LLM.""" ranking_options: RankingOptions | None = None """Ranking options for search.""" filters: Filters | None = None """A filter to apply based on file attributes.""" @property def name(self): return "file_search" @dataclass class WebSearchTool: """A hosted tool that lets the LLM search the web. Currently only supported with OpenAI models, using the Responses API. """ user_location: UserLocation | None = None """Optional location for the search. Lets you customize results to be relevant to a location.""" filters: WebSearchToolFilters | None = None """A filter to apply based on file attributes.""" search_context_size: Literal["low", "medium", "high"] = "medium" """The amount of context to use for the search.""" external_web_access: bool | None = None """Whether the web search tool may fetch live internet content. When omitted, the API default is used. Set to `False` to request cached or indexed-only behavior where supported. """ @property def name(self): return "web_search" @dataclass(eq=False) class ComputerTool(Generic[ComputerT]): """A local computer harness exposed through the Responses API computer tool.""" computer: ComputerT | ComputerCreate[ComputerT] | ComputerProvider[ComputerT] """The computer implementation, or a factory that produces a computer per run.""" on_safety_check: Callable[[ComputerToolSafetyCheckData], MaybeAwaitable[bool]] | None = None """Optional callback to acknowledge computer tool safety checks.""" def __post_init__(self) -> None: _store_computer_initializer(self) @property def name(self): # Keep the released preview-era runtime name for hooks and persisted # RunState compatibility. The Responses serializer selects the actual # wire tool type separately. return "computer_use_preview" @property def trace_name(self): # Tracing should display the GA tool alias even while runtime names preserve compatibility. return "computer" @dataclass class _ResolvedComputer: computer: ComputerLike dispose: ComputerDispose[ComputerLike] | None = None _computer_cache: weakref.WeakKeyDictionary[ ComputerTool[Any], weakref.WeakKeyDictionary[RunContextWrapper[Any], _ResolvedComputer], ] = weakref.WeakKeyDictionary() _computer_initializer_map: weakref.WeakKeyDictionary[ComputerTool[Any], ComputerConfig] = ( weakref.WeakKeyDictionary() ) _computers_by_run_context: weakref.WeakKeyDictionary[ RunContextWrapper[Any], dict[ComputerTool[Any], _ResolvedComputer] ] = weakref.WeakKeyDictionary() async def resolve_computer( *, tool: ComputerTool[Any], run_context: RunContextWrapper[Any] ) -> ComputerLike: """Resolve a computer for a given run context, initializing it if needed.""" per_context = _computer_cache.get(tool) if per_context is None: per_context = weakref.WeakKeyDictionary() _computer_cache[tool] = per_context cached = per_context.get(run_context) if cached is not None: _track_resolved_computer(tool=tool, run_context=run_context, resolved=cached) return cached.computer initializer_config = _get_computer_initializer(tool) lifecycle: ComputerProvider[Any] | None = ( cast(ComputerProvider[Any], initializer_config) if _is_computer_provider(initializer_config) else None ) initializer: ComputerCreate[Any] | None = None disposer: ComputerDispose[Any] | None = lifecycle.dispose if lifecycle else None if lifecycle is not None: initializer = lifecycle.create elif callable(initializer_config): initializer = initializer_config elif _is_computer_provider(tool.computer): lifecycle_provider = cast(ComputerProvider[Any], tool.computer) initializer = lifecycle_provider.create disposer = lifecycle_provider.dispose if initializer: computer_candidate = initializer(run_context=run_context) computer = ( await computer_candidate if inspect.isawaitable(computer_candidate) else computer_candidate ) else: computer = cast(ComputerLike, tool.computer) if not isinstance(computer, Computer | AsyncComputer): raise UserError("The computer tool did not provide a computer instance.") resolved = _ResolvedComputer(computer=computer, dispose=disposer) per_context[run_context] = resolved _track_resolved_computer(tool=tool, run_context=run_context, resolved=resolved) tool.computer = computer return computer async def dispose_resolved_computers(*, run_context: RunContextWrapper[Any]) -> None: """Dispose any computer instances created for the provided run context.""" resolved_by_tool = _computers_by_run_context.pop(run_context, None) if not resolved_by_tool: return disposers: list[tuple[ComputerDispose[ComputerLike], ComputerLike]] = [] for tool, _resolved in resolved_by_tool.items(): per_context = _computer_cache.get(tool) if per_context is not None: per_context.pop(run_context, None) initializer = _get_computer_initializer(tool) if initializer is not None: tool.computer = initializer if _resolved.dispose is not None: disposers.append((_resolved.dispose, _resolved.computer)) for dispose, computer in disposers: try: result = dispose(run_context=run_context, computer=computer) if inspect.isawaitable(result): await result except Exception as exc: logger.warning("Failed to dispose computer for run context: %s", exc) @dataclass class ComputerToolSafetyCheckData: """Information about a computer tool safety check.""" ctx_wrapper: RunContextWrapper[Any] """The run context.""" agent: Agent[Any] """The agent performing the computer action.""" tool_call: ResponseComputerToolCall """The computer tool call.""" safety_check: PendingSafetyCheck """The pending safety check to acknowledge.""" @dataclass class MCPToolApprovalRequest: """A request to approve a tool call.""" ctx_wrapper: RunContextWrapper[Any] """The run context.""" data: McpApprovalRequest """The data from the MCP tool approval request.""" class MCPToolApprovalFunctionResult(TypedDict): """The result of an MCP tool approval function.""" approve: bool """Whether to approve the tool call.""" reason: NotRequired[str] """An optional reason, if rejected.""" MCPToolApprovalFunction = Callable[ [MCPToolApprovalRequest], MaybeAwaitable[MCPToolApprovalFunctionResult] ] """A function that approves or rejects a tool call.""" ShellApprovalFunction = Callable[ [RunContextWrapper[Any], "ShellActionRequest", str], MaybeAwaitable[bool] ] """A function that determines whether a shell action requires approval. Takes (run_context, action, call_id) and returns whether approval is needed. """ class ShellOnApprovalFunctionResult(TypedDict): """The result of a shell tool on_approval callback.""" approve: bool """Whether to approve the tool call.""" reason: NotRequired[str] """An optional reason, if rejected.""" ShellOnApprovalFunction = Callable[ [RunContextWrapper[Any], "ToolApprovalItem"], MaybeAwaitable[ShellOnApprovalFunctionResult] ] """A function that auto-approves or rejects a shell tool call when approval is needed. Takes (run_context, approval_item) and returns approval decision. """ ApplyPatchApprovalFunction = Callable[ [RunContextWrapper[Any], ApplyPatchOperation, str], MaybeAwaitable[bool] ] """A function that determines whether an apply_patch operation requires approval. Takes (run_context, operation, call_id) and returns whether approval is needed. """ class ApplyPatchOnApprovalFunctionResult(TypedDict): """The result of an apply_patch tool on_approval callback.""" approve: bool """Whether to approve the tool call.""" reason: NotRequired[str] """An optional reason, if rejected.""" ApplyPatchOnApprovalFunction = Callable[ [RunContextWrapper[Any], "ToolApprovalItem"], MaybeAwaitable[ApplyPatchOnApprovalFunctionResult] ] """A function that auto-approves or rejects an apply_patch tool call when approval is needed. Takes (run_context, approval_item) and returns approval decision. """ class CustomToolOnApprovalFunctionResult(TypedDict): """The result of a custom tool on_approval callback.""" approve: bool """Whether to approve the tool call.""" reason: NotRequired[str] """An optional reason, if rejected.""" CustomToolOnApprovalFunction = Callable[ [RunContextWrapper[Any], "ToolApprovalItem"], MaybeAwaitable[CustomToolOnApprovalFunctionResult] ] """A function that auto-approves or rejects a custom tool call when approval is needed. Takes (run_context, approval_item) and returns approval decision. """ @dataclass class HostedMCPTool: """A tool that allows the LLM to use a remote MCP server. The LLM will automatically list and call tools, without requiring a round trip back to your code. If you want to run MCP servers locally via stdio, in a VPC or other non-publicly-accessible environment, or you just prefer to run tool calls locally, then you can instead use the servers in `agents.mcp` and pass `Agent(mcp_servers=[...])` to the agent.""" tool_config: Mcp """The MCP tool config, which includes the server URL and other settings.""" on_approval_request: MCPToolApprovalFunction | None = None """An optional function that will be called if approval is requested for an MCP tool. If not provided, you will need to manually add approvals/rejections to the input and call `Runner.run(...)` again.""" @property def name(self): return "hosted_mcp" @dataclass class CodeInterpreterTool: """A tool that allows the LLM to execute code in a sandboxed environment.""" tool_config: CodeInterpreter """The tool config, which includes the container and other settings.""" @property def name(self): return "code_interpreter" @dataclass class ImageGenerationTool: """A tool that allows the LLM to generate images.""" tool_config: ImageGeneration """The tool config, which image generation settings.""" @property def name(self): return "image_generation" @dataclass class LocalShellCommandRequest: """A request to execute a command on a shell.""" ctx_wrapper: RunContextWrapper[Any] """The run context.""" data: LocalShellCall """The data from the local shell tool call.""" LocalShellExecutor = Callable[[LocalShellCommandRequest], MaybeAwaitable[str]] """A function that executes a command on a shell.""" @dataclass class LocalShellTool: """A tool that allows the LLM to execute commands on a shell. For more details, see: https://platform.openai.com/docs/guides/tools-local-shell """ executor: LocalShellExecutor """A function that executes a command on a shell.""" @property def name(self): return "local_shell" class ShellToolLocalSkill(TypedDict): """Skill metadata for local shell environments.""" description: str name: str path: str class ShellToolSkillReference(TypedDict): """Reference to a hosted shell skill.""" type: Literal["skill_reference"] skill_id: str version: NotRequired[str] class ShellToolInlineSkillSource(TypedDict): """Inline skill source payload.""" data: str media_type: Literal["application/zip"] type: Literal["base64"] class ShellToolInlineSkill(TypedDict): """Inline hosted shell skill bundle.""" description: str name: str source: ShellToolInlineSkillSource type: Literal["inline"] ShellToolContainerSkill = ShellToolSkillReference | ShellToolInlineSkill """Container skill configuration.""" class ShellToolContainerNetworkPolicyDomainSecret(TypedDict): """A secret bound to a single domain in allowlist mode.""" domain: str name: str value: str class ShellToolContainerNetworkPolicyAllowlist(TypedDict): """Allowlist network policy for hosted containers.""" allowed_domains: list[str] type: Literal["allowlist"] domain_secrets: NotRequired[list[ShellToolContainerNetworkPolicyDomainSecret]] class ShellToolContainerNetworkPolicyDisabled(TypedDict): """Disabled network policy for hosted containers.""" type: Literal["disabled"] ShellToolContainerNetworkPolicy = ( ShellToolContainerNetworkPolicyAllowlist | ShellToolContainerNetworkPolicyDisabled ) """Network policy configuration for hosted shell containers.""" class ShellToolLocalEnvironment(TypedDict): """Local shell execution environment.""" type: Literal["local"] skills: NotRequired[list[ShellToolLocalSkill]] class ShellToolContainerAutoEnvironment(TypedDict): """Auto-provisioned hosted container environment.""" type: Literal["container_auto"] file_ids: NotRequired[list[str]] memory_limit: NotRequired[Literal["1g", "4g", "16g", "64g"] | None] network_policy: NotRequired[ShellToolContainerNetworkPolicy] skills: NotRequired[list[ShellToolContainerSkill]] class ShellToolContainerReferenceEnvironment(TypedDict): """Reference to an existing hosted container.""" type: Literal["container_reference"] container_id: str ShellToolHostedEnvironment = ( ShellToolContainerAutoEnvironment | ShellToolContainerReferenceEnvironment ) """Hosted shell environment variants.""" ShellToolEnvironment = ShellToolLocalEnvironment | ShellToolHostedEnvironment """All supported shell environments.""" @dataclass class ShellCallOutcome: """Describes the terminal condition of a shell command.""" type: Literal["exit", "timeout"] exit_code: int | None = None @dataclass class ShellCommandOutput: """Structured output for a single shell command execution.""" stdout: str = "" stderr: str = "" outcome: ShellCallOutcome = field(default_factory=lambda: ShellCallOutcome(type="exit")) command: str | None = None provider_data: dict[str, Any] | None = None @property def exit_code(self) -> int | None: return self.outcome.exit_code @property def status(self) -> Literal["completed", "timeout"]: return "timeout" if self.outcome.type == "timeout" else "completed" @dataclass class ShellResult: """Result returned by a shell executor.""" output: list[ShellCommandOutput] max_output_length: int | None = None provider_data: dict[str, Any] | None = None @dataclass class ShellActionRequest: """Action payload for a next-generation shell call.""" commands: list[str] timeout_ms: int | None = None max_output_length: int | None = None @dataclass class ShellCallData: """Normalized shell call data provided to shell executors.""" call_id: str action: ShellActionRequest status: Literal["in_progress", "completed"] | None = None raw: Any | None = None @dataclass class ShellCommandRequest: """A request to execute a modern shell call.""" ctx_wrapper: RunContextWrapper[Any] data: ShellCallData ShellExecutor = Callable[[ShellCommandRequest], MaybeAwaitable[str | ShellResult]] """Executes a shell command sequence and returns either text or structured output.""" def _normalize_shell_tool_environment( environment: ShellToolEnvironment | None, ) -> ShellToolEnvironment: """Normalize shell environment into a predictable mapping shape.""" if environment is None: return {"type": "local"} if not isinstance(environment, Mapping): raise UserError("ShellTool environment must be a mapping.") normalized = dict(environment) if "type" not in normalized: normalized["type"] = "local" return cast(ShellToolEnvironment, normalized) @dataclass class ShellTool: """Next-generation shell tool. LocalShellTool will be deprecated in favor of this.""" executor: ShellExecutor | None = None name: str = "shell" needs_approval: bool | ShellApprovalFunction = False """Whether the shell tool needs approval before execution. If True, the run will be interrupted and the tool call will need to be approved using RunState.approve() or rejected using RunState.reject() before continuing. Can be a bool (always/never needs approval) or a function that takes (run_context, action, call_id) and returns whether this specific call needs approval. """ on_approval: ShellOnApprovalFunction | None = None """Optional handler to auto-approve or reject when approval is required. If provided, it will be invoked immediately when an approval is needed. """ environment: ShellToolEnvironment | None = None """Execution environment for shell commands. If omitted, local mode is used. """ def __post_init__(self) -> None: """Validate shell tool configuration and normalize environment fields.""" normalized_environment = _normalize_shell_tool_environment(self.environment) self.environment = normalized_environment environment_type = normalized_environment["type"] if environment_type == "local": if self.executor is None: raise UserError("ShellTool with local environment requires an executor.") return if self.executor is not None: raise UserError("ShellTool with hosted environment does not accept an executor.") if self.needs_approval is not False or self.on_approval is not None: raise UserError( "ShellTool with hosted environment does not support needs_approval or on_approval." ) self.needs_approval = False self.on_approval = None @property def type(self) -> str: return "shell" @dataclass class ApplyPatchTool: """Hosted apply_patch tool. Lets the model request file mutations via unified diffs.""" editor: ApplyPatchEditor name: str = "apply_patch" needs_approval: bool | ApplyPatchApprovalFunction = False """Whether the apply_patch tool needs approval before execution. If True, the run will be interrupted and the tool call will need to be approved using RunState.approve() or rejected using RunState.reject() before continuing. Can be a bool (always/never needs approval) or a function that takes (run_context, operation, call_id) and returns whether this specific call needs approval. """ on_approval: ApplyPatchOnApprovalFunction | None = None """Optional handler to auto-approve or reject when approval is required. If provided, it will be invoked immediately when an approval is needed. """ @property def type(self) -> str: return "apply_patch" @dataclass class CustomTool: """A Responses custom tool that uses one raw string input instead of JSON arguments.""" name: str description: str on_invoke_tool: CustomToolExecutor format: object | None = None needs_approval: bool | CustomToolApprovalFunction = False """Whether the raw custom tool call needs approval before execution.""" on_approval: CustomToolOnApprovalFunction | None = None """Optional handler to auto-approve or reject when approval is required.""" defer_loading: bool = False tool_config: CustomToolParam = field(init=False, repr=False) def __post_init__(self) -> None: tool_config: CustomToolParam = { "type": "custom", "name": self.name, "description": self.description, } if self.format is not None: tool_config["format"] = self.format # type: ignore[typeddict-item] if self.defer_loading: tool_config["defer_loading"] = True self.tool_config = tool_config def runtime_needs_approval(self) -> bool | CustomToolApprovalFunction: """Return the callable/bool approval setting used by runtime execution.""" return self.needs_approval def runtime_on_approval(self) -> CustomToolOnApprovalFunction | None: """Return the approval callback used by runtime execution.""" return self.on_approval @property def type(self) -> str: return "custom" @dataclass class ToolSearchTool: """A hosted Responses API tool that lets the model search deferred tools by namespace. `execution="client"` is supported for manual Responses orchestration, but the standard OpenAI Agents runner does not auto-execute client tool search calls. """ description: str | None = None execution: Literal["server", "client"] | None = None parameters: object | None = None @property def name(self) -> str: return "tool_search" Tool = ( FunctionTool | FileSearchTool | WebSearchTool | ComputerTool[Any] | HostedMCPTool | CustomTool | ShellTool | ApplyPatchTool | LocalShellTool | ImageGenerationTool | CodeInterpreterTool | ToolSearchTool ) """A tool that can be used in an agent.""" def tool_namespace( *, name: str, description: str | None, tools: list[FunctionTool], ) -> list[FunctionTool]: """Attach namespace metadata to function tools for OpenAI Responses tool search.""" if not isinstance(name, str) or not name.strip(): raise UserError("tool_namespace() requires a non-empty namespace name.") if not isinstance(description, str) or not description.strip(): raise UserError("tool_namespace() requires a non-empty description.") if any(not isinstance(tool, FunctionTool) for tool in tools): raise UserError("tool_namespace() only supports FunctionTool instances.") namespace_name = name.strip() normalized_description = description.strip() namespaced_tools: list[FunctionTool] = [] for tool in tools: validate_function_tool_namespace_shape(tool.name, namespace_name) namespaced_tool = copy.copy(tool) namespaced_tool._tool_namespace = namespace_name namespaced_tool._tool_namespace_description = normalized_description namespaced_tools.append(namespaced_tool) return namespaced_tools def get_function_tool_responses_only_features(tool: FunctionTool) -> tuple[str, ...]: """Return Responses-only features used by a function tool.""" features: list[str] = [] if get_explicit_function_tool_namespace(tool) is not None: features.append("tool_namespace()") if tool.defer_loading: features.append("defer_loading=True") return tuple(features) def ensure_function_tool_supports_responses_only_features( tool: FunctionTool, *, backend_name: str, ) -> None: """Reject Responses-only function-tool features on unsupported backends.""" unsupported_features = get_function_tool_responses_only_features(tool) if not unsupported_features: return tool_name = tool.qualified_name raise UserError( "The following function-tool features are only supported with OpenAI Responses " f"models: {', '.join(unsupported_features)}. " f"Tool `{tool_name}` cannot be used with {backend_name}." ) def ensure_tool_choice_supports_backend( tool_choice: Literal["auto", "required", "none"] | str | Any | None, *, backend_name: str, ) -> None: """Backend-specific converters should validate reserved tool choices.""" return None def is_responses_tool_search_surface(tool: Tool) -> bool: """Return True when a tool can be exposed through hosted Responses tool search.""" if isinstance(tool, FunctionTool): return tool.defer_loading or get_explicit_function_tool_namespace(tool) is not None if isinstance(tool, HostedMCPTool): return bool(tool.tool_config.get("defer_loading")) return False def has_responses_tool_search_surface(tools: list[Tool]) -> bool: """Return True when tool search has at least one eligible searchable surface.""" return any(is_responses_tool_search_surface(tool) for tool in tools) def is_required_tool_search_surface(tool: Tool) -> bool: """Return True when a tool requires ToolSearchTool() to stay reachable.""" if isinstance(tool, FunctionTool): return tool.defer_loading if isinstance(tool, HostedMCPTool): return bool(tool.tool_config.get("defer_loading")) return False def has_required_tool_search_surface(tools: list[Tool]) -> bool: """Return True when any enabled surface requires ToolSearchTool().""" return any(is_required_tool_search_surface(tool) for tool in tools) def validate_responses_tool_search_configuration( tools: list[Tool], *, allow_opaque_search_surface: bool = False, ) -> None: """Validate the Responses-only tool_search and defer-loading contract.""" tool_search_tools = [tool for tool in tools if isinstance(tool, ToolSearchTool)] tool_search_count = len(tool_search_tools) has_tool_search = tool_search_count > 0 has_tool_search_surface = has_responses_tool_search_surface(tools) has_required_tool_search = has_required_tool_search_surface(tools) if tool_search_count > 1: raise UserError("Only one ToolSearchTool() is allowed when using OpenAI Responses models.") validate_function_tool_lookup_configuration(tools) if has_required_tool_search and not has_tool_search: raise UserError( "Deferred-loading Responses tools require ToolSearchTool() when using OpenAI " "Responses models." ) if has_tool_search and not has_tool_search_surface and not allow_opaque_search_surface: raise UserError( "ToolSearchTool() requires at least one searchable Responses surface: a " "tool_namespace(...) function tool, a deferred-loading function tool " "(`function_tool(..., defer_loading=True)`), or a deferred-loading hosted MCP " "server (`HostedMCPTool(tool_config={..., 'defer_loading': True})`)." ) def prune_orphaned_tool_search_tools(tools: list[Tool]) -> list[Tool]: """Preserve explicit ToolSearchTool entries until request conversion validates them. Whether a tool_search definition is valid can depend on prompt-managed surfaces that are only known during request conversion, so pruning here hides misconfiguration instead of surfacing a clear error. """ return tools def _extract_json_decode_error(error: BaseException) -> json.JSONDecodeError | None: current: BaseException | None = error while current is not None: if isinstance(current, json.JSONDecodeError): return current current = current.__cause__ or current.__context__ return None def _extract_tool_argument_json_error(error: Exception) -> json.JSONDecodeError | None: if not isinstance(error, ModelBehaviorError): return None if not str(error).startswith("Invalid JSON input for tool"): return None return _extract_json_decode_error(error) def _build_handled_function_tool_error_handler( *, span_message: str, log_label: str, span_message_for_json_decode_error: str | None = None, include_input_json_in_logs: bool = True, include_tool_name_in_log_messages: bool = True, ) -> Callable[[FunctionTool, Exception, str, ToolContext[Any]], None]: """Create a consistent handled-error reporter for wrapped FunctionTools.""" def _on_handled_error( function_tool: FunctionTool, error: Exception, input_json: str, context: ToolContext[Any], ) -> None: json_decode_error = _extract_tool_argument_json_error(error) if json_decode_error is not None and span_message_for_json_decode_error is not None: resolved_span_message = span_message_for_json_decode_error span_error_detail = str(json_decode_error) else: resolved_span_message = span_message span_error_detail = str(error) trace_include_sensitive_data = ( context.run_config is None or context.run_config.trace_include_sensitive_data ) trace_error = get_trace_tool_error( trace_include_sensitive_data=trace_include_sensitive_data, error_message=span_error_detail, ) _error_tracing.attach_error_to_current_span( SpanError( message=resolved_span_message, data={ "tool_name": function_tool.name, "error": trace_error, }, ) ) log_prefix = ( f"{log_label} {function_tool.name}" if include_tool_name_in_log_messages else log_label ) if _debug.DONT_LOG_TOOL_DATA: logger.debug(f"{log_prefix} failed") return if include_input_json_in_logs: logger.error(f"{log_prefix} failed: {input_json} {error}", exc_info=error) else: logger.error(f"{log_prefix} failed: {error}", exc_info=error) return _on_handled_error def _parse_function_tool_json_input(*, tool_name: str, input_json: str) -> dict[str, Any]: """Decode raw tool arguments with consistent diagnostics.""" try: parsed = json.loads(input_json) if input_json else {} except Exception as exc: if _debug.DONT_LOG_TOOL_DATA: logger.debug(f"Invalid JSON input for tool {tool_name}") else: logger.debug(f"Invalid JSON input for tool {tool_name}: {input_json}") raise ModelBehaviorError(f"Invalid JSON input for tool {tool_name}: {input_json}") from exc if not isinstance(parsed, dict): raise ModelBehaviorError(f"Invalid JSON input for tool {tool_name}: expected a JSON object") return parsed def _log_function_tool_invocation(*, tool_name: str, input_json: str) -> None: """Log the start of a tool invocation with the current redaction policy.""" if _debug.DONT_LOG_TOOL_DATA: logger.debug(f"Invoking tool {tool_name}") else: logger.debug(f"Invoking tool {tool_name} with input {input_json}") def default_tool_error_function(ctx: RunContextWrapper[Any], error: Exception) -> str: """The default tool error function, which just returns a generic error message.""" json_decode_error = _extract_tool_argument_json_error(error) if json_decode_error is not None: return ( "An error occurred while parsing tool arguments. " "Please try again with valid JSON. " f"Error: {json_decode_error}" ) return f"An error occurred while running the tool. Please try again. Error: {str(error)}" _FUNCTION_TOOL_TIMEOUT_BEHAVIORS: tuple[ToolTimeoutBehavior, ...] = ( "error_as_result", "raise_exception", ) def default_tool_timeout_error_message(*, tool_name: str, timeout_seconds: float) -> str: """Build the default message returned to the model when a tool times out.""" return f"Tool '{tool_name}' timed out after {timeout_seconds:g} seconds." def set_function_tool_failure_error_function( function_tool: FunctionTool, failure_error_function: ToolErrorFunction | None | object = _UNSET_FAILURE_ERROR_FUNCTION, ) -> FunctionTool: """Store internal failure formatter config for tool wrappers and runtime fallbacks.""" function_tool._use_default_failure_error_function = ( failure_error_function is _UNSET_FAILURE_ERROR_FUNCTION ) function_tool._failure_error_function = ( None if failure_error_function is _UNSET_FAILURE_ERROR_FUNCTION else cast(ToolErrorFunction | None, failure_error_function) ) return function_tool def resolve_function_tool_failure_error_function( function_tool: FunctionTool, ) -> ToolErrorFunction | None: """Return the configured tool failure formatter for runtime-generated error handling.""" if function_tool._use_default_failure_error_function: return default_tool_error_function return function_tool._failure_error_function class _FunctionToolCancelledError(Exception): """Adapter that preserves the public ToolErrorFunction Exception contract on cancellation.""" cancelled_error: asyncio.CancelledError def __init__(self, cancelled_error: asyncio.CancelledError): self.cancelled_error = cancelled_error message = str(cancelled_error) or "Tool execution cancelled." super().__init__(message) def _coerce_tool_error_for_failure_error_function(error: BaseException) -> Exception: """Convert runtime failures into the public Exception contract expected by tool formatters.""" if isinstance(error, Exception): return error if isinstance(error, asyncio.CancelledError): return _FunctionToolCancelledError(error) return Exception(str(error) or error.__class__.__name__) async def maybe_invoke_function_tool_failure_error_function( *, function_tool: FunctionTool, context: RunContextWrapper[Any], error: BaseException, ) -> str | None: """Invoke the configured failure formatter, if one exists.""" failure_error_function = resolve_function_tool_failure_error_function(function_tool) if failure_error_function is None: return None formatter_error = _coerce_tool_error_for_failure_error_function(error) result = failure_error_function(context, formatter_error) if inspect.isawaitable(result): return await result return result def _annotation_expr_name(expr: ast.expr) -> str | None: """Return the unqualified type name for a string annotation expression node.""" if isinstance(expr, ast.Name): return expr.id if isinstance(expr, ast.Attribute): return expr.attr return None def _string_annotation_mentions_context_type(annotation: str, *, type_name: str) -> bool: """Return True when a string annotation structurally references the given context type.""" try: expression = ast.parse(annotation, mode="eval").body except SyntaxError: return False return _annotation_expr_mentions_context_type(expression, type_name=type_name) def _annotation_expr_mentions_context_type(expr: ast.expr, *, type_name: str) -> bool: """Return True when an annotation expression structurally references the given context type.""" if isinstance(expr, ast.Constant) and isinstance(expr.value, str): return _string_annotation_mentions_context_type(expr.value, type_name=type_name) if _annotation_expr_name(expr) == type_name: return True if isinstance(expr, ast.BinOp) and isinstance(expr.op, ast.BitOr): return _annotation_expr_mentions_context_type( expr.left, type_name=type_name ) or _annotation_expr_mentions_context_type(expr.right, type_name=type_name) if isinstance(expr, ast.Subscript): wrapper_name = _annotation_expr_name(expr.value) args = expr.slice.elts if isinstance(expr.slice, ast.Tuple) else (expr.slice,) if wrapper_name == "Annotated": return bool(args) and _annotation_expr_mentions_context_type( args[0], type_name=type_name ) if wrapper_name in {"Optional", "Union"}: return any( _annotation_expr_mentions_context_type(arg, type_name=type_name) for arg in args ) return _annotation_expr_mentions_context_type(expr.value, type_name=type_name) return False def _annotation_mentions_context_type(annotation: Any, *, context_type: type[Any]) -> bool: """Return True when an annotation structurally references the given context type.""" if annotation is inspect.Signature.empty: return False if isinstance(annotation, str): return _string_annotation_mentions_context_type(annotation, type_name=context_type.__name__) origin = get_origin(annotation) if annotation is context_type or origin is context_type: return True if origin is Annotated: args = get_args(annotation) return bool(args) and _annotation_mentions_context_type(args[0], context_type=context_type) if origin in (Union, UnionType): return any( _annotation_mentions_context_type(arg, context_type=context_type) for arg in get_args(annotation) ) return False def _get_function_tool_invoke_context( function_tool: FunctionTool, context: ToolContext[Any], ) -> ToolContext[Any] | RunContextWrapper[Any]: """Choose the runtime context object to pass into a function tool wrapper. Third-party wrappers may declare a narrower `RunContextWrapper` contract and then serialize that object downstream. In those cases, passing the richer `ToolContext` can leak runtime-only metadata such as agents or run config into incompatible serializers. When the wrapper explicitly declares `RunContextWrapper`, preserve only the base context state. """ try: parameters = tuple(inspect.signature(function_tool.on_invoke_tool).parameters.values()) except (TypeError, ValueError): return context if not parameters: return context context_annotation = parameters[0].annotation try: resolved_annotations = get_type_hints(function_tool.on_invoke_tool, include_extras=True) except Exception: pass else: context_annotation = resolved_annotations.get(parameters[0].name, context_annotation) if _annotation_mentions_context_type(context_annotation, context_type=ToolContext): return context if _annotation_mentions_context_type(context_annotation, context_type=RunContextWrapper): return context._fork_with_tool_input(context.tool_input) return context async def invoke_function_tool( *, function_tool: FunctionTool, context: ToolContext[Any], arguments: str, ) -> Any: """Invoke a function tool, enforcing timeout configuration when provided.""" invoke_context = _get_function_tool_invoke_context(function_tool, context) timeout_seconds = function_tool.timeout_seconds if timeout_seconds is None: return await function_tool.on_invoke_tool(cast(Any, invoke_context), arguments) tool_task: asyncio.Future[Any] = asyncio.ensure_future( function_tool.on_invoke_tool(cast(Any, invoke_context), arguments) ) try: return await asyncio.wait_for(tool_task, timeout=timeout_seconds) except asyncio.TimeoutError as exc: if tool_task.done() and not tool_task.cancelled(): tool_exception = tool_task.exception() if tool_exception is None: return tool_task.result() raise tool_exception from None timeout_error = ToolTimeoutError( tool_name=function_tool.name, timeout_seconds=timeout_seconds, ) if function_tool.timeout_behavior == "raise_exception": raise timeout_error from exc timeout_error_function = function_tool.timeout_error_function if timeout_error_function is None: return default_tool_timeout_error_message( tool_name=function_tool.name, timeout_seconds=timeout_seconds, ) timeout_result = timeout_error_function(context, timeout_error) if inspect.isawaitable(timeout_result): return await timeout_result return timeout_result @overload def function_tool( func: ToolFunction[...], *, name_override: str | None = None, description_override: str | None = None, docstring_style: DocstringStyle | None = None, use_docstring_info: bool = True, failure_error_function: ToolErrorFunction | None = None, strict_mode: bool = True, is_enabled: bool | Callable[[RunContextWrapper[Any], AgentBase], MaybeAwaitable[bool]] = True, needs_approval: bool | Callable[[RunContextWrapper[Any], dict[str, Any], str], Awaitable[bool]] = False, tool_input_guardrails: list[ToolInputGuardrail[Any]] | None = None, tool_output_guardrails: list[ToolOutputGuardrail[Any]] | None = None, timeout: float | None = None, timeout_behavior: ToolTimeoutBehavior = "error_as_result", timeout_error_function: ToolErrorFunction | None = None, defer_loading: bool = False, ) -> FunctionTool: """Overload for usage as @function_tool (no parentheses).""" ... @overload def function_tool( *, name_override: str | None = None, description_override: str | None = None, docstring_style: DocstringStyle | None = None, use_docstring_info: bool = True, failure_error_function: ToolErrorFunction | None = None, strict_mode: bool = True, is_enabled: bool | Callable[[RunContextWrapper[Any], AgentBase], MaybeAwaitable[bool]] = True, needs_approval: bool | Callable[[RunContextWrapper[Any], dict[str, Any], str], Awaitable[bool]] = False, tool_input_guardrails: list[ToolInputGuardrail[Any]] | None = None, tool_output_guardrails: list[ToolOutputGuardrail[Any]] | None = None, timeout: float | None = None, timeout_behavior: ToolTimeoutBehavior = "error_as_result", timeout_error_function: ToolErrorFunction | None = None, defer_loading: bool = False, ) -> Callable[[ToolFunction[...]], FunctionTool]: """Overload for usage as @function_tool(...).""" ... def function_tool( func: ToolFunction[...] | None = None, *, name_override: str | None = None, description_override: str | None = None, docstring_style: DocstringStyle | None = None, use_docstring_info: bool = True, failure_error_function: ToolErrorFunction | None | object = _UNSET_FAILURE_ERROR_FUNCTION, strict_mode: bool = True, is_enabled: bool | Callable[[RunContextWrapper[Any], AgentBase], MaybeAwaitable[bool]] = True, needs_approval: bool | Callable[[RunContextWrapper[Any], dict[str, Any], str], Awaitable[bool]] = False, tool_input_guardrails: list[ToolInputGuardrail[Any]] | None = None, tool_output_guardrails: list[ToolOutputGuardrail[Any]] | None = None, timeout: float | None = None, timeout_behavior: ToolTimeoutBehavior = "error_as_result", timeout_error_function: ToolErrorFunction | None = None, defer_loading: bool = False, ) -> FunctionTool | Callable[[ToolFunction[...]], FunctionTool]: """ Decorator to create a FunctionTool from a function. By default, we will: 1. Parse the function signature to create a JSON schema for the tool's parameters. 2. Use the function's docstring to populate the tool's description. 3. Use the function's docstring to populate argument descriptions. The docstring style is detected automatically, but you can override it. If the function takes a `RunContextWrapper` as the first argument, it *must* match the context type of the agent that uses the tool. Args: func: The function to wrap. name_override: If provided, use this name for the tool instead of the function's name. description_override: If provided, use this description for the tool instead of the function's docstring. docstring_style: If provided, use this style for the tool's docstring. If not provided, we will attempt to auto-detect the style. use_docstring_info: If True, use the function's docstring to populate the tool's description and argument descriptions. failure_error_function: If provided, use this function to generate an error message when the tool call fails. The error message is sent to the LLM. If you pass None, then no error message will be sent and instead an Exception will be raised. strict_mode: Whether to enable strict mode for the tool's JSON schema. We *strongly* recommend setting this to True, as it increases the likelihood of correct JSON input. If False, it allows non-strict JSON schemas. For example, if a parameter has a default value, it will be optional, additional properties are allowed, etc. See here for more: https://platform.openai.com/docs/guides/structured-outputs?api-mode=responses#supported-schemas is_enabled: Whether the tool is enabled. Can be a bool or a callable that takes the run context and agent and returns whether the tool is enabled. Disabled tools are hidden from the LLM at runtime. needs_approval: Whether the tool needs approval before execution. If True, the run will be interrupted and the tool call will need to be approved using RunState.approve() or rejected using RunState.reject() before continuing. Can be a bool (always/never needs approval) or a function that takes (run_context, tool_parameters, call_id) and returns whether this specific call needs approval. tool_input_guardrails: Optional list of guardrails to run before invoking the tool. tool_output_guardrails: Optional list of guardrails to run after the tool returns. timeout: Optional timeout in seconds for each tool call. timeout_behavior: Timeout handling mode. "error_as_result" returns a model-visible message, while "raise_exception" raises ToolTimeoutError and fails the run. timeout_error_function: Optional formatter used for timeout messages when timeout_behavior="error_as_result". defer_loading: Whether to hide this tool definition until Responses API tool search explicitly loads it. """ def _create_function_tool(the_func: ToolFunction[...]) -> FunctionTool: is_sync_function_tool = not inspect.iscoroutinefunction(the_func) schema = function_schema( func=the_func, name_override=name_override, description_override=description_override, docstring_style=docstring_style, use_docstring_info=use_docstring_info, strict_json_schema=strict_mode, ) async def _on_invoke_tool_impl(ctx: ToolContext[Any], input: str) -> Any: tool_name = ctx.tool_name json_data = _parse_function_tool_json_input(tool_name=tool_name, input_json=input) _log_function_tool_invocation(tool_name=tool_name, input_json=input) try: parsed = ( schema.params_pydantic_model(**json_data) if json_data else schema.params_pydantic_model() ) except ValidationError as e: raise ModelBehaviorError(f"Invalid JSON input for tool {tool_name}: {e}") from e args, kwargs_dict = schema.to_call_args(parsed) if not _debug.DONT_LOG_TOOL_DATA: logger.debug(f"Tool call args: {args}, kwargs: {kwargs_dict}") if not is_sync_function_tool: if schema.takes_context: result = await the_func(ctx, *args, **kwargs_dict) else: result = await the_func(*args, **kwargs_dict) else: if schema.takes_context: result = await asyncio.to_thread(the_func, ctx, *args, **kwargs_dict) else: result = await asyncio.to_thread(the_func, *args, **kwargs_dict) if _debug.DONT_LOG_TOOL_DATA: logger.debug(f"Tool {tool_name} completed.") else: logger.debug(f"Tool {tool_name} returned {result}") return result function_tool = _build_wrapped_function_tool( name=schema.name, description=schema.description or "", params_json_schema=schema.params_json_schema, invoke_tool_impl=_on_invoke_tool_impl, on_handled_error=_build_handled_function_tool_error_handler( span_message="Error running tool (non-fatal)", span_message_for_json_decode_error="Error running tool", log_label="Tool", ), failure_error_function=failure_error_function, strict_json_schema=strict_mode, is_enabled=is_enabled, needs_approval=needs_approval, tool_input_guardrails=tool_input_guardrails, tool_output_guardrails=tool_output_guardrails, timeout_seconds=timeout, timeout_behavior=timeout_behavior, timeout_error_function=timeout_error_function, defer_loading=defer_loading, sync_invoker=is_sync_function_tool, ) return function_tool # If func is actually a callable, we were used as @function_tool with no parentheses if callable(func): return _create_function_tool(func) # Otherwise, we were used as @function_tool(...), so return a decorator def decorator(real_func: ToolFunction[...]) -> FunctionTool: return _create_function_tool(real_func) return decorator # -------------------------- # Private helpers # -------------------------- def _is_computer_provider(candidate: object) -> bool: if isinstance(candidate, ComputerProvider): return True if isinstance(candidate, Computer | AsyncComputer): # A resolved computer instance is never a provider, even if a subclass # happens to expose a callable `create` attribute. return False return hasattr(candidate, "create") and callable(candidate.create) def _validate_function_tool_timeout_config(tool: FunctionTool) -> None: timeout_seconds = tool.timeout_seconds if timeout_seconds is not None: if isinstance(timeout_seconds, bool) or not isinstance(timeout_seconds, int | float): raise TypeError( "FunctionTool timeout_seconds must be a positive number in seconds or None." ) timeout_seconds = float(timeout_seconds) if not math.isfinite(timeout_seconds): raise ValueError("FunctionTool timeout_seconds must be a finite number.") if timeout_seconds <= 0: raise ValueError("FunctionTool timeout_seconds must be greater than 0.") if getattr(tool.on_invoke_tool, _SYNC_FUNCTION_TOOL_MARKER, False): raise ValueError( "FunctionTool timeout_seconds is only supported for async @function_tool handlers." ) tool.timeout_seconds = timeout_seconds if tool.timeout_behavior not in _FUNCTION_TOOL_TIMEOUT_BEHAVIORS: raise ValueError( "FunctionTool timeout_behavior must be one of: " + ", ".join(_FUNCTION_TOOL_TIMEOUT_BEHAVIORS) ) if tool.timeout_error_function is not None and not callable(tool.timeout_error_function): raise TypeError("FunctionTool timeout_error_function must be callable or None.") def _store_computer_initializer(tool: ComputerTool[Any]) -> None: config = tool.computer if callable(config) or _is_computer_provider(config): _computer_initializer_map[tool] = config def _get_computer_initializer(tool: ComputerTool[Any]) -> ComputerConfig | None: if tool in _computer_initializer_map: return _computer_initializer_map[tool] if callable(tool.computer) or _is_computer_provider(tool.computer): return tool.computer return None def _track_resolved_computer( *, tool: ComputerTool[Any], run_context: RunContextWrapper[Any], resolved: _ResolvedComputer, ) -> None: resolved_by_run = _computers_by_run_context.get(run_context) if resolved_by_run is None: resolved_by_run = {} _computers_by_run_context[run_context] = resolved_by_run resolved_by_run[tool] = resolved