# File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details. from __future__ import annotations from typing import Any, Iterable, cast from typing_extensions import Literal import httpx from . import agent_ as agent from .runs import ( RunsResource, AsyncRunsResource, RunsResourceWithRawResponse, AsyncRunsResourceWithRawResponse, RunsResourceWithStreamingResponse, AsyncRunsResourceWithStreamingResponse, ) from ...types import agent_run_params, agent_list_params, agent_list_environments_params from ..._types import Body, Omit, Query, Headers, NotGiven, omit, not_given from ..._utils import path_template, maybe_transform, async_maybe_transform from .sessions import ( SessionsResource, AsyncSessionsResource, SessionsResourceWithRawResponse, AsyncSessionsResourceWithRawResponse, SessionsResourceWithStreamingResponse, AsyncSessionsResourceWithStreamingResponse, ) from ..._compat import cached_property from .schedules import ( SchedulesResource, AsyncSchedulesResource, SchedulesResourceWithRawResponse, AsyncSchedulesResourceWithRawResponse, SchedulesResourceWithStreamingResponse, AsyncSchedulesResourceWithStreamingResponse, ) from ..._resource import SyncAPIResource, AsyncAPIResource from ..._response import ( to_raw_response_wrapper, to_streamed_response_wrapper, async_to_raw_response_wrapper, async_to_streamed_response_wrapper, ) from .conversations import ( ConversationsResource, AsyncConversationsResource, ConversationsResourceWithRawResponse, AsyncConversationsResourceWithRawResponse, ConversationsResourceWithStreamingResponse, AsyncConversationsResourceWithStreamingResponse, ) from ..._base_client import make_request_options from ...types.agent_run_response import AgentRunResponse from ...types.agent_list_response import AgentListResponse from ...types.ambient_agent_config_param import AmbientAgentConfigParam from ...types.agent_get_artifact_response import AgentGetArtifactResponse from ...types.agent_list_environments_response import AgentListEnvironmentsResponse __all__ = ["AgentResource", "AsyncAgentResource"] class AgentResource(SyncAPIResource): """Operations for running and managing cloud agents""" @cached_property def runs(self) -> RunsResource: """Operations for running and managing cloud agents""" return RunsResource(self._client) @cached_property def schedules(self) -> SchedulesResource: """Operations for creating and managing scheduled agents""" return SchedulesResource(self._client) @cached_property def agent(self) -> agent.AgentResource: """Operations for running and managing cloud agents""" return agent.AgentResource(self._client) @cached_property def sessions(self) -> SessionsResource: """Operations for running and managing cloud agents""" return SessionsResource(self._client) @cached_property def conversations(self) -> ConversationsResource: """Operations for running and managing cloud agents""" return ConversationsResource(self._client) @cached_property def with_raw_response(self) -> AgentResourceWithRawResponse: """ This property can be used as a prefix for any HTTP method call to return the raw response object instead of the parsed content. For more information, see https://www.github.com/warpdotdev/oz-sdk-python#accessing-raw-response-data-eg-headers """ return AgentResourceWithRawResponse(self) @cached_property def with_streaming_response(self) -> AgentResourceWithStreamingResponse: """ An alternative to `.with_raw_response` that doesn't eagerly read the response body. For more information, see https://www.github.com/warpdotdev/oz-sdk-python#with_streaming_response """ return AgentResourceWithStreamingResponse(self) def list( self, *, include_malformed_skills: bool | Omit = omit, refresh: bool | Omit = omit, repo: str | Omit = omit, sort_by: Literal["name", "last_run"] | Omit = omit, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = not_given, ) -> AgentListResponse: """ Retrieve a list of available agents (skills) that can be used to run tasks. Agents are discovered from environments or a specific repository. Args: include_malformed_skills: When true, includes skills whose SKILL.md file exists but is malformed. These variants will have a non-empty `error` field describing the parse failure. Defaults to false. refresh: When true, clears the agent list cache before fetching. Use this to force a refresh of the available agents. repo: Optional repository specification to list agents from (format: "owner/repo"). If not provided, lists agents from all accessible environments. sort_by: Sort order for the returned agents. - "name": Sort alphabetically by name (default) - "last_run": Sort by most recently used extra_headers: Send extra headers extra_query: Add additional query parameters to the request extra_body: Add additional JSON properties to the request timeout: Override the client-level default timeout for this request, in seconds """ return self._get( "/agent", options=make_request_options( extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout, query=maybe_transform( { "include_malformed_skills": include_malformed_skills, "refresh": refresh, "repo": repo, "sort_by": sort_by, }, agent_list_params.AgentListParams, ), ), cast_to=AgentListResponse, ) def get_artifact( self, artifact_uid: str, *, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = not_given, ) -> AgentGetArtifactResponse: """Retrieve an artifact by its UUID. For downloadable file-like artifacts, returns a time-limited signed download URL. For plan artifacts, returns the current plan content inline. Args: extra_headers: Send extra headers extra_query: Add additional query parameters to the request extra_body: Add additional JSON properties to the request timeout: Override the client-level default timeout for this request, in seconds """ if not artifact_uid: raise ValueError(f"Expected a non-empty value for `artifact_uid` but received {artifact_uid!r}") return cast( AgentGetArtifactResponse, self._get( path_template("/agent/artifacts/{artifact_uid}", artifact_uid=artifact_uid), options=make_request_options( extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout ), cast_to=cast( Any, AgentGetArtifactResponse ), # Union types cannot be passed in as arguments in the type system ), ) def list_environments( self, *, sort_by: Literal["name", "last_updated"] | Omit = omit, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = not_given, ) -> AgentListEnvironmentsResponse: """Retrieve cloud environments accessible to the authenticated principal. Returns environments the caller owns, has been granted guest access to, or has accessed via link sharing. Args: sort_by: Sort order for the returned environments. - `name`: alphabetical by environment name - `last_updated`: most recently updated first (default) extra_headers: Send extra headers extra_query: Add additional query parameters to the request extra_body: Add additional JSON properties to the request timeout: Override the client-level default timeout for this request, in seconds """ return self._get( "/agent/environments", options=make_request_options( extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout, query=maybe_transform({"sort_by": sort_by}, agent_list_environments_params.AgentListEnvironmentsParams), ), cast_to=AgentListEnvironmentsResponse, ) def run( self, *, agent_identity_uid: str | Omit = omit, attachments: Iterable[agent_run_params.Attachment] | Omit = omit, config: AmbientAgentConfigParam | Omit = omit, conversation_id: str | Omit = omit, interactive: bool | Omit = omit, mode: Literal["normal", "plan", "orchestrate"] | Omit = omit, parent_run_id: str | Omit = omit, prompt: str | Omit = omit, skill: str | Omit = omit, team: bool | Omit = omit, title: str | Omit = omit, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = not_given, ) -> AgentRunResponse: """Alias for POST /agent/run. This is the preferred endpoint for creating new agent runs. Behavior is identical to POST /agent/run. Args: agent_identity_uid: Optional agent identity UID to use as the execution principal for the run. This is only valid for runs that are team owned. attachments: Optional file attachments to include with the prompt (max 5). Attachments are uploaded to cloud storage and made available to the agent. config: Configuration for a cloud agent run conversation_id: Optional conversation ID to continue an existing conversation. If provided, the agent will continue from where the previous run left off. interactive: Whether the run should be interactive. If not set, defaults to false. mode: Optional query mode for the run. Defaults to `normal` when omitted. The server does not infer mode from prompt prefixes such as `/plan`, so callers should pass this field explicitly to request non-normal behavior. parent_run_id: Optional run ID of the parent that spawned this run. Used for orchestration hierarchies. prompt: The prompt/instruction for the agent to execute. Required unless a skill is specified via the skill field, config.skill_spec, or config.skills. skill: Skill specification to use as the base prompt for the agent. Supported formats: - "repo:skill_name" - Simple name in specific repo - "repo:skill_path" - Full path in specific repo - "org/repo:skill_name" - Simple name with org and repo - "org/repo:skill_path" - Full path with org and repo When provided, this takes precedence over config.skill_spec. team: Whether to create a team-owned run. Defaults to true for users on a single team. title: Custom title for the run (auto-generated if not provided) extra_headers: Send extra headers extra_query: Add additional query parameters to the request extra_body: Add additional JSON properties to the request timeout: Override the client-level default timeout for this request, in seconds """ return self._post( "/agent/runs", body=maybe_transform( { "agent_identity_uid": agent_identity_uid, "attachments": attachments, "config": config, "conversation_id": conversation_id, "interactive": interactive, "mode": mode, "parent_run_id": parent_run_id, "prompt": prompt, "skill": skill, "team": team, "title": title, }, agent_run_params.AgentRunParams, ), options=make_request_options( extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout ), cast_to=AgentRunResponse, ) class AsyncAgentResource(AsyncAPIResource): """Operations for running and managing cloud agents""" @cached_property def runs(self) -> AsyncRunsResource: """Operations for running and managing cloud agents""" return AsyncRunsResource(self._client) @cached_property def schedules(self) -> AsyncSchedulesResource: """Operations for creating and managing scheduled agents""" return AsyncSchedulesResource(self._client) @cached_property def agent(self) -> agent.AsyncAgentResource: """Operations for running and managing cloud agents""" return agent.AsyncAgentResource(self._client) @cached_property def sessions(self) -> AsyncSessionsResource: """Operations for running and managing cloud agents""" return AsyncSessionsResource(self._client) @cached_property def conversations(self) -> AsyncConversationsResource: """Operations for running and managing cloud agents""" return AsyncConversationsResource(self._client) @cached_property def with_raw_response(self) -> AsyncAgentResourceWithRawResponse: """ This property can be used as a prefix for any HTTP method call to return the raw response object instead of the parsed content. For more information, see https://www.github.com/warpdotdev/oz-sdk-python#accessing-raw-response-data-eg-headers """ return AsyncAgentResourceWithRawResponse(self) @cached_property def with_streaming_response(self) -> AsyncAgentResourceWithStreamingResponse: """ An alternative to `.with_raw_response` that doesn't eagerly read the response body. For more information, see https://www.github.com/warpdotdev/oz-sdk-python#with_streaming_response """ return AsyncAgentResourceWithStreamingResponse(self) async def list( self, *, include_malformed_skills: bool | Omit = omit, refresh: bool | Omit = omit, repo: str | Omit = omit, sort_by: Literal["name", "last_run"] | Omit = omit, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = not_given, ) -> AgentListResponse: """ Retrieve a list of available agents (skills) that can be used to run tasks. Agents are discovered from environments or a specific repository. Args: include_malformed_skills: When true, includes skills whose SKILL.md file exists but is malformed. These variants will have a non-empty `error` field describing the parse failure. Defaults to false. refresh: When true, clears the agent list cache before fetching. Use this to force a refresh of the available agents. repo: Optional repository specification to list agents from (format: "owner/repo"). If not provided, lists agents from all accessible environments. sort_by: Sort order for the returned agents. - "name": Sort alphabetically by name (default) - "last_run": Sort by most recently used extra_headers: Send extra headers extra_query: Add additional query parameters to the request extra_body: Add additional JSON properties to the request timeout: Override the client-level default timeout for this request, in seconds """ return await self._get( "/agent", options=make_request_options( extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout, query=await async_maybe_transform( { "include_malformed_skills": include_malformed_skills, "refresh": refresh, "repo": repo, "sort_by": sort_by, }, agent_list_params.AgentListParams, ), ), cast_to=AgentListResponse, ) async def get_artifact( self, artifact_uid: str, *, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = not_given, ) -> AgentGetArtifactResponse: """Retrieve an artifact by its UUID. For downloadable file-like artifacts, returns a time-limited signed download URL. For plan artifacts, returns the current plan content inline. Args: extra_headers: Send extra headers extra_query: Add additional query parameters to the request extra_body: Add additional JSON properties to the request timeout: Override the client-level default timeout for this request, in seconds """ if not artifact_uid: raise ValueError(f"Expected a non-empty value for `artifact_uid` but received {artifact_uid!r}") return cast( AgentGetArtifactResponse, await self._get( path_template("/agent/artifacts/{artifact_uid}", artifact_uid=artifact_uid), options=make_request_options( extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout ), cast_to=cast( Any, AgentGetArtifactResponse ), # Union types cannot be passed in as arguments in the type system ), ) async def list_environments( self, *, sort_by: Literal["name", "last_updated"] | Omit = omit, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = not_given, ) -> AgentListEnvironmentsResponse: """Retrieve cloud environments accessible to the authenticated principal. Returns environments the caller owns, has been granted guest access to, or has accessed via link sharing. Args: sort_by: Sort order for the returned environments. - `name`: alphabetical by environment name - `last_updated`: most recently updated first (default) extra_headers: Send extra headers extra_query: Add additional query parameters to the request extra_body: Add additional JSON properties to the request timeout: Override the client-level default timeout for this request, in seconds """ return await self._get( "/agent/environments", options=make_request_options( extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout, query=await async_maybe_transform( {"sort_by": sort_by}, agent_list_environments_params.AgentListEnvironmentsParams ), ), cast_to=AgentListEnvironmentsResponse, ) async def run( self, *, agent_identity_uid: str | Omit = omit, attachments: Iterable[agent_run_params.Attachment] | Omit = omit, config: AmbientAgentConfigParam | Omit = omit, conversation_id: str | Omit = omit, interactive: bool | Omit = omit, mode: Literal["normal", "plan", "orchestrate"] | Omit = omit, parent_run_id: str | Omit = omit, prompt: str | Omit = omit, skill: str | Omit = omit, team: bool | Omit = omit, title: str | Omit = omit, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. extra_headers: Headers | None = None, extra_query: Query | None = None, extra_body: Body | None = None, timeout: float | httpx.Timeout | None | NotGiven = not_given, ) -> AgentRunResponse: """Alias for POST /agent/run. This is the preferred endpoint for creating new agent runs. Behavior is identical to POST /agent/run. Args: agent_identity_uid: Optional agent identity UID to use as the execution principal for the run. This is only valid for runs that are team owned. attachments: Optional file attachments to include with the prompt (max 5). Attachments are uploaded to cloud storage and made available to the agent. config: Configuration for a cloud agent run conversation_id: Optional conversation ID to continue an existing conversation. If provided, the agent will continue from where the previous run left off. interactive: Whether the run should be interactive. If not set, defaults to false. mode: Optional query mode for the run. Defaults to `normal` when omitted. The server does not infer mode from prompt prefixes such as `/plan`, so callers should pass this field explicitly to request non-normal behavior. parent_run_id: Optional run ID of the parent that spawned this run. Used for orchestration hierarchies. prompt: The prompt/instruction for the agent to execute. Required unless a skill is specified via the skill field, config.skill_spec, or config.skills. skill: Skill specification to use as the base prompt for the agent. Supported formats: - "repo:skill_name" - Simple name in specific repo - "repo:skill_path" - Full path in specific repo - "org/repo:skill_name" - Simple name with org and repo - "org/repo:skill_path" - Full path with org and repo When provided, this takes precedence over config.skill_spec. team: Whether to create a team-owned run. Defaults to true for users on a single team. title: Custom title for the run (auto-generated if not provided) extra_headers: Send extra headers extra_query: Add additional query parameters to the request extra_body: Add additional JSON properties to the request timeout: Override the client-level default timeout for this request, in seconds """ return await self._post( "/agent/runs", body=await async_maybe_transform( { "agent_identity_uid": agent_identity_uid, "attachments": attachments, "config": config, "conversation_id": conversation_id, "interactive": interactive, "mode": mode, "parent_run_id": parent_run_id, "prompt": prompt, "skill": skill, "team": team, "title": title, }, agent_run_params.AgentRunParams, ), options=make_request_options( extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout ), cast_to=AgentRunResponse, ) class AgentResourceWithRawResponse: def __init__(self, agent: AgentResource) -> None: self._agent = agent self.list = to_raw_response_wrapper( agent.list, ) self.get_artifact = to_raw_response_wrapper( agent.get_artifact, ) self.list_environments = to_raw_response_wrapper( agent.list_environments, ) self.run = to_raw_response_wrapper( agent.run, ) @cached_property def runs(self) -> RunsResourceWithRawResponse: """Operations for running and managing cloud agents""" return RunsResourceWithRawResponse(self._agent.runs) @cached_property def schedules(self) -> SchedulesResourceWithRawResponse: """Operations for creating and managing scheduled agents""" return SchedulesResourceWithRawResponse(self._agent.schedules) @cached_property def agent(self) -> agent.AgentResourceWithRawResponse: """Operations for running and managing cloud agents""" return agent.AgentResourceWithRawResponse(self._agent.agent) @cached_property def sessions(self) -> SessionsResourceWithRawResponse: """Operations for running and managing cloud agents""" return SessionsResourceWithRawResponse(self._agent.sessions) @cached_property def conversations(self) -> ConversationsResourceWithRawResponse: """Operations for running and managing cloud agents""" return ConversationsResourceWithRawResponse(self._agent.conversations) class AsyncAgentResourceWithRawResponse: def __init__(self, agent: AsyncAgentResource) -> None: self._agent = agent self.list = async_to_raw_response_wrapper( agent.list, ) self.get_artifact = async_to_raw_response_wrapper( agent.get_artifact, ) self.list_environments = async_to_raw_response_wrapper( agent.list_environments, ) self.run = async_to_raw_response_wrapper( agent.run, ) @cached_property def runs(self) -> AsyncRunsResourceWithRawResponse: """Operations for running and managing cloud agents""" return AsyncRunsResourceWithRawResponse(self._agent.runs) @cached_property def schedules(self) -> AsyncSchedulesResourceWithRawResponse: """Operations for creating and managing scheduled agents""" return AsyncSchedulesResourceWithRawResponse(self._agent.schedules) @cached_property def agent(self) -> agent.AsyncAgentResourceWithRawResponse: """Operations for running and managing cloud agents""" return agent.AsyncAgentResourceWithRawResponse(self._agent.agent) @cached_property def sessions(self) -> AsyncSessionsResourceWithRawResponse: """Operations for running and managing cloud agents""" return AsyncSessionsResourceWithRawResponse(self._agent.sessions) @cached_property def conversations(self) -> AsyncConversationsResourceWithRawResponse: """Operations for running and managing cloud agents""" return AsyncConversationsResourceWithRawResponse(self._agent.conversations) class AgentResourceWithStreamingResponse: def __init__(self, agent: AgentResource) -> None: self._agent = agent self.list = to_streamed_response_wrapper( agent.list, ) self.get_artifact = to_streamed_response_wrapper( agent.get_artifact, ) self.list_environments = to_streamed_response_wrapper( agent.list_environments, ) self.run = to_streamed_response_wrapper( agent.run, ) @cached_property def runs(self) -> RunsResourceWithStreamingResponse: """Operations for running and managing cloud agents""" return RunsResourceWithStreamingResponse(self._agent.runs) @cached_property def schedules(self) -> SchedulesResourceWithStreamingResponse: """Operations for creating and managing scheduled agents""" return SchedulesResourceWithStreamingResponse(self._agent.schedules) @cached_property def agent(self) -> agent.AgentResourceWithStreamingResponse: """Operations for running and managing cloud agents""" return agent.AgentResourceWithStreamingResponse(self._agent.agent) @cached_property def sessions(self) -> SessionsResourceWithStreamingResponse: """Operations for running and managing cloud agents""" return SessionsResourceWithStreamingResponse(self._agent.sessions) @cached_property def conversations(self) -> ConversationsResourceWithStreamingResponse: """Operations for running and managing cloud agents""" return ConversationsResourceWithStreamingResponse(self._agent.conversations) class AsyncAgentResourceWithStreamingResponse: def __init__(self, agent: AsyncAgentResource) -> None: self._agent = agent self.list = async_to_streamed_response_wrapper( agent.list, ) self.get_artifact = async_to_streamed_response_wrapper( agent.get_artifact, ) self.list_environments = async_to_streamed_response_wrapper( agent.list_environments, ) self.run = async_to_streamed_response_wrapper( agent.run, ) @cached_property def runs(self) -> AsyncRunsResourceWithStreamingResponse: """Operations for running and managing cloud agents""" return AsyncRunsResourceWithStreamingResponse(self._agent.runs) @cached_property def schedules(self) -> AsyncSchedulesResourceWithStreamingResponse: """Operations for creating and managing scheduled agents""" return AsyncSchedulesResourceWithStreamingResponse(self._agent.schedules) @cached_property def agent(self) -> agent.AsyncAgentResourceWithStreamingResponse: """Operations for running and managing cloud agents""" return agent.AsyncAgentResourceWithStreamingResponse(self._agent.agent) @cached_property def sessions(self) -> AsyncSessionsResourceWithStreamingResponse: """Operations for running and managing cloud agents""" return AsyncSessionsResourceWithStreamingResponse(self._agent.sessions) @cached_property def conversations(self) -> AsyncConversationsResourceWithStreamingResponse: """Operations for running and managing cloud agents""" return AsyncConversationsResourceWithStreamingResponse(self._agent.conversations)