Skip to content

teamrun

Class info

Classes

Name Children Inherits
AgentResponse
llmling_agent.messaging.messages
Result from an agent's execution.
    BaseTeam
    llmling_agent.delegation.base_team
    Base class for Team and TeamRun.
    ChatMessage
    llmling_agent.messaging.messages
    Common message format for all UI types.
      ExtendedTeamTalk
      llmling_agent.delegation.teamrun
      TeamTalk that also provides TeamRunStats interface.
        Talk
        llmling_agent.talk.talk
        Manages message flow between agents/groups.
          TeamResponse
          llmling_agent.messaging.messages
          Results from a team execution.
            TeamRun
            llmling_agent.delegation.teamrun
            Handles team operations with monitoring.
              TeamTalk
              llmling_agent.talk.talk
              Group of connections with aggregate operations.

              🛈 DocStrings

              Sequential, ordered group of agents / nodes.

              ExtendedTeamTalk dataclass

              Bases: TeamTalk

              TeamTalk that also provides TeamRunStats interface.

              Source code in src/llmling_agent/delegation/teamrun.py
              39
              40
              41
              42
              43
              44
              45
              46
              47
              48
              49
              50
              51
              52
              53
              54
              55
              56
              57
              @dataclass(frozen=True, kw_only=True)
              class ExtendedTeamTalk(TeamTalk):
                  """TeamTalk that also provides TeamRunStats interface."""
              
                  errors: list[tuple[str, str, datetime]] = field(default_factory=list)
              
                  def clear(self):
                      """Reset all tracking data."""
                      super().clear()  # Clear base TeamTalk
                      self.errors.clear()
              
                  def add_error(self, agent: str, error: str):
                      """Track errors from AgentResponses."""
                      self.errors.append((agent, error, get_now()))
              
                  @property
                  def error_log(self) -> list[tuple[str, str, datetime]]:
                      """Errors from failed responses."""
                      return self.errors
              

              error_log property

              error_log: list[tuple[str, str, datetime]]
              

              Errors from failed responses.

              add_error

              add_error(agent: str, error: str)
              

              Track errors from AgentResponses.

              Source code in src/llmling_agent/delegation/teamrun.py
              50
              51
              52
              def add_error(self, agent: str, error: str):
                  """Track errors from AgentResponses."""
                  self.errors.append((agent, error, get_now()))
              

              clear

              clear()
              

              Reset all tracking data.

              Source code in src/llmling_agent/delegation/teamrun.py
              45
              46
              47
              48
              def clear(self):
                  """Reset all tracking data."""
                  super().clear()  # Clear base TeamTalk
                  self.errors.clear()
              

              TeamRun

              Bases: BaseTeam[TDeps, TResult]

              Handles team operations with monitoring.

              Source code in src/llmling_agent/delegation/teamrun.py
               60
               61
               62
               63
               64
               65
               66
               67
               68
               69
               70
               71
               72
               73
               74
               75
               76
               77
               78
               79
               80
               81
               82
               83
               84
               85
               86
               87
               88
               89
               90
               91
               92
               93
               94
               95
               96
               97
               98
               99
              100
              101
              102
              103
              104
              105
              106
              107
              108
              109
              110
              111
              112
              113
              114
              115
              116
              117
              118
              119
              120
              121
              122
              123
              124
              125
              126
              127
              128
              129
              130
              131
              132
              133
              134
              135
              136
              137
              138
              139
              140
              141
              142
              143
              144
              145
              146
              147
              148
              149
              150
              151
              152
              153
              154
              155
              156
              157
              158
              159
              160
              161
              162
              163
              164
              165
              166
              167
              168
              169
              170
              171
              172
              173
              174
              175
              176
              177
              178
              179
              180
              181
              182
              183
              184
              185
              186
              187
              188
              189
              190
              191
              192
              193
              194
              195
              196
              197
              198
              199
              200
              201
              202
              203
              204
              205
              206
              207
              208
              209
              210
              211
              212
              213
              214
              215
              216
              217
              218
              219
              220
              221
              222
              223
              224
              225
              226
              227
              228
              229
              230
              231
              232
              233
              234
              235
              236
              237
              238
              239
              240
              241
              242
              243
              244
              245
              246
              247
              248
              249
              250
              251
              252
              253
              254
              255
              256
              257
              258
              259
              260
              261
              262
              263
              264
              265
              266
              267
              268
              269
              270
              271
              272
              273
              274
              275
              276
              277
              278
              279
              280
              281
              282
              283
              284
              285
              286
              287
              288
              289
              290
              291
              292
              293
              294
              295
              296
              297
              298
              299
              300
              301
              302
              303
              304
              305
              306
              307
              308
              309
              310
              311
              312
              313
              314
              315
              316
              317
              318
              319
              320
              321
              class TeamRun[TDeps, TResult](BaseTeam[TDeps, TResult]):
                  """Handles team operations with monitoring."""
              
                  @overload  # validator set: it defines the output
                  def __init__(
                      self,
                      agents: Sequence[MessageNode[TDeps, Any]],
                      *,
                      name: str | None = None,
                      description: str | None = None,
                      shared_prompt: str | None = None,
                      validator: MessageNode[Any, TResult],
                      picker: AnyAgent[Any, Any] | None = None,
                      num_picks: int | None = None,
                      pick_prompt: str | None = None,
                  ): ...
              
                  @overload
                  def __init__(  # no validator, but all nodes same output type.
                      self,
                      agents: Sequence[MessageNode[TDeps, TResult]],
                      *,
                      name: str | None = None,
                      description: str | None = None,
                      shared_prompt: str | None = None,
                      validator: None = None,
                      picker: AnyAgent[Any, Any] | None = None,
                      num_picks: int | None = None,
                      pick_prompt: str | None = None,
                  ): ...
              
                  @overload
                  def __init__(
                      self,
                      agents: Sequence[MessageNode[TDeps, Any]],
                      *,
                      name: str | None = None,
                      description: str | None = None,
                      shared_prompt: str | None = None,
                      validator: MessageNode[Any, TResult] | None = None,
                      picker: AnyAgent[Any, Any] | None = None,
                      num_picks: int | None = None,
                      pick_prompt: str | None = None,
                  ): ...
              
                  def __init__(
                      self,
                      agents: Sequence[MessageNode[TDeps, Any]],
                      *,
                      name: str | None = None,
                      description: str | None = None,
                      shared_prompt: str | None = None,
                      validator: MessageNode[Any, TResult] | None = None,
                      picker: AnyAgent[Any, Any] | None = None,
                      num_picks: int | None = None,
                      pick_prompt: str | None = None,
                      # result_mode: ResultMode = "last",
                  ):
                      super().__init__(
                          agents,
                          name=name,
                          description=description,
                          shared_prompt=shared_prompt,
                          picker=picker,
                          num_picks=num_picks,
                          pick_prompt=pick_prompt,
                      )
                      self.validator = validator
                      self.result_mode = "last"
              
                  def __prompt__(self) -> str:
                      """Format team info for prompts."""
                      members = " -> ".join(a.name for a in self.agents)
                      desc = f" - {self.description}" if self.description else ""
                      return f"Sequential Team {self.name!r}{desc}\nPipeline: {members}"
              
                  async def _run(
                      self,
                      *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
                      wait_for_connections: bool | None = None,
                      message_id: str | None = None,
                      conversation_id: str | None = None,
                      **kwargs: Any,
                  ) -> ChatMessage[TResult]:
                      """Run agents sequentially and return combined message.
              
                      This message wraps execute and extracts the ChatMessage in order to fulfill
                      the "message protocol".
                      """
                      message_id = message_id or str(uuid4())
              
                      result = await self.execute(*prompts, **kwargs)
                      all_messages = [r.message for r in result if r.message]
                      assert all_messages, "Error during execution, returned None for TeamRun"
                      # Determine content based on mode
                      match self.result_mode:
                          case "last":
                              content = all_messages[-1].content
                          # case "concat":
                          #     content = "\n".join(msg.format() for msg in all_messages)
                          case _:
                              msg = f"Invalid result mode: {self.result_mode}"
                              raise ValueError(msg)
              
                      return ChatMessage(
                          content=content,
                          role="assistant",
                          name=self.name,
                          associated_messages=all_messages,
                          message_id=message_id,
                          conversation_id=conversation_id,
                          metadata={
                              "execution_order": [r.agent_name for r in result],
                              "start_time": result.start_time.isoformat(),
                              "errors": {name: str(error) for name, error in result.errors.items()},
                          },
                      )
              
                  async def execute(
                      self,
                      *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
                      **kwargs: Any,
                  ) -> TeamResponse[TResult]:
                      """Start execution with optional monitoring."""
                      self._team_talk.clear()
                      start_time = get_now()
                      final_prompt = list(prompts)
                      if self.shared_prompt:
                          final_prompt.insert(0, self.shared_prompt)
              
                      responses = [
                          i
                          async for i in self.execute_iter(*final_prompt)
                          if isinstance(i, AgentResponse)
                      ]
                      return TeamResponse(responses, start_time)
              
                  async def run_iter(
                      self,
                      *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str],
                      **kwargs: Any,
                  ) -> AsyncIterator[ChatMessage[Any]]:
                      """Yield messages from the execution chain."""
                      async for item in self.execute_iter(*prompts, **kwargs):
                          match item:
                              case AgentResponse():
                                  if item.message:
                                      yield item.message
                              case Talk():
                                  pass
              
                  async def execute_iter(
                      self,
                      *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str],
                      **kwargs: Any,
                  ) -> AsyncIterator[Talk[Any] | AgentResponse[Any]]:
                      from toprompt import to_prompt
              
                      connections: list[Talk[Any]] = []
                      try:
                          combined_prompt = "\n".join([await to_prompt(p) for p in prompt])
                          all_nodes = list(await self.pick_agents(combined_prompt))
                          if self.validator:
                              all_nodes.append(self.validator)
                          first = all_nodes[0]
                          connections = [s.connect_to(t, queued=True) for s, t in pairwise(all_nodes)]
                          for conn in connections:
                              self._team_talk.append(conn)
              
                          # First agent
                          start = perf_counter()
                          message = await first.run(*prompt, **kwargs)
                          timing = perf_counter() - start
                          response = AgentResponse[Any](first.name, message=message, timing=timing)
                          yield response
              
                          # Process through chain
                          for connection in connections:
                              target = connection.targets[0]
                              target_name = target.name
                              yield connection
              
                              # Let errors propagate - they break the chain
                              start = perf_counter()
                              messages = await connection.trigger()
              
                              if target == all_nodes[-1]:
                                  last_talk = Talk[Any](target, [], connection_type="run")
                                  if response.message:
                                      last_talk.stats.messages.append(response.message)
                                  self._team_talk.append(last_talk)
              
                              timing = perf_counter() - start
                              msg = messages[0]
                              response = AgentResponse[Any](target_name, message=msg, timing=timing)
                              yield response
              
                      finally:  # Always clean up connections
                          for connection in connections:
                              connection.disconnect()
              
                  async def run_stream(
                      self,
                      *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str],
                      require_all: bool = True,
                      **kwargs: Any,
                  ) -> AsyncIterator[
                      tuple[MessageNode[Any, Any], AgentStreamEvent | StreamCompleteEvent]
                  ]:
                      """Stream responses through the chain of team members.
              
                      Args:
                          prompts: Input prompts to process through the chain
                          require_all: If True, fail if any agent fails. If False,
                                       continue with remaining agents.
                          kwargs: Additional arguments passed to each agent
              
                      Yields:
                          Tuples of (agent, event) where agent is the Agent instance
                          and event is the streaming event.
                      """
                      from pydantic_ai.messages import PartDeltaEvent, TextPartDelta
              
                      from llmling_agent.agent.agent import StreamCompleteEvent
              
                      current_message = prompts
                      collected_content = []
              
                      for agent in self.agents:
                          try:
                              agent_content = []
              
                              # Use wrapper to normalize all streaming nodes to (agent, event) tuples
                              def _raise_streaming_error(agent=agent):
                                  msg = f"Agent {agent.name} does not support streaming"
                                  raise ValueError(msg)  # noqa: TRY301
              
                              if hasattr(agent, "run_stream"):
                                  stream = normalize_stream_for_teams(agent, *current_message, **kwargs)
                              else:
                                  _raise_streaming_error()
              
                              async for agent_event_tuple in stream:
                                  actual_agent, event = agent_event_tuple
                                  match event:
                                      case PartDeltaEvent(delta=TextPartDelta(content_delta=delta)):
                                          agent_content.append(delta)
                                          collected_content.append(delta)
                                          yield (actual_agent, event)  # Yield tuple with agent context
                                      case StreamCompleteEvent(message=message):
                                          # Use complete response as input for next agent
                                          current_message = (message.content,)
                                          yield (actual_agent, event)  # Yield tuple with agent context
                                      case _:
                                          yield (actual_agent, event)  # Yield tuple with agent context
              
                          except Exception as e:
                              if require_all:
                                  msg = f"Chain broken at {agent.name}: {e}"
                                  logger.exception(msg)
                                  raise ValueError(msg) from e
                              logger.warning("Chain handler %s failed: %s", agent.name, e)
              

              __prompt__

              __prompt__() -> str
              

              Format team info for prompts.

              Source code in src/llmling_agent/delegation/teamrun.py
              130
              131
              132
              133
              134
              def __prompt__(self) -> str:
                  """Format team info for prompts."""
                  members = " -> ".join(a.name for a in self.agents)
                  desc = f" - {self.description}" if self.description else ""
                  return f"Sequential Team {self.name!r}{desc}\nPipeline: {members}"
              

              execute async

              execute(
                  *prompts: AnyPromptType | Image | PathLike[str] | None, **kwargs: Any
              ) -> TeamResponse[TResult]
              

              Start execution with optional monitoring.

              Source code in src/llmling_agent/delegation/teamrun.py
              178
              179
              180
              181
              182
              183
              184
              185
              186
              187
              188
              189
              190
              191
              192
              193
              194
              195
              async def execute(
                  self,
                  *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
                  **kwargs: Any,
              ) -> TeamResponse[TResult]:
                  """Start execution with optional monitoring."""
                  self._team_talk.clear()
                  start_time = get_now()
                  final_prompt = list(prompts)
                  if self.shared_prompt:
                      final_prompt.insert(0, self.shared_prompt)
              
                  responses = [
                      i
                      async for i in self.execute_iter(*final_prompt)
                      if isinstance(i, AgentResponse)
                  ]
                  return TeamResponse(responses, start_time)
              

              run_iter async

              run_iter(
                  *prompts: AnyPromptType | Image | PathLike[str], **kwargs: Any
              ) -> AsyncIterator[ChatMessage[Any]]
              

              Yield messages from the execution chain.

              Source code in src/llmling_agent/delegation/teamrun.py
              197
              198
              199
              200
              201
              202
              203
              204
              205
              206
              207
              208
              209
              async def run_iter(
                  self,
                  *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str],
                  **kwargs: Any,
              ) -> AsyncIterator[ChatMessage[Any]]:
                  """Yield messages from the execution chain."""
                  async for item in self.execute_iter(*prompts, **kwargs):
                      match item:
                          case AgentResponse():
                              if item.message:
                                  yield item.message
                          case Talk():
                              pass
              

              run_stream async

              run_stream(
                  *prompts: AnyPromptType | Image | PathLike[str],
                  require_all: bool = True,
                  **kwargs: Any,
              ) -> AsyncIterator[tuple[MessageNode[Any, Any], AgentStreamEvent | StreamCompleteEvent]]
              

              Stream responses through the chain of team members.

              Parameters:

              Name Type Description Default
              prompts AnyPromptType | Image | PathLike[str]

              Input prompts to process through the chain

              ()
              require_all bool

              If True, fail if any agent fails. If False, continue with remaining agents.

              True
              kwargs Any

              Additional arguments passed to each agent

              {}

              Yields:

              Type Description
              AsyncIterator[tuple[MessageNode[Any, Any], AgentStreamEvent | StreamCompleteEvent]]

              Tuples of (agent, event) where agent is the Agent instance

              AsyncIterator[tuple[MessageNode[Any, Any], AgentStreamEvent | StreamCompleteEvent]]

              and event is the streaming event.

              Source code in src/llmling_agent/delegation/teamrun.py
              261
              262
              263
              264
              265
              266
              267
              268
              269
              270
              271
              272
              273
              274
              275
              276
              277
              278
              279
              280
              281
              282
              283
              284
              285
              286
              287
              288
              289
              290
              291
              292
              293
              294
              295
              296
              297
              298
              299
              300
              301
              302
              303
              304
              305
              306
              307
              308
              309
              310
              311
              312
              313
              314
              315
              316
              317
              318
              319
              320
              321
              async def run_stream(
                  self,
                  *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str],
                  require_all: bool = True,
                  **kwargs: Any,
              ) -> AsyncIterator[
                  tuple[MessageNode[Any, Any], AgentStreamEvent | StreamCompleteEvent]
              ]:
                  """Stream responses through the chain of team members.
              
                  Args:
                      prompts: Input prompts to process through the chain
                      require_all: If True, fail if any agent fails. If False,
                                   continue with remaining agents.
                      kwargs: Additional arguments passed to each agent
              
                  Yields:
                      Tuples of (agent, event) where agent is the Agent instance
                      and event is the streaming event.
                  """
                  from pydantic_ai.messages import PartDeltaEvent, TextPartDelta
              
                  from llmling_agent.agent.agent import StreamCompleteEvent
              
                  current_message = prompts
                  collected_content = []
              
                  for agent in self.agents:
                      try:
                          agent_content = []
              
                          # Use wrapper to normalize all streaming nodes to (agent, event) tuples
                          def _raise_streaming_error(agent=agent):
                              msg = f"Agent {agent.name} does not support streaming"
                              raise ValueError(msg)  # noqa: TRY301
              
                          if hasattr(agent, "run_stream"):
                              stream = normalize_stream_for_teams(agent, *current_message, **kwargs)
                          else:
                              _raise_streaming_error()
              
                          async for agent_event_tuple in stream:
                              actual_agent, event = agent_event_tuple
                              match event:
                                  case PartDeltaEvent(delta=TextPartDelta(content_delta=delta)):
                                      agent_content.append(delta)
                                      collected_content.append(delta)
                                      yield (actual_agent, event)  # Yield tuple with agent context
                                  case StreamCompleteEvent(message=message):
                                      # Use complete response as input for next agent
                                      current_message = (message.content,)
                                      yield (actual_agent, event)  # Yield tuple with agent context
                                  case _:
                                      yield (actual_agent, event)  # Yield tuple with agent context
              
                      except Exception as e:
                          if require_all:
                              msg = f"Chain broken at {agent.name}: {e}"
                              logger.exception(msg)
                              raise ValueError(msg) from e
                          logger.warning("Chain handler %s failed: %s", agent.name, e)