classSystemPrompts:"""Manages system prompts for an agent."""def__init__(self,prompts:AnyPromptType|list[AnyPromptType]|None=None,template:str|None=None,dynamic:bool=True,context:AgentContext|None=None,inject_agent_info:bool=True,inject_tools:ToolInjectionMode="off",tool_usage_style:ToolUsageStyle="suggestive",):"""Initialize prompt manager."""fromjinjaropeimportEnvironmentfromtopromptimportto_promptmatchprompts:caselist():self.prompts=promptscaseNone:self.prompts=[]case_:self.prompts=[prompts]self.context=contextself.template=templateself.dynamic=dynamicself.inject_agent_info=inject_agent_infoself.inject_tools=inject_toolsself.tool_usage_style=tool_usage_styleself._cached=Falseself._env=Environment(enable_async=True)self._env.filters["to_prompt"]=to_promptdef__repr__(self)->str:return(f"SystemPrompts(prompts={len(self.prompts)}, "f"dynamic={self.dynamic}, inject_agent_info={self.inject_agent_info}, "f"inject_tools={self.inject_tools!r})")def__len__(self)->int:returnlen(self.prompts)def__getitem__(self,idx:int|slice)->AnyPromptType|list[AnyPromptType]:returnself.prompts[idx]asyncdefadd_by_reference(self,reference:str):"""Add a system prompt using reference syntax. Args: reference: [provider:]identifier[@version][?var1=val1,...] Examples: await sys_prompts.add_by_reference("code_review?language=python") await sys_prompts.add_by_reference("langfuse:expert@v2") """ifnotself.context:msg="No context available to resolve prompts"raiseRuntimeError(msg)try:content=awaitself.context.prompt_manager.get(reference)self.prompts.append(content)exceptExceptionase:msg=f"Failed to add prompt {reference!r}"raiseRuntimeError(msg)fromeasyncdefadd(self,identifier:str,*,provider:str|None=None,version:str|None=None,variables:dict[str,Any]|None=None,):"""Add a system prompt using explicit parameters. Args: identifier: Prompt identifier/name provider: Provider name (None = builtin) version: Optional version string variables: Optional template variables Examples: await sys_prompts.add("code_review", variables={"language": "python"}) await sys_prompts.add("expert", provider="langfuse", version="v2") """ifnotself.context:msg="No context available to resolve prompts"raiseRuntimeError(msg)try:content=awaitself.context.prompt_manager.get_from(identifier,provider=provider,version=version,variables=variables,)self.prompts.append(content)exceptExceptionase:ref=f"{provider+':'ifproviderelse''}{identifier}"msg=f"Failed to add prompt {ref!r}"raiseRuntimeError(msg)fromedefclear(self):"""Clear all system prompts."""self.prompts=[]asyncdefrefresh_cache(self):"""Force re-evaluation of prompts."""fromtopromptimportto_promptevaluated=[]forpromptinself.prompts:result=awaitto_prompt(prompt)evaluated.append(result)self.prompts=evaluatedself._cached=True@asynccontextmanagerasyncdeftemporary_prompt(self,prompt:AnyPromptType,exclusive:bool=False)->AsyncIterator[None]:"""Temporarily override system prompts. Args: prompt: Single prompt or sequence of prompts to use temporarily exclusive: Whether to only use given prompt. If False, prompt will be appended to the agents prompts temporarily. """fromtopromptimportto_promptoriginal_prompts=self.prompts.copy()new_prompt=awaitto_prompt(prompt)self.prompts=[new_prompt]ifnotexclusiveelse[*self.prompts,new_prompt]try:yieldfinally:self.prompts=original_promptsasyncdefformat_system_prompt(self,agent:Agent[Any,Any])->str:"""Format complete system prompt."""ifnotself.dynamicandnotself._cached:awaitself.refresh_cache()template=self._env.from_string(self.templateorDEFAULT_TEMPLATE)result=awaittemplate.render_async(agent=agent,prompts=self.prompts,dynamic=self.dynamic,inject_agent_info=self.inject_agent_info,inject_tools=self.inject_tools,tool_usage_style=self.tool_usage_style,)returnresult.strip()
asyncdefadd_by_reference(self,reference:str):"""Add a system prompt using reference syntax. Args: reference: [provider:]identifier[@version][?var1=val1,...] Examples: await sys_prompts.add_by_reference("code_review?language=python") await sys_prompts.add_by_reference("langfuse:expert@v2") """ifnotself.context:msg="No context available to resolve prompts"raiseRuntimeError(msg)try:content=awaitself.context.prompt_manager.get(reference)self.prompts.append(content)exceptExceptionase:msg=f"Failed to add prompt {reference!r}"raiseRuntimeError(msg)frome
Source code in src/llmling_agent/agent/sys_prompts.py
196197198199200201202203204205206207208209210
asyncdefformat_system_prompt(self,agent:Agent[Any,Any])->str:"""Format complete system prompt."""ifnotself.dynamicandnotself._cached:awaitself.refresh_cache()template=self._env.from_string(self.templateorDEFAULT_TEMPLATE)result=awaittemplate.render_async(agent=agent,prompts=self.prompts,dynamic=self.dynamic,inject_agent_info=self.inject_agent_info,inject_tools=self.inject_tools,tool_usage_style=self.tool_usage_style,)returnresult.strip()
Source code in src/llmling_agent/agent/sys_prompts.py
164165166167168169170171172173
asyncdefrefresh_cache(self):"""Force re-evaluation of prompts."""fromtopromptimportto_promptevaluated=[]forpromptinself.prompts:result=awaitto_prompt(prompt)evaluated.append(result)self.prompts=evaluatedself._cached=True
@asynccontextmanagerasyncdeftemporary_prompt(self,prompt:AnyPromptType,exclusive:bool=False)->AsyncIterator[None]:"""Temporarily override system prompts. Args: prompt: Single prompt or sequence of prompts to use temporarily exclusive: Whether to only use given prompt. If False, prompt will be appended to the agents prompts temporarily. """fromtopromptimportto_promptoriginal_prompts=self.prompts.copy()new_prompt=awaitto_prompt(prompt)self.prompts=[new_prompt]ifnotexclusiveelse[*self.prompts,new_prompt]try:yieldfinally:self.prompts=original_prompts