Skip to content

teamrun

Class info

Classes

Name Children Inherits
AgentResponse
llmling_agent.messaging.messages
Result from an agent's execution.
    BaseTeam
    llmling_agent.delegation.base_team
    Base class for Team and TeamRun.
    ChatMessage
    llmling_agent.messaging.messages
    Common message format for all UI types.
      ExtendedTeamTalk
      llmling_agent.delegation.teamrun
      TeamTalk that also provides TeamRunStats interface.
        Talk
        llmling_agent.talk.talk
        Manages message flow between agents/groups.
          TeamResponse
          llmling_agent.messaging.messages
          Results from a team execution.
            TeamRun
            llmling_agent.delegation.teamrun
            Handles team operations with monitoring.
              TeamTalk
              llmling_agent.talk.talk
              Group of connections with aggregate operations.

              🛈 DocStrings

              Team execution management and monitoring.

              ExtendedTeamTalk dataclass

              Bases: TeamTalk

              TeamTalk that also provides TeamRunStats interface.

              Source code in src/llmling_agent/delegation/teamrun.py
              38
              39
              40
              41
              42
              43
              44
              45
              46
              47
              48
              49
              50
              51
              52
              53
              54
              55
              56
              @dataclass(frozen=True, kw_only=True)
              class ExtendedTeamTalk(TeamTalk):
                  """TeamTalk that also provides TeamRunStats interface."""
              
                  errors: list[tuple[str, str, datetime]] = field(default_factory=list)
              
                  def clear(self):
                      """Reset all tracking data."""
                      super().clear()  # Clear base TeamTalk
                      self.errors.clear()
              
                  def add_error(self, agent: str, error: str):
                      """Track errors from AgentResponses."""
                      self.errors.append((agent, error, datetime.now()))
              
                  @property
                  def error_log(self) -> list[tuple[str, str, datetime]]:
                      """Errors from failed responses."""
                      return self.errors
              

              error_log property

              error_log: list[tuple[str, str, datetime]]
              

              Errors from failed responses.

              add_error

              add_error(agent: str, error: str)
              

              Track errors from AgentResponses.

              Source code in src/llmling_agent/delegation/teamrun.py
              49
              50
              51
              def add_error(self, agent: str, error: str):
                  """Track errors from AgentResponses."""
                  self.errors.append((agent, error, datetime.now()))
              

              clear

              clear()
              

              Reset all tracking data.

              Source code in src/llmling_agent/delegation/teamrun.py
              44
              45
              46
              47
              def clear(self):
                  """Reset all tracking data."""
                  super().clear()  # Clear base TeamTalk
                  self.errors.clear()
              

              TeamRun

              Bases: BaseTeam[TDeps, TResult]

              Handles team operations with monitoring.

              Source code in src/llmling_agent/delegation/teamrun.py
               59
               60
               61
               62
               63
               64
               65
               66
               67
               68
               69
               70
               71
               72
               73
               74
               75
               76
               77
               78
               79
               80
               81
               82
               83
               84
               85
               86
               87
               88
               89
               90
               91
               92
               93
               94
               95
               96
               97
               98
               99
              100
              101
              102
              103
              104
              105
              106
              107
              108
              109
              110
              111
              112
              113
              114
              115
              116
              117
              118
              119
              120
              121
              122
              123
              124
              125
              126
              127
              128
              129
              130
              131
              132
              133
              134
              135
              136
              137
              138
              139
              140
              141
              142
              143
              144
              145
              146
              147
              148
              149
              150
              151
              152
              153
              154
              155
              156
              157
              158
              159
              160
              161
              162
              163
              164
              165
              166
              167
              168
              169
              170
              171
              172
              173
              174
              175
              176
              177
              178
              179
              180
              181
              182
              183
              184
              185
              186
              187
              188
              189
              190
              191
              192
              193
              194
              195
              196
              197
              198
              199
              200
              201
              202
              203
              204
              205
              206
              207
              208
              209
              210
              211
              212
              213
              214
              215
              216
              217
              218
              219
              220
              221
              222
              223
              224
              225
              226
              227
              228
              229
              230
              231
              232
              233
              234
              235
              236
              237
              238
              239
              240
              241
              242
              243
              244
              245
              246
              247
              248
              249
              250
              251
              252
              253
              254
              255
              256
              257
              258
              259
              260
              261
              262
              263
              264
              265
              266
              267
              268
              269
              270
              271
              272
              273
              274
              275
              276
              277
              278
              279
              280
              281
              282
              283
              284
              285
              286
              287
              288
              289
              290
              291
              292
              293
              294
              295
              296
              297
              298
              299
              class TeamRun[TDeps, TResult](BaseTeam[TDeps, TResult]):
                  """Handles team operations with monitoring."""
              
                  def __init__(
                      self,
                      agents: Sequence[MessageNode[TDeps, Any]],
                      *,
                      name: str | None = None,
                      description: str | None = None,
                      shared_prompt: str | None = None,
                      validator: MessageNode[Any, TResult] | None = None,
                      picker: AnyAgent[Any, Any] | None = None,
                      num_picks: int | None = None,
                      pick_prompt: str | None = None,
                      # result_mode: ResultMode = "last",
                  ):
                      super().__init__(
                          agents,
                          name=name,
                          description=description,
                          shared_prompt=shared_prompt,
                          picker=picker,
                          num_picks=num_picks,
                          pick_prompt=pick_prompt,
                      )
                      self.validator = validator
                      self.result_mode = "last"
              
                  def __prompt__(self) -> str:
                      """Format team info for prompts."""
                      members = " -> ".join(a.name for a in self.agents)
                      desc = f" - {self.description}" if self.description else ""
                      return f"Sequential Team '{self.name}'{desc}\nPipeline: {members}"
              
                  async def _run(
                      self,
                      *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
                      wait_for_connections: bool | None = None,
                      message_id: str | None = None,
                      conversation_id: str | None = None,
                      **kwargs: Any,
                  ) -> ChatMessage[TResult]:
                      """Run agents sequentially and return combined message.
              
                      This message wraps execute and extracts the ChatMessage in order to fulfill
                      the "message protocol".
                      """
                      message_id = message_id or str(uuid4())
              
                      result = await self.execute(*prompts, **kwargs)
                      all_messages = [r.message for r in result if r.message]
                      assert all_messages, "Error during execution, returned None for TeamRun"
                      # Determine content based on mode
                      match self.result_mode:
                          case "last":
                              content = all_messages[-1].content
                          # case "concat":
                          #     content = "\n".join(msg.format() for msg in all_messages)
                          case _:
                              msg = f"Invalid result mode: {self.result_mode}"
                              raise ValueError(msg)
              
                      return ChatMessage(
                          content=content,
                          role="assistant",
                          name=self.name,
                          associated_messages=all_messages,
                          message_id=message_id,
                          conversation_id=conversation_id,
                          metadata={
                              "execution_order": [r.agent_name for r in result],
                              "start_time": result.start_time.isoformat(),
                              "errors": {name: str(error) for name, error in result.errors.items()},
                          },
                      )
              
                  async def execute(
                      self,
                      *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
                      **kwargs: Any,
                  ) -> TeamResponse[TResult]:
                      """Start execution with optional monitoring."""
                      self._team_talk.clear()
                      start_time = datetime.now()
                      final_prompt = list(prompts)
                      if self.shared_prompt:
                          final_prompt.insert(0, self.shared_prompt)
              
                      responses = [
                          i
                          async for i in self.execute_iter(*final_prompt)
                          if isinstance(i, AgentResponse)
                      ]
                      return TeamResponse(responses, start_time)
              
                  async def run_iter(
                      self,
                      *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str],
                      **kwargs: Any,
                  ) -> AsyncIterator[ChatMessage[Any]]:
                      """Yield messages from the execution chain."""
                      async for item in self.execute_iter(*prompts, **kwargs):
                          match item:
                              case AgentResponse():
                                  if item.message:
                                      yield item.message
                              case Talk():
                                  pass
              
                  async def execute_iter(
                      self,
                      *prompt: AnyPromptType | PIL.Image.Image | os.PathLike[str],
                      **kwargs: Any,
                  ) -> AsyncIterator[Talk[Any] | AgentResponse[Any]]:
                      from toprompt import to_prompt
              
                      connections: list[Talk[Any]] = []
                      try:
                          combined_prompt = "\n".join([await to_prompt(p) for p in prompt])
                          all_nodes = list(await self.pick_agents(combined_prompt))
                          if self.validator:
                              all_nodes.append(self.validator)
                          first = all_nodes[0]
                          connections = [
                              source.connect_to(target, queued=True)
                              for source, target in pairwise(all_nodes)
                          ]
                          for conn in connections:
                              self._team_talk.append(conn)
              
                          # First agent
                          start = perf_counter()
                          message = await first.run(*prompt, **kwargs)
                          timing = perf_counter() - start
                          response = AgentResponse[Any](first.name, message=message, timing=timing)
                          yield response
              
                          # Process through chain
                          for connection in connections:
                              target = connection.targets[0]
                              target_name = target.name
                              yield connection
              
                              # Let errors propagate - they break the chain
                              start = perf_counter()
                              messages = await connection.trigger()
              
                              # If this is the last node
                              if target == all_nodes[-1]:
                                  last_talk = Talk[Any](target, [], connection_type="run")
                                  if response.message:
                                      last_talk.stats.messages.append(response.message)
                                  self._team_talk.append(last_talk)
              
                              timing = perf_counter() - start
                              msg = messages[0]
                              response = AgentResponse[Any](target_name, message=msg, timing=timing)
                              yield response
              
                      finally:
                          # Always clean up connections
                          for connection in connections:
                              connection.disconnect()
              
                  @asynccontextmanager
                  async def chain_stream(
                      self,
                      *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
                      require_all: bool = True,
                      **kwargs: Any,
                  ) -> AsyncIterator[StreamingResponseProtocol]:
                      """Stream results through chain of team members."""
                      from llmling_agent.agent import Agent, StructuredAgent
                      from llmling_agent.delegation import TeamRun
                      from llmling_agent_providers.base import StreamingResponseProtocol
              
                      async with AsyncExitStack() as stack:
                          streams: list[StreamingResponseProtocol[str]] = []
                          current_message = prompts
              
                          # Set up all streams
                          for agent in self.agents:
                              try:
                                  assert isinstance(agent, TeamRun | Agent | StructuredAgent), (
                                      "Cannot stream teams!"
                                  )
                                  stream = await stack.enter_async_context(
                                      agent.run_stream(*current_message, **kwargs)
                                  )
                                  streams.append(stream)  # type: ignore
                                  # Wait for complete response for next agent
                                  async for chunk in stream.stream():
                                      current_message = chunk
                                      if stream.is_complete:
                                          current_message = (stream.formatted_content,)  # type: ignore
                                          break
                              except Exception as e:
                                  if require_all:
                                      msg = f"Chain broken at {agent.name}: {e}"
                                      raise ValueError(msg) from e
                                  logger.warning("Chain handler %s failed: %s", agent.name, e)
              
                          # Create a stream-like interface for the chain
                          class ChainStream(StreamingResponseProtocol[str]):
                              def __init__(self):
                                  self.streams = streams
                                  self.current_stream_idx = 0
                                  self.is_complete = False
                                  self.model_name = None
              
                              def usage(self) -> Usage:
                                  @dataclass
                                  class Usage:
                                      total_tokens: int | None
                                      request_tokens: int | None
                                      response_tokens: int | None
              
                                  return Usage(0, 0, 0)
              
                              async def stream(self) -> AsyncIterator[str]:  # type: ignore
                                  for idx, stream in enumerate(self.streams):
                                      self.current_stream_idx = idx
                                      async for chunk in stream.stream():
                                          yield chunk
                                          if idx == len(self.streams) - 1 and stream.is_complete:
                                              self.is_complete = True
              
                          yield ChainStream()
              
                  @asynccontextmanager
                  async def run_stream(
                      self,
                      *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str],
                      **kwargs: Any,
                  ) -> AsyncIterator[StreamingResponseProtocol[TResult]]:
                      """Stream responses through the chain.
              
                      Provides same interface as Agent.run_stream.
                      """
                      async with self.chain_stream(*prompts, **kwargs) as stream:
                          yield stream
              

              __prompt__

              __prompt__() -> str
              

              Format team info for prompts.

              Source code in src/llmling_agent/delegation/teamrun.py
              87
              88
              89
              90
              91
              def __prompt__(self) -> str:
                  """Format team info for prompts."""
                  members = " -> ".join(a.name for a in self.agents)
                  desc = f" - {self.description}" if self.description else ""
                  return f"Sequential Team '{self.name}'{desc}\nPipeline: {members}"
              

              _run async

              _run(
                  *prompts: AnyPromptType | Image | PathLike[str] | None,
                  wait_for_connections: bool | None = None,
                  message_id: str | None = None,
                  conversation_id: str | None = None,
                  **kwargs: Any,
              ) -> ChatMessage[TResult]
              

              Run agents sequentially and return combined message.

              This message wraps execute and extracts the ChatMessage in order to fulfill the "message protocol".

              Source code in src/llmling_agent/delegation/teamrun.py
               93
               94
               95
               96
               97
               98
               99
              100
              101
              102
              103
              104
              105
              106
              107
              108
              109
              110
              111
              112
              113
              114
              115
              116
              117
              118
              119
              120
              121
              122
              123
              124
              125
              126
              127
              128
              129
              130
              131
              132
              133
              async def _run(
                  self,
                  *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
                  wait_for_connections: bool | None = None,
                  message_id: str | None = None,
                  conversation_id: str | None = None,
                  **kwargs: Any,
              ) -> ChatMessage[TResult]:
                  """Run agents sequentially and return combined message.
              
                  This message wraps execute and extracts the ChatMessage in order to fulfill
                  the "message protocol".
                  """
                  message_id = message_id or str(uuid4())
              
                  result = await self.execute(*prompts, **kwargs)
                  all_messages = [r.message for r in result if r.message]
                  assert all_messages, "Error during execution, returned None for TeamRun"
                  # Determine content based on mode
                  match self.result_mode:
                      case "last":
                          content = all_messages[-1].content
                      # case "concat":
                      #     content = "\n".join(msg.format() for msg in all_messages)
                      case _:
                          msg = f"Invalid result mode: {self.result_mode}"
                          raise ValueError(msg)
              
                  return ChatMessage(
                      content=content,
                      role="assistant",
                      name=self.name,
                      associated_messages=all_messages,
                      message_id=message_id,
                      conversation_id=conversation_id,
                      metadata={
                          "execution_order": [r.agent_name for r in result],
                          "start_time": result.start_time.isoformat(),
                          "errors": {name: str(error) for name, error in result.errors.items()},
                      },
                  )
              

              chain_stream async

              chain_stream(
                  *prompts: AnyPromptType | Image | PathLike[str] | None,
                  require_all: bool = True,
                  **kwargs: Any,
              ) -> AsyncIterator[StreamingResponseProtocol]
              

              Stream results through chain of team members.

              Source code in src/llmling_agent/delegation/teamrun.py
              223
              224
              225
              226
              227
              228
              229
              230
              231
              232
              233
              234
              235
              236
              237
              238
              239
              240
              241
              242
              243
              244
              245
              246
              247
              248
              249
              250
              251
              252
              253
              254
              255
              256
              257
              258
              259
              260
              261
              262
              263
              264
              265
              266
              267
              268
              269
              270
              271
              272
              273
              274
              275
              276
              277
              278
              279
              280
              281
              282
              283
              284
              285
              286
              @asynccontextmanager
              async def chain_stream(
                  self,
                  *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
                  require_all: bool = True,
                  **kwargs: Any,
              ) -> AsyncIterator[StreamingResponseProtocol]:
                  """Stream results through chain of team members."""
                  from llmling_agent.agent import Agent, StructuredAgent
                  from llmling_agent.delegation import TeamRun
                  from llmling_agent_providers.base import StreamingResponseProtocol
              
                  async with AsyncExitStack() as stack:
                      streams: list[StreamingResponseProtocol[str]] = []
                      current_message = prompts
              
                      # Set up all streams
                      for agent in self.agents:
                          try:
                              assert isinstance(agent, TeamRun | Agent | StructuredAgent), (
                                  "Cannot stream teams!"
                              )
                              stream = await stack.enter_async_context(
                                  agent.run_stream(*current_message, **kwargs)
                              )
                              streams.append(stream)  # type: ignore
                              # Wait for complete response for next agent
                              async for chunk in stream.stream():
                                  current_message = chunk
                                  if stream.is_complete:
                                      current_message = (stream.formatted_content,)  # type: ignore
                                      break
                          except Exception as e:
                              if require_all:
                                  msg = f"Chain broken at {agent.name}: {e}"
                                  raise ValueError(msg) from e
                              logger.warning("Chain handler %s failed: %s", agent.name, e)
              
                      # Create a stream-like interface for the chain
                      class ChainStream(StreamingResponseProtocol[str]):
                          def __init__(self):
                              self.streams = streams
                              self.current_stream_idx = 0
                              self.is_complete = False
                              self.model_name = None
              
                          def usage(self) -> Usage:
                              @dataclass
                              class Usage:
                                  total_tokens: int | None
                                  request_tokens: int | None
                                  response_tokens: int | None
              
                              return Usage(0, 0, 0)
              
                          async def stream(self) -> AsyncIterator[str]:  # type: ignore
                              for idx, stream in enumerate(self.streams):
                                  self.current_stream_idx = idx
                                  async for chunk in stream.stream():
                                      yield chunk
                                      if idx == len(self.streams) - 1 and stream.is_complete:
                                          self.is_complete = True
              
                      yield ChainStream()
              

              execute async

              execute(
                  *prompts: AnyPromptType | Image | PathLike[str] | None, **kwargs: Any
              ) -> TeamResponse[TResult]
              

              Start execution with optional monitoring.

              Source code in src/llmling_agent/delegation/teamrun.py
              135
              136
              137
              138
              139
              140
              141
              142
              143
              144
              145
              146
              147
              148
              149
              150
              151
              152
              async def execute(
                  self,
                  *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str] | None,
                  **kwargs: Any,
              ) -> TeamResponse[TResult]:
                  """Start execution with optional monitoring."""
                  self._team_talk.clear()
                  start_time = datetime.now()
                  final_prompt = list(prompts)
                  if self.shared_prompt:
                      final_prompt.insert(0, self.shared_prompt)
              
                  responses = [
                      i
                      async for i in self.execute_iter(*final_prompt)
                      if isinstance(i, AgentResponse)
                  ]
                  return TeamResponse(responses, start_time)
              

              run_iter async

              run_iter(
                  *prompts: AnyPromptType | Image | PathLike[str], **kwargs: Any
              ) -> AsyncIterator[ChatMessage[Any]]
              

              Yield messages from the execution chain.

              Source code in src/llmling_agent/delegation/teamrun.py
              154
              155
              156
              157
              158
              159
              160
              161
              162
              163
              164
              165
              166
              async def run_iter(
                  self,
                  *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str],
                  **kwargs: Any,
              ) -> AsyncIterator[ChatMessage[Any]]:
                  """Yield messages from the execution chain."""
                  async for item in self.execute_iter(*prompts, **kwargs):
                      match item:
                          case AgentResponse():
                              if item.message:
                                  yield item.message
                          case Talk():
                              pass
              

              run_stream async

              run_stream(
                  *prompts: AnyPromptType | Image | PathLike[str], **kwargs: Any
              ) -> AsyncIterator[StreamingResponseProtocol[TResult]]
              

              Stream responses through the chain.

              Provides same interface as Agent.run_stream.

              Source code in src/llmling_agent/delegation/teamrun.py
              288
              289
              290
              291
              292
              293
              294
              295
              296
              297
              298
              299
              @asynccontextmanager
              async def run_stream(
                  self,
                  *prompts: AnyPromptType | PIL.Image.Image | os.PathLike[str],
                  **kwargs: Any,
              ) -> AsyncIterator[StreamingResponseProtocol[TResult]]:
                  """Stream responses through the chain.
              
                  Provides same interface as Agent.run_stream.
                  """
                  async with self.chain_stream(*prompts, **kwargs) as stream:
                      yield stream