classAgent[TDeps=None,OutputDataT=str](MessageNode[TDeps,OutputDataT]):"""The main agent class. Generically typed with: LLMLingAgent[Type of Dependencies, Type of Result] """@dataclass(frozen=True)classAgentReset:"""Emitted when agent is reset."""agent_name:AgentNameprevious_tools:dict[str,bool]new_tools:dict[str,bool]timestamp:datetime=field(default_factory=get_now)run_failed=Signal(str,Exception)agent_reset=Signal(AgentReset)def__init__(# we dont use AgentKwargs here so that we can work with explicit ones in the ctorself,name:str="llmling-agent",*,deps_type:type[TDeps]|None=None,model:ModelType=None,output_type:OutputSpec[OutputDataT]=str,# type: ignore[assignment]context:AgentContext[TDeps]|None=None,session:SessionIdType|SessionQuery|MemoryConfig|bool|int=None,system_prompt:AnyPromptType|Sequence[AnyPromptType]=(),description:str|None=None,tools:Sequence[ToolType|Tool]|None=None,toolsets:Sequence[ResourceProvider]|None=None,mcp_servers:Sequence[str|MCPServerConfig]|None=None,resources:Sequence[PromptType|str]=(),retries:int=1,output_retries:int|None=None,end_strategy:EndStrategy="early",input_provider:InputProvider|None=None,parallel_init:bool=True,debug:bool=False,event_handlers:Sequence[IndividualEventHandler]|None=None,):"""Initialize agent. Args: name: Name of the agent for logging and identification deps_type: Type of dependencies to use model: The default model to use (defaults to GPT-5) output_type: The default output type to use (defaults to str) context: Agent context with configuration session: Memory configuration. - None: Default memory config - False: Disable message history (max_messages=0) - int: Max tokens for memory - str/UUID: Session identifier - MemoryConfig: Full memory configuration - MemoryProvider: Custom memory provider - SessionQuery: Session query system_prompt: System prompts for the agent description: Description of the Agent ("what it can do") tools: List of tools to register with the agent toolsets: List of toolset resource providers for the agent mcp_servers: MCP servers to connect to resources: Additional resources to load retries: Default number of retries for failed operations output_retries: Max retries for result validation (defaults to retries) end_strategy: Strategy for handling tool calls that are requested alongside a final result input_provider: Provider for human input (tool confirmation / HumanProviders) parallel_init: Whether to initialize resources in parallel debug: Whether to enable debug mode event_handlers: Sequence of event handlers to register with the agent """fromllmling_agent.agentimportAgentContextfromllmling_agent.agent.conversationimportConversationManagerfromllmling_agent.agent.interactionsimportInteractionsfromllmling_agent.agent.sys_promptsimportSystemPromptsfromllmling_agent.observabilityimportregistryself.task_manager=TaskManager()self._infinite=False# save some stuff for asnyc initself.deps_type=deps_type# prepare contextctx=contextorAgentContext[TDeps].create_default(name,input_provider=input_provider,)self._context=ctx# TODO: use to_structured with tool_name / description?self._output_type=to_type(output_type,ctx.definition.responses)memory_cfg=(sessionifisinstance(session,MemoryConfig)elseMemoryConfig.from_value(session))# Initialize progress queue before super().__init__()self._progress_queue=asyncio.Queue[ToolCallProgressEvent]()super().__init__(name=name,context=ctx,description=description,enable_logging=memory_cfg.enable,mcp_servers=mcp_servers,progress_handler=create_queuing_progress_handler(self._progress_queue),)# Initialize tool managerself.event_handler=MultiEventHandler[IndividualEventHandler](event_handlers)all_tools=list(toolsor[])self.tools=ToolManager(all_tools)# MCP manager will be initialized in __aenter__ and providers added thereifbuiltin_tools:=ctx.config.get_tool_provider():self.tools.add_provider(builtin_tools)fortoolset_providerintoolsetsor[]:self.tools.add_provider(toolset_provider)# Initialize conversation managerresources=list(resources)ifctx.config.knowledge:resources.extend(ctx.config.knowledge.get_resources())self.conversation=ConversationManager(self,memory_cfg,resources=resources)# Store pydantic-ai configurationifmodelandnotisinstance(model,str):assertisinstance(model,models.Model)self._model=modelself._retries=retriesself._end_strategy:EndStrategy=end_strategyself._output_retries=output_retriesself.skills_registry=SkillsRegistry()ifctxandctx.definition:registry.configure_observability(ctx.definition.observability)# init variablesself._debug=debugself.parallel_init=parallel_initself._name=nameself._background_task:asyncio.Task[Any]|None=Noneself.talk=Interactions(self)# Set up system promptsconfig_prompts=ctx.config.system_promptsifctxelse[]all_prompts:list[AnyPromptType]=list(config_prompts)ifisinstance(system_prompt,list):all_prompts.extend(system_prompt)else:all_prompts.append(system_prompt)self.sys_prompts=SystemPrompts(all_prompts,context=ctx)def__repr__(self)->str:desc=f", {self.description!r}"ifself.descriptionelse""returnf"Agent({self.name!r}, model={self._model!r}{desc})"asyncdef__prompt__(self)->str:typ=self.__class__.__name__model=self.model_nameor"default"parts=[f"Agent: {self.name}",f"Type: {typ}",f"Model: {model}"]ifself.description:parts.append(f"Description: {self.description}")parts.extend([awaitself.tools.__prompt__(),self.conversation.__prompt__()])return"\n".join(parts)asyncdef__aenter__(self)->Self:"""Enter async context and set up MCP servers."""try:# Collect all coroutines that need to be runcoros:list[Coroutine[Any,Any,Any]]=[]coros.append(super().__aenter__())coros.extend(self.conversation.get_initialization_tasks())# Execute coroutines either in parallel or sequentiallyifself.parallel_initandcoros:awaitasyncio.gather(*coros)else:forcoroincoros:awaitcoro# Add MCP aggregating provider after manager is initializedaggregating_provider=self.mcp.get_aggregating_provider()self.tools.add_provider(aggregating_provider)fortoolset_providerinself.context.config.get_toolsets():self.tools.add_provider(toolset_provider)exceptExceptionase:msg="Failed to initialize agent"raiseRuntimeError(msg)fromeelse:returnselfasyncdef__aexit__(self,exc_type:type[BaseException]|None,exc_val:BaseException|None,exc_tb:TracebackType|None,):"""Exit async context."""awaitsuper().__aexit__(exc_type,exc_val,exc_tb)@overloaddef__and__(# if other doesnt define deps, we take the agents oneself,other:ProcessorCallback[Any]|Team[TDeps]|Agent[TDeps,Any])->Team[TDeps]:...@overloaddef__and__(# otherwise, we dont know and deps is Anyself,other:ProcessorCallback[Any]|Team[Any]|Agent[Any,Any])->Team[Any]:...def__and__(self,other:MessageNode[Any,Any]|ProcessorCallback[Any])->Team[Any]:"""Create sequential team using & operator. Example: group = analyzer & planner & executor # Create group of 3 group = analyzer & existing_group # Add to existing group """fromllmling_agent.delegation.teamimportTeammatchother:caseTeam():returnTeam([self,*other.agents])caseCallable():agent_2=Agent.from_callback(other)agent_2.context.pool=self.context.poolreturnTeam([self,agent_2])caseMessageNode():returnTeam([self,other])case_:msg=f"Invalid agent type: {type(other)}"raiseValueError(msg)@overloaddef__or__(self,other:MessageNode[TDeps,Any])->TeamRun[TDeps,Any]:...@overloaddef__or__[TOtherDeps](self,other:MessageNode[TOtherDeps,Any],)->TeamRun[Any,Any]:...@overloaddef__or__(self,other:ProcessorCallback[Any])->TeamRun[Any,Any]:...def__or__(self,other:MessageNode[Any,Any]|ProcessorCallback[Any])->TeamRun:# Create new execution with sequential mode (for piping)fromllmling_agentimportTeamRunifcallable(other):other=Agent.from_callback(other)other.context.pool=self.context.poolreturnTeamRun([self,other])@classmethoddeffrom_callback[TResult](cls,callback:ProcessorCallback[TResult],*,name:str|None=None,**kwargs:Any,)->Agent[None,TResult]:"""Create an agent from a processing callback. Args: callback: Function to process messages. Can be: - sync or async - with or without context - must return str for pipeline compatibility name: Optional name for the agent kwargs: Additional arguments for agent """name=nameorcallback.__name__or"processor"model=function_to_model(callback)return_type=get_type_hints(callback).get("return")if(# If async, unwrap from Awaitablereturn_typeandhasattr(return_type,"__origin__")andreturn_type.__origin__isAwaitable):return_type=return_type.__args__[0]returnAgent(model=model,name=name,output_type=return_typeorstr,**kwargs,)@propertydefname(self)->str:"""Get agent name."""returnself._nameor"llmling-agent"@propertydefcontext(self)->AgentContext[TDeps]:"""Get agent context."""returnself._context@context.setterdefcontext(self,value:AgentContext[TDeps]):"""Set agent context and propagate to provider."""self._context=valueself.mcp.context=valuedefto_structured[NewOutputDataT](self,output_type:type[NewOutputDataT],*,tool_name:str|None=None,tool_description:str|None=None,)->Agent[TDeps,NewOutputDataT]:"""Convert this agent to a structured agent. Args: output_type: Type for structured responses. Can be: - A Python type (Pydantic model) tool_name: Optional override for result tool name tool_description: Optional override for result tool description Returns: Typed Agent """self.log.debug("Setting result type",output_type=output_type)self._output_type=to_type(output_type)returnself# type: ignoredefis_busy(self)->bool:"""Check if agent is currently processing tasks."""returnbool(self.task_manager._pending_tasksorself._background_task)@propertydefmodel_name(self)->str|None:"""Get the model name in a consistent format."""returnself._model.model_nameifisinstance(self._model,Model)elseself._modeldefto_tool(self,*,name:str|None=None,reset_history_on_run:bool=True,pass_message_history:bool=False,parent:Agent[Any,Any]|None=None,)->Tool[OutputDataT]:"""Create a tool from this agent. Args: name: Optional tool name override reset_history_on_run: Clear agent's history before each run pass_message_history: Pass parent's message history to agent parent: Optional parent agent for history/context sharing """tool_name=nameorf"ask_{self.name}"# Create wrapper function with correct return type annotationoutput_type=self._output_typeorAnyasyncdefwrapped_tool(prompt:str):ifpass_message_historyandnotparent:msg="Parent agent required for message history sharing"raiseToolError(msg)ifreset_history_on_run:self.conversation.clear()history=Noneifpass_message_historyandparent:history=parent.conversation.get_history()old=self.conversation.get_history()self.conversation.set_history(history)result=awaitself.run(prompt)ifhistory:self.conversation.set_history(old)returnresult.data# Set the correct return annotation dynamicallywrapped_tool.__annotations__={"prompt":str,"return":output_type}normalized_name=self.name.replace("_"," ").title()docstring=f"Get expert answer from specialized agent: {normalized_name}"ifself.description:docstring=f"{docstring}\n\n{self.description}"wrapped_tool.__doc__=docstringwrapped_tool.__name__=tool_namereturnTool.from_callable(wrapped_tool,name_override=tool_name,description_override=docstring,source="agent",)asyncdefget_agentlet(self,tools:list[Tool],model:ModelType=None,output_type:type[Any]=str,deps_type:type[Any]|None=None,input_provider:InputProvider|None=None,):"""Create pydantic-ai agent from current state."""# Monkey patch pydantic-ai to recognize AgentContextfromllmling_agent.agent.tool_wrappingimportwrap_toolactual_model=modelorself._modelmodel_=(infer_model(actual_model)ifisinstance(actual_model,str)elseactual_model)agent=PydanticAgent(name=self.name,model=model_,instructions=awaitself.sys_prompts.format_system_prompt(self),retries=self._retries,end_strategy=self._end_strategy,output_retries=self._output_retries,deps_type=deps_typeorNoneType,output_type=output_type,)# If input_provider override is provided, create modified contextcontext_for_tools=(self.contextifinput_providerisNoneelsereplace(self.context,input_provider=input_provider))fortoolintools:wrapped=wrap_tool(tool,context_for_tools)ifget_argument_key(wrapped,RunContext):logger.info("Registering tool: with context",tool_name=tool.name)agent.tool(wrapped)else:logger.info("Registering tool: no context",tool_name=tool.name)agent.tool_plain(wrapped)returnagentasyncdef_run(self,*prompts:PromptCompatible|ChatMessage[Any],output_type:type[OutputDataT]|None=None,model:ModelType=None,store_history:bool=True,tool_choice:str|list[str]|None=None,usage_limits:UsageLimits|None=None,message_id:str|None=None,conversation_id:str|None=None,messages:list[ChatMessage[Any]]|None=None,deps:TDeps|None=None,input_provider:InputProvider|None=None,wait_for_connections:bool|None=None,)->ChatMessage[OutputDataT]:"""Run agent with prompt and get response. Args: prompts: User query or instruction output_type: Optional type for structured responses model: Optional model override store_history: Whether the message exchange should be added to the context window tool_choice: Filter tool choice by name usage_limits: Optional usage limits for the model message_id: Optional message id for the returned message. Automatically generated if not provided. conversation_id: Optional conversation id for the returned message. messages: Optional list of messages to replace the conversation history deps: Optional dependencies for the agent input_provider: Optional input provider for the agent wait_for_connections: Whether to wait for connected agents to complete Returns: Result containing response and run information Raises: UnexpectedModelBehavior: If the model fails or behaves unexpectedly """"""Run agent with prompt and get response."""message_id=message_idorstr(uuid4())tools=awaitself.tools.get_tools(state="enabled",names=tool_choice)final_type=to_type(output_type)ifoutput_typeelseself._output_typestart_time=time.perf_counter()message_history=(messagesifmessagesisnotNoneelseself.conversation.get_history())try:# Create pydantic-ai agent for this runagentlet=awaitself.get_agentlet(tools,model,final_type,self.deps_type,input_provider)converted_prompts=awaitconvert_prompts(prompts)# Merge internal and external event handlers like the old provider didasyncdefevent_distributor(ctx:RunContext,events:AsyncIterable[AgentStreamEvent]):asyncforeventinevents:forhandlerinself.event_handler._wrapped_handlers:awaithandler(ctx,event)result=awaitagentlet.run([iifisinstance(i,str)elsei.to_pydantic_ai()foriinconverted_prompts],deps=deps,message_history=[mforruninmessage_historyforminrun.to_pydantic_ai()],output_type=final_typeorstr,usage_limits=usage_limits,event_stream_handler=event_distributor,)response_time=time.perf_counter()-start_timeresponse_msg=awaitChatMessage.from_run_result(result,agent_name=self.name,message_id=message_id,conversation_id=conversation_id,response_time=response_time,)exceptExceptionase:self.log.exception("Agent run failed")self.run_failed.emit("Agent run failed",e)raiseelse:returnresponse_msg@method_spawnerasyncdefrun_stream(self,*prompt:PromptCompatible,output_type:type[OutputDataT]|None=None,model:ModelType=None,tool_choice:str|list[str]|None=None,store_history:bool=True,usage_limits:UsageLimits|None=None,message_id:str|None=None,conversation_id:str|None=None,messages:list[ChatMessage[Any]]|None=None,input_provider:InputProvider|None=None,wait_for_connections:bool|None=None,deps:TDeps|None=None,)->AsyncIterator[RichAgentStreamEvent[OutputDataT]]:"""Run agent with prompt and get a streaming response. Args: prompt: User query or instruction output_type: Optional type for structured responses model: Optional model override tool_choice: Filter tool choice by name store_history: Whether the message exchange should be added to the context window usage_limits: Optional usage limits for the model message_id: Optional message id for the returned message. Automatically generated if not provided. conversation_id: Optional conversation id for the returned message. messages: Optional list of messages to replace the conversation history input_provider: Optional input provider for the agent wait_for_connections: Whether to wait for connected agents to complete deps: Optional dependencies for the agent Returns: An async iterator yielding streaming events with final message embedded. Raises: UnexpectedModelBehavior: If the model fails or behaves unexpectedly """message_id=message_idorstr(uuid4())user_msg,prompts=awaitself.pre_run(*prompt)final_type=to_type(output_type)ifoutput_typeelseself._output_typestart_time=time.perf_counter()tools=awaitself.tools.get_tools(state="enabled",names=tool_choice)message_history=(messagesifmessagesisnotNoneelseself.conversation.get_history())try:agentlet=awaitself.get_agentlet(tools,model,final_type,self.deps_type,input_provider)content=awaitconvert_prompts(prompts)# Initialize variables for final responseresponse_msg=None# Create tool dict for signal emissionconverted=[iifisinstance(i,str)elsei.to_pydantic_ai()foriincontent]stream_events=agentlet.run_stream_events(converted,deps=deps,message_history=[mforruninmessage_historyforminrun.to_pydantic_ai()],output_type=final_typeorstr,usage_limits=usage_limits,)# Stream events through merge_queue for progress eventsasyncwithmerge_queue_into_iterator(stream_events,self._progress_queue)asevents:# Track tool call starts to combine with results laterpending_tcs:dict[str,ToolCallPart]={}asyncforeventinevents:# Process events and emit signalsyieldeventmatchevent:case(PartStartEvent(part=ToolCallPart()astool_part)|FunctionToolCallEvent(part=tool_part)):# Store tool call start info for later combination with resultpending_tcs[tool_part.tool_call_id]=tool_partcase(FunctionToolResultEvent(tool_call_id=call_id)asresult_event):# Check if we have a pending tool call to combine withifcall_info:=pending_tcs.pop(call_id,None):# Create and yield combined eventcombined_event=ToolCallCompleteEvent(tool_name=call_info.tool_name,tool_call_id=call_id,tool_input=call_info.args_as_dict(),tool_result=result_event.result.contentifisinstance(result_event.result,ToolReturnPart)elseresult_event.result,agent_name=self.name,message_id=message_id,)yieldcombined_eventcaseAgentRunResultEvent():# Capture final result data# Build final response messageresponse_time=time.perf_counter()-start_timeresponse_msg=awaitChatMessage.from_run_result(event.result,agent_name=self.name,message_id=message_id,conversation_id=conversation_id,response_time=response_time,)# Send additional enriched completion eventassertresponse_msgyieldStreamCompleteEvent(message=response_msg)self.message_sent.emit(response_msg)awaitself.log_message(response_msg)ifstore_history:self.conversation.add_chat_messages([user_msg,response_msg])awaitself.connections.route_message(response_msg,wait=wait_for_connections,)exceptExceptionase:self.log.exception("Agent stream failed")self.run_failed.emit("Agent stream failed",e)raiseasyncdefrun_iter(self,*prompt_groups:Sequence[PromptCompatible],output_type:type[OutputDataT]|None=None,model:ModelType=None,store_history:bool=True,wait_for_connections:bool|None=None,)->AsyncIterator[ChatMessage[OutputDataT]]:"""Run agent sequentially on multiple prompt groups. Args: prompt_groups: Groups of prompts to process sequentially output_type: Optional type for structured responses model: Optional model override store_history: Whether to store in conversation history wait_for_connections: Whether to wait for connected agents Yields: Response messages in sequence Example: questions = [ ["What is your name?"], ["How old are you?", image1], ["Describe this image", image2], ] async for response in agent.run_iter(*questions): print(response.content) """forpromptsinprompt_groups:response=awaitself.run(*prompts,output_type=output_type,model=model,store_history=store_history,wait_for_connections=wait_for_connections,)yieldresponse# pyright: ignore@method_spawnerasyncdefrun_job(self,job:Job[TDeps,str|None],*,store_history:bool=True,include_agent_tools:bool=True,)->ChatMessage[OutputDataT]:"""Execute a pre-defined task. Args: job: Job configuration to execute store_history: Whether the message exchange should be added to the context window include_agent_tools: Whether to include agent tools Returns: Job execution result Raises: JobError: If task execution fails ValueError: If task configuration is invalid """ifjob.required_dependencyisnotNone:# noqa: SIM102ifnotisinstance(self.context.data,job.required_dependency):msg=(f"Agent dependencies ({type(self.context.data)}) "f"don't match job requirement ({job.required_dependency})")raiseJobError(msg)# Load task knowledgeifjob.knowledge:# Add knowledge sources to contextforsourceinlist(job.knowledge.paths):awaitself.conversation.load_context_source(source)forpromptinjob.knowledge.prompts:awaitself.conversation.load_context_source(prompt)try:# Register task tools temporarilytools=job.get_tools()asyncwithself.tools.temporary_tools(tools,exclusive=notinclude_agent_tools):# Execute job with job-specific toolsreturnawaitself.run(awaitjob.get_prompt(),store_history=store_history)exceptExceptionase:self.log.exception("Task execution failed",error=str(e))msg=f"Task execution failed: {e}"raiseJobError(msg)fromeasyncdefrun_in_background(self,*prompt:PromptCompatible,max_count:int|None=None,interval:float=1.0,block:bool=False,**kwargs:Any,)->ChatMessage[OutputDataT]|None:"""Run agent continuously in background with prompt or dynamic prompt function. Args: prompt: Static prompt or function that generates prompts max_count: Maximum number of runs (None = infinite) interval: Seconds between runs block: Whether to block until completion **kwargs: Arguments passed to run() """self._infinite=max_countisNoneasyncdef_continuous():count=0self.log.debug("Starting continuous run",max_count=max_count,interval=interval)latest=Nonewhilemax_countisNoneorcount<max_count:try:current_prompts=[call_with_context(p,self.context,**kwargs)ifcallable(p)elsepforpinprompt]self.log.debug("Generated prompt",iteration=count)latest=awaitself.run(current_prompts,**kwargs)self.log.debug("Run continuous result",iteration=count)count+=1awaitasyncio.sleep(interval)exceptasyncio.CancelledError:self.log.debug("Continuous run cancelled")breakexceptException:self.log.exception("Background run failed")awaitasyncio.sleep(interval)self.log.debug("Continuous run completed",iterations=count)returnlatest# Cancel any existing background taskawaitself.stop()task=asyncio.create_task(_continuous(),name=f"background_{self.name}")ifblock:try:returnawaittask# type: ignorefinally:ifnottask.done():task.cancel()else:self.log.debug("Started background task",task_name=task.get_name())self._background_task=taskreturnNoneasyncdefstop(self):"""Stop continuous execution if running."""ifself._background_taskandnotself._background_task.done():self._background_task.cancel()awaitself._background_taskself._background_task=Noneasyncdefwait(self)->ChatMessage[OutputDataT]:"""Wait for background execution to complete."""ifnotself._background_task:msg="No background task running"raiseRuntimeError(msg)ifself._infinite:msg="Cannot wait on infinite execution"raiseRuntimeError(msg)try:returnawaitself._background_taskfinally:self._background_task=Noneasyncdefshare(self,target:Agent[TDeps,Any],*,tools:list[str]|None=None,history:bool|int|None=None,# bool or number of messagestoken_limit:int|None=None,):"""Share capabilities and knowledge with another agent. Args: target: Agent to share with tools: List of tool names to share history: Share conversation history: - True: Share full history - int: Number of most recent messages to share - None: Don't share history token_limit: Optional max tokens for history Raises: ValueError: If requested items don't exist RuntimeError: If runtime not available for resources """# Share tools if requestedfornameintoolsor[]:iftool:=awaitself.tools.get_tool(name):meta={"shared_from":self.name}target.tools.register_tool(tool.callable,metadata=meta)else:msg=f"Tool not found: {name}"raiseValueError(msg)# Share history if requestedifhistory:history_text=awaitself.conversation.format_history(max_tokens=token_limit,num_messages=historyifisinstance(history,int)elseNone,)target.conversation.add_context_message(history_text,source=self.name,metadata={"type":"shared_history"})defregister_worker(self,worker:MessageNode[Any,Any],*,name:str|None=None,reset_history_on_run:bool=True,pass_message_history:bool=False,)->Tool:"""Register another agent as a worker tool."""returnself.tools.register_worker(worker,name=name,reset_history_on_run=reset_history_on_run,pass_message_history=pass_message_history,parent=selfifpass_message_historyelseNone,)defset_model(self,model:ModelType):"""Set the model for this agent. Args: model: New model to use (name or instance) """self._model=modelasyncdefreset(self):"""Reset agent state (conversation history and tool states)."""old_tools=awaitself.tools.list_tools()self.conversation.clear()awaitself.tools.reset_states()new_tools=awaitself.tools.list_tools()event=self.AgentReset(agent_name=self.name,previous_tools=old_tools,new_tools=new_tools,)self.agent_reset.emit(event)asyncdefget_stats(self)->MessageStats:"""Get message statistics (async version)."""messages=awaitself.get_message_history()returnMessageStats(messages=messages)@asynccontextmanagerasyncdeftemporary_state[T](self,*,system_prompts:list[AnyPromptType]|None=None,output_type:type[T]|None=None,replace_prompts:bool=False,tools:list[ToolType]|None=None,replace_tools:bool=False,history:list[AnyPromptType]|SessionQuery|None=None,replace_history:bool=False,pause_routing:bool=False,model:ModelType|None=None,)->AsyncIterator[Self|Agent[T]]:"""Temporarily modify agent state. Args: system_prompts: Temporary system prompts to use output_type: Temporary output type to use replace_prompts: Whether to replace existing prompts tools: Temporary tools to make available replace_tools: Whether to replace existing tools history: Conversation history (prompts or query) replace_history: Whether to replace existing history pause_routing: Whether to pause message routing model: Temporary model override """old_model=self._modelifoutput_type:old_type=self._output_typeself.to_structured(output_type)asyncwithAsyncExitStack()asstack:ifsystem_promptsisnotNone:# System promptsawaitstack.enter_async_context(self.sys_prompts.temporary_prompt(system_prompts,exclusive=replace_prompts))iftoolsisnotNone:# Toolsawaitstack.enter_async_context(self.tools.temporary_tools(tools,exclusive=replace_tools))ifhistoryisnotNone:# Historyawaitstack.enter_async_context(self.conversation.temporary_state(history,replace_history=replace_history))ifpause_routing:# Routingawaitstack.enter_async_context(self.connections.paused_routing())elifmodelisnotNone:# Modelself._model=modeltry:yieldselffinally:# Restore modelifmodelisnotNoneandold_model:self._model=old_modelifoutput_type:self.to_structured(old_type)asyncdefvalidate_against(self,prompt:str,criteria:type[OutputDataT],**kwargs:Any,)->bool:"""Check if agent's response satisfies stricter criteria."""result=awaitself.run(prompt,**kwargs)try:criteria.model_validate(result.content.model_dump())# type: ignoreexceptValidationError:returnFalseelse:returnTrue
@dataclass(frozen=True)classAgentReset:"""Emitted when agent is reset."""agent_name:AgentNameprevious_tools:dict[str,bool]new_tools:dict[str,bool]timestamp:datetime=field(default_factory=get_now)
asyncdef__aenter__(self)->Self:"""Enter async context and set up MCP servers."""try:# Collect all coroutines that need to be runcoros:list[Coroutine[Any,Any,Any]]=[]coros.append(super().__aenter__())coros.extend(self.conversation.get_initialization_tasks())# Execute coroutines either in parallel or sequentiallyifself.parallel_initandcoros:awaitasyncio.gather(*coros)else:forcoroincoros:awaitcoro# Add MCP aggregating provider after manager is initializedaggregating_provider=self.mcp.get_aggregating_provider()self.tools.add_provider(aggregating_provider)fortoolset_providerinself.context.config.get_toolsets():self.tools.add_provider(toolset_provider)exceptExceptionase:msg="Failed to initialize agent"raiseRuntimeError(msg)fromeelse:returnself
def__and__(self,other:MessageNode[Any,Any]|ProcessorCallback[Any])->Team[Any]:"""Create sequential team using & operator. Example: group = analyzer & planner & executor # Create group of 3 group = analyzer & existing_group # Add to existing group """fromllmling_agent.delegation.teamimportTeammatchother:caseTeam():returnTeam([self,*other.agents])caseCallable():agent_2=Agent.from_callback(other)agent_2.context.pool=self.context.poolreturnTeam([self,agent_2])caseMessageNode():returnTeam([self,other])case_:msg=f"Invalid agent type: {type(other)}"raiseValueError(msg)
def__init__(# we dont use AgentKwargs here so that we can work with explicit ones in the ctorself,name:str="llmling-agent",*,deps_type:type[TDeps]|None=None,model:ModelType=None,output_type:OutputSpec[OutputDataT]=str,# type: ignore[assignment]context:AgentContext[TDeps]|None=None,session:SessionIdType|SessionQuery|MemoryConfig|bool|int=None,system_prompt:AnyPromptType|Sequence[AnyPromptType]=(),description:str|None=None,tools:Sequence[ToolType|Tool]|None=None,toolsets:Sequence[ResourceProvider]|None=None,mcp_servers:Sequence[str|MCPServerConfig]|None=None,resources:Sequence[PromptType|str]=(),retries:int=1,output_retries:int|None=None,end_strategy:EndStrategy="early",input_provider:InputProvider|None=None,parallel_init:bool=True,debug:bool=False,event_handlers:Sequence[IndividualEventHandler]|None=None,):"""Initialize agent. Args: name: Name of the agent for logging and identification deps_type: Type of dependencies to use model: The default model to use (defaults to GPT-5) output_type: The default output type to use (defaults to str) context: Agent context with configuration session: Memory configuration. - None: Default memory config - False: Disable message history (max_messages=0) - int: Max tokens for memory - str/UUID: Session identifier - MemoryConfig: Full memory configuration - MemoryProvider: Custom memory provider - SessionQuery: Session query system_prompt: System prompts for the agent description: Description of the Agent ("what it can do") tools: List of tools to register with the agent toolsets: List of toolset resource providers for the agent mcp_servers: MCP servers to connect to resources: Additional resources to load retries: Default number of retries for failed operations output_retries: Max retries for result validation (defaults to retries) end_strategy: Strategy for handling tool calls that are requested alongside a final result input_provider: Provider for human input (tool confirmation / HumanProviders) parallel_init: Whether to initialize resources in parallel debug: Whether to enable debug mode event_handlers: Sequence of event handlers to register with the agent """fromllmling_agent.agentimportAgentContextfromllmling_agent.agent.conversationimportConversationManagerfromllmling_agent.agent.interactionsimportInteractionsfromllmling_agent.agent.sys_promptsimportSystemPromptsfromllmling_agent.observabilityimportregistryself.task_manager=TaskManager()self._infinite=False# save some stuff for asnyc initself.deps_type=deps_type# prepare contextctx=contextorAgentContext[TDeps].create_default(name,input_provider=input_provider,)self._context=ctx# TODO: use to_structured with tool_name / description?self._output_type=to_type(output_type,ctx.definition.responses)memory_cfg=(sessionifisinstance(session,MemoryConfig)elseMemoryConfig.from_value(session))# Initialize progress queue before super().__init__()self._progress_queue=asyncio.Queue[ToolCallProgressEvent]()super().__init__(name=name,context=ctx,description=description,enable_logging=memory_cfg.enable,mcp_servers=mcp_servers,progress_handler=create_queuing_progress_handler(self._progress_queue),)# Initialize tool managerself.event_handler=MultiEventHandler[IndividualEventHandler](event_handlers)all_tools=list(toolsor[])self.tools=ToolManager(all_tools)# MCP manager will be initialized in __aenter__ and providers added thereifbuiltin_tools:=ctx.config.get_tool_provider():self.tools.add_provider(builtin_tools)fortoolset_providerintoolsetsor[]:self.tools.add_provider(toolset_provider)# Initialize conversation managerresources=list(resources)ifctx.config.knowledge:resources.extend(ctx.config.knowledge.get_resources())self.conversation=ConversationManager(self,memory_cfg,resources=resources)# Store pydantic-ai configurationifmodelandnotisinstance(model,str):assertisinstance(model,models.Model)self._model=modelself._retries=retriesself._end_strategy:EndStrategy=end_strategyself._output_retries=output_retriesself.skills_registry=SkillsRegistry()ifctxandctx.definition:registry.configure_observability(ctx.definition.observability)# init variablesself._debug=debugself.parallel_init=parallel_initself._name=nameself._background_task:asyncio.Task[Any]|None=Noneself.talk=Interactions(self)# Set up system promptsconfig_prompts=ctx.config.system_promptsifctxelse[]all_prompts:list[AnyPromptType]=list(config_prompts)ifisinstance(system_prompt,list):all_prompts.extend(system_prompt)else:all_prompts.append(system_prompt)self.sys_prompts=SystemPrompts(all_prompts,context=ctx)
@classmethoddeffrom_callback[TResult](cls,callback:ProcessorCallback[TResult],*,name:str|None=None,**kwargs:Any,)->Agent[None,TResult]:"""Create an agent from a processing callback. Args: callback: Function to process messages. Can be: - sync or async - with or without context - must return str for pipeline compatibility name: Optional name for the agent kwargs: Additional arguments for agent """name=nameorcallback.__name__or"processor"model=function_to_model(callback)return_type=get_type_hints(callback).get("return")if(# If async, unwrap from Awaitablereturn_typeandhasattr(return_type,"__origin__")andreturn_type.__origin__isAwaitable):return_type=return_type.__args__[0]returnAgent(model=model,name=name,output_type=return_typeorstr,**kwargs,)
asyncdefget_agentlet(self,tools:list[Tool],model:ModelType=None,output_type:type[Any]=str,deps_type:type[Any]|None=None,input_provider:InputProvider|None=None,):"""Create pydantic-ai agent from current state."""# Monkey patch pydantic-ai to recognize AgentContextfromllmling_agent.agent.tool_wrappingimportwrap_toolactual_model=modelorself._modelmodel_=(infer_model(actual_model)ifisinstance(actual_model,str)elseactual_model)agent=PydanticAgent(name=self.name,model=model_,instructions=awaitself.sys_prompts.format_system_prompt(self),retries=self._retries,end_strategy=self._end_strategy,output_retries=self._output_retries,deps_type=deps_typeorNoneType,output_type=output_type,)# If input_provider override is provided, create modified contextcontext_for_tools=(self.contextifinput_providerisNoneelsereplace(self.context,input_provider=input_provider))fortoolintools:wrapped=wrap_tool(tool,context_for_tools)ifget_argument_key(wrapped,RunContext):logger.info("Registering tool: with context",tool_name=tool.name)agent.tool(wrapped)else:logger.info("Registering tool: no context",tool_name=tool.name)agent.tool_plain(wrapped)returnagent
defregister_worker(self,worker:MessageNode[Any,Any],*,name:str|None=None,reset_history_on_run:bool=True,pass_message_history:bool=False,)->Tool:"""Register another agent as a worker tool."""returnself.tools.register_worker(worker,name=name,reset_history_on_run=reset_history_on_run,pass_message_history=pass_message_history,parent=selfifpass_message_historyelseNone,)
asyncdefreset(self):"""Reset agent state (conversation history and tool states)."""old_tools=awaitself.tools.list_tools()self.conversation.clear()awaitself.tools.reset_states()new_tools=awaitself.tools.list_tools()event=self.AgentReset(agent_name=self.name,previous_tools=old_tools,new_tools=new_tools,)self.agent_reset.emit(event)
asyncdefrun_in_background(self,*prompt:PromptCompatible,max_count:int|None=None,interval:float=1.0,block:bool=False,**kwargs:Any,)->ChatMessage[OutputDataT]|None:"""Run agent continuously in background with prompt or dynamic prompt function. Args: prompt: Static prompt or function that generates prompts max_count: Maximum number of runs (None = infinite) interval: Seconds between runs block: Whether to block until completion **kwargs: Arguments passed to run() """self._infinite=max_countisNoneasyncdef_continuous():count=0self.log.debug("Starting continuous run",max_count=max_count,interval=interval)latest=Nonewhilemax_countisNoneorcount<max_count:try:current_prompts=[call_with_context(p,self.context,**kwargs)ifcallable(p)elsepforpinprompt]self.log.debug("Generated prompt",iteration=count)latest=awaitself.run(current_prompts,**kwargs)self.log.debug("Run continuous result",iteration=count)count+=1awaitasyncio.sleep(interval)exceptasyncio.CancelledError:self.log.debug("Continuous run cancelled")breakexceptException:self.log.exception("Background run failed")awaitasyncio.sleep(interval)self.log.debug("Continuous run completed",iterations=count)returnlatest# Cancel any existing background taskawaitself.stop()task=asyncio.create_task(_continuous(),name=f"background_{self.name}")ifblock:try:returnawaittask# type: ignorefinally:ifnottask.done():task.cancel()else:self.log.debug("Started background task",task_name=task.get_name())self._background_task=taskreturnNone
questions = [
["What is your name?"],
["How old are you?", image1],
["Describe this image", image2],
]
async for response in agent.run_iter(*questions):
print(response.content)
asyncdefrun_iter(self,*prompt_groups:Sequence[PromptCompatible],output_type:type[OutputDataT]|None=None,model:ModelType=None,store_history:bool=True,wait_for_connections:bool|None=None,)->AsyncIterator[ChatMessage[OutputDataT]]:"""Run agent sequentially on multiple prompt groups. Args: prompt_groups: Groups of prompts to process sequentially output_type: Optional type for structured responses model: Optional model override store_history: Whether to store in conversation history wait_for_connections: Whether to wait for connected agents Yields: Response messages in sequence Example: questions = [ ["What is your name?"], ["How old are you?", image1], ["Describe this image", image2], ] async for response in agent.run_iter(*questions): print(response.content) """forpromptsinprompt_groups:response=awaitself.run(*prompts,output_type=output_type,model=model,store_history=store_history,wait_for_connections=wait_for_connections,)yieldresponse# pyright: ignore
@method_spawnerasyncdefrun_job(self,job:Job[TDeps,str|None],*,store_history:bool=True,include_agent_tools:bool=True,)->ChatMessage[OutputDataT]:"""Execute a pre-defined task. Args: job: Job configuration to execute store_history: Whether the message exchange should be added to the context window include_agent_tools: Whether to include agent tools Returns: Job execution result Raises: JobError: If task execution fails ValueError: If task configuration is invalid """ifjob.required_dependencyisnotNone:# noqa: SIM102ifnotisinstance(self.context.data,job.required_dependency):msg=(f"Agent dependencies ({type(self.context.data)}) "f"don't match job requirement ({job.required_dependency})")raiseJobError(msg)# Load task knowledgeifjob.knowledge:# Add knowledge sources to contextforsourceinlist(job.knowledge.paths):awaitself.conversation.load_context_source(source)forpromptinjob.knowledge.prompts:awaitself.conversation.load_context_source(prompt)try:# Register task tools temporarilytools=job.get_tools()asyncwithself.tools.temporary_tools(tools,exclusive=notinclude_agent_tools):# Execute job with job-specific toolsreturnawaitself.run(awaitjob.get_prompt(),store_history=store_history)exceptExceptionase:self.log.exception("Task execution failed",error=str(e))msg=f"Task execution failed: {e}"raiseJobError(msg)frome
@method_spawnerasyncdefrun_stream(self,*prompt:PromptCompatible,output_type:type[OutputDataT]|None=None,model:ModelType=None,tool_choice:str|list[str]|None=None,store_history:bool=True,usage_limits:UsageLimits|None=None,message_id:str|None=None,conversation_id:str|None=None,messages:list[ChatMessage[Any]]|None=None,input_provider:InputProvider|None=None,wait_for_connections:bool|None=None,deps:TDeps|None=None,)->AsyncIterator[RichAgentStreamEvent[OutputDataT]]:"""Run agent with prompt and get a streaming response. Args: prompt: User query or instruction output_type: Optional type for structured responses model: Optional model override tool_choice: Filter tool choice by name store_history: Whether the message exchange should be added to the context window usage_limits: Optional usage limits for the model message_id: Optional message id for the returned message. Automatically generated if not provided. conversation_id: Optional conversation id for the returned message. messages: Optional list of messages to replace the conversation history input_provider: Optional input provider for the agent wait_for_connections: Whether to wait for connected agents to complete deps: Optional dependencies for the agent Returns: An async iterator yielding streaming events with final message embedded. Raises: UnexpectedModelBehavior: If the model fails or behaves unexpectedly """message_id=message_idorstr(uuid4())user_msg,prompts=awaitself.pre_run(*prompt)final_type=to_type(output_type)ifoutput_typeelseself._output_typestart_time=time.perf_counter()tools=awaitself.tools.get_tools(state="enabled",names=tool_choice)message_history=(messagesifmessagesisnotNoneelseself.conversation.get_history())try:agentlet=awaitself.get_agentlet(tools,model,final_type,self.deps_type,input_provider)content=awaitconvert_prompts(prompts)# Initialize variables for final responseresponse_msg=None# Create tool dict for signal emissionconverted=[iifisinstance(i,str)elsei.to_pydantic_ai()foriincontent]stream_events=agentlet.run_stream_events(converted,deps=deps,message_history=[mforruninmessage_historyforminrun.to_pydantic_ai()],output_type=final_typeorstr,usage_limits=usage_limits,)# Stream events through merge_queue for progress eventsasyncwithmerge_queue_into_iterator(stream_events,self._progress_queue)asevents:# Track tool call starts to combine with results laterpending_tcs:dict[str,ToolCallPart]={}asyncforeventinevents:# Process events and emit signalsyieldeventmatchevent:case(PartStartEvent(part=ToolCallPart()astool_part)|FunctionToolCallEvent(part=tool_part)):# Store tool call start info for later combination with resultpending_tcs[tool_part.tool_call_id]=tool_partcase(FunctionToolResultEvent(tool_call_id=call_id)asresult_event):# Check if we have a pending tool call to combine withifcall_info:=pending_tcs.pop(call_id,None):# Create and yield combined eventcombined_event=ToolCallCompleteEvent(tool_name=call_info.tool_name,tool_call_id=call_id,tool_input=call_info.args_as_dict(),tool_result=result_event.result.contentifisinstance(result_event.result,ToolReturnPart)elseresult_event.result,agent_name=self.name,message_id=message_id,)yieldcombined_eventcaseAgentRunResultEvent():# Capture final result data# Build final response messageresponse_time=time.perf_counter()-start_timeresponse_msg=awaitChatMessage.from_run_result(event.result,agent_name=self.name,message_id=message_id,conversation_id=conversation_id,response_time=response_time,)# Send additional enriched completion eventassertresponse_msgyieldStreamCompleteEvent(message=response_msg)self.message_sent.emit(response_msg)awaitself.log_message(response_msg)ifstore_history:self.conversation.add_chat_messages([user_msg,response_msg])awaitself.connections.route_message(response_msg,wait=wait_for_connections,)exceptExceptionase:self.log.exception("Agent stream failed")self.run_failed.emit("Agent stream failed",e)raise
asyncdefshare(self,target:Agent[TDeps,Any],*,tools:list[str]|None=None,history:bool|int|None=None,# bool or number of messagestoken_limit:int|None=None,):"""Share capabilities and knowledge with another agent. Args: target: Agent to share with tools: List of tool names to share history: Share conversation history: - True: Share full history - int: Number of most recent messages to share - None: Don't share history token_limit: Optional max tokens for history Raises: ValueError: If requested items don't exist RuntimeError: If runtime not available for resources """# Share tools if requestedfornameintoolsor[]:iftool:=awaitself.tools.get_tool(name):meta={"shared_from":self.name}target.tools.register_tool(tool.callable,metadata=meta)else:msg=f"Tool not found: {name}"raiseValueError(msg)# Share history if requestedifhistory:history_text=awaitself.conversation.format_history(max_tokens=token_limit,num_messages=historyifisinstance(history,int)elseNone,)target.conversation.add_context_message(history_text,source=self.name,metadata={"type":"shared_history"})
asyncdefstop(self):"""Stop continuous execution if running."""ifself._background_taskandnotself._background_task.done():self._background_task.cancel()awaitself._background_taskself._background_task=None
@asynccontextmanagerasyncdeftemporary_state[T](self,*,system_prompts:list[AnyPromptType]|None=None,output_type:type[T]|None=None,replace_prompts:bool=False,tools:list[ToolType]|None=None,replace_tools:bool=False,history:list[AnyPromptType]|SessionQuery|None=None,replace_history:bool=False,pause_routing:bool=False,model:ModelType|None=None,)->AsyncIterator[Self|Agent[T]]:"""Temporarily modify agent state. Args: system_prompts: Temporary system prompts to use output_type: Temporary output type to use replace_prompts: Whether to replace existing prompts tools: Temporary tools to make available replace_tools: Whether to replace existing tools history: Conversation history (prompts or query) replace_history: Whether to replace existing history pause_routing: Whether to pause message routing model: Temporary model override """old_model=self._modelifoutput_type:old_type=self._output_typeself.to_structured(output_type)asyncwithAsyncExitStack()asstack:ifsystem_promptsisnotNone:# System promptsawaitstack.enter_async_context(self.sys_prompts.temporary_prompt(system_prompts,exclusive=replace_prompts))iftoolsisnotNone:# Toolsawaitstack.enter_async_context(self.tools.temporary_tools(tools,exclusive=replace_tools))ifhistoryisnotNone:# Historyawaitstack.enter_async_context(self.conversation.temporary_state(history,replace_history=replace_history))ifpause_routing:# Routingawaitstack.enter_async_context(self.connections.paused_routing())elifmodelisnotNone:# Modelself._model=modeltry:yieldselffinally:# Restore modelifmodelisnotNoneandold_model:self._model=old_modelifoutput_type:self.to_structured(old_type)
defto_structured[NewOutputDataT](self,output_type:type[NewOutputDataT],*,tool_name:str|None=None,tool_description:str|None=None,)->Agent[TDeps,NewOutputDataT]:"""Convert this agent to a structured agent. Args: output_type: Type for structured responses. Can be: - A Python type (Pydantic model) tool_name: Optional override for result tool name tool_description: Optional override for result tool description Returns: Typed Agent """self.log.debug("Setting result type",output_type=output_type)self._output_type=to_type(output_type)returnself# type: ignore
defto_tool(self,*,name:str|None=None,reset_history_on_run:bool=True,pass_message_history:bool=False,parent:Agent[Any,Any]|None=None,)->Tool[OutputDataT]:"""Create a tool from this agent. Args: name: Optional tool name override reset_history_on_run: Clear agent's history before each run pass_message_history: Pass parent's message history to agent parent: Optional parent agent for history/context sharing """tool_name=nameorf"ask_{self.name}"# Create wrapper function with correct return type annotationoutput_type=self._output_typeorAnyasyncdefwrapped_tool(prompt:str):ifpass_message_historyandnotparent:msg="Parent agent required for message history sharing"raiseToolError(msg)ifreset_history_on_run:self.conversation.clear()history=Noneifpass_message_historyandparent:history=parent.conversation.get_history()old=self.conversation.get_history()self.conversation.set_history(history)result=awaitself.run(prompt)ifhistory:self.conversation.set_history(old)returnresult.data# Set the correct return annotation dynamicallywrapped_tool.__annotations__={"prompt":str,"return":output_type}normalized_name=self.name.replace("_"," ").title()docstring=f"Get expert answer from specialized agent: {normalized_name}"ifself.description:docstring=f"{docstring}\n\n{self.description}"wrapped_tool.__doc__=docstringwrapped_tool.__name__=tool_namereturnTool.from_callable(wrapped_tool,name_override=tool_name,description_override=docstring,source="agent",)
asyncdefwait(self)->ChatMessage[OutputDataT]:"""Wait for background execution to complete."""ifnotself._background_task:msg="No background task running"raiseRuntimeError(msg)ifself._infinite:msg="Cannot wait on infinite execution"raiseRuntimeError(msg)try:returnawaitself._background_taskfinally:self._background_task=None
@dataclass(kw_only=True)classAgentContext[TDeps=Any](NodeContext[TDeps]):"""Runtime context for agent execution. Generically typed with AgentContext[Type of Dependencies] """config:AgentConfig"""Current agent's specific configuration."""model_settings:dict[str,Any]=field(default_factory=dict)"""Model-specific settings."""data:TDeps|None=None"""Custom context data."""tool_name:str|None=None"""Name of the currently executing tool."""tool_call_id:str|None=None"""ID of the current tool call."""tool_input:dict[str,Any]=field(default_factory=dict)"""Input arguments for the current tool call."""@classmethoddefcreate_default(cls,name:str,deps:TDeps|None=None,pool:AgentPool|None=None,input_provider:InputProvider|None=None,)->AgentContext[TDeps]:"""Create a default agent context with minimal privileges. Args: name: Name of the agent deps: Optional dependencies for the agent pool: Optional pool the agent is part of input_provider: Optional input provider for the agent """fromllmling_agent.modelsimportAgentConfig,AgentsManifestdefn=AgentsManifest()cfg=AgentConfig(name=name)returncls(input_provider=input_provider,node_name=name,definition=defn,config=cfg,data=deps,pool=pool,)@cached_propertydefconverter(self)->ConversionManager:"""Get conversion manager from global config."""returnConversionManager(self.definition.conversion)# TODO: perhaps add agent directly to context?@propertydefagent(self)->Agent[TDeps,Any]:"""Get the agent instance from the pool."""assertself.pool,"No agent pool available"assertself.node_name,"No agent name available"returnself.pool.agents[self.node_name]@propertydefprocess_manager(self):"""Get process manager from pool."""assertself.pool,"No agent pool available"returnself.pool.process_managerasyncdefhandle_confirmation(self,tool:Tool,args:dict[str,Any],)->ConfirmationResult:"""Handle tool execution confirmation. Returns True if: - No confirmation handler is set - Handler confirms the execution """provider=self.get_input_provider()mode=self.config.requires_tool_confirmationif(mode=="per_tool"andnottool.requires_confirmation)ormode=="never":return"allow"history=self.agent.conversation.get_history()ifself.poolelse[]returnawaitprovider.get_tool_confirmation(self,tool,args,history)asyncdefhandle_elicitation(self,params:types.ElicitRequestParams,)->types.ElicitResult|types.ErrorData:"""Handle elicitation request for additional information."""provider=self.get_input_provider()history=self.agent.conversation.get_history()ifself.poolelse[]returnawaitprovider.get_elicitation(self,params,history)asyncdefreport_progress(self,progress:float,total:float|None,message:str):"""Access progress reporting from pool server if available."""logger.info("Reporting tool call progress",progress=progress,total=total,message=message,)ifself.pool:awaitself.pool.progress_handlers(progress,total,message)
@classmethoddefcreate_default(cls,name:str,deps:TDeps|None=None,pool:AgentPool|None=None,input_provider:InputProvider|None=None,)->AgentContext[TDeps]:"""Create a default agent context with minimal privileges. Args: name: Name of the agent deps: Optional dependencies for the agent pool: Optional pool the agent is part of input_provider: Optional input provider for the agent """fromllmling_agent.modelsimportAgentConfig,AgentsManifestdefn=AgentsManifest()cfg=AgentConfig(name=name)returncls(input_provider=input_provider,node_name=name,definition=defn,config=cfg,data=deps,pool=pool,)
asyncdefhandle_confirmation(self,tool:Tool,args:dict[str,Any],)->ConfirmationResult:"""Handle tool execution confirmation. Returns True if: - No confirmation handler is set - Handler confirms the execution """provider=self.get_input_provider()mode=self.config.requires_tool_confirmationif(mode=="per_tool"andnottool.requires_confirmation)ormode=="never":return"allow"history=self.agent.conversation.get_history()ifself.poolelse[]returnawaitprovider.get_tool_confirmation(self,tool,args,history)
Access progress reporting from pool server if available.
Source code in src/llmling_agent/agent/context.py
129130131132133134135136137138
asyncdefreport_progress(self,progress:float,total:float|None,message:str):"""Access progress reporting from pool server if available."""logger.info("Reporting tool call progress",progress=progress,total=total,message=message,)ifself.pool:awaitself.pool.progress_handlers(progress,total,message)
classConversationManager:"""Manages conversation state and system prompts."""@dataclass(frozen=True)classHistoryCleared:"""Emitted when chat history is cleared."""session_id:strtimestamp:datetime=field(default_factory=get_now)history_cleared=Signal(HistoryCleared)def__init__(self,agent:Agent[Any,Any],session_config:MemoryConfig|None=None,*,resources:Sequence[PromptType|str]=(),):"""Initialize conversation manager. Args: agent: instance to manage session_config: Optional MemoryConfig resources: Optional paths to load as context """self._agent=agentself.chat_messages=ChatMessageContainer()self._last_messages:list[ChatMessage]=[]self._pending_messages:deque[ChatMessage]=deque()self._config=session_configself._resources=list(resources)# Store for async loading# Generate new ID if none providedself.id=str(uuid4())ifsession_configisnotNoneandsession_config.sessionisnotNone:storage=self._agent.context.storageself._current_history=storage.filter_messages.sync(session_config.session)ifsession_config.session.name:self.id=session_config.session.name# Note: max_messages and max_tokens will be handled in add_message/get_history# to maintain the rolling window during conversationdefget_initialization_tasks(self)->list[Coroutine[Any,Any,Any]]:"""Get all initialization coroutines."""self._resources=[]# Clear so we dont load again on async initreturn[self.load_context_source(source)forsourceinself._resources]asyncdef__aenter__(self)->Self:"""Initialize when used standalone."""iftasks:=self.get_initialization_tasks():awaitasyncio.gather(*tasks)returnselfasyncdef__aexit__(self,exc_type:type[BaseException]|None,exc_val:BaseException|None,exc_tb:TracebackType|None,):"""Clean up any pending messages."""self._pending_messages.clear()def__bool__(self)->bool:returnbool(self._pending_messages)orbool(self.chat_messages)def__repr__(self)->str:returnf"ConversationManager(id={self.id!r})"def__prompt__(self)->str:ifnotself.chat_messages:return"No conversation history"last_msgs=self.chat_messages[-2:]parts=["Recent conversation:"]parts.extend(msg.format()formsginlast_msgs)return"\n".join(parts)def__contains__(self,item:Any)->bool:"""Check if item is in history."""returniteminself.chat_messagesdef__len__(self)->int:"""Get length of history."""returnlen(self.chat_messages)defget_message_tokens(self,message:ChatMessage)->int:"""Get token count for a single message."""content="\n".join(message.format())returncount_tokens(content,message.model_name)asyncdefformat_history(self,*,max_tokens:int|None=None,include_system:bool=False,format_template:str|None=None,num_messages:int|None=None,# Add this parameter)->str:"""Format conversation history as a single context message. Args: max_tokens: Optional limit to include only last N tokens include_system: Whether to include system messages format_template: Optional custom format (defaults to agent/message pairs) num_messages: Optional limit to include only last N messages """template=format_templateor"Agent {agent}: {content}\n"messages:list[str]=[]token_count=0# Get messages, optionally limitedhistory:Sequence[ChatMessage[Any]]=self.chat_messagesifnum_messages:history=history[-num_messages:]ifmax_tokens:history=list(reversed(history))# Start from newest when token limitedformsginhistory:# Check role directly from ChatMessageifnotinclude_systemandmsg.role=="system":continuename=msg.nameormsg.role.title()formatted=template.format(agent=name,content=str(msg.content))ifmax_tokens:# Count tokens in this messageifmsg.cost_info:msg_tokens=msg.cost_info.token_usage.total_tokenselse:# Fallback to tiktoken if no cost infomsg_tokens=self.get_message_tokens(msg)iftoken_count+msg_tokens>max_tokens:breaktoken_count+=msg_tokens# Add to front since we're going backwardsmessages.insert(0,formatted)else:messages.append(formatted)return"\n".join(messages)asyncdefload_context_source(self,source:PromptType|str):"""Load context from a single source."""try:matchsource:casestr():awaitself.add_context_from_path(source)caseBasePrompt():awaitself.add_context_from_prompt(source)exceptException:logger.exception("Failed to load context",source="file"ifisinstance(source,str)elsesource.type,)defload_history_from_database(self,session:SessionIdType|SessionQuery=None,*,since:datetime|None=None,until:datetime|None=None,roles:set[MessageRole]|None=None,limit:int|None=None,):"""Load conversation history from database. Args: session: Session ID or query config since: Only include messages after this time (override) until: Only include messages before this time (override) roles: Only include messages with these roles (override) limit: Maximum number of messages to return (override) """storage=self._agent.context.storagematchsession:caseSessionQuery()asquery:# Override query params if providedifsinceisnotNoneoruntilisnotNoneorrolesorlimit:update={"since":since.isoformat()ifsinceelseNone,"until":until.isoformat()ifuntilelseNone,"roles":roles,"limit":limit,}query=query.model_copy(update=update)ifquery.name:self.id=query.namecasestr()|UUID():self.id=str(session)query=SessionQuery(name=self.id,since=since.isoformat()ifsinceelseNone,until=until.isoformat()ifuntilelseNone,roles=roles,limit=limit,)caseNone:# Use current session IDquery=SessionQuery(name=self.id,since=since.isoformat()ifsinceelseNone,until=until.isoformat()ifuntilelseNone,roles=roles,limit=limit,)case_asunreachable:assert_never(unreachable)self.chat_messages.clear()self.chat_messages.extend(storage.filter_messages.sync(query))defget_history(self,include_pending:bool=True,do_filter:bool=True,)->list[ChatMessage]:"""Get conversation history. Args: include_pending: Whether to include pending messages do_filter: Whether to apply memory config limits (max_tokens, max_messages) Returns: Filtered list of messages in chronological order """ifinclude_pendingandself._pending_messages:self.chat_messages.extend(self._pending_messages)self._pending_messages.clear()# 2. Start with original historyhistory:Sequence[ChatMessage[Any]]=self.chat_messages# 3. Only filter if neededifdo_filterandself._config:# First filter by message count (simple slice)ifself._config.max_messages:history=history[-self._config.max_messages:]# Then filter by tokens if neededifself._config.max_tokens:token_count=0filtered=[]# Collect messages from newest to oldest until we hit the limitformsginreversed(history):msg_tokens=self.get_message_tokens(msg)iftoken_count+msg_tokens>self._config.max_tokens:breaktoken_count+=msg_tokensfiltered.append(msg)history=list(reversed(filtered))returnlist(history)defget_pending_messages(self)->list[ChatMessage]:"""Get messages that will be included in next interaction."""returnlist(self._pending_messages)defclear_pending(self):"""Clear pending messages without adding them to history."""self._pending_messages.clear()defset_history(self,history:list[ChatMessage]):"""Update conversation history after run."""self.chat_messages.clear()self.chat_messages.extend(history)defclear(self):"""Clear conversation history and prompts."""self.chat_messages=ChatMessageContainer()self._last_messages=[]event=self.HistoryCleared(session_id=str(self.id))self.history_cleared.emit(event)@asynccontextmanagerasyncdeftemporary_state(self,history:list[AnyPromptType]|SessionQuery|None=None,*,replace_history:bool=False,)->AsyncIterator[Self]:"""Temporarily set conversation history. Args: history: Optional list of prompts to use as temporary history. Can be strings, BasePrompts, or other prompt types. replace_history: If True, only use provided history. If False, append to existing history. """fromtopromptimportto_promptold_history=self.chat_messages.copy()try:messages:Sequence[ChatMessage[Any]]=ChatMessageContainer()ifhistoryisnotNone:ifisinstance(history,SessionQuery):messages=awaitself._agent.context.storage.filter_messages(history)else:messages=[ChatMessage.user_prompt(message=prompt)forpinhistoryif(prompt:=awaitto_prompt(p))]ifreplace_history:self.chat_messages=ChatMessageContainer(messages)else:self.chat_messages.extend(messages)yieldselffinally:self.chat_messages=old_historydefadd_chat_messages(self,messages:Sequence[ChatMessage]):"""Add new messages to history and update last_messages."""self._last_messages=list(messages)self.chat_messages.extend(messages)@propertydeflast_run_messages(self)->list[ChatMessage]:"""Get messages from the last run converted to our format."""returnself._last_messagesdefadd_context_message(self,content:str,source:str|None=None,**metadata:Any,):"""Add a context message. Args: content: Text content to add source: Description of content source **metadata: Additional metadata to include with the message """meta_str=""ifmetadata:meta_str="\n".join(f"{k}: {v}"fork,vinmetadata.items())meta_str=f"\nMetadata:\n{meta_str}\n"header=f"Content from {source}:"ifsourceelse"Additional context:"formatted=f"{header}{meta_str}\n{content}\n"chat_message=ChatMessage(content=formatted,role="user",name="user",metadata=metadata,conversation_id="context",# TODO: should probably allow DB field to be NULL)self._pending_messages.append(chat_message)asyncdefadd_context_from_path(self,path:JoinablePathLike,*,convert_to_md:bool=False,**metadata:Any,):"""Add file or URL content as context message. Args: path: Any UPath-supported path convert_to_md: Whether to convert content to markdown **metadata: Additional metadata to include with the message Raises: ValueError: If content cannot be loaded or converted """path_obj=to_upath(path)ifconvert_to_md:content=awaitself._agent.context.converter.convert_file(path)source=f"markdown:{path_obj.name}"else:content=awaitread_path(path)source=f"{path_obj.protocol}:{path_obj.name}"self.add_context_message(content,source=source,**metadata)asyncdefadd_context_from_prompt(self,prompt:PromptType,metadata:dict[str,Any]|None=None,**kwargs:Any,):"""Add rendered prompt content as context message. Args: prompt: LLMling prompt (static, dynamic, or file-based) metadata: Additional metadata to include with the message kwargs: Optional kwargs for prompt formatting """try:# Format the prompt using LLMling's prompt systemmessages=awaitprompt.format(kwargs)# Extract text content from all messagescontent="\n\n".join(msg.get_text_content()formsginmessages)self.add_context_message(content,source=f"prompt:{prompt.nameorprompt.type}",prompt_args=kwargs,**(metadataor{}),)exceptExceptionase:msg=f"Failed to format prompt: {e}"raiseValueError(msg)fromedefget_history_tokens(self)->int:"""Get token count for current history."""# Use cost_info if availablereturnself.chat_messages.get_history_tokens()
Source code in src/llmling_agent/agent/conversation.py
93 94 95 96 97 98 99100
asyncdef__aexit__(self,exc_type:type[BaseException]|None,exc_val:BaseException|None,exc_tb:TracebackType|None,):"""Clean up any pending messages."""self._pending_messages.clear()
def__init__(self,agent:Agent[Any,Any],session_config:MemoryConfig|None=None,*,resources:Sequence[PromptType|str]=(),):"""Initialize conversation manager. Args: agent: instance to manage session_config: Optional MemoryConfig resources: Optional paths to load as context """self._agent=agentself.chat_messages=ChatMessageContainer()self._last_messages:list[ChatMessage]=[]self._pending_messages:deque[ChatMessage]=deque()self._config=session_configself._resources=list(resources)# Store for async loading# Generate new ID if none providedself.id=str(uuid4())ifsession_configisnotNoneandsession_config.sessionisnotNone:storage=self._agent.context.storageself._current_history=storage.filter_messages.sync(session_config.session)ifsession_config.session.name:self.id=session_config.session.name
Add new messages to history and update last_messages.
Source code in src/llmling_agent/agent/conversation.py
355356357358
defadd_chat_messages(self,messages:Sequence[ChatMessage]):"""Add new messages to history and update last_messages."""self._last_messages=list(messages)self.chat_messages.extend(messages)
asyncdefadd_context_from_path(self,path:JoinablePathLike,*,convert_to_md:bool=False,**metadata:Any,):"""Add file or URL content as context message. Args: path: Any UPath-supported path convert_to_md: Whether to convert content to markdown **metadata: Additional metadata to include with the message Raises: ValueError: If content cannot be loaded or converted """path_obj=to_upath(path)ifconvert_to_md:content=awaitself._agent.context.converter.convert_file(path)source=f"markdown:{path_obj.name}"else:content=awaitread_path(path)source=f"{path_obj.protocol}:{path_obj.name}"self.add_context_message(content,source=source,**metadata)
asyncdefadd_context_from_prompt(self,prompt:PromptType,metadata:dict[str,Any]|None=None,**kwargs:Any,):"""Add rendered prompt content as context message. Args: prompt: LLMling prompt (static, dynamic, or file-based) metadata: Additional metadata to include with the message kwargs: Optional kwargs for prompt formatting """try:# Format the prompt using LLMling's prompt systemmessages=awaitprompt.format(kwargs)# Extract text content from all messagescontent="\n\n".join(msg.get_text_content()formsginmessages)self.add_context_message(content,source=f"prompt:{prompt.nameorprompt.type}",prompt_args=kwargs,**(metadataor{}),)exceptExceptionase:msg=f"Failed to format prompt: {e}"raiseValueError(msg)frome
defadd_context_message(self,content:str,source:str|None=None,**metadata:Any,):"""Add a context message. Args: content: Text content to add source: Description of content source **metadata: Additional metadata to include with the message """meta_str=""ifmetadata:meta_str="\n".join(f"{k}: {v}"fork,vinmetadata.items())meta_str=f"\nMetadata:\n{meta_str}\n"header=f"Content from {source}:"ifsourceelse"Additional context:"formatted=f"{header}{meta_str}\n{content}\n"chat_message=ChatMessage(content=formatted,role="user",name="user",metadata=metadata,conversation_id="context",# TODO: should probably allow DB field to be NULL)self._pending_messages.append(chat_message)
Source code in src/llmling_agent/agent/conversation.py
307308309310311312
defclear(self):"""Clear conversation history and prompts."""self.chat_messages=ChatMessageContainer()self._last_messages=[]event=self.HistoryCleared(session_id=str(self.id))self.history_cleared.emit(event)
asyncdefformat_history(self,*,max_tokens:int|None=None,include_system:bool=False,format_template:str|None=None,num_messages:int|None=None,# Add this parameter)->str:"""Format conversation history as a single context message. Args: max_tokens: Optional limit to include only last N tokens include_system: Whether to include system messages format_template: Optional custom format (defaults to agent/message pairs) num_messages: Optional limit to include only last N messages """template=format_templateor"Agent {agent}: {content}\n"messages:list[str]=[]token_count=0# Get messages, optionally limitedhistory:Sequence[ChatMessage[Any]]=self.chat_messagesifnum_messages:history=history[-num_messages:]ifmax_tokens:history=list(reversed(history))# Start from newest when token limitedformsginhistory:# Check role directly from ChatMessageifnotinclude_systemandmsg.role=="system":continuename=msg.nameormsg.role.title()formatted=template.format(agent=name,content=str(msg.content))ifmax_tokens:# Count tokens in this messageifmsg.cost_info:msg_tokens=msg.cost_info.token_usage.total_tokenselse:# Fallback to tiktoken if no cost infomsg_tokens=self.get_message_tokens(msg)iftoken_count+msg_tokens>max_tokens:breaktoken_count+=msg_tokens# Add to front since we're going backwardsmessages.insert(0,formatted)else:messages.append(formatted)return"\n".join(messages)
defget_history(self,include_pending:bool=True,do_filter:bool=True,)->list[ChatMessage]:"""Get conversation history. Args: include_pending: Whether to include pending messages do_filter: Whether to apply memory config limits (max_tokens, max_messages) Returns: Filtered list of messages in chronological order """ifinclude_pendingandself._pending_messages:self.chat_messages.extend(self._pending_messages)self._pending_messages.clear()# 2. Start with original historyhistory:Sequence[ChatMessage[Any]]=self.chat_messages# 3. Only filter if neededifdo_filterandself._config:# First filter by message count (simple slice)ifself._config.max_messages:history=history[-self._config.max_messages:]# Then filter by tokens if neededifself._config.max_tokens:token_count=0filtered=[]# Collect messages from newest to oldest until we hit the limitformsginreversed(history):msg_tokens=self.get_message_tokens(msg)iftoken_count+msg_tokens>self._config.max_tokens:breaktoken_count+=msg_tokensfiltered.append(msg)history=list(reversed(filtered))returnlist(history)
Source code in src/llmling_agent/agent/conversation.py
82838485
defget_initialization_tasks(self)->list[Coroutine[Any,Any,Any]]:"""Get all initialization coroutines."""self._resources=[]# Clear so we dont load again on async initreturn[self.load_context_source(source)forsourceinself._resources]
Source code in src/llmling_agent/agent/conversation.py
125126127128
defget_message_tokens(self,message:ChatMessage)->int:"""Get token count for a single message."""content="\n".join(message.format())returncount_tokens(content,message.model_name)
Source code in src/llmling_agent/agent/conversation.py
183184185186187188189190191192193194195
asyncdefload_context_source(self,source:PromptType|str):"""Load context from a single source."""try:matchsource:casestr():awaitself.add_context_from_path(source)caseBasePrompt():awaitself.add_context_from_prompt(source)exceptException:logger.exception("Failed to load context",source="file"ifisinstance(source,str)elsesource.type,)
defload_history_from_database(self,session:SessionIdType|SessionQuery=None,*,since:datetime|None=None,until:datetime|None=None,roles:set[MessageRole]|None=None,limit:int|None=None,):"""Load conversation history from database. Args: session: Session ID or query config since: Only include messages after this time (override) until: Only include messages before this time (override) roles: Only include messages with these roles (override) limit: Maximum number of messages to return (override) """storage=self._agent.context.storagematchsession:caseSessionQuery()asquery:# Override query params if providedifsinceisnotNoneoruntilisnotNoneorrolesorlimit:update={"since":since.isoformat()ifsinceelseNone,"until":until.isoformat()ifuntilelseNone,"roles":roles,"limit":limit,}query=query.model_copy(update=update)ifquery.name:self.id=query.namecasestr()|UUID():self.id=str(session)query=SessionQuery(name=self.id,since=since.isoformat()ifsinceelseNone,until=until.isoformat()ifuntilelseNone,roles=roles,limit=limit,)caseNone:# Use current session IDquery=SessionQuery(name=self.id,since=since.isoformat()ifsinceelseNone,until=until.isoformat()ifuntilelseNone,roles=roles,limit=limit,)case_asunreachable:assert_never(unreachable)self.chat_messages.clear()self.chat_messages.extend(storage.filter_messages.sync(query))
Source code in src/llmling_agent/agent/conversation.py
302303304305
defset_history(self,history:list[ChatMessage]):"""Update conversation history after run."""self.chat_messages.clear()self.chat_messages.extend(history)
@asynccontextmanagerasyncdeftemporary_state(self,history:list[AnyPromptType]|SessionQuery|None=None,*,replace_history:bool=False,)->AsyncIterator[Self]:"""Temporarily set conversation history. Args: history: Optional list of prompts to use as temporary history. Can be strings, BasePrompts, or other prompt types. replace_history: If True, only use provided history. If False, append to existing history. """fromtopromptimportto_promptold_history=self.chat_messages.copy()try:messages:Sequence[ChatMessage[Any]]=ChatMessageContainer()ifhistoryisnotNone:ifisinstance(history,SessionQuery):messages=awaitself._agent.context.storage.filter_messages(history)else:messages=[ChatMessage.user_prompt(message=prompt)forpinhistoryif(prompt:=awaitto_prompt(p))]ifreplace_history:self.chat_messages=ChatMessageContainer(messages)else:self.chat_messages.extend(messages)yieldselffinally:self.chat_messages=old_history
classInteractions[TDeps,TResult]:"""Manages agent communication patterns."""def__init__(self,agent:Agent[TDeps,TResult]):self.agent=agentasyncdefconversation(self,other:MessageNode[Any,Any],initial_message:AnyPromptType,*,max_rounds:int|None=None,end_condition:Callable[[list[ChatMessage[Any]],ChatMessage[Any]],bool]|None=None,store_history:bool=True,)->AsyncIterator[ChatMessage[Any]]:"""Maintain conversation between two agents. Args: other: Agent to converse with initial_message: Message to start conversation with max_rounds: Optional maximum number of exchanges end_condition: Optional predicate to check for conversation end store_history: Whether to store in conversation history Yields: Messages from both agents in conversation order """rounds=0messages:list[ChatMessage[Any]]=[]current_message=initial_messagecurrent_node:MessageNode[Any,Any]=self.agentwhileTrue:ifmax_roundsandrounds>=max_rounds:logger.debug("Conversation ended",max_rounds=max_rounds)returnresponse=awaitcurrent_node.run(current_message,store_history=store_history)messages.append(response)yieldresponseifend_conditionandend_condition(messages,response):logger.debug("Conversation ended: end condition met")return# Switch agents for next roundcurrent_node=otherifcurrent_node==self.agentelseself.agentcurrent_message=response.contentrounds+=1@overloadasyncdefpick[T:AnyPromptType](self,selections:Sequence[T],task:str,prompt:AnyPromptType|None=None,)->Pick[T]:...@overloadasyncdefpick[T:AnyPromptType](self,selections:Sequence[T],task:str,prompt:AnyPromptType|None=None,)->Pick[T]:...@overloadasyncdefpick[T:AnyPromptType](self,selections:Mapping[str,T],task:str,prompt:AnyPromptType|None=None,)->Pick[T]:...@overloadasyncdefpick(self,selections:AgentPool,task:str,prompt:AnyPromptType|None=None,)->Pick[Agent[Any,Any]]:...@overloadasyncdefpick(self,selections:BaseTeam[TDeps,Any],task:str,prompt:AnyPromptType|None=None,)->Pick[MessageNode[TDeps,Any]]:...asyncdefpick[T](self,selections:Sequence[T]|Mapping[str,T]|AgentPool|BaseTeam[TDeps,Any],task:str,prompt:AnyPromptType|None=None,)->Pick[T]:"""Pick from available options with reasoning. Args: selections: What to pick from: - Sequence of items (auto-labeled) - Dict mapping labels to items - AgentPool - Team task: Task/decision description prompt: Optional custom selection prompt Returns: Decision with selected item and reasoning Raises: ValueError: If no choices available or invalid selection """# Get items and create label mappingfromtopromptimportto_promptfromllmling_agentimportAgentPoolfromllmling_agent.delegation.base_teamimportBaseTeammatchselections:casedict():label_map=selectionsitems:list[Any]=list(selections.values())caseBaseTeam():items=list(selections.agents)label_map={get_label(item):itemforiteminitems}caseAgentPool():items=list(selections.agents.values())label_map={get_label(item):itemforiteminitems}case_:items=list(selections)label_map={get_label(item):itemforiteminitems}ifnotitems:msg="No choices available"raiseValueError(msg)# Get descriptions for all itemsdescriptions=[]forlabel,iteminlabel_map.items():item_desc=awaitto_prompt(item)descriptions.append(f"{label}:\n{item_desc}")default_prompt=f"""Task/Decision: {task}Available options:{"-"*40}{"\n\n".join(descriptions)}{"-"*40}Select ONE option by its exact label."""# Get LLM's string-based decisionresult=awaitself.agent.run(promptordefault_prompt,output_type=LLMPick)# Convert to type-safe decisionifresult.content.selectionnotinlabel_map:msg=f"Invalid selection: {result.content.selection}"raiseValueError(msg)selected=cast(T,label_map[result.content.selection])returnPick(selection=selected,reason=result.content.reason)@overloadasyncdefpick_multiple[T:AnyPromptType](self,selections:Sequence[T],task:str,*,min_picks:int=1,max_picks:int|None=None,prompt:AnyPromptType|None=None,)->MultiPick[T]:...@overloadasyncdefpick_multiple[T:AnyPromptType](self,selections:Mapping[str,T],task:str,*,min_picks:int=1,max_picks:int|None=None,prompt:AnyPromptType|None=None,)->MultiPick[T]:...@overloadasyncdefpick_multiple(self,selections:BaseTeam[TDeps,Any],task:str,*,min_picks:int=1,max_picks:int|None=None,prompt:AnyPromptType|None=None,)->MultiPick[MessageNode[TDeps,Any]]:...@overloadasyncdefpick_multiple(self,selections:AgentPool,task:str,*,min_picks:int=1,max_picks:int|None=None,prompt:AnyPromptType|None=None,)->MultiPick[Agent[Any,Any]]:...asyncdefpick_multiple[T](self,selections:Sequence[T]|Mapping[str,T]|AgentPool|BaseTeam[TDeps,Any],task:str,*,min_picks:int=1,max_picks:int|None=None,prompt:AnyPromptType|None=None,)->MultiPick[T]:"""Pick multiple options from available choices. Args: selections: What to pick from task: Task/decision description min_picks: Minimum number of selections required max_picks: Maximum number of selections (None for unlimited) prompt: Optional custom selection prompt """fromtopromptimportto_promptfromllmling_agentimportAgentPoolfromllmling_agent.delegation.base_teamimportBaseTeammatchselections:caseMapping():label_map=selectionsitems:list[Any]=list(selections.values())caseBaseTeam():items=list(selections.agents)label_map={get_label(item):itemforiteminitems}caseAgentPool():items=list(selections.agents.values())label_map={get_label(item):itemforiteminitems}case_:items=list(selections)label_map={get_label(item):itemforiteminitems}ifnotitems:msg="No choices available"raiseValueError(msg)ifmax_picksisnotNoneandmax_picks<min_picks:msg=f"max_picks ({max_picks}) cannot be less than min_picks ({min_picks})"raiseValueError(msg)descriptions=[]forlabel,iteminlabel_map.items():item_desc=awaitto_prompt(item)descriptions.append(f"{label}:\n{item_desc}")picks_info=(f"Select between {min_picks} and {max_picks}"ifmax_picksisnotNoneelsef"Select at least {min_picks}")default_prompt=f"""Task/Decision: {task}Available options:{"-"*40}{"\n\n".join(descriptions)}{"-"*40}{picks_info} options by their exact labels.List your selections, one per line, followed by your reasoning."""result=awaitself.agent.run(promptordefault_prompt,output_type=LLMMultiPick)# Validate selectionsinvalid=[sforsinresult.content.selectionsifsnotinlabel_map]ifinvalid:msg=f"Invalid selections: {', '.join(invalid)}"raiseValueError(msg)num_picks=len(result.content.selections)ifnum_picks<min_picks:msg=f"Too few selections: got {num_picks}, need {min_picks}"raiseValueError(msg)ifmax_picksandnum_picks>max_picks:msg=f"Too many selections: got {num_picks}, max {max_picks}"raiseValueError(msg)selected=[cast(T,label_map[label])forlabelinresult.content.selections]returnMultiPick(selections=selected,reason=result.content.reason)asyncdefextract[T](self,text:str,as_type:type[T],*,mode:ExtractionMode="structured",prompt:AnyPromptType|None=None,include_tools:bool=False,)->T:"""Extract single instance of type from text. Args: text: Text to extract from as_type: Type to extract mode: Extraction approach: - "structured": Use Pydantic models (more robust) - "tool_calls": Use tool calls (more flexible) prompt: Optional custom prompt include_tools: Whether to include other tools (tool_calls mode only) """# Create model for single instanceitem_model=Schema.for_class_ctor(as_type)# Create extraction promptfinal_prompt=promptorf"Extract {as_type.__name__} from: {text}"schema_obj=create_constructor_schema(as_type)schema=schema_obj.model_dump_openai()["function"]ifmode=="structured":classExtraction(Schema):instance:item_model# type: ignore# explanation: str | None = Noneresult=awaitself.agent.run(final_prompt,output_type=Extraction)# Convert model instance to actual typereturnas_type(**result.content.instance.model_dump())# type: ignore# Legacy tool-calls approachasyncdefconstruct(**kwargs:Any)->T:"""Construct instance from extracted data."""returnas_type(**kwargs)tool=Tool.from_callable(construct,name_override=schema["name"],description_override=schema["description"],# schema_override=schema,)asyncwithself.agent.tools.temporary_tools(tool,exclusive=notinclude_tools):result=awaitself.agent.run(final_prompt,output_type=item_model)# type: ignorereturnresult.content# type: ignoreasyncdefextract_multiple[T](self,text:str,as_type:type[T],*,mode:ExtractionMode="structured",min_items:int=1,max_items:int|None=None,prompt:AnyPromptType|None=None,include_tools:bool=False,)->list[T]:"""Extract multiple instances of type from text. Args: text: Text to extract from as_type: Type to extract mode: Extraction approach: - "structured": Use Pydantic models (more robust) - "tool_calls": Use tool calls (more flexible) min_items: Minimum number of instances to extract max_items: Maximum number of instances (None=unlimited) prompt: Optional custom prompt include_tools: Whether to include other tools (tool_calls mode only) """item_model=Schema.for_class_ctor(as_type)instances:list[T]=[]schema_obj=create_constructor_schema(as_type)final_prompt=promptor"\n".join([f"Extract {as_type.__name__} instances from text.",# "Requirements:",# f"- Extract at least {min_items} instances",# f"- Extract at most {max_items} instances" if max_items else "","\nText to analyze:",text,])ifmode=="structured":# Create model for individual instanceclassExtraction(Schema):instances:list[item_model]# type: ignore# explanation: str | None = Noneresult=awaitself.agent.run(final_prompt,output_type=Extraction)# Validate countsnum_instances=len(result.content.instances)iflen(result.content.instances)<min_items:msg=f"Found only {num_instances} instances, need {min_items}"raiseValueError(msg)ifmax_itemsandnum_instances>max_items:msg=f"Found {num_instances} instances, max is {max_items}"raiseValueError(msg)# Convert model instances to actual typereturn[as_type(**instance.data# type: ignoreifhasattr(instance,"data")elseinstance.model_dump()# type: ignore)forinstanceinresult.content.instances]# Legacy tool-calls approachasyncdefadd_instance(**kwargs:Any)->str:"""Add an extracted instance."""ifmax_itemsandlen(instances)>=max_items:msg=f"Maximum number of items ({max_items}) reached"raiseValueError(msg)instance=as_type(**kwargs)instances.append(instance)returnf"Added {instance}"add_instance.__annotations__=schema_obj.get_annotations()add_instance.__signature__=schema_obj.to_python_signature()# type: ignoreasyncwithself.agent.temporary_state(tools=[add_instance],replace_tools=notinclude_tools,output_type=item_model,):awaitself.agent.run(final_prompt)# Create extraction promptiflen(instances)<min_items:msg=f"Found only {len(instances)} instances, need at least {min_items}"raiseValueError(msg)returninstances
asyncdefconversation(self,other:MessageNode[Any,Any],initial_message:AnyPromptType,*,max_rounds:int|None=None,end_condition:Callable[[list[ChatMessage[Any]],ChatMessage[Any]],bool]|None=None,store_history:bool=True,)->AsyncIterator[ChatMessage[Any]]:"""Maintain conversation between two agents. Args: other: Agent to converse with initial_message: Message to start conversation with max_rounds: Optional maximum number of exchanges end_condition: Optional predicate to check for conversation end store_history: Whether to store in conversation history Yields: Messages from both agents in conversation order """rounds=0messages:list[ChatMessage[Any]]=[]current_message=initial_messagecurrent_node:MessageNode[Any,Any]=self.agentwhileTrue:ifmax_roundsandrounds>=max_rounds:logger.debug("Conversation ended",max_rounds=max_rounds)returnresponse=awaitcurrent_node.run(current_message,store_history=store_history)messages.append(response)yieldresponseifend_conditionandend_condition(messages,response):logger.debug("Conversation ended: end condition met")return# Switch agents for next roundcurrent_node=otherifcurrent_node==self.agentelseself.agentcurrent_message=response.contentrounds+=1
asyncdefextract[T](self,text:str,as_type:type[T],*,mode:ExtractionMode="structured",prompt:AnyPromptType|None=None,include_tools:bool=False,)->T:"""Extract single instance of type from text. Args: text: Text to extract from as_type: Type to extract mode: Extraction approach: - "structured": Use Pydantic models (more robust) - "tool_calls": Use tool calls (more flexible) prompt: Optional custom prompt include_tools: Whether to include other tools (tool_calls mode only) """# Create model for single instanceitem_model=Schema.for_class_ctor(as_type)# Create extraction promptfinal_prompt=promptorf"Extract {as_type.__name__} from: {text}"schema_obj=create_constructor_schema(as_type)schema=schema_obj.model_dump_openai()["function"]ifmode=="structured":classExtraction(Schema):instance:item_model# type: ignore# explanation: str | None = Noneresult=awaitself.agent.run(final_prompt,output_type=Extraction)# Convert model instance to actual typereturnas_type(**result.content.instance.model_dump())# type: ignore# Legacy tool-calls approachasyncdefconstruct(**kwargs:Any)->T:"""Construct instance from extracted data."""returnas_type(**kwargs)tool=Tool.from_callable(construct,name_override=schema["name"],description_override=schema["description"],# schema_override=schema,)asyncwithself.agent.tools.temporary_tools(tool,exclusive=notinclude_tools):result=awaitself.agent.run(final_prompt,output_type=item_model)# type: ignorereturnresult.content# type: ignore
asyncdefextract_multiple[T](self,text:str,as_type:type[T],*,mode:ExtractionMode="structured",min_items:int=1,max_items:int|None=None,prompt:AnyPromptType|None=None,include_tools:bool=False,)->list[T]:"""Extract multiple instances of type from text. Args: text: Text to extract from as_type: Type to extract mode: Extraction approach: - "structured": Use Pydantic models (more robust) - "tool_calls": Use tool calls (more flexible) min_items: Minimum number of instances to extract max_items: Maximum number of instances (None=unlimited) prompt: Optional custom prompt include_tools: Whether to include other tools (tool_calls mode only) """item_model=Schema.for_class_ctor(as_type)instances:list[T]=[]schema_obj=create_constructor_schema(as_type)final_prompt=promptor"\n".join([f"Extract {as_type.__name__} instances from text.",# "Requirements:",# f"- Extract at least {min_items} instances",# f"- Extract at most {max_items} instances" if max_items else "","\nText to analyze:",text,])ifmode=="structured":# Create model for individual instanceclassExtraction(Schema):instances:list[item_model]# type: ignore# explanation: str | None = Noneresult=awaitself.agent.run(final_prompt,output_type=Extraction)# Validate countsnum_instances=len(result.content.instances)iflen(result.content.instances)<min_items:msg=f"Found only {num_instances} instances, need {min_items}"raiseValueError(msg)ifmax_itemsandnum_instances>max_items:msg=f"Found {num_instances} instances, max is {max_items}"raiseValueError(msg)# Convert model instances to actual typereturn[as_type(**instance.data# type: ignoreifhasattr(instance,"data")elseinstance.model_dump()# type: ignore)forinstanceinresult.content.instances]# Legacy tool-calls approachasyncdefadd_instance(**kwargs:Any)->str:"""Add an extracted instance."""ifmax_itemsandlen(instances)>=max_items:msg=f"Maximum number of items ({max_items}) reached"raiseValueError(msg)instance=as_type(**kwargs)instances.append(instance)returnf"Added {instance}"add_instance.__annotations__=schema_obj.get_annotations()add_instance.__signature__=schema_obj.to_python_signature()# type: ignoreasyncwithself.agent.temporary_state(tools=[add_instance],replace_tools=notinclude_tools,output_type=item_model,):awaitself.agent.run(final_prompt)# Create extraction promptiflen(instances)<min_items:msg=f"Found only {len(instances)} instances, need at least {min_items}"raiseValueError(msg)returninstances
asyncdefpick[T](self,selections:Sequence[T]|Mapping[str,T]|AgentPool|BaseTeam[TDeps,Any],task:str,prompt:AnyPromptType|None=None,)->Pick[T]:"""Pick from available options with reasoning. Args: selections: What to pick from: - Sequence of items (auto-labeled) - Dict mapping labels to items - AgentPool - Team task: Task/decision description prompt: Optional custom selection prompt Returns: Decision with selected item and reasoning Raises: ValueError: If no choices available or invalid selection """# Get items and create label mappingfromtopromptimportto_promptfromllmling_agentimportAgentPoolfromllmling_agent.delegation.base_teamimportBaseTeammatchselections:casedict():label_map=selectionsitems:list[Any]=list(selections.values())caseBaseTeam():items=list(selections.agents)label_map={get_label(item):itemforiteminitems}caseAgentPool():items=list(selections.agents.values())label_map={get_label(item):itemforiteminitems}case_:items=list(selections)label_map={get_label(item):itemforiteminitems}ifnotitems:msg="No choices available"raiseValueError(msg)# Get descriptions for all itemsdescriptions=[]forlabel,iteminlabel_map.items():item_desc=awaitto_prompt(item)descriptions.append(f"{label}:\n{item_desc}")default_prompt=f"""Task/Decision: {task}Available options:{"-"*40}{"\n\n".join(descriptions)}{"-"*40}Select ONE option by its exact label."""# Get LLM's string-based decisionresult=awaitself.agent.run(promptordefault_prompt,output_type=LLMPick)# Convert to type-safe decisionifresult.content.selectionnotinlabel_map:msg=f"Invalid selection: {result.content.selection}"raiseValueError(msg)selected=cast(T,label_map[result.content.selection])returnPick(selection=selected,reason=result.content.reason)
asyncdefpick_multiple[T](self,selections:Sequence[T]|Mapping[str,T]|AgentPool|BaseTeam[TDeps,Any],task:str,*,min_picks:int=1,max_picks:int|None=None,prompt:AnyPromptType|None=None,)->MultiPick[T]:"""Pick multiple options from available choices. Args: selections: What to pick from task: Task/decision description min_picks: Minimum number of selections required max_picks: Maximum number of selections (None for unlimited) prompt: Optional custom selection prompt """fromtopromptimportto_promptfromllmling_agentimportAgentPoolfromllmling_agent.delegation.base_teamimportBaseTeammatchselections:caseMapping():label_map=selectionsitems:list[Any]=list(selections.values())caseBaseTeam():items=list(selections.agents)label_map={get_label(item):itemforiteminitems}caseAgentPool():items=list(selections.agents.values())label_map={get_label(item):itemforiteminitems}case_:items=list(selections)label_map={get_label(item):itemforiteminitems}ifnotitems:msg="No choices available"raiseValueError(msg)ifmax_picksisnotNoneandmax_picks<min_picks:msg=f"max_picks ({max_picks}) cannot be less than min_picks ({min_picks})"raiseValueError(msg)descriptions=[]forlabel,iteminlabel_map.items():item_desc=awaitto_prompt(item)descriptions.append(f"{label}:\n{item_desc}")picks_info=(f"Select between {min_picks} and {max_picks}"ifmax_picksisnotNoneelsef"Select at least {min_picks}")default_prompt=f"""Task/Decision: {task}Available options:{"-"*40}{"\n\n".join(descriptions)}{"-"*40}{picks_info} options by their exact labels.List your selections, one per line, followed by your reasoning."""result=awaitself.agent.run(promptordefault_prompt,output_type=LLMMultiPick)# Validate selectionsinvalid=[sforsinresult.content.selectionsifsnotinlabel_map]ifinvalid:msg=f"Invalid selections: {', '.join(invalid)}"raiseValueError(msg)num_picks=len(result.content.selections)ifnum_picks<min_picks:msg=f"Too few selections: got {num_picks}, need {min_picks}"raiseValueError(msg)ifmax_picksandnum_picks>max_picks:msg=f"Too many selections: got {num_picks}, max {max_picks}"raiseValueError(msg)selected=[cast(T,label_map[label])forlabelinresult.content.selections]returnMultiPick(selections=selected,reason=result.content.reason)
Wrapper around Agent that handles slash commands in streams.
Uses the "commands first" strategy from the ACP adapter:
1. Execute all slash commands first
2. Then process remaining content through wrapped agent
3. If only commands, end without LLM processing
Source code in src/llmling_agent/agent/slashed_agent.py
classSlashedAgent[TDeps,OutputDataT]:"""Wrapper around Agent that handles slash commands in streams. Uses the "commands first" strategy from the ACP adapter: 1. Execute all slash commands first 2. Then process remaining content through wrapped agent 3. If only commands, end without LLM processing """def__init__(self,agent:Agent[TDeps,OutputDataT],command_store:CommandStore|None=None,*,context_data_factory:Callable[[],Any]|None=None,)->None:"""Initialize with wrapped agent and command store. Args: agent: The agent to wrap command_store: Command store for slash commands (creates default if None) context_data_factory: Optional factory for creating command context data """self.agent=agentself._context_data_factory=context_data_factoryself._event_queue:asyncio.Queue[CommandStoreEvent]|None=None# Create store with our streaming event handlerifcommand_storeisNone:fromslashedimportCommandStorefromllmling_agent_commandsimportget_commandsself.command_store=CommandStore(event_handler=self._bridge_events_to_queue,commands=get_commands(),)else:self.command_store=command_storeasyncdef_bridge_events_to_queue(self,event:CommandStoreEvent)->None:"""Bridge store events to async queue during command execution."""ifself._event_queue:awaitself._event_queue.put(event)def_is_slash_command(self,text:str)->bool:"""Check if text starts with a slash command. Args: text: Text to check Returns: True if text is a slash command """returnbool(SLASH_PATTERN.match(text.strip()))asyncdef_execute_slash_command_streaming(self,command_text:str)->AsyncGenerator[CommandOutputEvent|CommandCompleteEvent]:"""Execute a single slash command and yield events as they happen. Args: command_text: Full command text including slash Yields: Command output and completion events """parsed=_parse_slash_command(command_text)ifnotparsed:logger.warning("Invalid slash command",command=command_text)yieldCommandCompleteEvent(command="unknown",success=False)returncmd_name,args=parsed# Set up event queue for this command executionself._event_queue=asyncio.Queue()context_data=(# Create command contextself._context_data_factory()ifself._context_data_factoryelseself.agent.context)cmd_ctx=self.command_store.create_context(data=context_data)command_str=f"{cmd_name}{args}".strip()execute_task=asyncio.create_task(self.command_store.execute_command(command_str,cmd_ctx))success=Truetry:# Yield events from queue as command runswhilenotexecute_task.done():try:# Wait for events with short timeout to check task completionevent=awaitasyncio.wait_for(self._event_queue.get(),timeout=0.1)# Convert store events to our stream eventsmatchevent:caseSlashedCommandOutputEvent(output=output):yieldCommandOutputEvent(command=cmd_name,output=output)caseCommandExecutedEvent(success=False,error=error)iferror:yieldCommandOutputEvent(command=cmd_name,output=f"Command error: {error}",)success=FalseexceptTimeoutError:# No events in queue, continue checking if command is donecontinue# Ensure command task completes and handle any remaining eventstry:awaitexecute_taskexceptExceptionase:logger.exception("Command execution failed",command=cmd_name)success=FalseyieldCommandOutputEvent(command=cmd_name,output=f"Command error: {e}")# Drain any remaining events from queuewhilenotself._event_queue.empty():try:matchself._event_queue.get_nowait():caseSlashedCommandOutputEvent(output=output):yieldCommandOutputEvent(command=cmd_name,output=output)exceptasyncio.QueueEmpty:break# Always yield completion eventyieldCommandCompleteEvent(command=cmd_name,success=success)finally:# Clean up event queueself._event_queue=Noneasyncdefrun_stream(self,*prompts:PromptCompatible,**kwargs:Any,)->AsyncGenerator[SlashedAgentStreamEvent[OutputDataT]]:"""Run agent with slash command support. Separates slash commands from regular prompts, executes commands first, then processes remaining content through the wrapped agent. Args: *prompts: Input prompts (may include slash commands) **kwargs: Additional arguments passed to agent.run_stream Yields: Stream events from command execution and agent processing """# Separate slash commands from regular contentcommands:list[str]=[]regular_prompts:list[Any]=[]forpromptinprompts:ifisinstance(prompt,str)andself._is_slash_command(prompt):logger.debug("Found slash command",command=prompt)commands.append(prompt.strip())else:regular_prompts.append(prompt)# Execute all commands first with streamingifcommands:forcommandincommands:logger.info("Processing slash command",command=command)asyncforcmd_eventinself._execute_slash_command_streaming(command):yieldcmd_event# If we have regular content, process it through the agentifregular_prompts:logger.debug("Processing prompts through agent",num_prompts=len(regular_prompts))asyncforeventinself.agent.run_stream(*regular_prompts,**kwargs):yieldevent# If we only had commands and no regular content, we're done# (no additional events needed)def__getattr__(self,name:str)->Any:"""Delegate attribute access to wrapped agent. Args: name: Attribute name Returns: Attribute value from wrapped agent """returngetattr(self.agent,name)
Source code in src/llmling_agent/agent/slashed_agent.py
231232233234235236237238239240
def__getattr__(self,name:str)->Any:"""Delegate attribute access to wrapped agent. Args: name: Attribute name Returns: Attribute value from wrapped agent """returngetattr(self.agent,name)
def__init__(self,agent:Agent[TDeps,OutputDataT],command_store:CommandStore|None=None,*,context_data_factory:Callable[[],Any]|None=None,)->None:"""Initialize with wrapped agent and command store. Args: agent: The agent to wrap command_store: Command store for slash commands (creates default if None) context_data_factory: Optional factory for creating command context data """self.agent=agentself._context_data_factory=context_data_factoryself._event_queue:asyncio.Queue[CommandStoreEvent]|None=None# Create store with our streaming event handlerifcommand_storeisNone:fromslashedimportCommandStorefromllmling_agent_commandsimportget_commandsself.command_store=CommandStore(event_handler=self._bridge_events_to_queue,commands=get_commands(),)else:self.command_store=command_store
asyncdefrun_stream(self,*prompts:PromptCompatible,**kwargs:Any,)->AsyncGenerator[SlashedAgentStreamEvent[OutputDataT]]:"""Run agent with slash command support. Separates slash commands from regular prompts, executes commands first, then processes remaining content through the wrapped agent. Args: *prompts: Input prompts (may include slash commands) **kwargs: Additional arguments passed to agent.run_stream Yields: Stream events from command execution and agent processing """# Separate slash commands from regular contentcommands:list[str]=[]regular_prompts:list[Any]=[]forpromptinprompts:ifisinstance(prompt,str)andself._is_slash_command(prompt):logger.debug("Found slash command",command=prompt)commands.append(prompt.strip())else:regular_prompts.append(prompt)# Execute all commands first with streamingifcommands:forcommandincommands:logger.info("Processing slash command",command=command)asyncforcmd_eventinself._execute_slash_command_streaming(command):yieldcmd_event# If we have regular content, process it through the agentifregular_prompts:logger.debug("Processing prompts through agent",num_prompts=len(regular_prompts))asyncforeventinself.agent.run_stream(*regular_prompts,**kwargs):yieldevent
classSystemPrompts:"""Manages system prompts for an agent."""def__init__(self,prompts:AnyPromptType|list[AnyPromptType]|None=None,template:str|None=None,dynamic:bool=True,context:AgentContext|None=None,inject_agent_info:bool=True,inject_tools:ToolInjectionMode="off",tool_usage_style:ToolUsageStyle="suggestive",):"""Initialize prompt manager."""fromjinjaropeimportEnvironmentfromtopromptimportto_promptmatchprompts:caselist():self.prompts=promptscaseNone:self.prompts=[]case_:self.prompts=[prompts]self.context=contextself.template=templateself.dynamic=dynamicself.inject_agent_info=inject_agent_infoself.inject_tools=inject_toolsself.tool_usage_style=tool_usage_styleself._cached=Falseself._env=Environment(enable_async=True)self._env.filters["to_prompt"]=to_promptdef__repr__(self)->str:return(f"SystemPrompts(prompts={len(self.prompts)}, "f"dynamic={self.dynamic}, inject_agent_info={self.inject_agent_info}, "f"inject_tools={self.inject_tools!r})")def__len__(self)->int:returnlen(self.prompts)def__getitem__(self,idx:int|slice)->AnyPromptType|list[AnyPromptType]:returnself.prompts[idx]asyncdefadd_by_reference(self,reference:str):"""Add a system prompt using reference syntax. Args: reference: [provider:]identifier[@version][?var1=val1,...] Examples: await sys_prompts.add_by_reference("code_review?language=python") await sys_prompts.add_by_reference("langfuse:expert@v2") """ifnotself.context:msg="No context available to resolve prompts"raiseRuntimeError(msg)try:content=awaitself.context.prompt_manager.get(reference)self.prompts.append(content)exceptExceptionase:msg=f"Failed to add prompt {reference!r}"raiseRuntimeError(msg)fromeasyncdefadd(self,identifier:str,*,provider:str|None=None,version:str|None=None,variables:dict[str,Any]|None=None,):"""Add a system prompt using explicit parameters. Args: identifier: Prompt identifier/name provider: Provider name (None = builtin) version: Optional version string variables: Optional template variables Examples: await sys_prompts.add("code_review", variables={"language": "python"}) await sys_prompts.add("expert", provider="langfuse", version="v2") """ifnotself.context:msg="No context available to resolve prompts"raiseRuntimeError(msg)try:content=awaitself.context.prompt_manager.get_from(identifier,provider=provider,version=version,variables=variables,)self.prompts.append(content)exceptExceptionase:ref=f"{provider+':'ifproviderelse''}{identifier}"msg=f"Failed to add prompt {ref!r}"raiseRuntimeError(msg)fromedefclear(self):"""Clear all system prompts."""self.prompts=[]asyncdefrefresh_cache(self):"""Force re-evaluation of prompts."""fromtopromptimportto_promptevaluated=[]forpromptinself.prompts:result=awaitto_prompt(prompt)evaluated.append(result)self.prompts=evaluatedself._cached=True@asynccontextmanagerasyncdeftemporary_prompt(self,prompt:AnyPromptType,exclusive:bool=False)->AsyncIterator[None]:"""Temporarily override system prompts. Args: prompt: Single prompt or sequence of prompts to use temporarily exclusive: Whether to only use given prompt. If False, prompt will be appended to the agents prompts temporarily. """fromtopromptimportto_promptoriginal_prompts=self.prompts.copy()new_prompt=awaitto_prompt(prompt)self.prompts=[new_prompt]ifnotexclusiveelse[*self.prompts,new_prompt]try:yieldfinally:self.prompts=original_promptsasyncdefformat_system_prompt(self,agent:Agent[Any,Any])->str:"""Format complete system prompt."""ifnotself.dynamicandnotself._cached:awaitself.refresh_cache()template=self._env.from_string(self.templateorDEFAULT_TEMPLATE)result=awaittemplate.render_async(agent=agent,prompts=self.prompts,dynamic=self.dynamic,inject_agent_info=self.inject_agent_info,inject_tools=self.inject_tools,tool_usage_style=self.tool_usage_style,)returnresult.strip()
asyncdefadd_by_reference(self,reference:str):"""Add a system prompt using reference syntax. Args: reference: [provider:]identifier[@version][?var1=val1,...] Examples: await sys_prompts.add_by_reference("code_review?language=python") await sys_prompts.add_by_reference("langfuse:expert@v2") """ifnotself.context:msg="No context available to resolve prompts"raiseRuntimeError(msg)try:content=awaitself.context.prompt_manager.get(reference)self.prompts.append(content)exceptExceptionase:msg=f"Failed to add prompt {reference!r}"raiseRuntimeError(msg)frome
Source code in src/llmling_agent/agent/sys_prompts.py
196197198199200201202203204205206207208209210
asyncdefformat_system_prompt(self,agent:Agent[Any,Any])->str:"""Format complete system prompt."""ifnotself.dynamicandnotself._cached:awaitself.refresh_cache()template=self._env.from_string(self.templateorDEFAULT_TEMPLATE)result=awaittemplate.render_async(agent=agent,prompts=self.prompts,dynamic=self.dynamic,inject_agent_info=self.inject_agent_info,inject_tools=self.inject_tools,tool_usage_style=self.tool_usage_style,)returnresult.strip()
Source code in src/llmling_agent/agent/sys_prompts.py
164165166167168169170171172173
asyncdefrefresh_cache(self):"""Force re-evaluation of prompts."""fromtopromptimportto_promptevaluated=[]forpromptinself.prompts:result=awaitto_prompt(prompt)evaluated.append(result)self.prompts=evaluatedself._cached=True
@asynccontextmanagerasyncdeftemporary_prompt(self,prompt:AnyPromptType,exclusive:bool=False)->AsyncIterator[None]:"""Temporarily override system prompts. Args: prompt: Single prompt or sequence of prompts to use temporarily exclusive: Whether to only use given prompt. If False, prompt will be appended to the agents prompts temporarily. """fromtopromptimportto_promptoriginal_prompts=self.prompts.copy()new_prompt=awaitto_prompt(prompt)self.prompts=[new_prompt]ifnotexclusiveelse[*self.prompts,new_prompt]try:yieldfinally:self.prompts=original_prompts